From 8d097c5f4388be83e90325e5b6d674ad21a6121b Mon Sep 17 00:00:00 2001 From: Shawn Walker Date: Tue, 21 Jun 2016 13:34:34 -0400 Subject: [PATCH 1/7] ACCUMULO-4353: Stabilize tablet assignment during transient failure. Added configuration property `table.suspend.duration` (default 0s): When a tablet server dies, instead of immediately placing its tablets in the TabletState.UNASSIGNED state, they are instead moved to the new TabletState.SUSPENDED state. A suspended tablet will only be reassigned if (a) table.suspend.duration has passed since the tablet was suspended, or (b) the tablet server most recently hosting the tablet has come back online. In the latter case, the tablet will be assigned back to its previous host. Added configuration property `master.metadata.suspendable` (default false): The above functionality is really meant to be used only on user tablets. Suspending metadata tablets can lead to much more significant loss of availability. Despite this, it is possible to set `table.suspend.duration` on `accumulo.metadata`. If one really wishes to allow metadata tablets to be suspended as well, one must also set the `master.metadata.suspendable` to true. I chose not to implement suspension of the root tablet. Implementation outline: * `master.MasterTime` maintains an approximately monotonic clock; this is used by suspension to determine how much time has passed since a tablet was suspended. `MasterTime` periodically writes its time base to ZooKeeper for persistence. * The `server.master.state.TabletState` now has a `TabletState.SUSPENDED` state. `TabletLocationState`, `MetaDataStateStore` were updated to properly read and write suspensions. * `server.master.state.TabletStateStore` now features a `suspend(...)` method, for suspending a tablet, with implementations in `MetaDataStateStore`. `suspend(...)` acts just as `unassign(...)`, except that it writes additional metadata indicating when each tablet was suspended, and which tablet server it was suspended from. * `master.TabletServerWatcher` updated to properly transition to/from `TabletState.SUSPENDED`. * `master.Master` updated to avoid balancing while any tablets remain suspended. --- .../org/apache/accumulo/core/Constants.java | 1 + .../apache/accumulo/core/conf/Property.java | 6 + .../core/metadata/schema/MetadataSchema.java | 7 + .../impl/MiniAccumuloClusterControl.java | 25 +- .../impl/MiniAccumuloClusterImpl.java | 12 +- .../constraints/MetadataConstraints.java | 6 +- .../master/state/MetaDataStateStore.java | 37 +++ .../master/state/MetaDataTableScanner.java | 6 +- .../master/state/SuspendingTServer.java | 71 +++++ .../master/state/TabletLocationState.java | 9 +- .../server/master/state/TabletState.java | 2 +- .../state/TabletStateChangeIterator.java | 1 + .../server/master/state/TabletStateStore.java | 11 + .../master/state/ZooTabletStateStore.java | 14 +- .../master/state/TabletLocationStateTest.java | 28 +- .../gc/GarbageCollectWriteAheadLogsTest.java | 4 +- .../org/apache/accumulo/master/Master.java | 39 ++- .../apache/accumulo/master/MasterTime.java | 108 ++++++++ .../accumulo/master/TabletGroupWatcher.java | 72 ++++- .../accumulo/master/state/MergeStats.java | 4 +- .../accumulo/master/state/TableCounts.java | 4 + .../state/RootTabletStateStoreTest.java | 4 +- .../apache/accumulo/tserver/TabletServer.java | 2 +- .../accumulo/test/master/MergeStateIT.java | 2 +- .../test/master/SuspendedTabletsIT.java | 256 ++++++++++++++++++ 25 files changed, 677 insertions(+), 54 deletions(-) create mode 100644 server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java create mode 100644 server/master/src/main/java/org/apache/accumulo/master/MasterTime.java create mode 100644 test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index 94ada7a3d6f..eebd81d2921 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -50,6 +50,7 @@ public class Constants { public static final String ZMASTER_LOCK = ZMASTERS + "/lock"; public static final String ZMASTER_GOAL_STATE = ZMASTERS + "/goal_state"; public static final String ZMASTER_REPLICATION_COORDINATOR_ADDR = ZMASTERS + "/repl_coord_addr"; + public static final String ZMASTER_TICK = ZMASTERS + "/tick"; public static final String ZGC = "/gc"; public static final String ZGC_LOCK = ZGC + "/lock"; diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index bc1e60e4df4..52449f103a5 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.util.format.DefaultFormatter; import org.apache.accumulo.core.util.interpret.DefaultScanInterpreter; import org.apache.accumulo.start.classloader.AccumuloClassLoader; @@ -231,6 +232,8 @@ public enum Property { "The time between adjustments of the coordinator thread pool"), MASTER_STATUS_THREAD_POOL_SIZE("master.status.threadpool.size", "1", PropertyType.COUNT, "The number of threads to use when fetching the tablet server status for balancing."), + MASTER_METADATA_SUSPENDABLE("master.metadata.suspendable", "false", PropertyType.BOOLEAN, "Allow tablets for the " + MetadataTable.NAME + + " table to be suspended via table.suspend.duration."), // properties that are specific to tablet server behavior TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers"), @@ -543,6 +546,9 @@ public enum Property { TABLE_SAMPLER_OPTS("table.sampler.opt.", null, PropertyType.PREFIX, "The property is used to set options for a sampler. If a sample had two options like hasher and modulous, then the two properties " + "table.sampler.opt.hasher=${hash algorithm} and table.sampler.opt.modulous=${mod} would be set."), + TABLE_SUSPEND_DURATION("table.suspend.duration", "0s", PropertyType.TIMEDURATION, + "For tablets belonging to this table: When a tablet server dies, allow the tablet server this duration to revive before reassigning its tablets" + + "to other tablet servers."), // VFS ClassLoader properties VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY(AccumuloVFSClassLoader.VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY, "", PropertyType.STRING, diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 7426fede795..c93987dd936 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -136,6 +136,13 @@ public static class LastLocationColumnFamily { public static final Text NAME = new Text("last"); } + /** + * Column family for storing suspension location, as a demand for assignment. + */ + public static class SuspendLocationColumn { + public static final ColumnFQ SUSPEND_COLUMN = new ColumnFQ(new Text("suspend"), new Text("loc")); + } + /** * Temporary markers that indicate a tablet loaded a bulk file */ diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterControl.java index 80c4edc9a7b..6d7195eabe7 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterControl.java @@ -42,6 +42,8 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; +import java.util.Collections; +import java.util.Map; /** * @@ -132,37 +134,46 @@ public synchronized void startAllServers(ServerType server) throws IOException { @Override public synchronized void start(ServerType server, String hostname) throws IOException { + start(server, hostname, Collections.EMPTY_MAP, Integer.MAX_VALUE); + } + + public synchronized void start(ServerType server, String hostname, Map configOverrides, int limit) throws IOException { + if (limit <= 0) { + return; + } + switch (server) { case TABLET_SERVER: synchronized (tabletServerProcesses) { - for (int i = tabletServerProcesses.size(); i < cluster.getConfig().getNumTservers(); i++) { - tabletServerProcesses.add(cluster._exec(TabletServer.class, server)); + int count = 0; + for (int i = tabletServerProcesses.size(); count < limit && i < cluster.getConfig().getNumTservers(); i++, ++count) { + tabletServerProcesses.add(cluster._exec(TabletServer.class, server, configOverrides)); } } break; case MASTER: if (null == masterProcess) { - masterProcess = cluster._exec(Master.class, server); + masterProcess = cluster._exec(Master.class, server, configOverrides); } break; case ZOOKEEPER: if (null == zooKeeperProcess) { - zooKeeperProcess = cluster._exec(ZooKeeperServerMain.class, server, cluster.getZooCfgFile().getAbsolutePath()); + zooKeeperProcess = cluster._exec(ZooKeeperServerMain.class, server, configOverrides, cluster.getZooCfgFile().getAbsolutePath()); } break; case GARBAGE_COLLECTOR: if (null == gcProcess) { - gcProcess = cluster._exec(SimpleGarbageCollector.class, server); + gcProcess = cluster._exec(SimpleGarbageCollector.class, server, configOverrides); } break; case MONITOR: if (null == monitor) { - monitor = cluster._exec(Monitor.class, server); + monitor = cluster._exec(Monitor.class, server, configOverrides); } break; case TRACER: if (null == tracer) { - tracer = cluster._exec(TraceServer.class, server); + tracer = cluster._exec(TraceServer.class, server, configOverrides); } break; default: diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java index 7bbfd070a8b..eb74e6e2925 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java @@ -111,6 +111,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.collect.Maps; +import java.nio.file.Files; /** * This class provides the backing implementation for {@link MiniAccumuloCluster}, and may contain features for internal testing which have not yet been @@ -341,10 +342,17 @@ private Process _exec(Class clazz, List extraJvmOpts, String... args) return process; } - Process _exec(Class clazz, ServerType serverType, String... args) throws IOException { - + Process _exec(Class clazz, ServerType serverType, Map configOverrides, String... args) throws IOException { List jvmOpts = new ArrayList(); jvmOpts.add("-Xmx" + config.getMemory(serverType)); + if (configOverrides != null && !configOverrides.isEmpty()) { + File siteFile = Files.createTempFile(config.getConfDir().toPath(), "accumulo-site", ".xml").toFile(); + Map confMap = new HashMap<>(); + confMap.putAll(config.getSiteConfig()); + confMap.putAll(configOverrides); + writeConfig(siteFile, confMap.entrySet()); + jvmOpts.add("-Dorg.apache.accumulo.config.file=" + siteFile.getName()); + } if (config.isJDWPEnabled()) { Integer port = PortUtils.getRandomFreePort(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index fd5af143a1a..aad5caae2d9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -65,9 +65,9 @@ public class MetadataConstraints implements Constraint { } private static final HashSet validColumnQuals = new HashSet(Arrays.asList(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, - TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN, TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, - TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN, TabletsSection.ServerColumnFamily.TIME_COLUMN, TabletsSection.ServerColumnFamily.LOCK_COLUMN, - TabletsSection.ServerColumnFamily.FLUSH_COLUMN, TabletsSection.ServerColumnFamily.COMPACT_COLUMN)); + TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN, TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN, + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN, TabletsSection.ServerColumnFamily.TIME_COLUMN, + TabletsSection.ServerColumnFamily.LOCK_COLUMN, TabletsSection.ServerColumnFamily.FLUSH_COLUMN, TabletsSection.ServerColumnFamily.COMPACT_COLUMN)); private static final HashSet validColumnFams = new HashSet(Arrays.asList(TabletsSection.BulkFileColumnFamily.NAME, LogColumnFamily.NAME, ScanFileColumnFamily.NAME, DataFileColumnFamily.NAME, TabletsSection.CurrentLocationColumnFamily.NAME, TabletsSection.LastLocationColumnFamily.NAME, diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java index 7763c258ba2..c549adce958 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java @@ -74,6 +74,7 @@ public void setLocations(Collection assignments) throws DistributedS Mutation m = new Mutation(assignment.tablet.getMetadataEntry()); assignment.server.putLocation(m); assignment.server.clearFutureLocation(m); + SuspendingTServer.clearSuspension(m); writer.addMutation(m); } } catch (Exception ex) { @@ -105,6 +106,7 @@ public void setFutureLocations(Collection assignments) throws Distri try { for (Assignment assignment : assignments) { Mutation m = new Mutation(assignment.tablet.getMetadataEntry()); + SuspendingTServer.clearSuspension(m); assignment.server.putFutureLocation(m); writer.addMutation(m); } @@ -121,7 +123,12 @@ public void setFutureLocations(Collection assignments) throws Distri @Override public void unassign(Collection tablets, Map> logsForDeadServers) throws DistributedStoreException { + suspend(tablets, logsForDeadServers, -1); + } + @Override + public void suspend(Collection tablets, Map> logsForDeadServers, long suspensionTimestamp) + throws DistributedStoreException { BatchWriter writer = createBatchWriter(); try { for (TabletLocationState tls : tablets) { @@ -137,6 +144,13 @@ public void unassign(Collection tablets, Map= 0) { + SuspendingTServer suspender = new SuspendingTServer(tls.current.getLocation(), suspensionTimestamp); + suspender.setSuspension(m); + } + } + if (tls.suspend != null && suspensionTimestamp < 0) { + SuspendingTServer.clearSuspension(m); } if (tls.future != null) { tls.future.clearFutureLocation(m); @@ -154,6 +168,29 @@ public void unassign(Collection tablets, Map tablets) throws DistributedStoreException { + BatchWriter writer = createBatchWriter(); + try { + for (TabletLocationState tls : tablets) { + if (tls.suspend != null) { + continue; + } + Mutation m = new Mutation(tls.extent.getMetadataEntry()); + SuspendingTServer.clearSuspension(m); + writer.addMutation(m); + } + } catch (Exception ex) { + throw new DistributedStoreException(ex); + } finally { + try { + writer.close(); + } catch (MutationsRejectedException e) { + throw new DistributedStoreException(e); + } + } + } + @Override public String name() { return "Normal Tablets"; diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java index 536e1a50fbf..bfe79ec2e3b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java @@ -78,6 +78,7 @@ static public void configureScanner(ScannerBase scanner, CurrentState state) { scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME); scanner.fetchColumnFamily(TabletsSection.LastLocationColumnFamily.NAME); + scanner.fetchColumnFamily(TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily()); scanner.fetchColumnFamily(LogColumnFamily.NAME); scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class)); @@ -136,6 +137,7 @@ public static TabletLocationState createTabletLocationState(Key k, Value v) thro TServerInstance future = null; TServerInstance current = null; TServerInstance last = null; + SuspendingTServer suspend = null; long lastTimestamp = 0; List> walogs = new ArrayList>(); boolean chopped = false; @@ -171,6 +173,8 @@ public static TabletLocationState createTabletLocationState(Key k, Value v) thro chopped = true; } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.equals(cf, cq)) { extent = new KeyExtent(row, entry.getValue()); + } else if (TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN.equals(cf, cq)) { + suspend = SuspendingTServer.fromValue(entry.getValue()); } } if (extent == null) { @@ -178,7 +182,7 @@ public static TabletLocationState createTabletLocationState(Key k, Value v) thro log.error(msg); throw new BadLocationStateException(msg, k.getRow()); } - return new TabletLocationState(extent, future, current, last, walogs, chopped); + return new TabletLocationState(extent, future, current, last, suspend, walogs, chopped); } private TabletLocationState fetch() { diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java new file mode 100644 index 00000000000..3f4e49e3355 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.master.state; + +import com.google.common.net.HostAndPort; +import java.util.Objects; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN; + +/** For a suspended tablet, the time of suspension and the server it was suspended from. */ +public class SuspendingTServer { + public final HostAndPort server; + public final long suspensionTime; + + SuspendingTServer(HostAndPort server, long suspensionTime) { + this.server = Objects.requireNonNull(server); + this.suspensionTime = suspensionTime; + } + + public static SuspendingTServer fromValue(Value value) { + String valStr = value.toString(); + String[] parts = valStr.split("[|]", 2); + return new SuspendingTServer(HostAndPort.fromString(parts[0]), Long.parseLong(parts[1])); + } + + public Value toValue() { + return new Value(server.toString() + "|" + suspensionTime); + } + + @Override + public boolean equals(Object rhsObject) { + if (!(rhsObject instanceof SuspendingTServer)) { + return false; + } + SuspendingTServer rhs = (SuspendingTServer) rhsObject; + return server.equals(rhs.server) && suspensionTime == rhs.suspensionTime; + } + + public void setSuspension(Mutation m) { + m.put(SUSPEND_COLUMN.getColumnFamily(), SUSPEND_COLUMN.getColumnQualifier(), toValue()); + } + + public static void clearSuspension(Mutation m) { + m.putDelete(SUSPEND_COLUMN.getColumnFamily(), SUSPEND_COLUMN.getColumnQualifier()); + } + + @Override + public int hashCode() { + return Objects.hash(server, suspensionTime); + } + + @Override + public String toString() { + return server.toString() + "[" + suspensionTime + "]"; + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java index 8116ecf9e30..369a14bab2f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java @@ -46,12 +46,13 @@ public Text getEncodedEndRow() { } } - public TabletLocationState(KeyExtent extent, TServerInstance future, TServerInstance current, TServerInstance last, Collection> walogs, - boolean chopped) throws BadLocationStateException { + public TabletLocationState(KeyExtent extent, TServerInstance future, TServerInstance current, TServerInstance last, SuspendingTServer suspend, + Collection> walogs, boolean chopped) throws BadLocationStateException { this.extent = extent; this.future = future; this.current = current; this.last = last; + this.suspend = suspend; if (walogs == null) walogs = Collections.emptyList(); this.walogs = walogs; @@ -65,6 +66,7 @@ public TabletLocationState(KeyExtent extent, TServerInstance future, TServerInst final public TServerInstance future; final public TServerInstance current; final public TServerInstance last; + final public SuspendingTServer suspend; final public Collection> walogs; final public boolean chopped; @@ -93,6 +95,9 @@ public TServerInstance getServer() { } public TabletState getState(Set liveServers) { + if (suspend != null) { + return TabletState.SUSPENDED; + } TServerInstance server = getServer(); if (server == null) return TabletState.UNASSIGNED; diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java index d69ca198e2c..bd0e8858aea 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java @@ -17,5 +17,5 @@ package org.apache.accumulo.server.master.state; public enum TabletState { - UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER + UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED } diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java index fbaa1d9e0c1..0f85b3bbf16 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java @@ -194,6 +194,7 @@ protected void consume() throws IOException { break; case ASSIGNED_TO_DEAD_SERVER: return; + case SUSPENDED: case UNASSIGNED: if (shouldBeOnline) return; diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java index 3ead237affe..4cfa12b6ebf 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java @@ -64,6 +64,17 @@ public abstract class TabletStateStore implements Iterable */ abstract public void unassign(Collection tablets, Map> logsForDeadServers) throws DistributedStoreException; + /** + * Mark tablets as having no known or future location, but desiring to be returned to their previous tserver. + */ + abstract public void suspend(Collection tablets, Map> logsForDeadServers, long suspensionTimestamp) + throws DistributedStoreException; + + /** + * Remove a suspension marker for a collection of tablets, moving them to being simply unassigned. + */ + abstract public void unsuspend(Collection tablets) throws DistributedStoreException; + public static void unassign(AccumuloServerContext context, TabletLocationState tls, Map> logsForDeadServers) throws DistributedStoreException { TabletStateStore store; diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java index 720046fb4aa..101f949c9be 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java @@ -93,7 +93,7 @@ public TabletLocationState next() { log.debug("root tablet log " + logEntry.filename); } } - TabletLocationState result = new TabletLocationState(RootTable.EXTENT, futureSession, currentSession, lastSession, logs, false); + TabletLocationState result = new TabletLocationState(RootTable.EXTENT, futureSession, currentSession, lastSession, null, logs, false); log.debug("Returning root tablet state: " + result); return result; } catch (Exception ex) { @@ -189,6 +189,18 @@ public void unassign(Collection tablets, Map tablets, Map> logsForDeadServers, long suspensionTimestamp) + throws DistributedStoreException { + // No support for suspending root tablet. + unassign(tablets, logsForDeadServers); + } + + @Override + public void unsuspend(Collection tablets) throws DistributedStoreException { + // no support for suspending root tablet. + } + @Override public String name() { return "Root Table"; diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java index 0a0afd1ce03..f67c15f004e 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java @@ -60,7 +60,7 @@ public void setUp() throws Exception { @Test public void testConstruction_NoFuture() throws Exception { - tls = new TabletLocationState(keyExtent, null, current, last, walogs, true); + tls = new TabletLocationState(keyExtent, null, current, last, null, walogs, true); assertSame(keyExtent, tls.extent); assertNull(tls.future); assertSame(current, tls.current); @@ -71,7 +71,7 @@ public void testConstruction_NoFuture() throws Exception { @Test public void testConstruction_NoCurrent() throws Exception { - tls = new TabletLocationState(keyExtent, future, null, last, walogs, true); + tls = new TabletLocationState(keyExtent, future, null, last, null, walogs, true); assertSame(keyExtent, tls.extent); assertSame(future, tls.future); assertNull(tls.current); @@ -85,7 +85,7 @@ public void testConstruction_FutureAndCurrent() throws Exception { expect(keyExtent.getMetadataEntry()).andReturn(new Text("entry")); replay(keyExtent); try { - new TabletLocationState(keyExtent, future, current, last, walogs, true); + new TabletLocationState(keyExtent, future, current, last, null, walogs, true); } catch (TabletLocationState.BadLocationStateException e) { assertEquals(new Text("entry"), e.getEncodedEndRow()); throw (e); @@ -94,44 +94,44 @@ public void testConstruction_FutureAndCurrent() throws Exception { @Test public void testConstruction_NoFuture_NoWalogs() throws Exception { - tls = new TabletLocationState(keyExtent, null, current, last, null, true); + tls = new TabletLocationState(keyExtent, null, current, last, null, null, true); assertNotNull(tls.walogs); assertEquals(0, tls.walogs.size()); } @Test public void testGetServer_Current() throws Exception { - tls = new TabletLocationState(keyExtent, null, current, last, walogs, true); + tls = new TabletLocationState(keyExtent, null, current, last, null, walogs, true); assertSame(current, tls.getServer()); } @Test public void testGetServer_Future() throws Exception { - tls = new TabletLocationState(keyExtent, future, null, last, walogs, true); + tls = new TabletLocationState(keyExtent, future, null, last, null, walogs, true); assertSame(future, tls.getServer()); } @Test public void testGetServer_Last() throws Exception { - tls = new TabletLocationState(keyExtent, null, null, last, walogs, true); + tls = new TabletLocationState(keyExtent, null, null, last, null, walogs, true); assertSame(last, tls.getServer()); } @Test public void testGetServer_None() throws Exception { - tls = new TabletLocationState(keyExtent, null, null, null, walogs, true); + tls = new TabletLocationState(keyExtent, null, null, null, null, walogs, true); assertNull(tls.getServer()); } @Test public void testGetState_Unassigned1() throws Exception { - tls = new TabletLocationState(keyExtent, null, null, null, walogs, true); + tls = new TabletLocationState(keyExtent, null, null, null, null, walogs, true); assertEquals(TabletState.UNASSIGNED, tls.getState(null)); } @Test public void testGetState_Unassigned2() throws Exception { - tls = new TabletLocationState(keyExtent, null, null, last, walogs, true); + tls = new TabletLocationState(keyExtent, null, null, last, null, walogs, true); assertEquals(TabletState.UNASSIGNED, tls.getState(null)); } @@ -139,7 +139,7 @@ public void testGetState_Unassigned2() throws Exception { public void testGetState_Assigned() throws Exception { Set liveServers = new java.util.HashSet(); liveServers.add(future); - tls = new TabletLocationState(keyExtent, future, null, last, walogs, true); + tls = new TabletLocationState(keyExtent, future, null, last, null, walogs, true); assertEquals(TabletState.ASSIGNED, tls.getState(liveServers)); } @@ -147,7 +147,7 @@ public void testGetState_Assigned() throws Exception { public void testGetState_Hosted() throws Exception { Set liveServers = new java.util.HashSet(); liveServers.add(current); - tls = new TabletLocationState(keyExtent, null, current, last, walogs, true); + tls = new TabletLocationState(keyExtent, null, current, last, null, walogs, true); assertEquals(TabletState.HOSTED, tls.getState(liveServers)); } @@ -155,7 +155,7 @@ public void testGetState_Hosted() throws Exception { public void testGetState_Dead1() throws Exception { Set liveServers = new java.util.HashSet(); liveServers.add(current); - tls = new TabletLocationState(keyExtent, future, null, last, walogs, true); + tls = new TabletLocationState(keyExtent, future, null, last, null, walogs, true); assertEquals(TabletState.ASSIGNED_TO_DEAD_SERVER, tls.getState(liveServers)); } @@ -163,7 +163,7 @@ public void testGetState_Dead1() throws Exception { public void testGetState_Dead2() throws Exception { Set liveServers = new java.util.HashSet(); liveServers.add(future); - tls = new TabletLocationState(keyExtent, null, current, last, walogs, true); + tls = new TabletLocationState(keyExtent, null, current, last, null, walogs, true); assertEquals(TabletState.ASSIGNED_TO_DEAD_SERVER, tls.getState(liveServers)); } } diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index 60d60262e9b..efa015df480 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -65,8 +65,8 @@ public class GarbageCollectWriteAheadLogsTest { private final TabletLocationState tabletAssignedToServer2; { try { - tabletAssignedToServer1 = new TabletLocationState(extent, (TServerInstance) null, server1, (TServerInstance) null, walogs, false); - tabletAssignedToServer2 = new TabletLocationState(extent, (TServerInstance) null, server2, (TServerInstance) null, walogs, false); + tabletAssignedToServer1 = new TabletLocationState(extent, (TServerInstance) null, server1, (TServerInstance) null, null, walogs, false); + tabletAssignedToServer2 = new TabletLocationState(extent, (TServerInstance) null, server2, (TServerInstance) null, null, walogs, false); } catch (Exception ex) { throw new RuntimeException(ex); } diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 6af04f3b461..3760874d35a 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -162,6 +162,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Iterables; +import org.apache.accumulo.master.TabletGroupWatcher.SuspensionPolicy; /** * The Master is responsible for assigning and balancing tablets to tablet servers. @@ -196,6 +197,7 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List private ReplicationDriver replicationWorkDriver; private WorkDriver replicationWorkAssigner; RecoveryManager recoveryManager = null; + private final MasterTime timeKeeper; // Delegation Token classes private final boolean delegationTokensAvailable; @@ -533,7 +535,7 @@ private int notHosted() { int result = 0; for (TabletGroupWatcher watcher : watchers) { for (TableCounts counts : watcher.getStats().values()) { - result += counts.assigned() + counts.assignedToDeadServers(); + result += counts.assigned() + counts.assignedToDeadServers() + counts.suspended(); } } return result; @@ -552,7 +554,7 @@ int displayUnassigned() { TableCounts counts = entry.getValue(); TableState tableState = manager.getTableState(tableId); if (tableState != null && tableState.equals(TableState.ONLINE)) { - result += counts.unassigned() + counts.assignedToDeadServers() + counts.assigned(); + result += counts.unassigned() + counts.assignedToDeadServers() + counts.assigned() + counts.suspended(); } } } @@ -560,13 +562,15 @@ int displayUnassigned() { case SAFE_MODE: // Count offline tablets for the metadata table for (TabletGroupWatcher watcher : watchers) { - result += watcher.getStats(MetadataTable.ID).unassigned(); + TableCounts counts = watcher.getStats(MetadataTable.ID); + result += counts.unassigned() + counts.suspended(); } break; case UNLOAD_METADATA_TABLETS: case UNLOAD_ROOT_TABLET: for (TabletGroupWatcher watcher : watchers) { - result += watcher.getStats(MetadataTable.ID).unassigned(); + TableCounts counts = watcher.getStats(MetadataTable.ID); + result += counts.unassigned() + counts.suspended(); } break; default: @@ -591,6 +595,8 @@ public Master(ServerConfigurationFactory config, VolumeManager fs, String hostna log.info("Version " + Constants.VERSION); log.info("Instance " + getInstance().getInstanceID()); + timeKeeper = new MasterTime(this); + ThriftTransportPool.getInstance().setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); tserverSet = new LiveTServerSet(this, this); this.tabletBalancer = aconf.instantiateClassProperty(Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer()); @@ -626,6 +632,7 @@ public Iterator> iterator() { log.info("SASL is not enabled, delegation tokens will not be available"); delegationTokensAvailable = false; } + } public TServerConnection getConnection(TServerInstance server) { @@ -1133,9 +1140,16 @@ public void process(WatchedEvent event) { } }); - watchers.add(new TabletGroupWatcher(this, new MetaDataStateStore(this, this), null)); - watchers.add(new TabletGroupWatcher(this, new RootTabletStateStore(this, this), watchers.get(0))); - watchers.add(new TabletGroupWatcher(this, new ZooTabletStateStore(new ZooStore(zroot)), watchers.get(1))); + // Always allow user data tablets to enter suspended state. + watchers.add(new TabletGroupWatcher(this, new MetaDataStateStore(this, this), null, SuspensionPolicy.SUSPEND)); + + // Allow metadata tablets to enter suspended state only if so configured. Generally we'll want metadata tablets to + // be immediately reassigned, even if there's a global table.suspension.duration setting. + watchers.add(new TabletGroupWatcher(this, new RootTabletStateStore(this, this), watchers.get(0), getConfiguration().getBoolean( + Property.MASTER_METADATA_SUSPENDABLE) ? SuspensionPolicy.SUSPEND : SuspensionPolicy.UNASSIGN)); + + // Never allow root tablet to enter suspended state. + watchers.add(new TabletGroupWatcher(this, new ZooTabletStateStore(new ZooStore(zroot)), watchers.get(1), SuspensionPolicy.UNASSIGN)); for (TabletGroupWatcher watcher : watchers) { watcher.start(); } @@ -1248,6 +1262,9 @@ public void run() { log.info("Shutting down fate."); fate.shutdown(); + log.info("Shutting down timekeeping."); + timeKeeper.shutdown(); + final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME; statusThread.join(remaining(deadline)); replicationWorkAssigner.join(remaining(deadline)); @@ -1620,4 +1637,12 @@ public void updateBulkImportStatus(String directory, BulkImportState state) { public void removeBulkImportStatus(String directory) { bulkImportStatus.removeBulkImportStatus(Collections.singletonList(directory)); } + + /** + * Return how long (in milliseconds) there has been a master overseeing this cluster. This is an approximately monotonic clock, which will be approximately + * consistent between different masters or different runs of the same master. + */ + public Long getSteadyTime() { + return timeKeeper.getTime(); + } } diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterTime.java b/server/master/src/main/java/org/apache/accumulo/master/MasterTime.java new file mode 100644 index 00000000000..27c57f0cfeb --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/MasterTime.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Timer; +import java.util.TimerTask; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Keep a persistent roughly monotone view of how long a master has been overseeing this cluster. */ +public class MasterTime extends TimerTask { + private static final Logger log = LoggerFactory.getLogger(MasterTime.class); + + private final String zPath; + private final ZooReaderWriter zk; + private final Master master; + private final Timer timer; + + /** Difference between time stored in ZooKeeper and System.nanoTime() when we last read from ZooKeeper. */ + private long skewAmount; + + public MasterTime(Master master) throws IOException { + this.zPath = ZooUtil.getRoot(master.getInstance()) + Constants.ZMASTER_TICK; + this.zk = ZooReaderWriter.getInstance(); + this.master = master; + + try { + zk.putPersistentData(zPath, "0".getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.SKIP); + skewAmount = Long.parseLong(new String(zk.getData(zPath, null), StandardCharsets.UTF_8)) - System.nanoTime(); + } catch (Exception ex) { + throw new IOException("Error updating master time", ex); + } + + this.timer = new Timer(); + timer.schedule(this, 0, MILLISECONDS.convert(10, SECONDS)); + } + + /** + * How long has this cluster had a Master? + * + * @returns Approximate total duration this cluster has had a Master, in milliseconds. + */ + public synchronized long getTime() { + return MILLISECONDS.convert(System.nanoTime() + skewAmount, NANOSECONDS); + } + + /** Shut down the time keeping. */ + public void shutdown() { + timer.cancel(); + } + + @Override + public void run() { + switch (master.getMasterState()) { + // If we don't have the lock, periodically re-read the value in ZooKeeper, in case there's another master we're + // shadowing for. + case INITIAL: + case STOP: + try { + long zkTime = Long.parseLong(new String(zk.getData(zPath, null), StandardCharsets.UTF_8)); + synchronized (this) { + skewAmount = zkTime - System.nanoTime(); + } + } catch (Exception ex) { + if (log.isDebugEnabled()) { + log.debug("Failed to retrieve master tick time", ex); + } + } + break; + // If we do have the lock, periodically write our clock to ZooKeeper. + case HAVE_LOCK: + case SAFE_MODE: + case NORMAL: + case UNLOAD_METADATA_TABLETS: + case UNLOAD_ROOT_TABLET: + try { + zk.putPersistentData(zPath, Long.toString(System.nanoTime() + skewAmount).getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE); + } catch (Exception ex) { + if (log.isDebugEnabled()) { + log.debug("Failed to update master tick time", ex); + } + } + } + } +} diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java index b25a2e69a79..18d9580d61f 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java +++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java @@ -17,7 +17,6 @@ package org.apache.accumulo.master; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; -import static java.lang.Math.min; import java.io.IOException; import java.util.ArrayList; @@ -92,8 +91,15 @@ import org.apache.thrift.TException; import com.google.common.collect.Iterators; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.server.conf.TableConfiguration; +import static java.lang.Math.min; class TabletGroupWatcher extends Daemon { + public static enum SuspensionPolicy { + UNASSIGN, SUSPEND + } + // Constants used to make sure assignment logging isn't excessive in quantity or size private static final String ASSIGNMENT_BUFFER_SEPARATOR = ", "; private static final int ASSINGMENT_BUFFER_MAX_LENGTH = 4096; @@ -101,14 +107,21 @@ class TabletGroupWatcher extends Daemon { private final Master master; final TabletStateStore store; final TabletGroupWatcher dependentWatcher; + /** + * When false, move tablets in state {@code TabletState.ASSIGNED_TO_DEAD_SERVER} to state {@code TabletState.UNASSIGNED}. When true, move such tablets to + * state {@code TabletState.SUSPENDED}. + */ + private final SuspensionPolicy suspensionPolicy; + private MasterState masterState; final TableStats stats = new TableStats(); - TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher) { + TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher, SuspensionPolicy suspensionPolicy) { this.master = master; this.store = store; this.dependentWatcher = dependentWatcher; + this.suspensionPolicy = suspensionPolicy; } Map getStats() { @@ -168,6 +181,7 @@ public void run() { List assignments = new ArrayList(); List assigned = new ArrayList(); List assignedToDeadServers = new ArrayList(); + List suspendedToGoneServers = new ArrayList(); Map unassigned = new HashMap(); Map> logsForDeadServers = new TreeMap<>(); @@ -192,15 +206,18 @@ public void run() { // Don't overwhelm the tablet servers with work if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { - flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned); + flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, suspendedToGoneServers, unassigned); assignments.clear(); assigned.clear(); assignedToDeadServers.clear(); + suspendedToGoneServers.clear(); unassigned.clear(); unloaded = 0; eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); } String tableId = tls.extent.getTableId(); + TableConfiguration tableConf = this.master.getConfigurationFactory().getTableConfiguration(tableId); + MergeStats mergeStats = mergeStatsCache.get(tableId); if (mergeStats == null) { mergeStats = currentMerges.get(tableId); @@ -253,6 +270,29 @@ public void run() { logsForDeadServers.put(tserver, wals.getWalsInUse(tserver)); } break; + case SUSPENDED: + if (master.getSteadyTime() - tls.suspend.suspensionTime < tableConf.getTimeInMillis(Property.TABLE_SUSPEND_DURATION)) { + // Tablet is suspended. See if its tablet server is back. + TServerInstance returnInstance = null; + Iterator find = currentTServers.tailMap(new TServerInstance(tls.suspend.server, " ")).keySet().iterator(); + if (find.hasNext()) { + TServerInstance found = find.next(); + if (found.getLocation().equals(tls.suspend.server)) { + returnInstance = found; + } + } + + // Old tablet server is back. Return this tablet to its previous owner. + if (returnInstance != null) { + assignments.add(new Assignment(tls.extent, returnInstance)); + } else { + // leave suspended, don't ask for a new assignment. + } + } else { + // Treat as unassigned, ask for a new assignment. + unassigned.put(tls.extent, server); + } + break; case UNASSIGNED: // maybe it's a finishing migration TServerInstance dest = this.master.migrations.get(tls.extent); @@ -276,6 +316,10 @@ public void run() { } } else { switch (state) { + case SUSPENDED: + // Request a move to UNASSIGNED, so as to allow balancing to continue. + suspendedToGoneServers.add(tls); + // Fall through to unassigned to cancel migrations. case UNASSIGNED: TServerInstance dest = this.master.migrations.get(tls.extent); TableState tableState = TableManager.getInstance().getTableState(tls.extent.getTableId()); @@ -306,7 +350,7 @@ public void run() { counts[state.ordinal()]++; } - flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned); + flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, suspendedToGoneServers, unassigned); // provide stats after flushing changes to avoid race conditions w/ delete table stats.end(masterState); @@ -749,15 +793,27 @@ private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException { } private void flushChanges(SortedMap currentTServers, List assignments, List assigned, - List assignedToDeadServers, Map> logsForDeadServers, Map unassigned) - throws DistributedStoreException, TException, WalMarkerException { + List assignedToDeadServers, Map> logsForDeadServers, List suspendedToGoneServers, + Map unassigned) throws DistributedStoreException, TException, WalMarkerException { if (!assignedToDeadServers.isEmpty()) { int maxServersToShow = min(assignedToDeadServers.size(), 100); Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "..."); Master.log.debug("logs for dead servers: " + logsForDeadServers); - store.unassign(assignedToDeadServers, logsForDeadServers); + switch (suspensionPolicy) { + case SUSPEND: + store.suspend(assignedToDeadServers, logsForDeadServers, master.getSteadyTime()); + break; + case UNASSIGN: + store.unassign(assignedToDeadServers, logsForDeadServers); + break; + } this.master.markDeadServerLogsAsClosed(logsForDeadServers); - this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size()); + this.master.nextEvent.event("Marked %d tablets as suspended because they don't have current servers", assignedToDeadServers.size()); + } + if (!suspendedToGoneServers.isEmpty()) { + int maxServersToShow = min(assignedToDeadServers.size(), 100); + Master.log.debug(assignedToDeadServers.size() + " suspended to gone servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "..."); + store.unsuspend(suspendedToGoneServers); } if (!currentTServers.isEmpty()) { diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java index a89100eb9f6..4cb858c1b34 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java +++ b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java @@ -99,7 +99,7 @@ public void update(KeyExtent ke, TabletState state, boolean chopped, boolean has this.total++; if (state.equals(TabletState.HOSTED)) this.hosted++; - if (state.equals(TabletState.UNASSIGNED)) + if (state.equals(TabletState.UNASSIGNED) || state.equals(TabletState.SUSPENDED)) this.unassigned++; } @@ -217,7 +217,7 @@ private boolean verifyMergeConsistency(Connector connector, CurrentState master) return false; } - if (tls.getState(master.onlineTabletServers()) != TabletState.UNASSIGNED) { + if (tls.getState(master.onlineTabletServers()) != TabletState.UNASSIGNED && tls.getState(master.onlineTabletServers()) != TabletState.SUSPENDED) { log.debug("failing consistency: assigned or hosted " + tls); return false; } diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java b/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java index 73395eaf026..dd44bc625c6 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java +++ b/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java @@ -36,4 +36,8 @@ public int assignedToDeadServers() { public int hosted() { return counts[TabletState.HOSTED.ordinal()]; } + + public int suspended() { + return counts[TabletState.SUSPENDED.ordinal()]; + } } diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java index 0a78d59a829..7381dcd527f 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java @@ -176,7 +176,7 @@ public void testRootTabletStateStore() throws DistributedStoreException { assertEquals(count, 1); TabletLocationState assigned = null; try { - assigned = new TabletLocationState(root, server, null, null, null, false); + assigned = new TabletLocationState(root, server, null, null, null, null, false); } catch (BadLocationStateException e) { fail("Unexpected error " + e); } @@ -203,7 +203,7 @@ public void testRootTabletStateStore() throws DistributedStoreException { TabletLocationState broken = null; try { - broken = new TabletLocationState(notRoot, server, null, null, null, false); + broken = new TabletLocationState(notRoot, server, null, null, null, null, false); } catch (BadLocationStateException e) { fail("Unexpected error " + e); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 5626f126a42..4ced721a774 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -2004,7 +2004,7 @@ public void run() { TServerInstance instance = new TServerInstance(clientAddress, getLock().getSessionId()); TabletLocationState tls = null; try { - tls = new TabletLocationState(extent, null, instance, null, null, false); + tls = new TabletLocationState(extent, null, instance, null, null, null, false); } catch (BadLocationStateException e) { log.error("Unexpected error ", e); } diff --git a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java index 30584a6d56b..2d233c41655 100644 --- a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java @@ -185,7 +185,7 @@ public void test() throws Exception { // take it offline m = tablet.getPrevRowUpdateMutation(); Collection> walogs = Collections.emptyList(); - metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), null); + metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, null, walogs, false)), null); // now we can split stats = scan(state, metaDataStateStore); diff --git a/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java b/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java new file mode 100644 index 00000000000..5981be6f7f7 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.master; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.SetMultimap; +import com.google.common.net.HostAndPort; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ThreadFactory; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.Credentials; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.server.master.state.MetaDataTableScanner; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SuspendedTabletsIT extends ConfigurableMacBase { + private static final Logger log = LoggerFactory.getLogger(SuspendedTabletsIT.class); + private static ExecutorService threadPool; + + public static final int TSERVERS = 5; + public static final long SUSPEND_DURATION = MILLISECONDS.convert(2, MINUTES); + public static final int TABLETS = 100; + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) { + cfg.setProperty(Property.TABLE_SUSPEND_DURATION, SUSPEND_DURATION + "ms"); + cfg.setNumTservers(TSERVERS); + } + + @Test + public void suspendAndResumeTserver() throws Exception { + String tableName = getUniqueNames(1)[0]; + + Credentials creds = new Credentials("root", new PasswordToken(ROOT_PASSWORD)); + Instance instance = new ZooKeeperInstance(getCluster().getClientConfig()); + ClientContext ctx = new ClientContext(instance, creds, getCluster().getClientConfig()); + + Connector conn = ctx.getConnector(); + + // Create a table with a bunch of splits + log.info("Creating table " + tableName); + conn.tableOperations().create(tableName); + SortedSet splitPoints = new TreeSet<>(); + for (int i = 1; i < TABLETS; ++i) { + splitPoints.add(new Text("" + i)); + } + conn.tableOperations().addSplits(tableName, splitPoints); + + // Wait for all of the tablets to hosted ... + log.info("Waiting on hosting and balance"); + TabletLocations ds; + for (ds = TabletLocations.retrieve(ctx, tableName); ds.hostedCount != TABLETS; ds = TabletLocations.retrieve(ctx, tableName)) { + Thread.sleep(1000); + } + + // ... and balanced. + conn.instanceOperations().waitForBalance(); + do { + // Give at least another 5 seconds for migrations to finish up + Thread.sleep(5000); + ds = TabletLocations.retrieve(ctx, tableName); + } while (ds.hostedCount != TABLETS); + + // Pray all of our tservers have at least 1 tablet. + Assert.assertEquals(TSERVERS, ds.hosted.keySet().size()); + + // Kill two tablet servers hosting our tablets. This should put tablets into suspended state, and thus halt balancing. + log.info("Killing tservers"); + + TabletLocations beforeDeathState = ds; + { + Iterator prIt = getCluster().getProcesses().get(ServerType.TABLET_SERVER).iterator(); + ProcessReference first = prIt.next(); + ProcessReference second = prIt.next(); + getCluster().killProcess(ServerType.TABLET_SERVER, first); + getCluster().killProcess(ServerType.TABLET_SERVER, second); + } + + // Eventually some tablets will be suspended. + log.info("Waiting on suspended tablets"); + ds = TabletLocations.retrieve(ctx, tableName); + // Until we can scan the metadata table, the master probably can't either, so won't have been able to suspend the tablets. + // So we note the time that we were first able to successfully scan the metadata table. + long killTime = System.nanoTime(); + while (ds.suspended.keySet().size() != 2) { + Thread.sleep(1000); + ds = TabletLocations.retrieve(ctx, tableName); + } + + SetMultimap deadTabletsByServer = ds.suspended; + + // By this point, all tablets should be either hosted or suspended. All suspended tablets should + // "belong" to the dead tablet servers, and should be in exactly the same place as before any tserver death. + for (HostAndPort server : deadTabletsByServer.keySet()) { + Assert.assertEquals(deadTabletsByServer.get(server), beforeDeathState.hosted.get(server)); + } + Assert.assertEquals(TABLETS, ds.hostedCount + ds.suspendedCount); + + // Restart the first tablet server, making sure it ends up on the same port + HostAndPort restartedServer = deadTabletsByServer.keySet().iterator().next(); + log.info("Restarting " + restartedServer); + getCluster().getClusterControl().start(ServerType.TABLET_SERVER, null, + ImmutableMap.of(Property.TSERV_CLIENTPORT.getKey(), "" + restartedServer.getPort(), Property.TSERV_PORTSEARCH.getKey(), "false"), 1); + + // Eventually, the suspended tablets should be reassigned to the newly alive tserver. + log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer); + for (ds = TabletLocations.retrieve(ctx, tableName); ds.suspended.containsKey(restartedServer) || ds.assignedCount != 0; ds = TabletLocations.retrieve(ctx, + tableName)) { + Thread.sleep(1000); + } + Assert.assertEquals(deadTabletsByServer.get(restartedServer), ds.hosted.get(restartedServer)); + + // Finally, after much longer, remaining suspended tablets should be reassigned. + log.info("Awaiting tablet reassignment for remaining tablets"); + for (ds = TabletLocations.retrieve(ctx, tableName); ds.hostedCount != TABLETS; ds = TabletLocations.retrieve(ctx, tableName)) { + Thread.sleep(1000); + } + + long recoverTime = System.nanoTime(); + Assert.assertTrue(recoverTime - killTime >= NANOSECONDS.convert(SUSPEND_DURATION, MILLISECONDS)); + Assert.assertTrue(recoverTime - killTime <= NANOSECONDS.convert(SUSPEND_DURATION, MILLISECONDS) + NANOSECONDS.convert(3, MINUTES)); + } + + private static final AtomicInteger threadCounter = new AtomicInteger(0); + + @BeforeClass + public static void init() { + threadPool = Executors.newCachedThreadPool(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "Scanning deadline thread #" + threadCounter.incrementAndGet()); + } + }); + } + + @AfterClass + public static void cleanup() { + threadPool.shutdownNow(); + } + + private static class TabletLocations { + public final Map locationStates = new HashMap<>(); + public final SetMultimap hosted = HashMultimap.create(); + public final SetMultimap suspended = HashMultimap.create(); + public int hostedCount = 0; + public int assignedCount = 0; + public int suspendedCount = 0; + public int unassignedCount = 0; + + private TabletLocations() {} + + public static TabletLocations retrieve(final ClientContext ctx, final String tableName) throws Exception { + int sleepTime = 200; + int remainingAttempts = 30; + + while (true) { + try { + FutureTask tlsFuture = new FutureTask<>(new Callable() { + @Override + public TabletLocations call() throws Exception { + TabletLocations answer = new TabletLocations(); + answer.scan(ctx, tableName); + return answer; + } + }); + threadPool.submit(tlsFuture); + return tlsFuture.get(5, SECONDS); + } catch (TimeoutException ex) { + log.debug("Retrieval timed out", ex); + } catch (Exception ex) { + log.warn("Failed to scan metadata", ex); + } + sleepTime = Math.min(2 * sleepTime, 10000); + Thread.sleep(sleepTime); + --remainingAttempts; + if (remainingAttempts == 0) { + Assert.fail("Scanning of metadata failed, aborting"); + } + } + } + + private void scan(ClientContext ctx, String tableName) throws Exception { + Map idMap = ctx.getConnector().tableOperations().tableIdMap(); + String tableId = Objects.requireNonNull(idMap.get(tableName)); + try (MetaDataTableScanner scanner = new MetaDataTableScanner(ctx, new Range())) { + while (scanner.hasNext()) { + TabletLocationState tls = scanner.next(); + + if (!tls.extent.getTableId().equals(tableId)) { + continue; + } + locationStates.put(tls.extent, tls); + if (tls.suspend != null) { + suspended.put(tls.suspend.server, tls.extent); + ++suspendedCount; + } else if (tls.current != null) { + hosted.put(tls.current.getLocation(), tls.extent); + ++hostedCount; + } else if (tls.future != null) { + ++assignedCount; + } else { + unassignedCount += 1; + } + } + } + } + } +} From e0e1523b96b83ee289dbae5ec785ca9b7d3761e7 Mon Sep 17 00:00:00 2001 From: Shawn Walker Date: Tue, 28 Jun 2016 13:27:57 -0400 Subject: [PATCH 2/7] ACCUMULO-4353: Now block balancing until live tservers settles to avoid balance/suspend race --- .../org/apache/accumulo/master/Master.java | 12 +++++++--- .../accumulo/master/TabletGroupWatcher.java | 24 ++++++++++++++----- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 3760874d35a..d29d1234895 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -981,7 +981,8 @@ public void run() { } private long updateStatus() throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - tserverStatus = Collections.synchronizedSortedMap(gatherTableInformation()); + Set currentServers = tserverSet.getCurrentServers(); + tserverStatus = Collections.synchronizedSortedMap(gatherTableInformation(currentServers)); checkForHeldServer(tserverStatus); if (!badServers.isEmpty()) { @@ -993,6 +994,12 @@ private long updateStatus() throws AccumuloException, AccumuloSecurityException, } else if (!serversToShutdown.isEmpty()) { log.debug("not balancing while shutting down servers " + serversToShutdown); } else { + for (TabletGroupWatcher tgw : watchers) { + if (!tgw.isSameTserversAsLastScan(currentServers)) { + log.debug("not balancing just yet, as collection of live tservers is in flux"); + return DEFAULT_WAIT_FOR_WATCHER; + } + } return balanceTablets(); } return DEFAULT_WAIT_FOR_WATCHER; @@ -1049,12 +1056,11 @@ private long balanceTablets() { } - private SortedMap gatherTableInformation() { + private SortedMap gatherTableInformation(Set currentServers) { long start = System.currentTimeMillis(); int threads = Math.max(getConfiguration().getCount(Property.MASTER_STATUS_THREAD_POOL_SIZE), 1); ExecutorService tp = Executors.newFixedThreadPool(threads); final SortedMap result = new TreeMap(); - Set currentServers = tserverSet.getCurrentServers(); for (TServerInstance serverInstance : currentServers) { final TServerInstance server = serverInstance; tp.submit(new Runnable() { diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java index 18d9580d61f..cd30e3799cc 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java +++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.master; +import com.google.common.collect.ImmutableSortedSet; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import java.io.IOException; @@ -94,10 +95,14 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.server.conf.TableConfiguration; import static java.lang.Math.min; +import java.util.SortedSet; class TabletGroupWatcher extends Daemon { public static enum SuspensionPolicy { - UNASSIGN, SUSPEND + /** Move tablets in state TabletState.ASSIGNED_TO_DEAD_SERVER to state TabletState.UNASSIGNED, and attempt reassignment. */ + UNASSIGN, + /** Move tablets in state TabletState.ASSIGNED_TO_DEAD_SERVER to state TabletState.SUSPENDED. */ + SUSPEND } // Constants used to make sure assignment logging isn't excessive in quantity or size @@ -107,15 +112,12 @@ public static enum SuspensionPolicy { private final Master master; final TabletStateStore store; final TabletGroupWatcher dependentWatcher; - /** - * When false, move tablets in state {@code TabletState.ASSIGNED_TO_DEAD_SERVER} to state {@code TabletState.UNASSIGNED}. When true, move such tablets to - * state {@code TabletState.SUSPENDED}. - */ private final SuspensionPolicy suspensionPolicy; private MasterState masterState; final TableStats stats = new TableStats(); + private SortedSet lastScanServers = ImmutableSortedSet.of(); TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher, SuspensionPolicy suspensionPolicy) { this.master = master; @@ -137,9 +139,13 @@ TableCounts getStats(String tableId) { return stats.getLast(tableId); } + /** True if the collection of live tservers specified in 'candidates' hasn't changed since the last time an assignment scan was started. */ + public synchronized boolean isSameTserversAsLastScan(Set candidates) { + return candidates.equals(lastScanServers); + } + @Override public void run() { - Thread.currentThread().setName("Watching " + store.name()); int[] oldCounts = new int[TabletState.values().length]; EventCoordinator.Listener eventListener = this.master.nextEvent.getListener(); @@ -171,6 +177,9 @@ public void run() { if (currentTServers.size() == 0) { eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); + synchronized (this) { + lastScanServers = ImmutableSortedSet.of(); + } continue; } @@ -370,6 +379,9 @@ public void run() { updateMergeState(mergeStatsCache); + synchronized (this) { + lastScanServers = ImmutableSortedSet.copyOf(currentTServers.keySet()); + } if (this.master.tserverSet.getCurrentServers().equals(currentTServers.keySet())) { Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.)); eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); From 96e3ddd742ced7010596f994d912662448743a0a Mon Sep 17 00:00:00 2001 From: Shawn Walker Date: Wed, 29 Jun 2016 13:18:58 -0400 Subject: [PATCH 3/7] ACCUMULO-4353: Rewrote `TabletLocationState.getState()` to better account for concurrent change --- .../master/state/TabletLocationState.java | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java index 369a14bab2f..784bd337a8e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java @@ -94,26 +94,29 @@ public TServerInstance getServer() { return result; } + private static final int _HAS_CURRENT = 1 << 0; + private static final int _HAS_FUTURE = 1 << 1; + private static final int _HAS_SUSPEND = 1 << 2; + public TabletState getState(Set liveServers) { - if (suspend != null) { - return TabletState.SUSPENDED; - } - TServerInstance server = getServer(); - if (server == null) - return TabletState.UNASSIGNED; - if (server.equals(current) || server.equals(future)) { - if (liveServers.contains(server)) - if (server.equals(future)) { - return TabletState.ASSIGNED; - } else { - return TabletState.HOSTED; - } - else { - return TabletState.ASSIGNED_TO_DEAD_SERVER; - } + switch ((current == null ? 0 : _HAS_CURRENT) | (future == null ? 0 : _HAS_FUTURE) | (suspend == null ? 0 : _HAS_SUSPEND)) { + case 0: + return TabletState.UNASSIGNED; + + case _HAS_SUSPEND: + return TabletState.SUSPENDED; + + case _HAS_FUTURE: + case (_HAS_FUTURE | _HAS_SUSPEND): + return liveServers.contains(future) ? TabletState.ASSIGNED : TabletState.ASSIGNED_TO_DEAD_SERVER; + + case _HAS_CURRENT: + case (_HAS_CURRENT | _HAS_SUSPEND): + return liveServers.contains(current) ? TabletState.HOSTED : TabletState.ASSIGNED_TO_DEAD_SERVER; + + default: + // Both current and future are set, which is prevented by constructor. + throw new IllegalStateException(); } - // server == last - return TabletState.UNASSIGNED; } - } From fc861c2b84773567d2fdb2c3e863eeffd5fb701c Mon Sep 17 00:00:00 2001 From: Shawn Walker Date: Tue, 5 Jul 2016 14:22:37 -0400 Subject: [PATCH 4/7] ACCUMULO-4353: TServers undergoing "clean" shutdown will suspend their tablets now, too. `master.metadata.suspendable` is now checked more often than just at startup --- .../thrift/TUnloadTabletGoal.java | 67 +++++ .../thrift/TabletClientService.java | 275 +++++++++++++----- core/src/main/thrift/tabletserver.thrift | 9 +- .../server/master/LiveTServerSet.java | 5 +- .../server/master/state/TabletStateStore.java | 32 +- .../org/apache/accumulo/master/Master.java | 47 ++- .../accumulo/master/TabletGroupWatcher.java | 35 +-- .../apache/accumulo/tserver/TabletServer.java | 27 +- .../test/performance/thrift/NullTserver.java | 3 +- 9 files changed, 364 insertions(+), 136 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TUnloadTabletGoal.java diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TUnloadTabletGoal.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TUnloadTabletGoal.java new file mode 100644 index 00000000000..3ce0b31273c --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TUnloadTabletGoal.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Autogenerated by Thrift Compiler (0.9.3) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.accumulo.core.tabletserver.thrift; + + +import java.util.Map; +import java.util.HashMap; +import org.apache.thrift.TEnum; + +@SuppressWarnings({"unused"}) public enum TUnloadTabletGoal implements org.apache.thrift.TEnum { + UNKNOWN(0), + UNASSIGNED(1), + SUSPENDED(2), + DELETED(3); + + private final int value; + + private TUnloadTabletGoal(int value) { + this.value = value; + } + + /** + * Get the integer value of this enum value, as defined in the Thrift IDL. + */ + public int getValue() { + return value; + } + + /** + * Find a the enum type by its integer value, as defined in the Thrift IDL. + * @return null if the value is not found. + */ + public static TUnloadTabletGoal findByValue(int value) { + switch (value) { + case 0: + return UNKNOWN; + case 1: + return UNASSIGNED; + case 2: + return SUSPENDED; + case 3: + return DELETED; + default: + return null; + } + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java index 3d4fa061c9f..4ce9927122b 100644 --- a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java +++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java @@ -89,7 +89,7 @@ public interface Iface extends org.apache.accumulo.core.client.impl.thrift.Clien public void loadTablet(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent) throws org.apache.thrift.TException; - public void unloadTablet(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent, boolean save) throws org.apache.thrift.TException; + public void unloadTablet(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent, TUnloadTabletGoal goal, long requestTime) throws org.apache.thrift.TException; public void flush(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws org.apache.thrift.TException; @@ -155,7 +155,7 @@ public interface AsyncIface extends org.apache.accumulo.core.client.impl.thrift. public void loadTablet(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; - public void unloadTablet(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent, boolean save, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + public void unloadTablet(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent, TUnloadTabletGoal goal, long requestTime, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; public void flush(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; @@ -667,19 +667,20 @@ public void send_loadTablet(org.apache.accumulo.core.trace.thrift.TInfo tinfo, o sendBaseOneway("loadTablet", args); } - public void unloadTablet(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent, boolean save) throws org.apache.thrift.TException + public void unloadTablet(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent, TUnloadTabletGoal goal, long requestTime) throws org.apache.thrift.TException { - send_unloadTablet(tinfo, credentials, lock, extent, save); + send_unloadTablet(tinfo, credentials, lock, extent, goal, requestTime); } - public void send_unloadTablet(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent, boolean save) throws org.apache.thrift.TException + public void send_unloadTablet(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent, TUnloadTabletGoal goal, long requestTime) throws org.apache.thrift.TException { unloadTablet_args args = new unloadTablet_args(); args.setTinfo(tinfo); args.setCredentials(credentials); args.setLock(lock); args.setExtent(extent); - args.setSave(save); + args.setGoal(goal); + args.setRequestTime(requestTime); sendBaseOneway("unloadTablet", args); } @@ -1692,9 +1693,9 @@ public void getResult() throws org.apache.thrift.TException { } } - public void unloadTablet(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent, boolean save, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + public void unloadTablet(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent, TUnloadTabletGoal goal, long requestTime, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { checkReady(); - unloadTablet_call method_call = new unloadTablet_call(tinfo, credentials, lock, extent, save, resultHandler, this, ___protocolFactory, ___transport); + unloadTablet_call method_call = new unloadTablet_call(tinfo, credentials, lock, extent, goal, requestTime, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } @@ -1704,14 +1705,16 @@ public static class unloadTablet_call extends org.apache.thrift.async.TAsyncMeth private org.apache.accumulo.core.security.thrift.TCredentials credentials; private String lock; private org.apache.accumulo.core.data.thrift.TKeyExtent extent; - private boolean save; - public unloadTablet_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent, boolean save, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + private TUnloadTabletGoal goal; + private long requestTime; + public unloadTablet_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent, TUnloadTabletGoal goal, long requestTime, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { super(client, protocolFactory, transport, resultHandler, true); this.tinfo = tinfo; this.credentials = credentials; this.lock = lock; this.extent = extent; - this.save = save; + this.goal = goal; + this.requestTime = requestTime; } public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { @@ -1721,7 +1724,8 @@ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apa args.setCredentials(credentials); args.setLock(lock); args.setExtent(extent); - args.setSave(save); + args.setGoal(goal); + args.setRequestTime(requestTime); args.write(prot); prot.writeMessageEnd(); } @@ -2700,7 +2704,7 @@ protected boolean isOneway() { } public org.apache.thrift.TBase getResult(I iface, unloadTablet_args args) throws org.apache.thrift.TException { - iface.unloadTablet(args.tinfo, args.credentials, args.lock, args.extent, args.save); + iface.unloadTablet(args.tinfo, args.credentials, args.lock, args.extent, args.goal, args.requestTime); return null; } } @@ -3953,7 +3957,7 @@ protected boolean isOneway() { } public void start(I iface, unloadTablet_args args, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws TException { - iface.unloadTablet(args.tinfo, args.credentials, args.lock, args.extent, args.save,resultHandler); + iface.unloadTablet(args.tinfo, args.credentials, args.lock, args.extent, args.goal, args.requestTime,resultHandler); } } @@ -24718,7 +24722,8 @@ public static class unloadTablet_args implements org.apache.thrift.TBase, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -24730,7 +24735,12 @@ public static class unloadTablet_args implements org.apache.thrift.TBase byName = new HashMap(); @@ -24761,8 +24776,10 @@ public static _Fields findByThriftId(int fieldId) { return LOCK; case 2: // EXTENT return EXTENT; - case 3: // SAVE - return SAVE; + case 6: // GOAL + return GOAL; + case 7: // REQUEST_TIME + return REQUEST_TIME; default: return null; } @@ -24803,7 +24820,7 @@ public String getFieldName() { } // isset id assignments - private static final int __SAVE_ISSET_ID = 0; + private static final int __REQUESTTIME_ISSET_ID = 0; private byte __isset_bitfield = 0; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -24816,8 +24833,10 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.EXTENT, new org.apache.thrift.meta_data.FieldMetaData("extent", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.data.thrift.TKeyExtent.class))); - tmpMap.put(_Fields.SAVE, new org.apache.thrift.meta_data.FieldMetaData("save", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); + tmpMap.put(_Fields.GOAL, new org.apache.thrift.meta_data.FieldMetaData("goal", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, TUnloadTabletGoal.class))); + tmpMap.put(_Fields.REQUEST_TIME, new org.apache.thrift.meta_data.FieldMetaData("requestTime", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(unloadTablet_args.class, metaDataMap); } @@ -24830,15 +24849,17 @@ public unloadTablet_args( org.apache.accumulo.core.security.thrift.TCredentials credentials, String lock, org.apache.accumulo.core.data.thrift.TKeyExtent extent, - boolean save) + TUnloadTabletGoal goal, + long requestTime) { this(); this.tinfo = tinfo; this.credentials = credentials; this.lock = lock; this.extent = extent; - this.save = save; - setSaveIsSet(true); + this.goal = goal; + this.requestTime = requestTime; + setRequestTimeIsSet(true); } /** @@ -24858,7 +24879,10 @@ public unloadTablet_args(unloadTablet_args other) { if (other.isSetExtent()) { this.extent = new org.apache.accumulo.core.data.thrift.TKeyExtent(other.extent); } - this.save = other.save; + if (other.isSetGoal()) { + this.goal = other.goal; + } + this.requestTime = other.requestTime; } public unloadTablet_args deepCopy() { @@ -24871,8 +24895,9 @@ public void clear() { this.credentials = null; this.lock = null; this.extent = null; - setSaveIsSet(false); - this.save = false; + this.goal = null; + setRequestTimeIsSet(false); + this.requestTime = 0; } public org.apache.accumulo.core.trace.thrift.TInfo getTinfo() { @@ -24971,27 +24996,59 @@ public void setExtentIsSet(boolean value) { } } - public boolean isSave() { - return this.save; + /** + * + * @see TUnloadTabletGoal + */ + public TUnloadTabletGoal getGoal() { + return this.goal; + } + + /** + * + * @see TUnloadTabletGoal + */ + public unloadTablet_args setGoal(TUnloadTabletGoal goal) { + this.goal = goal; + return this; + } + + public void unsetGoal() { + this.goal = null; + } + + /** Returns true if field goal is set (has been assigned a value) and false otherwise */ + public boolean isSetGoal() { + return this.goal != null; + } + + public void setGoalIsSet(boolean value) { + if (!value) { + this.goal = null; + } + } + + public long getRequestTime() { + return this.requestTime; } - public unloadTablet_args setSave(boolean save) { - this.save = save; - setSaveIsSet(true); + public unloadTablet_args setRequestTime(long requestTime) { + this.requestTime = requestTime; + setRequestTimeIsSet(true); return this; } - public void unsetSave() { - __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __SAVE_ISSET_ID); + public void unsetRequestTime() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __REQUESTTIME_ISSET_ID); } - /** Returns true if field save is set (has been assigned a value) and false otherwise */ - public boolean isSetSave() { - return EncodingUtils.testBit(__isset_bitfield, __SAVE_ISSET_ID); + /** Returns true if field requestTime is set (has been assigned a value) and false otherwise */ + public boolean isSetRequestTime() { + return EncodingUtils.testBit(__isset_bitfield, __REQUESTTIME_ISSET_ID); } - public void setSaveIsSet(boolean value) { - __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __SAVE_ISSET_ID, value); + public void setRequestTimeIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __REQUESTTIME_ISSET_ID, value); } public void setFieldValue(_Fields field, Object value) { @@ -25028,11 +25085,19 @@ public void setFieldValue(_Fields field, Object value) { } break; - case SAVE: + case GOAL: if (value == null) { - unsetSave(); + unsetGoal(); } else { - setSave((Boolean)value); + setGoal((TUnloadTabletGoal)value); + } + break; + + case REQUEST_TIME: + if (value == null) { + unsetRequestTime(); + } else { + setRequestTime((Long)value); } break; @@ -25053,8 +25118,11 @@ public Object getFieldValue(_Fields field) { case EXTENT: return getExtent(); - case SAVE: - return isSave(); + case GOAL: + return getGoal(); + + case REQUEST_TIME: + return getRequestTime(); } throw new IllegalStateException(); @@ -25075,8 +25143,10 @@ public boolean isSet(_Fields field) { return isSetLock(); case EXTENT: return isSetExtent(); - case SAVE: - return isSetSave(); + case GOAL: + return isSetGoal(); + case REQUEST_TIME: + return isSetRequestTime(); } throw new IllegalStateException(); } @@ -25130,12 +25200,21 @@ public boolean equals(unloadTablet_args that) { return false; } - boolean this_present_save = true; - boolean that_present_save = true; - if (this_present_save || that_present_save) { - if (!(this_present_save && that_present_save)) + boolean this_present_goal = true && this.isSetGoal(); + boolean that_present_goal = true && that.isSetGoal(); + if (this_present_goal || that_present_goal) { + if (!(this_present_goal && that_present_goal)) return false; - if (this.save != that.save) + if (!this.goal.equals(that.goal)) + return false; + } + + boolean this_present_requestTime = true; + boolean that_present_requestTime = true; + if (this_present_requestTime || that_present_requestTime) { + if (!(this_present_requestTime && that_present_requestTime)) + return false; + if (this.requestTime != that.requestTime) return false; } @@ -25166,10 +25245,15 @@ public int hashCode() { if (present_extent) list.add(extent); - boolean present_save = true; - list.add(present_save); - if (present_save) - list.add(save); + boolean present_goal = true && (isSetGoal()); + list.add(present_goal); + if (present_goal) + list.add(goal.getValue()); + + boolean present_requestTime = true; + list.add(present_requestTime); + if (present_requestTime) + list.add(requestTime); return list.hashCode(); } @@ -25222,12 +25306,22 @@ public int compareTo(unloadTablet_args other) { return lastComparison; } } - lastComparison = Boolean.valueOf(isSetSave()).compareTo(other.isSetSave()); + lastComparison = Boolean.valueOf(isSetGoal()).compareTo(other.isSetGoal()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetGoal()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.goal, other.goal); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetRequestTime()).compareTo(other.isSetRequestTime()); if (lastComparison != 0) { return lastComparison; } - if (isSetSave()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.save, other.save); + if (isSetRequestTime()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.requestTime, other.requestTime); if (lastComparison != 0) { return lastComparison; } @@ -25284,8 +25378,16 @@ public String toString() { } first = false; if (!first) sb.append(", "); - sb.append("save:"); - sb.append(this.save); + sb.append("goal:"); + if (this.goal == null) { + sb.append("null"); + } else { + sb.append(this.goal); + } + first = false; + if (!first) sb.append(", "); + sb.append("requestTime:"); + sb.append(this.requestTime); first = false; sb.append(")"); return sb.toString(); @@ -25376,10 +25478,18 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, unloadTablet_args s org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 3: // SAVE - if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { - struct.save = iprot.readBool(); - struct.setSaveIsSet(true); + case 6: // GOAL + if (schemeField.type == org.apache.thrift.protocol.TType.I32) { + struct.goal = org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal.findByValue(iprot.readI32()); + struct.setGoalIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 7: // REQUEST_TIME + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.requestTime = iprot.readI64(); + struct.setRequestTimeIsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -25409,9 +25519,6 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, unloadTablet_args struct.extent.write(oprot); oprot.writeFieldEnd(); } - oprot.writeFieldBegin(SAVE_FIELD_DESC); - oprot.writeBool(struct.save); - oprot.writeFieldEnd(); if (struct.lock != null) { oprot.writeFieldBegin(LOCK_FIELD_DESC); oprot.writeString(struct.lock); @@ -25422,6 +25529,14 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, unloadTablet_args struct.tinfo.write(oprot); oprot.writeFieldEnd(); } + if (struct.goal != null) { + oprot.writeFieldBegin(GOAL_FIELD_DESC); + oprot.writeI32(struct.goal.getValue()); + oprot.writeFieldEnd(); + } + oprot.writeFieldBegin(REQUEST_TIME_FIELD_DESC); + oprot.writeI64(struct.requestTime); + oprot.writeFieldEnd(); oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -25452,10 +25567,13 @@ public void write(org.apache.thrift.protocol.TProtocol prot, unloadTablet_args s if (struct.isSetExtent()) { optionals.set(3); } - if (struct.isSetSave()) { + if (struct.isSetGoal()) { optionals.set(4); } - oprot.writeBitSet(optionals, 5); + if (struct.isSetRequestTime()) { + optionals.set(5); + } + oprot.writeBitSet(optionals, 6); if (struct.isSetTinfo()) { struct.tinfo.write(oprot); } @@ -25468,15 +25586,18 @@ public void write(org.apache.thrift.protocol.TProtocol prot, unloadTablet_args s if (struct.isSetExtent()) { struct.extent.write(oprot); } - if (struct.isSetSave()) { - oprot.writeBool(struct.save); + if (struct.isSetGoal()) { + oprot.writeI32(struct.goal.getValue()); + } + if (struct.isSetRequestTime()) { + oprot.writeI64(struct.requestTime); } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, unloadTablet_args struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; - BitSet incoming = iprot.readBitSet(5); + BitSet incoming = iprot.readBitSet(6); if (incoming.get(0)) { struct.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo(); struct.tinfo.read(iprot); @@ -25497,8 +25618,12 @@ public void read(org.apache.thrift.protocol.TProtocol prot, unloadTablet_args st struct.setExtentIsSet(true); } if (incoming.get(4)) { - struct.save = iprot.readBool(); - struct.setSaveIsSet(true); + struct.goal = org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal.findByValue(iprot.readI32()); + struct.setGoalIsSet(true); + } + if (incoming.get(5)) { + struct.requestTime = iprot.readI64(); + struct.setRequestTimeIsSet(true); } } } diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift index 6a455626fb7..7697a2d256e 100644 --- a/core/src/main/thrift/tabletserver.thrift +++ b/core/src/main/thrift/tabletserver.thrift @@ -146,6 +146,13 @@ struct TSamplerConfiguration { 2:map options } +enum TUnloadTabletGoal { + UNKNOWN, + UNASSIGNED, + SUSPENDED, + DELETED +} + service TabletClientService extends client.ClientService { // scan a range of keys data.InitialScan startScan(11:trace.TInfo tinfo, @@ -207,7 +214,7 @@ service TabletClientService extends client.ClientService { void splitTablet(4:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:data.TKeyExtent extent, 3:binary splitPoint) throws (1:client.ThriftSecurityException sec, 2:NotServingTabletException nste) oneway void loadTablet(5:trace.TInfo tinfo, 1:security.TCredentials credentials, 4:string lock, 2:data.TKeyExtent extent), - oneway void unloadTablet(5:trace.TInfo tinfo, 1:security.TCredentials credentials, 4:string lock, 2:data.TKeyExtent extent, 3:bool save), + oneway void unloadTablet(5:trace.TInfo tinfo, 1:security.TCredentials credentials, 4:string lock, 2:data.TKeyExtent extent, 6:TUnloadTabletGoal goal, 7:i64 requestTime), oneway void flush(4:trace.TInfo tinfo, 1:security.TCredentials credentials, 3:string lock, 2:string tableId, 5:binary startRow, 6:binary endRow), oneway void flushTablet(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:string lock, 4:data.TKeyExtent extent), oneway void chop(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:string lock, 4:data.TKeyExtent extent), diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java index 0c0cceba631..2e08cea7359 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java @@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory; import com.google.common.net.HostAndPort; +import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal; public class LiveTServerSet implements Watcher { @@ -105,10 +106,10 @@ public void assignTablet(ZooLock lock, KeyExtent extent) throws TException { } } - public void unloadTablet(ZooLock lock, KeyExtent extent, boolean save) throws TException { + public void unloadTablet(ZooLock lock, KeyExtent extent, TUnloadTabletGoal goal, long requestTime) throws TException { TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context); try { - client.unloadTablet(Tracer.traceInfo(), context.rpcCreds(), lockString(lock), extent.toThrift(), save); + client.unloadTablet(Tracer.traceInfo(), context.rpcCreds(), lockString(lock), extent.toThrift(), goal, requestTime); } finally { ThriftUtil.returnClient(client); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java index 4cfa12b6ebf..6872466e9c7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.hadoop.fs.Path; @@ -77,26 +78,25 @@ abstract public void suspend(Collection tablets, Map> logsForDeadServers) throws DistributedStoreException { - TabletStateStore store; - if (tls.extent.isRootTablet()) { - store = new ZooTabletStateStore(); - } else if (tls.extent.isMeta()) { - store = new RootTabletStateStore(context); - } else { - store = new MetaDataStateStore(context); - } - store.unassign(Collections.singletonList(tls), logsForDeadServers); + getStoreForTablet(tls.extent, context).unassign(Collections.singletonList(tls), logsForDeadServers); + } + + public static void suspend(AccumuloServerContext context, TabletLocationState tls, Map> logsForDeadServers, + long suspensionTimestamp) throws DistributedStoreException { + getStoreForTablet(tls.extent, context).suspend(Collections.singletonList(tls), logsForDeadServers, suspensionTimestamp); } public static void setLocation(AccumuloServerContext context, Assignment assignment) throws DistributedStoreException { - TabletStateStore store; - if (assignment.tablet.isRootTablet()) { - store = new ZooTabletStateStore(); - } else if (assignment.tablet.isMeta()) { - store = new RootTabletStateStore(context); + getStoreForTablet(assignment.tablet, context).setLocations(Collections.singletonList(assignment)); + } + + protected static TabletStateStore getStoreForTablet(KeyExtent extent, AccumuloServerContext context) throws DistributedStoreException { + if (extent.isRootTablet()) { + return new ZooTabletStateStore(); + } else if (extent.isMeta()) { + return new RootTabletStateStore(context); } else { - store = new MetaDataStateStore(context); + return new MetaDataStateStore(context); } - store.setLocations(Collections.singletonList(assignment)); } } diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index d29d1234895..0497ee27b9d 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -162,7 +162,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Iterables; -import org.apache.accumulo.master.TabletGroupWatcher.SuspensionPolicy; +import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal; /** * The Master is responsible for assigning and balancing tablets to tablet servers. @@ -733,7 +733,18 @@ public void clearMigrations(String tableId) { } static enum TabletGoalState { - HOSTED, UNASSIGNED, DELETED + HOSTED(TUnloadTabletGoal.UNKNOWN), UNASSIGNED(TUnloadTabletGoal.UNASSIGNED), DELETED(TUnloadTabletGoal.DELETED), SUSPENDED(TUnloadTabletGoal.SUSPENDED); + + private final TUnloadTabletGoal unloadGoal; + + TabletGoalState(TUnloadTabletGoal unloadGoal) { + this.unloadGoal = unloadGoal; + } + + /** The purpose of unloading this tablet. */ + public TUnloadTabletGoal howUnload() { + return unloadGoal; + } }; TabletGoalState getSystemGoalState(TabletLocationState tls) { @@ -780,7 +791,7 @@ TabletGoalState getGoalState(TabletLocationState tls, MergeInfo mergeInfo) { TabletGoalState state = getSystemGoalState(tls); if (state == TabletGoalState.HOSTED) { if (tls.current != null && serversToShutdown.contains(tls.current)) { - return TabletGoalState.UNASSIGNED; + return TabletGoalState.SUSPENDED; } // Handle merge transitions if (mergeInfo.getExtent() != null) { @@ -1146,16 +1157,30 @@ public void process(WatchedEvent event) { } }); - // Always allow user data tablets to enter suspended state. - watchers.add(new TabletGroupWatcher(this, new MetaDataStateStore(this, this), null, SuspensionPolicy.SUSPEND)); + watchers.add(new TabletGroupWatcher(this, new MetaDataStateStore(this, this), null) { + @Override + boolean canSuspendTablets() { + // Always allow user data tablets to enter suspended state. + return true; + } + }); - // Allow metadata tablets to enter suspended state only if so configured. Generally we'll want metadata tablets to - // be immediately reassigned, even if there's a global table.suspension.duration setting. - watchers.add(new TabletGroupWatcher(this, new RootTabletStateStore(this, this), watchers.get(0), getConfiguration().getBoolean( - Property.MASTER_METADATA_SUSPENDABLE) ? SuspensionPolicy.SUSPEND : SuspensionPolicy.UNASSIGN)); + watchers.add(new TabletGroupWatcher(this, new RootTabletStateStore(this, this), watchers.get(0)) { + @Override + boolean canSuspendTablets() { + // Allow metadata tablets to enter suspended state only if so configured. Generally we'll want metadata tablets to + // be immediately reassigned, even if there's a global table.suspension.duration setting. + return getConfiguration().getBoolean(Property.MASTER_METADATA_SUSPENDABLE); + } + }); - // Never allow root tablet to enter suspended state. - watchers.add(new TabletGroupWatcher(this, new ZooTabletStateStore(new ZooStore(zroot)), watchers.get(1), SuspensionPolicy.UNASSIGN)); + watchers.add(new TabletGroupWatcher(this, new ZooTabletStateStore(new ZooStore(zroot)), watchers.get(1)) { + @Override + boolean canSuspendTablets() { + // Never allow root tablet to enter suspended state. + return false; + } + }); for (TabletGroupWatcher watcher : watchers) { watcher.start(); } diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java index cd30e3799cc..dea0d20733a 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java +++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java @@ -96,15 +96,9 @@ import org.apache.accumulo.server.conf.TableConfiguration; import static java.lang.Math.min; import java.util.SortedSet; +import static java.lang.Math.min; -class TabletGroupWatcher extends Daemon { - public static enum SuspensionPolicy { - /** Move tablets in state TabletState.ASSIGNED_TO_DEAD_SERVER to state TabletState.UNASSIGNED, and attempt reassignment. */ - UNASSIGN, - /** Move tablets in state TabletState.ASSIGNED_TO_DEAD_SERVER to state TabletState.SUSPENDED. */ - SUSPEND - } - +abstract class TabletGroupWatcher extends Daemon { // Constants used to make sure assignment logging isn't excessive in quantity or size private static final String ASSIGNMENT_BUFFER_SEPARATOR = ", "; private static final int ASSINGMENT_BUFFER_MAX_LENGTH = 4096; @@ -112,20 +106,21 @@ public static enum SuspensionPolicy { private final Master master; final TabletStateStore store; final TabletGroupWatcher dependentWatcher; - private final SuspensionPolicy suspensionPolicy; private MasterState masterState; final TableStats stats = new TableStats(); private SortedSet lastScanServers = ImmutableSortedSet.of(); - TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher, SuspensionPolicy suspensionPolicy) { + TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher) { this.master = master; this.store = store; this.dependentWatcher = dependentWatcher; - this.suspensionPolicy = suspensionPolicy; } + /** Should this {@code TabletGroupWatcher} suspend tablets? */ + abstract boolean canSuspendTablets(); + Map getStats() { return stats.getLast(); } @@ -252,7 +247,7 @@ public void run() { } // if we are shutting down all the tabletservers, we have to do it in order - if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) { + if (goal == TabletGoalState.SUSPENDED && state == TabletState.HOSTED) { if (this.master.serversToShutdown.equals(currentTServers.keySet())) { if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) { goal = TabletGoalState.HOSTED; @@ -283,7 +278,7 @@ public void run() { if (master.getSteadyTime() - tls.suspend.suspensionTime < tableConf.getTimeInMillis(Property.TABLE_SUSPEND_DURATION)) { // Tablet is suspended. See if its tablet server is back. TServerInstance returnInstance = null; - Iterator find = currentTServers.tailMap(new TServerInstance(tls.suspend.server, " ")).keySet().iterator(); + Iterator find = destinations.tailMap(new TServerInstance(tls.suspend.server, " ")).keySet().iterator(); if (find.hasNext()) { TServerInstance found = find.next(); if (found.getLocation().equals(tls.suspend.server)) { @@ -345,7 +340,7 @@ public void run() { case HOSTED: TServerConnection conn = this.master.tserverSet.getConnection(server); if (conn != null) { - conn.unloadTablet(this.master.masterLock, tls.extent, goal != TabletGoalState.DELETED); + conn.unloadTablet(this.master.masterLock, tls.extent, goal.howUnload(), master.getSteadyTime()); unloaded++; totalUnloaded++; } else { @@ -807,17 +802,15 @@ private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException { private void flushChanges(SortedMap currentTServers, List assignments, List assigned, List assignedToDeadServers, Map> logsForDeadServers, List suspendedToGoneServers, Map unassigned) throws DistributedStoreException, TException, WalMarkerException { + boolean tabletsSuspendable = canSuspendTablets(); if (!assignedToDeadServers.isEmpty()) { int maxServersToShow = min(assignedToDeadServers.size(), 100); Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "..."); Master.log.debug("logs for dead servers: " + logsForDeadServers); - switch (suspensionPolicy) { - case SUSPEND: - store.suspend(assignedToDeadServers, logsForDeadServers, master.getSteadyTime()); - break; - case UNASSIGN: - store.unassign(assignedToDeadServers, logsForDeadServers); - break; + if (tabletsSuspendable) { + store.suspend(assignedToDeadServers, logsForDeadServers, master.getSteadyTime()); + } else { + store.unassign(assignedToDeadServers, logsForDeadServers); } this.master.markDeadServerLogsAsClosed(logsForDeadServers); this.master.nextEvent.event("Marked %d tablets as suspended because they don't have current servers", assignedToDeadServers.size()); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 4ced721a774..b9995fc45a8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -258,6 +258,7 @@ import org.slf4j.LoggerFactory; import com.google.common.net.HostAndPort; +import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal; public class TabletServer extends AccumuloServerContext implements Runnable { @@ -329,7 +330,7 @@ public Metrics getMinCMetrics() { private final ZooAuthenticationKeyWatcher authKeyWatcher; private final WalStateManager walMarker; - public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) { + public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) throws IOException { super(confFactory); this.confFactory = confFactory; this.fs = fs; @@ -1550,7 +1551,7 @@ public void run() { } @Override - public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent, boolean save) { + public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent, TUnloadTabletGoal goal, long requestTime) { try { checkPermission(credentials, lock, "unloadTablet"); } catch (ThriftSecurityException e) { @@ -1560,7 +1561,7 @@ public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKe KeyExtent extent = new KeyExtent(textent); - resourceManager.addMigration(extent, new LoggingRunnable(log, new UnloadTabletHandler(extent, save))); + resourceManager.addMigration(extent, new LoggingRunnable(log, new UnloadTabletHandler(extent, goal, requestTime))); } @Override @@ -1940,11 +1941,13 @@ public void enqueueMasterMessage(MasterMessage m) { private class UnloadTabletHandler implements Runnable { private final KeyExtent extent; - private final boolean saveState; + private final TUnloadTabletGoal goalState; + private final long requestTimeSkew; - public UnloadTabletHandler(KeyExtent extent, boolean saveState) { + public UnloadTabletHandler(KeyExtent extent, TUnloadTabletGoal goalState, long requestTime) { this.extent = extent; - this.saveState = saveState; + this.goalState = goalState; + this.requestTimeSkew = requestTime - System.nanoTime(); } @Override @@ -1983,7 +1986,7 @@ public void run() { } try { - t.close(saveState); + t.close(!goalState.equals(TUnloadTabletGoal.DELETED)); } catch (Throwable e) { if ((t.isClosing() || t.isClosed()) && e instanceof IllegalStateException) { @@ -2008,8 +2011,14 @@ public void run() { } catch (BadLocationStateException e) { log.error("Unexpected error ", e); } - log.debug("Unassigning " + tls); - TabletStateStore.unassign(TabletServer.this, tls, null); + if (!goalState.equals(TUnloadTabletGoal.SUSPENDED) || extent.isRootTablet() + || (extent.isMeta() && !getConfiguration().getBoolean(Property.MASTER_METADATA_SUSPENDABLE))) { + log.debug("Unassigning " + tls); + TabletStateStore.unassign(TabletServer.this, tls, null); + } else { + log.debug("Suspending " + tls); + TabletStateStore.suspend(TabletServer.this, tls, null, requestTimeSkew + System.nanoTime()); + } } catch (DistributedStoreException ex) { log.warn("Unable to update storage", ex); } catch (KeeperException e) { diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index 4d4402b522b..2b0185aed01 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@ -79,6 +79,7 @@ import com.google.common.net.HostAndPort; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal; /** * The purpose of this class is to server as fake tserver that is a data sink like /dev/null. NullTserver modifies the metadata location entries for a table to @@ -179,7 +180,7 @@ public void fastHalt(TInfo tinfo, TCredentials credentials, String lock) {} public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent) throws TException {} @Override - public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent, boolean save) throws TException {} + public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent, TUnloadTabletGoal goal, long requestTime) throws TException {} @Override public List getActiveScans(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { From 35fcdd09a542680fc012860fc33151b7933cda02 Mon Sep 17 00:00:00 2001 From: Shawn Walker Date: Fri, 8 Jul 2016 12:40:03 -0400 Subject: [PATCH 5/7] ACCUMULO-4353: Fixed time unit mismatch in recent change to TabletServer --- .../main/java/org/apache/accumulo/tserver/TabletServer.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index b9995fc45a8..1c33d05977b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -258,6 +258,8 @@ import org.slf4j.LoggerFactory; import com.google.common.net.HostAndPort; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal; public class TabletServer extends AccumuloServerContext implements Runnable { @@ -1947,7 +1949,7 @@ private class UnloadTabletHandler implements Runnable { public UnloadTabletHandler(KeyExtent extent, TUnloadTabletGoal goalState, long requestTime) { this.extent = extent; this.goalState = goalState; - this.requestTimeSkew = requestTime - System.nanoTime(); + this.requestTimeSkew = requestTime - MILLISECONDS.convert(System.nanoTime(), NANOSECONDS); } @Override @@ -2017,7 +2019,7 @@ public void run() { TabletStateStore.unassign(TabletServer.this, tls, null); } else { log.debug("Suspending " + tls); - TabletStateStore.suspend(TabletServer.this, tls, null, requestTimeSkew + System.nanoTime()); + TabletStateStore.suspend(TabletServer.this, tls, null, requestTimeSkew + MILLISECONDS.convert(System.nanoTime(), NANOSECONDS)); } } catch (DistributedStoreException ex) { log.warn("Unable to update storage", ex); From 24223c6ada605250c1d5b7b1da7abfd57e105085 Mon Sep 17 00:00:00 2001 From: Shawn Walker Date: Mon, 11 Jul 2016 11:24:13 -0400 Subject: [PATCH 6/7] ACCUMULO-4353: Refactored SuspendedTabletsIT to test for suspension upon both (a) clean tserver shutdown and (b) tserver crash --- .../test/master/SuspendedTabletsIT.java | 122 +++++++++++++++--- 1 file changed, 104 insertions(+), 18 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java b/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java index 5981be6f7f7..455074b3bdb 100644 --- a/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java @@ -20,10 +20,15 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.SetMultimap; import com.google.common.net.HostAndPort; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Random; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.Callable; @@ -32,7 +37,6 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadFactory; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import java.util.concurrent.TimeoutException; @@ -41,15 +45,19 @@ import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.ClientExec; import org.apache.accumulo.core.client.impl.Credentials; +import org.apache.accumulo.core.client.impl.MasterClient; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; import org.apache.accumulo.minicluster.impl.ProcessReference; import org.apache.accumulo.server.master.state.MetaDataTableScanner; +import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.master.state.TabletLocationState; import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.hadoop.conf.Configuration; @@ -63,26 +71,107 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { private static final Logger log = LoggerFactory.getLogger(SuspendedTabletsIT.class); - private static ExecutorService threadPool; + private static final Random RANDOM = new Random(); + private static ExecutorService THREAD_POOL; public static final int TSERVERS = 5; - public static final long SUSPEND_DURATION = MILLISECONDS.convert(2, MINUTES); + public static final long SUSPEND_DURATION = MILLISECONDS.convert(30, SECONDS); public static final int TABLETS = 100; @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) { cfg.setProperty(Property.TABLE_SUSPEND_DURATION, SUSPEND_DURATION + "ms"); + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); cfg.setNumTservers(TSERVERS); } @Test - public void suspendAndResumeTserver() throws Exception { - String tableName = getUniqueNames(1)[0]; + public void crashAndResumeTserver() throws Exception { + // Run the test body. When we get to the point where we need a tserver to go away, get rid of it via crashing + suspensionTestBody(new TServerKiller() { + @Override + public void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count) throws Exception { + List procs = new ArrayList<>(getCluster().getProcesses().get(ServerType.TABLET_SERVER)); + Collections.shuffle(procs); + + for (int i = 0; i < count; ++i) { + ProcessReference pr = procs.get(i); + log.info("Crashing {}", pr.getProcess()); + getCluster().killProcess(ServerType.TABLET_SERVER, pr); + } + } + }); + } + + @Test + public void shutdownAndResumeTserver() throws Exception { + // Run the test body. When we get to the point where we need tservers to go away, stop them via a clean shutdown. + suspensionTestBody(new TServerKiller() { + @Override + public void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count) throws Exception { + Set tserversSet = new HashSet<>(); + for (TabletLocationState tls : locs.locationStates.values()) { + if (tls.current != null) { + tserversSet.add(tls.current); + } + } + List tserversList = new ArrayList<>(tserversSet); + Collections.shuffle(tserversList, RANDOM); + + for (int i = 0; i < count; ++i) { + final String tserverName = tserversList.get(i).toString(); + MasterClient.execute(ctx, new ClientExec() { + @Override + public void execute(MasterClientService.Client client) throws Exception { + log.info("Sending shutdown command to {} via MasterClientService", tserverName); + client.shutdownTabletServer(null, ctx.rpcCreds(), tserverName, false); + } + }); + } + + log.info("Waiting for tserver process{} to die", count == 1 ? "" : "es"); + for (int i = 0; i < 10; ++i) { + List deadProcs = new ArrayList<>(); + for (ProcessReference pr : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) { + Process p = pr.getProcess(); + if (!p.isAlive()) { + deadProcs.add(pr); + } + } + for (ProcessReference pr : deadProcs) { + log.info("Process {} is dead, informing cluster control about this", pr.getProcess()); + getCluster().getClusterControl().killProcess(ServerType.TABLET_SERVER, pr); + --count; + } + if (count == 0) { + return; + } else { + Thread.sleep(MILLISECONDS.convert(2, SECONDS)); + } + } + throw new IllegalStateException("Tablet servers didn't die!"); + } + }); + } + /** + * Main test body for suspension tests. + * + * @param ctx + * client context for cluster. + * @param tableName + * name of table to create + * @param serverStopper + * callback which shuts down some tablet servers. + * @throws Exception + */ + private void suspensionTestBody(TServerKiller serverStopper) throws Exception { Credentials creds = new Credentials("root", new PasswordToken(ROOT_PASSWORD)); Instance instance = new ZooKeeperInstance(getCluster().getClientConfig()); ClientContext ctx = new ClientContext(instance, creds, getCluster().getClientConfig()); + String tableName = getUniqueNames(1)[0]; + Connector conn = ctx.getConnector(); // Create a table with a bunch of splits @@ -113,16 +202,10 @@ public void suspendAndResumeTserver() throws Exception { Assert.assertEquals(TSERVERS, ds.hosted.keySet().size()); // Kill two tablet servers hosting our tablets. This should put tablets into suspended state, and thus halt balancing. - log.info("Killing tservers"); TabletLocations beforeDeathState = ds; - { - Iterator prIt = getCluster().getProcesses().get(ServerType.TABLET_SERVER).iterator(); - ProcessReference first = prIt.next(); - ProcessReference second = prIt.next(); - getCluster().killProcess(ServerType.TABLET_SERVER, first); - getCluster().killProcess(ServerType.TABLET_SERVER, second); - } + log.info("Eliminating tablet servers"); + serverStopper.eliminateTabletServers(ctx, beforeDeathState, 2); // Eventually some tablets will be suspended. log.info("Waiting on suspended tablets"); @@ -166,14 +249,17 @@ public void suspendAndResumeTserver() throws Exception { long recoverTime = System.nanoTime(); Assert.assertTrue(recoverTime - killTime >= NANOSECONDS.convert(SUSPEND_DURATION, MILLISECONDS)); - Assert.assertTrue(recoverTime - killTime <= NANOSECONDS.convert(SUSPEND_DURATION, MILLISECONDS) + NANOSECONDS.convert(3, MINUTES)); + } + + private static interface TServerKiller { + public void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count) throws Exception; } private static final AtomicInteger threadCounter = new AtomicInteger(0); @BeforeClass public static void init() { - threadPool = Executors.newCachedThreadPool(new ThreadFactory() { + THREAD_POOL = Executors.newCachedThreadPool(new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "Scanning deadline thread #" + threadCounter.incrementAndGet()); @@ -183,7 +269,7 @@ public Thread newThread(Runnable r) { @AfterClass public static void cleanup() { - threadPool.shutdownNow(); + THREAD_POOL.shutdownNow(); } private static class TabletLocations { @@ -211,7 +297,7 @@ public TabletLocations call() throws Exception { return answer; } }); - threadPool.submit(tlsFuture); + THREAD_POOL.submit(tlsFuture); return tlsFuture.get(5, SECONDS); } catch (TimeoutException ex) { log.debug("Retrieval timed out", ex); From 2e30d9178ec4352eaa724d2a9d5ea033f90e7d67 Mon Sep 17 00:00:00 2001 From: Shawn Walker Date: Mon, 11 Jul 2016 12:15:57 -0400 Subject: [PATCH 7/7] ACCUMULO-4353: Added short blurb in user manual on rolling restarts and `table.suspend.duration` --- docs/src/main/asciidoc/chapters/administration.txt | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docs/src/main/asciidoc/chapters/administration.txt b/docs/src/main/asciidoc/chapters/administration.txt index 1935181d513..a2dab8e503e 100644 --- a/docs/src/main/asciidoc/chapters/administration.txt +++ b/docs/src/main/asciidoc/chapters/administration.txt @@ -476,6 +476,20 @@ from the +$ACCUMULO_HOME/conf/slaves+ file) to gracefully stop a node. This will ensure that the tabletserver is cleanly stopped and recovery will not need to be performed when the tablets are re-hosted. +===== A note on rolling restarts + +For sufficiently large Accumulo clusters, restarting multiple TabletServers within a short window can place significant +load on the Master server. If slightly lower availability is acceptable, this load can be reduced by globally setting ++table.suspend.duration+ to a positive value. + +With +table.suspend.duration+ set to, say, +5m+, Accumulo will wait +for 5 minutes for any dead TabletServer to return before reassigning that TabletServer's responsibilities to other TabletServers. +If the TabletServer returns to the cluster before the specified timeout has elapsed, Accumulo will assign the TabletServer +its original responsibilities. + +It is important not to choose too large a value for +table.suspend.duration+, as during this time, all scans against the +data that TabletServer had hosted will block (or time out). + ==== Running multiple TabletServers on a single node With very powerful nodes, it may be beneficial to run more than one TabletServer on a given