From 63d5771992db7f56b051df8a1fa92b2007e5e967 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 25 Jan 2017 16:57:03 -0500 Subject: [PATCH 1/2] ACCUMULO-4575 Fixed concurrent delete issue in FATE ops --- .../accumulo/master/FateServiceHandler.java | 15 +- .../master/tableOps/CancelCompactions.java | 19 +- .../master/tableOps/ChangeTableState.java | 20 ++- .../master/tableOps/CompactRange.java | 30 ++-- .../master/tableOps/CompactionDriver.java | 14 +- .../accumulo/master/tableOps/DeleteTable.java | 29 ++- .../accumulo/master/tableOps/RenameTable.java | 19 +- .../master/tableOps/TableRangeOp.java | 21 ++- .../master/tableOps/TableRangeOpWait.java | 9 +- .../accumulo/master/tableOps/Utils.java | 9 +- .../functional/ConcurrentDeleteTableIT.java | 169 +++++++++++++++++- 11 files changed, 263 insertions(+), 91 deletions(-) diff --git a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java index 5f0ddd24f5d..5af612c3dca 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java +++ b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java @@ -185,7 +185,7 @@ public String invalidMessage(String argument) { throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); try { - master.fate.seedTransaction(opid, new TraceRepo<>(new RenameTable(tableId, oldTableName, newTableName)), autoCleanup); + master.fate.seedTransaction(opid, new TraceRepo<>(new RenameTable(namespaceId, tableId, oldTableName, newTableName)), autoCleanup); } catch (NamespaceNotFoundException e) { throw new ThriftTableOperationException(null, oldTableName, tableOp, TableOperationExceptionType.NAMESPACE_NOTFOUND, ""); } @@ -273,7 +273,7 @@ public String invalidMessage(String argument) { if (!canOnlineOfflineTable) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - master.fate.seedTransaction(opid, new TraceRepo<>(new ChangeTableState(tableId, tableOp)), autoCleanup); + master.fate.seedTransaction(opid, new TraceRepo<>(new ChangeTableState(namespaceId, tableId, tableOp)), autoCleanup); break; } case TABLE_OFFLINE: { @@ -292,7 +292,7 @@ public String invalidMessage(String argument) { if (!canOnlineOfflineTable) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - master.fate.seedTransaction(opid, new TraceRepo<>(new ChangeTableState(tableId, tableOp)), autoCleanup); + master.fate.seedTransaction(opid, new TraceRepo<>(new ChangeTableState(namespaceId, tableId, tableOp)), autoCleanup); break; } case TABLE_MERGE: { @@ -316,7 +316,7 @@ public String invalidMessage(String argument) { throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); Master.log.debug("Creating merge op: " + tableId + " " + startRow + " " + endRow); - master.fate.seedTransaction(opid, new TraceRepo<>(new TableRangeOp(MergeInfo.Operation.MERGE, tableId, startRow, endRow)), autoCleanup); + master.fate.seedTransaction(opid, new TraceRepo<>(new TableRangeOp(MergeInfo.Operation.MERGE, namespaceId, tableId, startRow, endRow)), autoCleanup); break; } case TABLE_DELETE_RANGE: { @@ -339,7 +339,7 @@ public String invalidMessage(String argument) { if (!canDeleteRange) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - master.fate.seedTransaction(opid, new TraceRepo<>(new TableRangeOp(MergeInfo.Operation.DELETE, tableId, startRow, endRow)), autoCleanup); + master.fate.seedTransaction(opid, new TraceRepo<>(new TableRangeOp(MergeInfo.Operation.DELETE, namespaceId, tableId, startRow, endRow)), autoCleanup); break; } case TABLE_BULK_IMPORT: { @@ -386,7 +386,8 @@ public String invalidMessage(String argument) { if (!canCompact) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - master.fate.seedTransaction(opid, new TraceRepo<>(new CompactRange(tableId, startRow, endRow, iterators, compactionStrategy)), autoCleanup); + master.fate + .seedTransaction(opid, new TraceRepo<>(new CompactRange(namespaceId, tableId, startRow, endRow, iterators, compactionStrategy)), autoCleanup); break; } case TABLE_CANCEL_COMPACT: { @@ -405,7 +406,7 @@ public String invalidMessage(String argument) { if (!canCancelCompact) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - master.fate.seedTransaction(opid, new TraceRepo<>(new CancelCompactions(tableId)), autoCleanup); + master.fate.seedTransaction(opid, new TraceRepo<>(new CancelCompactions(namespaceId, tableId)), autoCleanup); break; } case TABLE_IMPORT: { diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java index e268f17771b..c98174e7fa8 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java @@ -19,7 +19,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.fate.Repo; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; @@ -31,15 +30,20 @@ public class CancelCompactions extends MasterRepo { private static final long serialVersionUID = 1L; private String tableId; + private String namespaceId; - public CancelCompactions(String tableId) { + private String getNamespaceId(Master env) throws Exception { + return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.COMPACT_CANCEL, this.namespaceId); + } + + public CancelCompactions(String namespaceId, String tableId) { this.tableId = tableId; + this.namespaceId = namespaceId; } @Override - public long isReady(long tid, Master environment) throws Exception { - String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId); - return Utils.reserveNamespace(namespaceId, tid, false, true, TableOperation.COMPACT_CANCEL) + public long isReady(long tid, Master env) throws Exception { + return Utils.reserveNamespace(getNamespaceId(env), tid, false, true, TableOperation.COMPACT_CANCEL) + Utils.reserveTable(tableId, tid, false, true, TableOperation.COMPACT_CANCEL); } @@ -73,9 +77,8 @@ public byte[] mutate(byte[] currentValue) throws Exception { } @Override - public void undo(long tid, Master environment) throws Exception { - String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId); - Utils.unreserveNamespace(namespaceId, tid, false); + public void undo(long tid, Master env) throws Exception { Utils.unreserveTable(tableId, tid, false); + Utils.unreserveNamespace(getNamespaceId(env), tid, false); } } diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java index 86495702c92..ee6efa4bee3 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.master.tableOps; -import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.fate.Repo; @@ -28,10 +27,16 @@ public class ChangeTableState extends MasterRepo { private static final long serialVersionUID = 1L; private String tableId; + private String namespaceId; private TableOperation top; - public ChangeTableState(String tableId, TableOperation top) { + private String getNamespaceId(Master env) throws Exception { + return Utils.getNamespaceId(env.getInstance(), tableId, top, this.namespaceId); + } + + public ChangeTableState(String namespaceId, String tableId, TableOperation top) { this.tableId = tableId; + this.namespaceId = namespaceId; this.top = top; if (top != TableOperation.ONLINE && top != TableOperation.OFFLINE) @@ -39,21 +44,19 @@ public ChangeTableState(String tableId, TableOperation top) { } @Override - public long isReady(long tid, Master environment) throws Exception { - String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId); + public long isReady(long tid, Master env) throws Exception { // reserve the table so that this op does not run concurrently with create, clone, or delete table - return Utils.reserveNamespace(namespaceId, tid, false, true, top) + Utils.reserveTable(tableId, tid, true, true, top); + return Utils.reserveNamespace(getNamespaceId(env), tid, false, true, top) + Utils.reserveTable(tableId, tid, true, true, top); } @Override public Repo call(long tid, Master env) throws Exception { - String namespaceId = Tables.getNamespaceId(env.getInstance(), tableId); TableState ts = TableState.ONLINE; if (top == TableOperation.OFFLINE) ts = TableState.OFFLINE; TableManager.getInstance().transitionTableState(tableId, ts); - Utils.unreserveNamespace(namespaceId, tid, false); + Utils.unreserveNamespace(getNamespaceId(env), tid, false); Utils.unreserveTable(tableId, tid, true); LoggerFactory.getLogger(ChangeTableState.class).debug("Changed table state " + tableId + " " + ts); env.getEventCoordinator().event("Set table state of %s to %s", tableId, ts); @@ -62,8 +65,7 @@ public Repo call(long tid, Master env) throws Exception { @Override public void undo(long tid, Master env) throws Exception { - String namespaceId = Tables.getNamespaceId(env.getInstance(), tableId); - Utils.unreserveNamespace(namespaceId, tid, false); + Utils.unreserveNamespace(getNamespaceId(env), tid, false); Utils.unreserveTable(tableId, tid, true); } } diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java index 7a9c5d6e3b2..e641479373d 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java @@ -25,7 +25,6 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.admin.CompactionStrategyConfig; import org.apache.accumulo.core.client.impl.CompactionStrategyConfigUtil; -import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; @@ -47,18 +46,25 @@ public class CompactRange extends MasterRepo { private static final long serialVersionUID = 1L; private final String tableId; + private final String namespaceId; private byte[] startRow; private byte[] endRow; private byte[] config; - public CompactRange(String tableId, byte[] startRow, byte[] endRow, List iterators, CompactionStrategyConfig compactionStrategy) - throws ThriftTableOperationException { + private String getNamespaceId(Master env) throws Exception { + return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.COMPACT, this.namespaceId); + } + + public CompactRange(String namespaceId, String tableId, byte[] startRow, byte[] endRow, List iterators, + CompactionStrategyConfig compactionStrategy) throws ThriftTableOperationException { + requireNonNull(namespaceId, "Invalid argument: null namespaceId"); requireNonNull(tableId, "Invalid argument: null tableId"); requireNonNull(iterators, "Invalid argument: null iterator list"); requireNonNull(compactionStrategy, "Invalid argument: null compactionStrategy"); this.tableId = tableId; + this.namespaceId = namespaceId; this.startRow = startRow.length == 0 ? null : startRow; this.endRow = endRow.length == 0 ? null : endRow; @@ -74,15 +80,14 @@ public CompactRange(String tableId, byte[] startRow, byte[] endRow, List call(final long tid, Master environment) throws Exception { - String zTablePath = Constants.ZROOT + "/" + environment.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID; + public Repo call(final long tid, Master env) throws Exception { + String zTablePath = Constants.ZROOT + "/" + env.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID; IZooReaderWriter zoo = ZooReaderWriter.getInstance(); byte[] cid; @@ -122,7 +127,7 @@ public byte[] mutate(byte[] currentValue) throws Exception { } }); - return new CompactionDriver(Long.parseLong(new String(cid, UTF_8).split(",")[0]), tableId, startRow, endRow); + return new CompactionDriver(Long.parseLong(new String(cid, UTF_8).split(",")[0]), getNamespaceId(env), tableId, startRow, endRow); } catch (NoNodeException nne) { throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null); } @@ -158,12 +163,11 @@ public byte[] mutate(byte[] currentValue) throws Exception { } @Override - public void undo(long tid, Master environment) throws Exception { - String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId); + public void undo(long tid, Master env) throws Exception { try { - removeIterators(environment, tid, tableId); + removeIterators(env, tid, tableId); } finally { - Utils.unreserveNamespace(namespaceId, tid, false); + Utils.unreserveNamespace(getNamespaceId(env), tid, false); Utils.unreserveTable(tableId, tid, false); } } diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java index f6301211fc3..da60f890721 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java @@ -57,13 +57,18 @@ class CompactionDriver extends MasterRepo { private long compactId; private final String tableId; + private final String namespaceId; private byte[] startRow; private byte[] endRow; - public CompactionDriver(long compactId, String tableId, byte[] startRow, byte[] endRow) { + private String getNamespaceId(Master env) throws Exception { + return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.COMPACT, this.namespaceId); + } + public CompactionDriver(long compactId, String namespaceId, String tableId, byte[] startRow, byte[] endRow) { this.compactId = compactId; this.tableId = tableId; + this.namespaceId = namespaceId; this.startRow = startRow; this.endRow = endRow; } @@ -172,11 +177,10 @@ public long isReady(long tid, Master master) throws Exception { } @Override - public Repo call(long tid, Master environment) throws Exception { - String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId); - CompactRange.removeIterators(environment, tid, tableId); + public Repo call(long tid, Master env) throws Exception { + CompactRange.removeIterators(env, tid, tableId); Utils.getReadLock(tableId, tid).unlock(); - Utils.getReadLock(namespaceId, tid).unlock(); + Utils.getReadLock(getNamespaceId(env), tid).unlock(); return null; } diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java index 1eae5b94a6d..e6267dfa544 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java @@ -29,14 +29,8 @@ public class DeleteTable extends MasterRepo { private String tableId; private String namespaceId; - private String getNamespaceId(Master environment) throws Exception { - if (namespaceId == null) { - // For ACCUMULO-4575 namespaceId was added in a bug fix release. Since it was added in bug fix release, we have to ensure we can properly deserialize - // older versions. When deserializing an older version, namespaceId will be null. For this case revert to the old buggy behavior. - return Utils.getNamespaceId(environment.getInstance(), tableId, TableOperation.DELETE); - } - - return namespaceId; + private String getNamespaceId(Master env) throws Exception { + return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.DELETE, this.namespaceId); } public DeleteTable(String namespaceId, String tableId) { @@ -45,24 +39,21 @@ public DeleteTable(String namespaceId, String tableId) { } @Override - public long isReady(long tid, Master environment) throws Exception { - String namespaceId = getNamespaceId(environment); - return Utils.reserveNamespace(namespaceId, tid, false, false, TableOperation.DELETE) + Utils.reserveTable(tableId, tid, true, true, TableOperation.DELETE); + public long isReady(long tid, Master env) throws Exception { + return Utils.reserveNamespace(getNamespaceId(env), tid, false, false, TableOperation.DELETE) + + Utils.reserveTable(tableId, tid, true, true, TableOperation.DELETE); } @Override - public Repo call(long tid, Master environment) throws Exception { - String namespaceId = getNamespaceId(environment); + public Repo call(long tid, Master env) throws Exception { TableManager.getInstance().transitionTableState(tableId, TableState.DELETING); - environment.getEventCoordinator().event("deleting table %s ", tableId); - return new CleanUp(tableId, namespaceId); + env.getEventCoordinator().event("deleting table %s ", tableId); + return new CleanUp(tableId, getNamespaceId(env)); } @Override - public void undo(long tid, Master environment) throws Exception { - if (namespaceId != null) { - Utils.unreserveNamespace(namespaceId, tid, false); - } + public void undo(long tid, Master env) throws Exception { Utils.unreserveTable(tableId, tid, true); + Utils.unreserveNamespace(getNamespaceId(env), tid, false); } } diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java index 053749fa9ec..80d3293afff 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java @@ -39,16 +39,22 @@ public class RenameTable extends MasterRepo { private static final long serialVersionUID = 1L; private String tableId; + private String namespaceId; private String oldTableName; private String newTableName; + private String getNamespaceId(Master env) throws Exception { + return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.RENAME, this.namespaceId); + } + @Override - public long isReady(long tid, Master environment) throws Exception { - String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId); - return Utils.reserveNamespace(namespaceId, tid, false, true, TableOperation.RENAME) + Utils.reserveTable(tableId, tid, true, true, TableOperation.RENAME); + public long isReady(long tid, Master env) throws Exception { + return Utils.reserveNamespace(getNamespaceId(env), tid, false, true, TableOperation.RENAME) + + Utils.reserveTable(tableId, tid, true, true, TableOperation.RENAME); } - public RenameTable(String tableId, String oldTableName, String newTableName) throws NamespaceNotFoundException { + public RenameTable(String namespaceId, String tableId, String oldTableName, String newTableName) throws NamespaceNotFoundException { + this.namespaceId = namespaceId; this.tableId = tableId; this.oldTableName = oldTableName; this.newTableName = newTableName; @@ -57,7 +63,7 @@ public RenameTable(String tableId, String oldTableName, String newTableName) thr @Override public Repo call(long tid, Master master) throws Exception { Instance instance = master.getInstance(); - String namespaceId = Tables.getNamespaceId(instance, tableId); + String namespaceId = getNamespaceId(master); Pair qualifiedOldTableName = Tables.qualify(oldTableName); Pair qualifiedNewTableName = Tables.qualify(newTableName); @@ -104,9 +110,8 @@ public byte[] mutate(byte[] current) throws Exception { @Override public void undo(long tid, Master env) throws Exception { - String namespaceId = Tables.getNamespaceId(env.getInstance(), tableId); Utils.unreserveTable(tableId, tid, true); - Utils.unreserveNamespace(namespaceId, tid, false); + Utils.unreserveNamespace(getNamespaceId(env), tid, false); } } diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java index 879470b8968..64d08beb28c 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.master.tableOps; -import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; @@ -38,19 +37,24 @@ public class TableRangeOp extends MasterRepo { private static final long serialVersionUID = 1L; private final String tableId; + private final String namespaceId; private byte[] startRow; private byte[] endRow; private Operation op; - @Override - public long isReady(long tid, Master environment) throws Exception { - String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId); - return Utils.reserveNamespace(namespaceId, tid, false, true, TableOperation.MERGE) + Utils.reserveTable(tableId, tid, true, true, TableOperation.MERGE); + private String getNamespaceId(Master env) throws Exception { + return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.MERGE, this.namespaceId); } - public TableRangeOp(MergeInfo.Operation op, String tableId, Text startRow, Text endRow) throws ThriftTableOperationException { + @Override + public long isReady(long tid, Master env) throws Exception { + return Utils.reserveNamespace(getNamespaceId(env), tid, false, true, TableOperation.MERGE) + + Utils.reserveTable(tableId, tid, true, true, TableOperation.MERGE); + } + public TableRangeOp(MergeInfo.Operation op, String namespaceId, String tableId, Text startRow, Text endRow) throws ThriftTableOperationException { this.tableId = tableId; + this.namespaceId = namespaceId; this.startRow = TextUtil.getBytes(startRow); this.endRow = TextUtil.getBytes(endRow); this.op = op; @@ -81,20 +85,19 @@ public Repo call(long tid, Master env) throws Exception { env.setMergeState(new MergeInfo(range, op), MergeState.STARTED); } - return new TableRangeOpWait(tableId); + return new TableRangeOpWait(getNamespaceId(env), tableId); } @Override public void undo(long tid, Master env) throws Exception { - String namespaceId = Tables.getNamespaceId(env.getInstance(), tableId); // Not sure this is a good thing to do. The Master state engine should be the one to remove it. Text tableIdText = new Text(tableId); MergeInfo mergeInfo = env.getMergeInfo(tableIdText); if (mergeInfo.getState() != MergeState.NONE) log.info("removing merge information " + mergeInfo); env.clearMergeState(tableIdText); - Utils.unreserveNamespace(namespaceId, tid, false); Utils.unreserveTable(tableId, tid, true); + Utils.unreserveNamespace(getNamespaceId(env), tid, false); } } diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java index 668c79083e4..5feb06d2f51 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java @@ -16,7 +16,7 @@ */ package org.apache.accumulo.master.tableOps; -import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.fate.Repo; import org.apache.accumulo.master.Master; import org.apache.accumulo.server.master.state.MergeInfo; @@ -43,9 +43,11 @@ class TableRangeOpWait extends MasterRepo { private static final long serialVersionUID = 1L; private String tableId; + private String namespaceId; - public TableRangeOpWait(String tableId) { + public TableRangeOpWait(String namespaceId, String tableId) { this.tableId = tableId; + this.namespaceId = namespaceId; } @Override @@ -59,13 +61,12 @@ public long isReady(long tid, Master env) throws Exception { @Override public Repo call(long tid, Master master) throws Exception { - String namespaceId = Tables.getNamespaceId(master.getInstance(), tableId); Text tableIdText = new Text(tableId); MergeInfo mergeInfo = master.getMergeInfo(tableIdText); log.info("removing merge information " + mergeInfo); master.clearMergeState(tableIdText); - Utils.unreserveNamespace(namespaceId, tid, false); Utils.unreserveTable(tableId, tid, true); + Utils.unreserveNamespace(Utils.getNamespaceId(master.getInstance(), tableId, TableOperation.MERGE, this.namespaceId), tid, false); return null; } diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java index 9b921e2ca89..d47bedf307e 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java @@ -116,7 +116,14 @@ public static long reserveNamespace(String namespaceId, long id, boolean writeLo return 100; } - public static String getNamespaceId(Instance instance, String tableId, TableOperation op) throws Exception { + public static String getNamespaceId(Instance instance, String tableId, TableOperation op, String namespaceId) throws Exception { + if (namespaceId != null) { + return namespaceId; + } + + // For ACCUMULO-4575 namespaceId was added in a bug fix release. Since it was added in bug fix release, we have to ensure we can properly deserialize + // older versions. When deserializing an older version, namespaceId will be null. For this case revert to the old buggy behavior. + try { return Tables.getNamespaceId(instance, tableId); } catch (RuntimeException e) { diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java index 47980953b39..b26740a374e 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java @@ -18,8 +18,11 @@ package org.apache.accumulo.test.functional; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -35,6 +38,8 @@ import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.security.Authorizations; @@ -57,12 +62,7 @@ public void testConcurrentDeleteTablesOps() throws Exception { final Connector c = getConnector(); String[] tables = getUniqueNames(2); - TreeSet splits = new TreeSet<>(); - - for (int i = 0; i < 1000; i++) { - Text split = new Text(String.format("%09x", i * 100000)); - splits.add(split); - } + TreeSet splits = createSplits(); ExecutorService es = Executors.newFixedThreadPool(20); @@ -76,11 +76,12 @@ public void testConcurrentDeleteTablesOps() throws Exception { } count++; - final CountDownLatch cdl = new CountDownLatch(20); + int numDeleteOps = 20; + final CountDownLatch cdl = new CountDownLatch(numDeleteOps); List> futures = new ArrayList<>(); - for (int i = 0; i < 20; i++) { + for (int i = 0; i < numDeleteOps; i++) { Future future = es.submit(new Runnable() { @Override @@ -89,7 +90,7 @@ public void run() { cdl.countDown(); cdl.await(); c.tableOperations().delete(table); - } catch (TableNotFoundException e) { + } catch (TableNotFoundException | TableOfflineException e) { // expected } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) { throw new RuntimeException(e); @@ -121,6 +122,156 @@ public void run() { es.shutdown(); } + private TreeSet createSplits() { + TreeSet splits = new TreeSet<>(); + + for (int i = 0; i < 1000; i++) { + Text split = new Text(String.format("%09x", i * 100000)); + splits.add(split); + } + return splits; + } + + private static abstract class DelayedTableOp implements Runnable { + private CountDownLatch cdl; + + DelayedTableOp(CountDownLatch cdl) { + this.cdl = cdl; + } + + public void run() { + try { + cdl.countDown(); + cdl.await(); + Thread.sleep(10); + doTableOp(); + } catch (TableNotFoundException e) { + // expected + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected abstract void doTableOp() throws Exception; + } + + @Test + public void testConcurrentFateOpsWithDelete() throws Exception { + final Connector c = getConnector(); + String[] tables = getUniqueNames(2); + + TreeSet splits = createSplits(); + + int numOperations = 8; + + ExecutorService es = Executors.newFixedThreadPool(numOperations); + + int count = 0; + for (final String table : tables) { + c.tableOperations().create(table); + c.tableOperations().addSplits(table, splits); + writeData(c, table); + if (count == 1) { + c.tableOperations().flush(table, null, null, true); + } + count++; + + // increment this for each test + final CountDownLatch cdl = new CountDownLatch(numOperations); + + List> futures = new ArrayList<>(); + + futures.add(es.submit(new Runnable() { + @Override + public void run() { + try { + cdl.countDown(); + cdl.await(); + c.tableOperations().delete(table); + } catch (TableNotFoundException e) { + // expected + } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); + } + } + })); + + futures.add(es.submit(new DelayedTableOp(cdl) { + @Override + protected void doTableOp() throws Exception { + c.tableOperations().compact(table, new CompactionConfig()); + } + })); + + futures.add(es.submit(new DelayedTableOp(cdl) { + @Override + protected void doTableOp() throws Exception { + c.tableOperations().merge(table, null, null); + } + })); + + futures.add(es.submit(new DelayedTableOp(cdl) { + @Override + protected void doTableOp() throws Exception { + Map m = Collections.emptyMap(); + Set s = Collections.emptySet(); + c.tableOperations().clone(table, table + "_clone", true, m, s); + } + })); + + futures.add(es.submit(new DelayedTableOp(cdl) { + @Override + protected void doTableOp() throws Exception { + c.tableOperations().deleteRows(table, null, null); + } + })); + + futures.add(es.submit(new DelayedTableOp(cdl) { + @Override + protected void doTableOp() throws Exception { + c.tableOperations().cancelCompaction(table); + } + })); + + futures.add(es.submit(new DelayedTableOp(cdl) { + @Override + protected void doTableOp() throws Exception { + c.tableOperations().rename(table, table + "_renamed"); + } + })); + + futures.add(es.submit(new DelayedTableOp(cdl) { + @Override + protected void doTableOp() throws Exception { + c.tableOperations().offline(table); + } + })); + + Assert.assertEquals(numOperations, futures.size()); + + for (Future future : futures) { + future.get(); + } + + try { + c.createScanner(table, Authorizations.EMPTY); + Assert.fail("Expected table " + table + " to be gone."); + } catch (TableNotFoundException tnfe) { + // expected + } + + FateStatus fateStatus = getFateStatus(); + + // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent delete tables could fail and leave dangling locks. + Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size()); + Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size()); + } + + es.shutdown(); + } + private FateStatus getFateStatus() throws KeeperException, InterruptedException { Instance instance = getConnector().getInstance(); AdminUtil admin = new AdminUtil<>(false); From cef83a6525fe4cd588b758f34a8bc8b03628a913 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 26 Jan 2017 17:33:19 -0500 Subject: [PATCH 2/2] Fix exception handling --- .../accumulo/test/functional/ConcurrentDeleteTableIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java index b26740a374e..0c63e5977a2 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java @@ -90,7 +90,7 @@ public void run() { cdl.countDown(); cdl.await(); c.tableOperations().delete(table); - } catch (TableNotFoundException | TableOfflineException e) { + } catch (TableNotFoundException e) { // expected } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) { throw new RuntimeException(e); @@ -190,7 +190,7 @@ public void run() { cdl.countDown(); cdl.await(); c.tableOperations().delete(table); - } catch (TableNotFoundException e) { + } catch (TableNotFoundException | TableOfflineException e) { // expected } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) { throw new RuntimeException(e);