From 7909a0cb1c15e2cf7fe6e68731db224ed03e8f33 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 18 Mar 2026 01:35:17 +0000 Subject: [PATCH 1/6] Moves set of shutting down tservers from manager memory to ZK The set of shutting down tservers was causing system fate operations to have to run on the primary manager because this was an in memory set. This caused fate to have different code paths to user vs system fate, this in turn caused problems when trying to distribute compaction coordination. To fix this problem moved the set from an in memory set to a set in zookeeper. The set is managed by fate operations which simplifies the existing code. Only fate operations add and remove from the set and fate keys are used to ensure only one fate operation runs at a time for a tserver instance. The previous in memory set had a lot of code to try to keep it in sync with reality, that is all gone now. There were many bugs with this code in the past. After this change is made fate can be simplified in a follow on commit to remove all specialization for the primary manager. Also the monitor can now directly access this set instead of making an RPC to the manager, will open a follow on issue for this. --- .../org/apache/accumulo/core/Constants.java | 2 + .../apache/accumulo/core/fate/FateKey.java | 43 +++++++- .../server/init/ZooKeeperInitializer.java | 2 + .../org/apache/accumulo/manager/Manager.java | 76 ++++++------- .../manager/ManagerClientServiceHandler.java | 39 +++---- .../tserverOps/BeginTserverShutdown.java | 80 ++++++++++++++ .../manager/tserverOps/ShutdownTServer.java | 75 +++++++------ .../manager/upgrade/Upgrader11to12.java | 11 ++ .../manager/tableOps/ShutdownTServerTest.java | 100 ------------------ 9 files changed, 228 insertions(+), 200 deletions(-) create mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/BeginTserverShutdown.java delete mode 100644 server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index eb8ba1059eb..d2c556f2c2a 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -74,6 +74,8 @@ public class Constants { public static final String ZDEAD = "/dead"; public static final String ZDEADTSERVERS = ZDEAD + "/tservers"; + public static final String ZSHUTTING_DOWN_TSERVERS = "/shutting-down-tservers"; + public static final String ZFATE = "/fate"; public static final String ZNEXT_FILE = "/next_file"; diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java index 650dca01af3..19641c000a3 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java @@ -27,6 +27,7 @@ import java.util.Optional; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -41,12 +42,14 @@ public class FateKey { private final FateKeyType type; private final Optional keyExtent; private final Optional compactionId; + private final Optional tServerInstance; private final byte[] serialized; private FateKey(FateKeyType type, KeyExtent keyExtent) { this.type = Objects.requireNonNull(type); this.keyExtent = Optional.of(keyExtent); this.compactionId = Optional.empty(); + this.tServerInstance = Optional.empty(); this.serialized = serialize(type, keyExtent); } @@ -54,15 +57,25 @@ private FateKey(FateKeyType type, ExternalCompactionId compactionId) { this.type = Objects.requireNonNull(type); this.keyExtent = Optional.empty(); this.compactionId = Optional.of(compactionId); + this.tServerInstance = Optional.empty(); this.serialized = serialize(type, compactionId); } + private FateKey(FateKeyType type, TServerInstance tServerInstance) { + this.type = Objects.requireNonNull(type); + this.keyExtent = Optional.empty(); + this.compactionId = Optional.empty(); + this.tServerInstance = Optional.of(tServerInstance); + this.serialized = serialize(type, tServerInstance); + } + private FateKey(byte[] serialized) { try (DataInputBuffer buffer = new DataInputBuffer()) { buffer.reset(serialized, serialized.length); this.type = FateKeyType.valueOf(buffer.readUTF()); this.keyExtent = deserializeKeyExtent(type, buffer); this.compactionId = deserializeCompactionId(type, buffer); + this.tServerInstance = deserializeTserverId(type, buffer); this.serialized = serialized; } catch (IOException e) { throw new UncheckedIOException(e); @@ -127,8 +140,12 @@ public static FateKey forMerge(KeyExtent extent) { return new FateKey(FateKeyType.MERGE, extent); } + public static FateKey forShutdown(TServerInstance tServerInstance) { + return new FateKey(FateKeyType.TSERVER_SHUTDOWN, tServerInstance); + } + public enum FateKeyType { - SPLIT, COMPACTION_COMMIT, MERGE + SPLIT, COMPACTION_COMMIT, MERGE, TSERVER_SHUTDOWN } private static byte[] serialize(FateKeyType type, KeyExtent ke) { @@ -155,22 +172,42 @@ private static byte[] serialize(FateKeyType type, ExternalCompactionId compactio } } + private static byte[] serialize(FateKeyType type, TServerInstance tServerInstance) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + dos.writeUTF(type.toString()); + dos.writeUTF(tServerInstance.getHostPortSession()); + dos.close(); + return baos.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private static Optional deserializeKeyExtent(FateKeyType type, DataInputBuffer buffer) throws IOException { return switch (type) { case SPLIT, MERGE -> Optional.of(KeyExtent.readFrom(buffer)); - case COMPACTION_COMMIT -> Optional.empty(); + case COMPACTION_COMMIT, TSERVER_SHUTDOWN -> Optional.empty(); }; } private static Optional deserializeCompactionId(FateKeyType type, DataInputBuffer buffer) throws IOException { return switch (type) { - case SPLIT, MERGE -> Optional.empty(); + case SPLIT, MERGE, TSERVER_SHUTDOWN -> Optional.empty(); case COMPACTION_COMMIT -> Optional.of(ExternalCompactionId.of(buffer.readUTF())); }; } + private static Optional deserializeTserverId(FateKeyType type, + DataInputBuffer buffer) throws IOException { + return switch (type) { + case SPLIT, MERGE, COMPACTION_COMMIT -> Optional.empty(); + case TSERVER_SHUTDOWN -> Optional.of(new TServerInstance(buffer.readUTF())); + }; + } + @Override public String toString() { var buf = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE); diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java index 464a6dae5a6..b23c6746803 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java @@ -178,6 +178,8 @@ void initialize(final ServerContext context, final String rootTabletDirName, ZooUtil.NodeExistsPolicy.FAIL); zrwChroot.putPersistentData(Constants.ZMANAGER_ASSISTANT_LOCK, EMPTY_BYTE_ARRAY, ZooUtil.NodeExistsPolicy.FAIL); + zrwChroot.putPersistentData(Constants.ZSHUTTING_DOWN_TSERVERS, EMPTY_BYTE_ARRAY, + ZooUtil.NodeExistsPolicy.FAIL); } /** diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 5ff232765c4..8e639e6ae19 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -58,6 +58,7 @@ import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ServerOpts; @@ -206,7 +207,6 @@ public class Manager extends AbstractServer private final List watchers = new ArrayList<>(); final Map badServers = Collections.synchronizedMap(new HashMap<>()); - final Set serversToShutdown = Collections.synchronizedSet(new HashSet<>()); final EventCoordinator nextEvent = new EventCoordinator(); RecoveryManager recoveryManager = null; private final ManagerTime timeKeeper; @@ -557,10 +557,6 @@ protected Manager(ServerOpts opts, aconf.getTimeInMillis(Property.MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME); } - public TServerConnection getConnection(TServerInstance server) { - return tserverSet.getConnection(server); - } - void setManagerGoalState(ManagerGoalState state) { try { getContext().getZooSession().asReaderWriter().putPersistentData(Constants.ZMANAGER_GOAL_STATE, @@ -667,6 +663,7 @@ public void run() { } boolean canBalance(DataLevel dataLevel, TServerStatus tServerStatus) { + Set serversToShutdown; if (!badServers.isEmpty()) { log.debug("not balancing {} because the balance information is out-of-date {}", dataLevel, badServers.keySet()); @@ -674,7 +671,7 @@ boolean canBalance(DataLevel dataLevel, TServerStatus tServerStatus) { } else if (getManagerGoalState() == ManagerGoalState.CLEAN_STOP) { log.debug("not balancing {} because the manager is attempting to stop cleanly", dataLevel); return false; - } else if (!serversToShutdown.isEmpty()) { + } else if (!(serversToShutdown = shutdownServers()).isEmpty()) { log.debug("not balancing {} while shutting down servers {}", dataLevel, serversToShutdown); return false; } else { @@ -766,7 +763,6 @@ public void run() { log.debug("stopping {} tablet servers", currentServers.size()); for (TServerInstance server : currentServers) { try { - serversToShutdown.add(server); tserverSet.getConnection(server).fastHalt(primaryManagerLock); } catch (TException e) { // its probably down, and we don't care @@ -1591,38 +1587,35 @@ public void update(LiveTServerSet current, Set deleted, obit.delete(up.getHostPort()); } } - for (TServerInstance dead : deleted) { - String cause = "unexpected failure"; - if (serversToShutdown.contains(dead)) { - cause = "clean shutdown"; // maybe an incorrect assumption + + if (!deleted.isEmpty()) { + // This set is read from zookeeper, so only get it if its actually needed + var serversToShutdown = shutdownServers(); + for (TServerInstance dead : deleted) { + String cause = "unexpected failure"; + if (serversToShutdown.contains(dead)) { + cause = "clean shutdown"; // maybe an incorrect assumption + } + if (!getManagerGoalState().equals(ManagerGoalState.CLEAN_STOP)) { + obit.post(dead.getHostPort(), cause); + } } - if (!getManagerGoalState().equals(ManagerGoalState.CLEAN_STOP)) { - obit.post(dead.getHostPort(), cause); + + Set unexpected = new HashSet<>(deleted); + unexpected.removeAll(serversToShutdown); + if (!unexpected.isEmpty() + && (stillManager() && !getManagerGoalState().equals(ManagerGoalState.CLEAN_STOP))) { + log.warn("Lost servers {}", unexpected); } + badServers.keySet().removeAll(deleted); } - Set unexpected = new HashSet<>(deleted); - unexpected.removeAll(this.serversToShutdown); - if (!unexpected.isEmpty() - && (stillManager() && !getManagerGoalState().equals(ManagerGoalState.CLEAN_STOP))) { - log.warn("Lost servers {}", unexpected); - } - serversToShutdown.removeAll(deleted); - badServers.keySet().removeAll(deleted); // clear out any bad server with the same host/port as a new server synchronized (badServers) { cleanListByHostAndPort(badServers.keySet(), deleted, added); } - synchronized (serversToShutdown) { - cleanListByHostAndPort(serversToShutdown, deleted, added); - } nextEvent.event("There are now %d tablet servers", current.size()); } - - // clear out any servers that are no longer current - // this is needed when we are using a fate operation to shutdown a tserver as it - // will continue to add the server to the serversToShutdown (ACCUMULO-4410) - serversToShutdown.retainAll(current.getCurrentServers()); } static void cleanListByHostAndPort(Collection badServers, @@ -1669,15 +1662,6 @@ public LiveTServersSnapshot tserversSnapshot() { return tserverSet.getSnapshot(); } - // recovers state from the persistent transaction to shutdown a server - public boolean shutdownTServer(TServerInstance server) { - if (serversToShutdown.add(server)) { - nextEvent.event("Tablet Server shutdown requested for %s", server); - return true; - } - return false; - } - public EventCoordinator getEventCoordinator() { return nextEvent; } @@ -1725,12 +1709,8 @@ public ManagerMonitorInfo getManagerMonitorInfo() { result.state = getManagerState(); result.goalState = getManagerGoalState(); result.unassignedTablets = displayUnassigned(); - result.serversShuttingDown = new HashSet<>(); - synchronized (serversToShutdown) { - for (TServerInstance server : serversToShutdown) { - result.serversShuttingDown.add(server.getHostPort()); - } - } + result.serversShuttingDown = + shutdownServers().stream().map(TServerInstance::getHostPort).collect(Collectors.toSet()); DeadServerList obit = new DeadServerList(getContext()); result.deadTabletServers = obit.getList(); return result; @@ -1744,8 +1724,12 @@ public boolean delegationTokensAvailable() { } public Set shutdownServers() { - synchronized (serversToShutdown) { - return Set.copyOf(serversToShutdown); + try { + List children = + getContext().getZooSession().asReader().getChildren(Constants.ZSHUTTING_DOWN_TSERVERS); + return children.stream().map(TServerInstance::new).collect(Collectors.toSet()); + } catch (KeeperException | InterruptedException e) { + throw new RuntimeException(e); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index f96332dc9d6..d88a94516d6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@ -33,7 +33,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; @@ -62,6 +64,7 @@ import org.apache.accumulo.core.fate.FateClient; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.TraceRepo; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.manager.state.tables.TableState; @@ -85,7 +88,7 @@ import org.apache.accumulo.core.securityImpl.thrift.TDelegationTokenConfig; import org.apache.accumulo.core.util.ByteBufferUtil; import org.apache.accumulo.manager.tableOps.FateEnv; -import org.apache.accumulo.manager.tserverOps.ShutdownTServer; +import org.apache.accumulo.manager.tserverOps.BeginTserverShutdown; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.client.ClientServiceHandler; import org.apache.accumulo.server.conf.store.NamespacePropKey; @@ -336,16 +339,16 @@ public void shutdownTabletServer(TInfo info, TCredentials c, String tabletServer } FateClient fate = manager.fateClient(FateInstanceType.META); - FateId fateId = fate.startTransaction(); - String msg = "Shutdown tserver " + tabletServer; + var repo = new TraceRepo<>( + new BeginTserverShutdown(doomed, manager.tserverSet.getResourceGroup(doomed), force)); - fate.seedTransaction(Fate.FateOperation.SHUTDOWN_TSERVER, fateId, - new TraceRepo<>( - new ShutdownTServer(doomed, manager.tserverSet.getResourceGroup(doomed), force)), - false, msg); - fate.waitForCompletion(fateId); - fate.delete(fateId); + CompletableFuture> future; + try (var seeder = fate.beginSeeding()) { + future = seeder.attemptToSeedTransaction(Fate.FateOperation.SHUTDOWN_TSERVER, + FateKey.forShutdown(doomed), repo, true); + } + future.join().ifPresent(fate::waitForCompletion); log.debug("FATE op shutting down " + tabletServer + " finished"); } @@ -360,17 +363,15 @@ public void tabletServerStopping(TInfo tinfo, TCredentials credentials, String t } log.info("Tablet Server {} has reported it's shutting down", tabletServer); var tserver = new TServerInstance(tabletServer); - if (manager.shutdownTServer(tserver)) { - // If there is an exception seeding the fate tx this should cause the RPC to fail which should - // cause the tserver to halt. Because of that not making an attempt to handle failure here. - FateClient fate = manager.fateClient(FateInstanceType.META); - var tid = fate.startTransaction(); - String msg = "Shutdown tserver " + tabletServer; + // If there is an exception seeding the fate tx this should cause the RPC to fail which should + // cause the tserver to halt. Because of that not making an attempt to handle failure here. + FateClient fate = manager.fateClient(FateInstanceType.META); - fate.seedTransaction(Fate.FateOperation.SHUTDOWN_TSERVER, tid, - new TraceRepo<>(new ShutdownTServer(tserver, ResourceGroupId.of(resourceGroup), false)), - true, msg); - } + var repo = new TraceRepo<>( + new BeginTserverShutdown(tserver, ResourceGroupId.of(resourceGroup), false)); + // only seed a new transaction if nothing is running for this tserver + fate.seedTransaction(Fate.FateOperation.SHUTDOWN_TSERVER, FateKey.forShutdown(tserver), repo, + true); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/BeginTserverShutdown.java b/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/BeginTserverShutdown.java new file mode 100644 index 00000000000..358595ebcac --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/BeginTserverShutdown.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tserverOps; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.data.ResourceGroupId; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.manager.tableOps.AbstractFateOperation; +import org.apache.accumulo.manager.tableOps.FateEnv; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.net.HostAndPort; + +public class BeginTserverShutdown extends AbstractFateOperation { + + private static final long serialVersionUID = 1L; + + private static final Logger log = LoggerFactory.getLogger(BeginTserverShutdown.class); + + private final ResourceGroupId resourceGroup; + private final HostAndPort hostAndPort; + private final String serverSession; + private final boolean force; + + public BeginTserverShutdown(TServerInstance server, ResourceGroupId resourceGroup, + boolean force) { + this.hostAndPort = server.getHostAndPort(); + this.resourceGroup = resourceGroup; + this.serverSession = server.getSession(); + this.force = force; + } + + static String createPath(HostAndPort hostPort, String session) { + return Constants.ZSHUTTING_DOWN_TSERVERS + "/" + + new TServerInstance(hostPort, session).getHostPortSession(); + } + + @Override + public Repo call(FateId fateId, FateEnv environment) throws Exception { + if (!force) { + String path = createPath(hostAndPort, serverSession); + // Because these fate operation are seeded with a fate key there should only ever be one fate + // operation running for a tserver instance, so do not need to worry about race conditions + // here + environment.getContext().getZooSession().asReaderWriter().putPersistentData(path, new byte[0], + ZooUtil.NodeExistsPolicy.SKIP); + log.trace("{} created {}", fateId, path); + } + return new ShutdownTServer(hostAndPort, serverSession, resourceGroup, force); + } + + @Override + public void undo(FateId fateId, FateEnv environment) throws Exception { + if (!force) { + String path = createPath(hostAndPort, serverSession); + environment.getContext().getZooSession().asReaderWriter().delete(path); + log.trace("{} removed {}", fateId, path); + } + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java b/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java index 147b8206cca..eb456a8a3c6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java @@ -19,6 +19,7 @@ package org.apache.accumulo.manager.tserverOps; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.manager.tserverOps.BeginTserverShutdown.createPath; import org.apache.accumulo.core.data.ResourceGroupId; import org.apache.accumulo.core.fate.FateId; @@ -28,10 +29,12 @@ import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService; +import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.manager.tableOps.AbstractFateOperation; import org.apache.accumulo.manager.tableOps.FateEnv; -import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,10 +50,11 @@ public class ShutdownTServer extends AbstractFateOperation { private final String serverSession; private final boolean force; - public ShutdownTServer(TServerInstance server, ResourceGroupId resourceGroup, boolean force) { - this.hostAndPort = server.getHostAndPort(); + public ShutdownTServer(HostAndPort hostAndPort, String serverSession, + ResourceGroupId resourceGroup, boolean force) { + this.hostAndPort = hostAndPort; this.resourceGroup = resourceGroup; - this.serverSession = server.getSession(); + this.serverSession = serverSession; this.force = force; } @@ -62,35 +66,38 @@ public long isReady(FateId fateId, FateEnv env) { return 0; } - // Inform the manager that we want this server to shutdown - Manager manager = (Manager) env; - manager.shutdownTServer(server); - - if (manager.onlineTabletServers().contains(server)) { - TServerConnection connection = manager.getConnection(server); - if (connection != null) { - try { - TabletServerStatus status = connection.getTableMap(false); - if (status.tableMap != null && status.tableMap.isEmpty()) { - log.info("tablet server hosts no tablets {}", server); - connection.halt(manager.getServiceLock()); - log.info("tablet server asked to halt {}", server); - return 0; - } else { - log.info("tablet server {} still has tablets for tables: {}", server, - (status.tableMap == null) ? "null" : status.tableMap.keySet()); - } - } catch (TTransportException ex) { - // expected - } catch (Exception ex) { - log.error("Error talking to tablet server {}: ", server, ex); - } + if (env.onlineTabletServers().contains(server)) { + + TabletServerClientService.Client client = null; - // If the connection was non-null and we could communicate with it - // give the manager some more time to tell it to stop and for the - // tserver to ack the request and stop itself. - return 1000; + try { + client = + ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, hostAndPort, env.getContext()); + TabletServerStatus status = + client.getTabletServerStatus(TraceUtil.traceInfo(), env.getContext().rpcCreds()); + if (status.tableMap != null && status.tableMap.isEmpty()) { + log.info("tablet server hosts no tablets {}", server); + client.halt(TraceUtil.traceInfo(), env.getContext().rpcCreds(), + env.getServiceLock().getLockID().serialize()); + log.info("tablet server asked to halt {}", server); + return 0; + } else { + log.info("tablet server {} still has tablets for tables: {}", server, + (status.tableMap == null) ? "null" : status.tableMap.keySet()); + } + } catch (TTransportException ex) { + // expected + } catch (Exception ex) { + log.error("Error talking to tablet server {}: ", server, ex); + } finally { + ThriftUtil.returnClient(client, env.getContext()); } + + // If the connection was non-null and we could communicate with it + // give the manager some more time to tell it to stop and for the + // tserver to ack the request and stop itself. + return 1000; + } return 0; @@ -108,6 +115,10 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { env.getContext().getServerPaths().createDeadTabletServerPath(resourceGroup, hostAndPort); zoo.putPersistentData(path.toString(), "forced down".getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); + } else { + String path = createPath(hostAndPort, serverSession); + env.getContext().getZooSession().asReaderWriter().delete(path); + log.trace("{} removed {}", fateId, path); } return null; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java index a230dc7149a..516d0462da4 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java @@ -255,6 +255,8 @@ public void upgradeZookeeper(ServerContext context) { moveTableProperties(context); LOG.info("Add assistant manager node"); addAssistantManager(context); + LOG.info("Adding shutting down tservers node"); + addShuttingDownTservers(context); } @Override @@ -308,6 +310,15 @@ private static void addAssistantManager(ServerContext context) { } } + private static void addShuttingDownTservers(ServerContext context) { + try { + context.getZooSession().asReaderWriter().putPersistentData(Constants.ZSHUTTING_DOWN_TSERVERS, + new byte[0], ZooUtil.NodeExistsPolicy.SKIP); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + private static void addCompactionsNode(ServerContext context) { try { context.getZooSession().asReaderWriter().putPersistentData(Constants.ZCOMPACTIONS, diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java deleted file mode 100644 index 8dbe53bf122..00000000000 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.tableOps; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.Collections; -import java.util.HashMap; -import java.util.UUID; - -import org.apache.accumulo.core.data.ResourceGroupId; -import org.apache.accumulo.core.fate.FateId; -import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.manager.thrift.TableInfo; -import org.apache.accumulo.core.manager.thrift.TabletServerStatus; -import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.tserverOps.ShutdownTServer; -import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; -import org.easymock.EasyMock; -import org.junit.jupiter.api.Test; - -import com.google.common.net.HostAndPort; - -public class ShutdownTServerTest { - - @Test - public void testSingleShutdown() throws Exception { - HostAndPort hap = HostAndPort.fromParts("localhost", 1234); - final TServerInstance tserver = new TServerInstance(hap, "fake"); - final boolean force = false; - - final ShutdownTServer op = new ShutdownTServer(tserver, ResourceGroupId.DEFAULT, force); - - final Manager manager = EasyMock.createMock(Manager.class); - final FateId fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); - - final TServerConnection tserverCnxn = EasyMock.createMock(TServerConnection.class); - final TabletServerStatus status = new TabletServerStatus(); - status.tableMap = new HashMap<>(); - // Put in a table info record, don't care what - status.tableMap.put("a_table", new TableInfo()); - - EasyMock.expect(manager.shutdownTServer(tserver)).andReturn(true).once(); - EasyMock.expect(manager.onlineTabletServers()).andReturn(Collections.singleton(tserver)); - EasyMock.expect(manager.getConnection(tserver)).andReturn(tserverCnxn); - EasyMock.expect(tserverCnxn.getTableMap(false)).andReturn(status); - - EasyMock.replay(tserverCnxn, manager); - - // FATE op is not ready - long wait = op.isReady(fateId, manager); - assertTrue(wait > 0, "Expected wait to be greater than 0"); - - EasyMock.verify(tserverCnxn, manager); - - // Reset the mocks - EasyMock.reset(tserverCnxn, manager); - - // reset the table map to the empty set to simulate all tablets unloaded - status.tableMap = new HashMap<>(); - EasyMock.expect(manager.shutdownTServer(tserver)).andReturn(false).once(); - EasyMock.expect(manager.onlineTabletServers()).andReturn(Collections.singleton(tserver)); - EasyMock.expect(manager.getConnection(tserver)).andReturn(tserverCnxn); - EasyMock.expect(tserverCnxn.getTableMap(false)).andReturn(status); - EasyMock.expect(manager.getServiceLock()).andReturn(null); - tserverCnxn.halt(null); - EasyMock.expectLastCall().once(); - - EasyMock.replay(tserverCnxn, manager); - - // FATE op is not ready - wait = op.isReady(fateId, manager); - assertEquals(0, wait, "Expected wait to be 0"); - - var op2 = op.call(fateId, manager); - assertNull(op2, "Expected no follow on step"); - - EasyMock.verify(tserverCnxn, manager); - } - -} From eeac89f2af0c3f15add0b71b5742f33dda196560 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 18 Mar 2026 18:59:11 +0000 Subject: [PATCH 2/6] format code --- .../org/apache/accumulo/manager/upgrade/Upgrader11to12.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java index 516d0462da4..f88a9d59c4d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java @@ -313,7 +313,7 @@ private static void addAssistantManager(ServerContext context) { private static void addShuttingDownTservers(ServerContext context) { try { context.getZooSession().asReaderWriter().putPersistentData(Constants.ZSHUTTING_DOWN_TSERVERS, - new byte[0], ZooUtil.NodeExistsPolicy.SKIP); + new byte[0], ZooUtil.NodeExistsPolicy.SKIP); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } From c7eb2199f6f9da6bdacf9c61c8ff13f862e39181 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 18 Mar 2026 23:41:16 +0000 Subject: [PATCH 3/6] Removed specialization for meta fate in the manager After this change meta fate and user fate are both treated mostly the same in the managers. One difference is in assignment, the entire meta fate range is assigned to a single manager. User fate is spread across all managers. But both are assigned out by the primary manager using the same RPCs now. The primary manager used to directly start a meta fate instance. Was able to remove the extension of FateEnv from the manager class in this change, that caused a ripple of test changes. But now there are no longer two different implementations of FateEnv --- .../accumulo/core/fate/FatePartition.java | 4 + .../org/apache/accumulo/manager/Manager.java | 92 ++----------------- .../accumulo/manager/fate/FateManager.java | 90 ++++++++++++------ .../accumulo/manager/fate/FateWorker.java | 81 +++++++++------- .../compact/CompactionDriverTest.java | 13 +-- .../tableOps/merge/MergeTabletsTest.java | 12 +-- .../tableOps/split/UpdateTabletsTest.java | 26 +++--- .../accumulo/test/MultipleManagerFateIT.java | 5 +- .../accumulo/test/ample/TestAmpleUtil.java | 24 ++--- .../test/fate/ManagerRepoIT_SimpleSuite.java | 66 ++++++------- 10 files changed, 198 insertions(+), 215 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java b/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java index 33cbbc97240..44907cb788c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java @@ -60,4 +60,8 @@ public boolean contains(FateId fateId) { } } + + public FateInstanceType getType() { + return start.getType(); + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 8e639e6ae19..204de1d79d9 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -50,7 +50,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -77,7 +76,6 @@ import org.apache.accumulo.core.fate.FateClient; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.fate.FatePartition; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.TraceRepo; import org.apache.accumulo.core.fate.user.UserFateStore; @@ -108,7 +106,6 @@ import org.apache.accumulo.core.metadata.SystemTables; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.trace.TraceUtil; @@ -123,11 +120,9 @@ import org.apache.accumulo.manager.fate.FateManager; import org.apache.accumulo.manager.fate.FateWorker; import org.apache.accumulo.manager.merge.FindMergeableRangeTask; -import org.apache.accumulo.manager.metrics.fate.FateExecutorMetricsProducer; import org.apache.accumulo.manager.metrics.fate.meta.MetaFateMetrics; import org.apache.accumulo.manager.metrics.fate.user.UserFateMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; -import org.apache.accumulo.manager.split.FileRangeCache; import org.apache.accumulo.manager.split.Splitter; import org.apache.accumulo.manager.state.TableCounts; import org.apache.accumulo.manager.tableOps.FateEnv; @@ -179,10 +174,8 @@ *

* The manager will also coordinate log recoveries and reports general status. */ -// TODO create standalone PrimaryFateEnv class and pull everything into there relatated to -// FateEnv... this will make it much more clear the env is for metadata ops only public class Manager extends AbstractServer - implements LiveTServerSet.Listener, FateEnv, PrimaryManagerThriftService { + implements LiveTServerSet.Listener, PrimaryManagerThriftService { static final Logger log = LoggerFactory.getLogger(Manager.class); @@ -228,8 +221,6 @@ public class Manager extends AbstractServer private final CountDownLatch fateReadyLatch = new CountDownLatch(1); private final AtomicReference>> fateClients = new AtomicReference<>(); - private final AtomicReference>> fateRefs = - new AtomicReference<>(); private volatile FateManager fateManager; static class TServerStatus { @@ -287,7 +278,6 @@ void setTserverStatus(LiveTServersSnapshot snapshot, private final long timeToCacheRecoveryWalExistence; private ExecutorService tableInformationStatusPool = null; - private ThreadPoolExecutor tabletRefreshThreadPool; private final TabletStateStore rootTabletStore; private final TabletStateStore metadataTabletStore; @@ -341,21 +331,15 @@ private void waitForFate() { } /** - * Retrieve the Fate object, blocking until it is ready. This could cause problems if Fate + * Retrieve the FateClient object, blocking until it is ready. This could cause problems if Fate * operations are attempted to be used prior to the Manager being ready for them. If these * operations are triggered by a client side request from a tserver or client, it should be safe * to wait to handle those until Fate is ready, but if it occurs during an upgrade, or some other * time in the Manager before Fate is started, that may result in a deadlock and will need to be * fixed. * - * @return the Fate object, only after the fate components are running and ready + * @return the FateClient object, only after the fate components are running and ready */ - public Fate fate(FateInstanceType type) { - waitForFate(); - var fate = requireNonNull(fateRefs.get(), "fateRefs is not set yet").get(type); - return requireNonNull(fate, () -> "fate type " + type + " is not present"); - } - public FateClient fateClient(FateInstanceType type) { waitForFate(); var client = requireNonNull(fateClients.get(), "fateClients is not set yet").get(type); @@ -498,16 +482,10 @@ int displayUnassigned() { return result; } - @Override public TableManager getTableManager() { return getContext().getTableManager(); } - @Override - public ThreadPoolExecutor getTabletRefreshThreadPool() { - return tabletRefreshThreadPool; - } - public static void main(String[] args) throws Exception { AbstractServer.startServer(new Manager(new ServerOpts(), ServerContext::new, args), log); } @@ -586,17 +564,11 @@ ManagerGoalState getManagerGoalState() { } private Splitter splitter; - private FileRangeCache fileRangeCache; public Splitter getSplitter() { return splitter; } - @Override - public FileRangeCache getFileRangeCache() { - return fileRangeCache; - } - public UpgradeCoordinator.UpgradeStatus getUpgradeStatus() { return upgradeCoordinator.getStatus(); } @@ -605,11 +577,6 @@ public CompactionCoordinator getCompactionCoordinator() { return compactionCoordinator; } - @Override - public void recordCompactionCompletion(ExternalCompactionId ecid) { - getCompactionCoordinator().recordCompletion(ecid); - } - public void hostOndemand(List extents) { extents.forEach(e -> Preconditions.checkArgument(DataLevel.of(e.tableId()) == DataLevel.USER)); @@ -723,14 +690,14 @@ public void run() { case CLEAN_STOP: switch (getManagerState()) { case NORMAL: - fateManager.stop(Duration.ofMinutes(1)); + fateManager.stop(FateInstanceType.USER, Duration.ofMinutes(1)); setManagerState(ManagerState.SAFE_MODE); break; case SAFE_MODE: { // META fate stores its data in Zookeeper and its operations interact with // metadata and root tablets, need to completely shut it down before unloading // metadata and root tablets - fate(FateInstanceType.META).shutdown(1, MINUTES); + fateManager.stop(FateInstanceType.META, Duration.ofMinutes(1)); int count = nonMetaDataTabletsAssignedOrHosted(); log.debug( String.format("There are %d non-metadata tablets assigned or hosted", count)); @@ -959,9 +926,6 @@ private void setupPrimaryMetrics() { getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))); metricsInfo.addMetricsProducers(new UserFateMetrics(getContext(), getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))); - metricsInfo.addMetricsProducers(new FateExecutorMetricsProducer(getContext(), - fate(FateInstanceType.META).getFateExecutors(), - getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))); metricsInfo.addMetricsProducers(this); } @@ -1051,11 +1015,6 @@ public void run() { tableInformationStatusPool = ThreadPools.getServerThreadPools() .createExecutorService(getConfiguration(), Property.MANAGER_STATUS_THREAD_POOL_SIZE, false); - tabletRefreshThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("Tablet refresh ") - .numCoreThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MINTHREADS)) - .numMaxThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS)) - .build(); - Thread statusThread = Threads.createCriticalThread("Status Thread", new StatusThread()); statusThread.start(); @@ -1201,7 +1160,6 @@ boolean canSuspendTablets() { this.splitter = new Splitter(this); this.splitter.start(); - this.fileRangeCache = new FileRangeCache(context); setupFate(context); @@ -1289,8 +1247,8 @@ boolean canSuspendTablets() { } log.debug("Shutting down fate."); - fate(FateInstanceType.META).close(); - fateManager.stop(Duration.ZERO); + fateManager.stop(FateInstanceType.USER, Duration.ZERO); + fateManager.stop(FateInstanceType.META, Duration.ZERO); splitter.stop(); @@ -1302,7 +1260,6 @@ boolean canSuspendTablets() { } tableInformationStatusPool.shutdownNow(); - tabletRefreshThreadPool.shutdownNow(); compactionCoordinator.shutdown(); @@ -1350,12 +1307,10 @@ private void setupFate(ServerContext context) { lock -> ServiceLock.isLockHeld(context.getZooCache(), lock); var metaStore = new MetaFateStore(context.getZooSession(), primaryManagerLock.getLockID(), isLockHeld); - var metaInstance = createFateInstance(this, metaStore, context); - // configure this instance to process all data - metaInstance.setPartitions(Set.of(FatePartition.all(FateInstanceType.META))); + var metaFateClient = new FateClient<>(metaStore, TraceRepo::toLogString); var userStore = new UserFateStore(context, SystemTables.FATE.tableName(), managerLock.getLockID(), isLockHeld); - var userFateClient = new FateClient(userStore, TraceRepo::toLogString); + var userFateClient = new FateClient<>(userStore, TraceRepo::toLogString); var metaCleaner = new FateCleaner<>(metaStore, Duration.ofHours(8), this::getSteadyTime); ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() @@ -1365,14 +1320,10 @@ private void setupFate(ServerContext context) { .scheduleWithFixedDelay(userCleaner::ageOff, 10, 4 * 60, MINUTES)); if (!fateClients.compareAndSet(null, - Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER, userFateClient))) { + Map.of(FateInstanceType.META, metaFateClient, FateInstanceType.USER, userFateClient))) { throw new IllegalStateException( "Unexpected previous fateClient reference map already initialized"); } - if (!fateRefs.compareAndSet(null, Map.of(FateInstanceType.META, metaInstance))) { - throw new IllegalStateException( - "Unexpected previous fate reference map already initialized"); - } fateReadyLatch.countDown(); } catch (KeeperException | InterruptedException e) { @@ -1528,11 +1479,6 @@ private void getAssistantManagerLock() throws KeeperException, InterruptedExcept } } - @Override - public ServiceLock getServiceLock() { - return primaryManagerLock; - } - private ServiceLockData getPrimaryManagerLock(final ServiceLockPath zManagerLoc) throws KeeperException, InterruptedException { var zooKeeper = getContext().getZooSession(); @@ -1653,7 +1599,6 @@ public Set onlineTables() { return result; } - @Override public Set onlineTabletServers() { return tserverSet.getSnapshot().getTservers(); } @@ -1666,12 +1611,6 @@ public EventCoordinator getEventCoordinator() { return nextEvent; } - @Override - public EventPublisher getEventPublisher() { - return nextEvent; - } - - @Override public VolumeManager getVolumeManager() { return getContext().getVolumeManager(); } @@ -1738,7 +1677,6 @@ public Set shutdownServers() { * monotonic clock, which will be approximately consistent between different managers or different * runs of the same manager. SteadyTime supports both nanoseconds and milliseconds. */ - @Override public SteadyTime getSteadyTime() { return timeKeeper.getTime(); } @@ -1766,14 +1704,4 @@ public void registerMetrics(MeterRegistry registry) { public ServiceLock getLock() { return managerLock; } - - /** - * Get Threads Pool instance which is used by blocked I/O - * - * @return {@link ExecutorService} - */ - @Override - public ExecutorService getRenamePool() { - return this.renamePool; - } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java index 6b294e0b8ad..9d934913098 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java @@ -32,7 +32,9 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FatePartition; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.manager.thrift.FateWorkerService; import org.apache.accumulo.core.metadata.SystemTables; @@ -44,6 +46,7 @@ import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.ServerContext; import org.apache.thrift.TException; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +106,8 @@ private void manageAssistants() throws TException, InterruptedException { } Map> currentAssignments = new HashMap<>(); currentPartitions.forEach((k, v) -> currentAssignments.put(k, v.partitions())); - Set desiredParititions = getDesiredPartitions(currentAssignments.size()); + Map> desiredParititions = + getDesiredPartitions(currentAssignments.size()); Map> desired = computeDesiredAssignments(currentAssignments, desiredParititions); @@ -191,7 +195,7 @@ public synchronized void start() { @SuppressFBWarnings(value = "SWL_SLEEP_WITH_LOCK_HELD", justification = "Sleep is okay. Can hold the lock as long as needed, as we are shutting down." + " Don't need or want other operations to run.") - public synchronized void stop(Duration timeout) { + public synchronized void stop(FateInstanceType fateType, Duration timeout) { if (!stop.compareAndSet(false, true)) { return; } @@ -222,7 +226,9 @@ public synchronized void stop(Duration timeout) { var currentPartitions = entry.getValue(); if (!currentPartitions.partitions.isEmpty()) { try { - setPartitions(hostPort, currentPartitions.updateId(), Set.of()); + var copy = new HashSet<>(currentPartitions.partitions); + copy.removeIf(fp -> fp.getType() == fateType); + setPartitions(hostPort, currentPartitions.updateId(), copy); } catch (TException e) { log.warn("Failed to unassign fate partitions {}", hostPort, e); } @@ -232,8 +238,16 @@ public synchronized void stop(Duration timeout) { stableAssignments.set(TreeRangeMap.create()); if (!timer.isExpired()) { - var store = new UserFateStore(context, SystemTables.FATE.tableName(), null, null); - + FateStore store = switch (fateType) { + case USER -> new UserFateStore(context, SystemTables.FATE.tableName(), null, null); + case META -> { + try { + yield new MetaFateStore<>(context.getZooSession(), null, null); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + }; var reserved = store.getActiveReservations(Set.of(FatePartition.all(FateInstanceType.USER))); while (!reserved.isEmpty() && !timer.isExpired()) { if (log.isTraceEnabled()) { @@ -333,28 +347,41 @@ private boolean setPartitions(HostAndPort address, long updateId, Set> computeDesiredAssignments( Map> currentAssignments, - Set desiredParititions) { + Map> desiredParititions) { - Preconditions.checkArgument(currentAssignments.size() == desiredParititions.size()); Map> desiredAssignments = new HashMap<>(); - var copy = new HashSet<>(desiredParititions); + currentAssignments.keySet().forEach(hp -> { + desiredAssignments.put(hp, new HashSet<>()); + }); + + desiredParititions.forEach((fateType, desiredForType) -> { + // This code can not handle more than one partition per host + Preconditions.checkState(desiredForType.size() <= currentAssignments.size()); + + var added = new HashSet(); + + currentAssignments.forEach((hp, partitions) -> { + var hostAssignments = desiredAssignments.get(hp); + partitions.forEach(partition -> { + if (desiredForType.contains(partition) + && hostAssignments.stream().noneMatch(fp -> fp.getType() == fateType) + && !added.contains(partition)) { + hostAssignments.add(partition); + Preconditions.checkState(added.add(partition)); + } + }); + }); - currentAssignments.forEach((hp, partitions) -> { - if (!partitions.isEmpty()) { - var firstPart = partitions.iterator().next(); - if (copy.contains(firstPart)) { - desiredAssignments.put(hp, Set.of(firstPart)); - copy.remove(firstPart); + var iter = Sets.difference(desiredForType, added).iterator(); + currentAssignments.forEach((hp, partitions) -> { + var hostAssignments = desiredAssignments.get(hp); + if (iter.hasNext() && hostAssignments.stream().noneMatch(fp -> fp.getType() == fateType)) { + hostAssignments.add(iter.next()); } - } - }); + }); - var iter = copy.iterator(); - currentAssignments.forEach((hp, partitions) -> { - if (!desiredAssignments.containsKey(hp)) { - desiredAssignments.put(hp, Set.of(iter.next())); - } + Preconditions.checkState(!iter.hasNext()); }); if (log.isTraceEnabled()) { @@ -363,7 +390,6 @@ private Map> computeDesiredAssignments( log.trace(" desired {} {} {}", hp, parts.size(), parts); }); } - return desiredAssignments; } @@ -371,15 +397,23 @@ private Map> computeDesiredAssignments( * Computes a single partition for each worker such that the partition cover all possible UUIDs * and evenly divide the UUIDs. */ - private Set getDesiredPartitions(int numWorkers) { + private Map> getDesiredPartitions(int numWorkers) { Preconditions.checkArgument(numWorkers >= 0); if (numWorkers == 0) { - return Set.of(); + return Map.of(FateInstanceType.META, Set.of(), FateInstanceType.USER, Set.of()); } // create a single partition per worker that equally divides the space - HashSet desired = new HashSet<>(); + Map> desired = new HashMap<>(); + + // meta fate will never see much activity, so give it a single partition. + desired.put(FateInstanceType.META, + Set.of(new FatePartition(FateId.from(FateInstanceType.META, new UUID(0, 0)), + FateId.from(FateInstanceType.META, new UUID(-1, -1))))); + + Set desiredUser = new HashSet<>(); + // All the shifting is because java does not have unsigned integers. Want to evenly partition // [0,2^64) into numWorker ranges, but can not directly do that. Work w/ 60 bit unsigned // integers to partition the space and then shift over by 4. Used 60 bits instead of 63 so it @@ -392,7 +426,7 @@ private Set getDesiredPartitions(int numWorkers) { UUID startUuid = new UUID(start, 0); UUID endUuid = new UUID(end, 0); - desired.add(new FatePartition(FateId.from(FateInstanceType.USER, startUuid), + desiredUser.add(new FatePartition(FateId.from(FateInstanceType.USER, startUuid), FateId.from(FateInstanceType.USER, endUuid))); } @@ -400,9 +434,11 @@ private Set getDesiredPartitions(int numWorkers) { UUID startUuid = new UUID(start, 0); // last partition has a special end uuid that is all f nibbles. UUID endUuid = new UUID(-1, -1); - desired.add(new FatePartition(FateId.from(FateInstanceType.USER, startUuid), + desiredUser.add(new FatePartition(FateId.from(FateInstanceType.USER, startUuid), FateId.from(FateInstanceType.USER, endUuid))); + desired.put(FateInstanceType.USER, desiredUser); + return desired; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java index a1409af98ee..5b0261171d0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java @@ -20,8 +20,11 @@ import static org.apache.accumulo.core.util.LazySingletons.RANDOM; +import java.util.Arrays; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -31,9 +34,11 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FatePartition; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.manager.thrift.FateWorkerService; @@ -49,6 +54,7 @@ import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.security.AuditedSecurityOperation; import org.apache.thrift.TException; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +67,7 @@ public class FateWorker implements FateWorkerService.Iface { private final AuditedSecurityOperation security; private final LiveTServerSet liveTserverSet; private final FateFactory fateFactory; - private Fate fate; + private final Map> fates = new ConcurrentHashMap<>(); private FateWorkerEnv fateWorkerEnv; public interface FateFactory { @@ -71,17 +77,24 @@ public interface FateFactory { public FateWorker(ServerContext ctx, LiveTServerSet liveTServerSet, FateFactory fateFactory) { this.context = ctx; this.security = ctx.getSecurityOperation(); - this.fate = null; this.liveTserverSet = liveTServerSet; this.fateFactory = fateFactory; + Thread.interrupted() } public synchronized void setLock(ServiceLock lock) { fateWorkerEnv = new FateWorkerEnv(context, lock, liveTserverSet); Predicate isLockHeld = l -> ServiceLock.isLockHeld(context.getZooCache(), l); + try { + MetaFateStore metaStore = + new MetaFateStore<>(context.getZooSession(), lock.getLockID(), isLockHeld); + this.fates.put(FateInstanceType.META, fateFactory.create(fateWorkerEnv, metaStore, context)); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } UserFateStore store = new UserFateStore<>(context, SystemTables.FATE.tableName(), lock.getLockID(), isLockHeld); - this.fate = fateFactory.create(fateWorkerEnv, store, context); + this.fates.put(FateInstanceType.USER, fateFactory.create(fateWorkerEnv, store, context)); } private Long expectedUpdateId = null; @@ -105,12 +118,8 @@ public TFatePartitions getPartitions(TInfo tinfo, TCredentials credentials) // id expectedUpdateId = updateId; - if (fate == null) { - return new TFatePartitions(updateId, List.of()); - } else { - return new TFatePartitions(updateId, - fate.getPartitions().stream().map(FatePartition::toThrift).toList()); - } + return new TFatePartitions(updateId, fates.values().stream() + .flatMap(fate -> fate.getPartitions().stream()).map(FatePartition::toThrift).toList()); } } @@ -137,16 +146,22 @@ public boolean setPartitions(TInfo tinfo, TCredentials credentials, long updateI synchronized (this) { // The primary manager should not assign any fate partitions until after upgrade is complete. Preconditions.checkState(isUpgradeComplete()); - if (fate != null && expectedUpdateId != null && updateId == expectedUpdateId) { + + if (expectedUpdateId != null && updateId == expectedUpdateId) { // Set to null which makes it so that an update id can only be used once. expectedUpdateId = null; - var desiredSet = desired.stream().map(FatePartition::from).collect(Collectors.toSet()); - var oldPartitions = fate.setPartitions(desiredSet); - log.info("Changed partitions from {} to {}", oldPartitions, desiredSet); + for (var fateType : FateInstanceType.values()) { + var fate = fates.get(fateType); + var desiredSet = desired.stream().map(FatePartition::from) + .filter(fp -> fp.getType() == fateType).collect(Collectors.toSet()); + var oldPartitions = fate.setPartitions(desiredSet); + log.info("Changed partitions for {} from {} to {}", fateType, oldPartitions, desiredSet); + } + return true; } else { - log.debug("Did not change partitions to {} expectedUpdateId:{} updateId:{} fate==null:{}", - desired, expectedUpdateId, updateId, fate == null); + log.debug("Did not change partitions to {} expectedUpdateId:{} updateId:{} fates:{}", + desired, expectedUpdateId, updateId, fates.keySet()); return false; } } @@ -161,28 +176,24 @@ public void seeded(TInfo tinfo, TCredentials credentials, List t SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } - Fate localFate; - synchronized (this) { - localFate = fate; - } + Map> partitions = + tpartitions.stream().map(FatePartition::from) + .collect(Collectors.groupingBy(FatePartition::getType, Collectors.toSet())); - if (localFate != null) { - localFate.seeded(tpartitions.stream().map(FatePartition::from).collect(Collectors.toSet())); - } - } - - public synchronized void stop() { - fate.shutdown(1, TimeUnit.MINUTES); - fate.close(); - fateWorkerEnv.stop(); - fate = null; - fateWorkerEnv = null; + partitions.forEach((fateType, typePartitions) -> { + var fate = fates.get(fateType); + if (fate != null) { + fate.seeded(typePartitions); + } + }); } public synchronized MetricsProducer[] getMetricsProducers() { - Preconditions.checkState(fate != null, "Not started yet"); - return new MetricsProducer[] { - new FateExecutorMetricsProducer(context, fate.getFateExecutors(), context.getConfiguration() - .getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))}; + Preconditions.checkState(!fates.isEmpty(), "Not started yet"); + return Arrays.stream(FateInstanceType.values()).map(fates::get) + .map(fate -> new FateExecutorMetricsProducer(context, fate.getFateExecutors(), + context.getConfiguration() + .getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))) + .toArray(MetricsProducer[]::new); } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java index c95dcd9ad6a..b1ec160734e 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java @@ -39,6 +39,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.zookeeper.ZooSession; import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.manager.tableOps.delete.PreDeleteTable; import org.apache.accumulo.server.ServerContext; import org.apache.zookeeper.data.Stat; @@ -74,24 +75,24 @@ protected boolean isCancelled(FateId fateId, ServerContext context) { } } - private Manager manager; + private FateEnv fateEnv; private ServerContext ctx; private ZooSession zk; @BeforeEach public void setup() { - manager = createMock(Manager.class); + fateEnv = createMock(Manager.class); ctx = createMock(ServerContext.class); zk = createMock(ZooSession.class); expect(ctx.getInstanceID()).andReturn(instance).anyTimes(); expect(ctx.getZooSession()).andReturn(zk).anyTimes(); expect(zk.asReaderWriter()).andReturn(new ZooReaderWriter(zk)).anyTimes(); - expect(manager.getContext()).andReturn(ctx).anyTimes(); + expect(fateEnv.getContext()).andReturn(ctx).anyTimes(); } @AfterEach public void teardown() { - verify(manager, ctx, zk); + verify(fateEnv, ctx, zk); } @Test @@ -107,9 +108,9 @@ public void testTableBeingDeleted() throws Exception { } private void runDriver(CompactionDriver driver, String expectedMessage) { - replay(manager, ctx, zk); + replay(fateEnv, ctx, zk); var e = assertThrows(AcceptableThriftTableOperationException.class, - () -> driver.isReady(FateId.from(FateInstanceType.USER, UUID.randomUUID()), manager)); + () -> driver.isReady(FateId.from(FateInstanceType.USER, UUID.randomUUID()), fateEnv)); assertEquals(e.getTableId(), tableId.toString()); assertEquals(e.getOp(), TableOperation.COMPACT); assertEquals(e.getType(), TableOperationExceptionType.OTHER); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java index 9e3037dbe01..7bd6824f2f6 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java @@ -84,7 +84,7 @@ import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.time.SteadyTime; -import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.gc.AllVolumesDirectory; import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl; @@ -425,7 +425,7 @@ private static void testMerge(List inputTablets, TableId tableId end == null ? null : end.getBytes(UTF_8), MergeInfo.Operation.MERGE); MergeTablets mergeTablets = new MergeTablets(mergeInfo); - Manager manager = EasyMock.mock(Manager.class); + FateEnv fateEnv = EasyMock.mock(FateEnv.class); ServerContext context = EasyMock.mock(ServerContext.class); Ample ample = EasyMock.mock(Ample.class); TabletsMetadata.Builder tabletBuilder = EasyMock.mock(TabletsMetadata.Builder.class); @@ -438,7 +438,7 @@ private static void testMerge(List inputTablets, TableId tableId EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes(); // setup reading the tablets - EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce(); + EasyMock.expect(fateEnv.getContext()).andReturn(context).atLeastOnce(); EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce(); EasyMock.expect(ample.readTablets()).andReturn(tabletBuilder).once(); EasyMock.expect(tabletBuilder.forTable(tableId)).andReturn(tabletBuilder).once(); @@ -478,12 +478,12 @@ private static void testMerge(List inputTablets, TableId tableId ample.putGcFileAndDirCandidates(tableId, dirs); EasyMock.expectLastCall().once(); - EasyMock.replay(manager, context, ample, tabletBuilder, tabletsMetadata, tabletsMutator, + EasyMock.replay(fateEnv, context, ample, tabletBuilder, tabletsMetadata, tabletsMutator, tabletMutator, cr, managerLock); - mergeTablets.call(fateId, manager); + mergeTablets.call(fateId, fateEnv); - EasyMock.verify(manager, context, ample, tabletBuilder, tabletsMetadata, tabletsMutator, + EasyMock.verify(fateEnv, context, ample, tabletBuilder, tabletsMetadata, tabletsMutator, tabletMutator, cr, managerLock); } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java index 6ceac3d6810..dad40a0cb13 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java @@ -66,8 +66,8 @@ import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.time.SteadyTime; -import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.split.FileRangeCache; +import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl; import org.apache.hadoop.fs.Path; @@ -233,9 +233,9 @@ public void testManyColumns() throws Exception { String dir1 = "dir1"; String dir2 = "dir2"; - Manager manager = EasyMock.mock(Manager.class); + FateEnv fateEnv = EasyMock.mock(FateEnv.class); ServerContext context = EasyMock.mock(ServerContext.class); - EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce(); + EasyMock.expect(fateEnv.getContext()).andReturn(context).atLeastOnce(); Ample ample = EasyMock.mock(Ample.class); EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce(); FileRangeCache fileRangeCache = EasyMock.mock(FileRangeCache.class); @@ -247,8 +247,8 @@ public void testManyColumns() throws Exception { .andReturn(newFileInfo("d", "f")); EasyMock.expect(fileRangeCache.getCachedFileInfo(tableId, file4)) .andReturn(newFileInfo("d", "j")); - EasyMock.expect(manager.getFileRangeCache()).andReturn(fileRangeCache).atLeastOnce(); - EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100_000, TimeUnit.SECONDS)) + EasyMock.expect(fateEnv.getFileRangeCache()).andReturn(fileRangeCache).atLeastOnce(); + EasyMock.expect(fateEnv.getSteadyTime()).andReturn(SteadyTime.from(100_000, TimeUnit.SECONDS)) .atLeastOnce(); ServiceLock managerLock = EasyMock.mock(ServiceLock.class); @@ -394,7 +394,7 @@ public void testManyColumns() throws Exception { tabletsMutator.close(); EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(manager, context, ample, tabletMeta, fileRangeCache, tabletsMutator, + EasyMock.replay(fateEnv, context, ample, tabletMeta, fileRangeCache, tabletsMutator, tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions); // Now we can actually test the split code that writes the new tablets with a bunch columns in // the original tablet @@ -404,9 +404,9 @@ public void testManyColumns() throws Exception { dirNames.add(dir2); UpdateTablets updateTablets = new UpdateTablets( new SplitInfo(origExtent, TabletMergeabilityUtil.systemDefaultSplits(splits)), dirNames); - updateTablets.call(fateId, manager); + updateTablets.call(fateId, fateEnv); - EasyMock.verify(manager, context, ample, tabletMeta, fileRangeCache, tabletsMutator, + EasyMock.verify(fateEnv, context, ample, tabletMeta, fileRangeCache, tabletsMutator, tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions); } @@ -469,15 +469,15 @@ public void testErrors() throws Exception { private static void testError(KeyExtent origExtent, TabletMetadata tm1, FateId fateId) throws Exception { - Manager manager = EasyMock.mock(Manager.class); + FateEnv fateEnv = EasyMock.mock(FateEnv.class); ServerContext context = EasyMock.mock(ServerContext.class); - EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce(); + EasyMock.expect(fateEnv.getContext()).andReturn(context).atLeastOnce(); Ample ample = EasyMock.mock(Ample.class); EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce(); EasyMock.expect(ample.readTablet(origExtent)).andReturn(tm1); - EasyMock.replay(manager, context, ample); + EasyMock.replay(fateEnv, context, ample); // Now we can actually test the split code that writes the new tablets with a bunch columns in // the original tablet SortedSet splits = new TreeSet<>(List.of(new Text("c"))); @@ -485,8 +485,8 @@ private static void testError(KeyExtent origExtent, TabletMetadata tm1, FateId f dirNames.add("d1"); var updateTablets = new UpdateTablets( new SplitInfo(origExtent, TabletMergeabilityUtil.systemDefaultSplits(splits)), dirNames); - updateTablets.call(fateId, manager); + updateTablets.call(fateId, fateEnv); - EasyMock.verify(manager, context, ample); + EasyMock.verify(fateEnv, context, ample); } } diff --git a/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java b/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java index d0d8bdb5ed8..2367040c4a6 100644 --- a/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java @@ -84,7 +84,6 @@ * */ public class MultipleManagerFateIT extends ConfigurableMacBase { - // A manager that will quickly clean up fate reservations held by dead managers public static class FastFateCleanupManager extends Manager { protected FastFateCleanupManager(ServerOpts opts, String[] args) throws IOException { @@ -111,6 +110,10 @@ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSit cfg.getClusterServerConfiguration().setNumDefaultCompactors(8); // Set this lower so that locks timeout faster cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); + // This test could kill a manager after its written a compaction to the metadata table, but + // before it returns it to the compactor via RPC which creates a dead compaction. Need to speed + // up the dead compaction detection to handle this or else the test will hang. + cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, "5s"); cfg.setServerClass(ServerType.MANAGER, r -> FastFateCleanupManager.class); super.configure(cfg, hadoopCoreSite); } diff --git a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java index 8b4283f31db..834e9a10bdc 100644 --- a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java +++ b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java @@ -23,29 +23,29 @@ import java.time.Duration; import org.apache.accumulo.core.util.time.SteadyTime; -import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.ample.metadata.TestAmple.TestServerAmpleImpl; import org.easymock.EasyMock; public class TestAmpleUtil { - public static Manager mockWithAmple(ServerContext context, TestServerAmpleImpl ample) { - Manager manager = EasyMock.mock(Manager.class); - EasyMock.expect(manager.getContext()).andReturn(testAmpleServerContext(context, ample)) + public static FateEnv mockWithAmple(ServerContext context, TestServerAmpleImpl ample) { + FateEnv fateEnv = EasyMock.mock(FateEnv.class); + EasyMock.expect(fateEnv.getContext()).andReturn(testAmpleServerContext(context, ample)) .atLeastOnce(); - EasyMock.replay(manager); - return manager; + EasyMock.replay(fateEnv); + return fateEnv; } - public static Manager mockWithAmple(ServerContext context, TestServerAmpleImpl ample, + public static FateEnv mockWithAmple(ServerContext context, TestServerAmpleImpl ample, Duration currentTime) { - Manager manager = EasyMock.mock(Manager.class); - EasyMock.expect(manager.getContext()).andReturn(testAmpleServerContext(context, ample)) + FateEnv fateEnv = EasyMock.mock(FateEnv.class); + EasyMock.expect(fateEnv.getContext()).andReturn(testAmpleServerContext(context, ample)) .atLeastOnce(); - EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(currentTime)).anyTimes(); - EasyMock.replay(manager); - return manager; + EasyMock.expect(fateEnv.getSteadyTime()).andReturn(SteadyTime.from(currentTime)).anyTimes(); + EasyMock.replay(fateEnv); + return fateEnv; } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java index 1178f2d8ad8..a36f8704460 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java @@ -75,9 +75,9 @@ import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.harness.SharedMiniClusterBase; -import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason; import org.apache.accumulo.manager.tableOps.AbstractFateOperation; +import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.manager.tableOps.compact.CompactionDriver; import org.apache.accumulo.manager.tableOps.merge.DeleteRows; import org.apache.accumulo.manager.tableOps.merge.MergeInfo; @@ -133,7 +133,7 @@ public void testNoWalsMergeRepos(MergeInfo.Operation operation) throws Exception TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple .create(getCluster().getServerContext(), Map.of(DataLevel.USER, metadataTable)); testAmple.createMetadataFromExisting(client, tableId); - Manager manager = mockWithAmple(getCluster().getServerContext(), testAmple); + FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple); // Create a test operation and fate id for testing merge and delete rows // and add operation to test metadata for the tablet @@ -148,15 +148,15 @@ public void testNoWalsMergeRepos(MergeInfo.Operation operation) throws Exception // Build either MergeTablets or DeleteRows repo for testing no WALs, both should check this // condition final MergeInfo mergeInfo = new MergeInfo(tableId, - manager.getContext().getNamespaceId(tableId), null, null, operation); + fateEnv.getContext().getNamespaceId(tableId), null, null, operation); final AbstractFateOperation repo = operation == Operation.MERGE ? new MergeTablets(mergeInfo) : new DeleteRows(mergeInfo); // Also test ReserveTablets isReady() final AbstractFateOperation reserve = new ReserveTablets(mergeInfo); // First, check no errors with the default case - assertEquals(0, reserve.isReady(fateId, manager)); - assertNotNull(repo.call(fateId, manager)); + assertEquals(0, reserve.isReady(fateId, fateEnv)); + assertNotNull(repo.call(fateId, fateEnv)); // Write a WAL to the test metadata and then re-run the repo to check for an error try (TabletsMutator tm = testAmple.mutateTablets()) { @@ -165,10 +165,10 @@ public void testNoWalsMergeRepos(MergeInfo.Operation operation) throws Exception } // Should not be ready due to the presence of a WAL - assertTrue(reserve.isReady(fateId, manager) > 0); + assertTrue(reserve.isReady(fateId, fateEnv) > 0); // Repo should throw an exception due to the WAL existence - var thrown = assertThrows(IllegalStateException.class, () -> repo.call(fateId, manager)); + var thrown = assertThrows(IllegalStateException.class, () -> repo.call(fateId, fateEnv)); assertTrue(thrown.getMessage().contains("has unexpected walogs")); } } @@ -200,57 +200,57 @@ public void testVerifyMergeability() throws Exception { TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple .create(getCluster().getServerContext(), Map.of(DataLevel.USER, metadataTable)); testAmple.createMetadataFromExisting(client, tableId); - Manager manager = + FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple, Duration.ofDays(1)); // Create a test fate id var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); // Tablet c is set to never merge - MergeInfo mergeInfo = new MergeInfo(tableId, manager.getContext().getNamespaceId(tableId), + MergeInfo mergeInfo = new MergeInfo(tableId, fateEnv.getContext().getNamespaceId(tableId), null, new Text("c").getBytes(), Operation.SYSTEM_MERGE); - var repo = new VerifyMergeability(mergeInfo).call(fateId, manager); + var repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv); assertInstanceOf(UnreserveSystemMerge.class, repo); assertEquals(UnmergeableReason.TABLET_MERGEABILITY, ((UnreserveSystemMerge) repo).getReason()); // Tablets a and b are always merge - mergeInfo = new MergeInfo(tableId, manager.getContext().getNamespaceId(tableId), null, + mergeInfo = new MergeInfo(tableId, fateEnv.getContext().getNamespaceId(tableId), null, new Text("b").getBytes(), Operation.SYSTEM_MERGE); - assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, manager)); + assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, fateEnv)); - var context = manager.getContext(); + var context = fateEnv.getContext(); // split threshold is 10k so default max merge size is 2500 bytes. // this adds 6 files of 450 each which puts the tablets over teh 2500 threshold addFileMetadata(context, tableId, null, new Text("c"), 3, 450); // Data written to the first two tablets totals 2700 bytes and is too large - repo = new VerifyMergeability(mergeInfo).call(fateId, manager); + repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv); assertInstanceOf(UnreserveSystemMerge.class, repo); assertEquals(UnmergeableReason.MAX_TOTAL_SIZE, ((UnreserveSystemMerge) repo).getReason()); // Not enough time has passed for Tablet, should be able to merge d and e - mergeInfo = new MergeInfo(tableId, manager.getContext().getNamespaceId(tableId), + mergeInfo = new MergeInfo(tableId, fateEnv.getContext().getNamespaceId(tableId), new Text("c").getBytes(), new Text("e").getBytes(), Operation.SYSTEM_MERGE); - repo = new VerifyMergeability(mergeInfo).call(fateId, manager); + repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv); assertInstanceOf(UnreserveSystemMerge.class, repo); assertEquals(UnmergeableReason.TABLET_MERGEABILITY, ((UnreserveSystemMerge) repo).getReason()); // update time to 3 days so enough time has passed - manager = mockWithAmple(getCluster().getServerContext(), testAmple, Duration.ofDays(3)); - assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, manager)); + fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple, Duration.ofDays(3)); + assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, fateEnv)); // last 3 tablets should total 9 files which is < max of 10 - mergeInfo = new MergeInfo(tableId, manager.getContext().getNamespaceId(tableId), + mergeInfo = new MergeInfo(tableId, fateEnv.getContext().getNamespaceId(tableId), new Text("c").getBytes(), null, Operation.SYSTEM_MERGE); addFileMetadata(context, tableId, new Text("c"), null, 3, 10); - assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, manager)); + assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, fateEnv)); // last 3 tablets should total 12 files which is > max of 10 addFileMetadata(context, tableId, new Text("c"), null, 4, 10); - repo = new VerifyMergeability(mergeInfo).call(fateId, manager); + repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv); assertInstanceOf(UnreserveSystemMerge.class, repo); assertEquals(UnmergeableReason.MAX_FILE_COUNT, ((UnreserveSystemMerge) repo).getReason()); } @@ -306,7 +306,7 @@ public void testSplitOffline() throws Exception { testAmple.mutateTablet(extent) .putOperation(TabletOperationId.from(TabletOperationType.SPLITTING, fateId)).mutate(); - Manager manager = mockWithAmple(getCluster().getServerContext(), testAmple); + FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple); assertEquals(opid, testAmple.readTablet(extent).getOperationId()); @@ -314,7 +314,7 @@ public void testSplitOffline() throws Exception { TabletMergeabilityUtil.systemDefaultSplits(new TreeSet<>(List.of(new Text("sp1")))))); // The repo should delete the opid and throw an exception - assertThrows(ThriftTableOperationException.class, () -> eoRepo.call(fateId, manager)); + assertThrows(ThriftTableOperationException.class, () -> eoRepo.call(fateId, fateEnv)); // the operation id should have been cleaned up before the exception was thrown assertNull(testAmple.readTablet(extent).getOperationId()); @@ -347,11 +347,11 @@ public void testFindSplitsUnsplittable() throws Exception { not(SplitColumnFamily.UNSPLITTABLE_COLUMN)); KeyExtent extent = new KeyExtent(tableId, null, null); - Manager manager = mockWithAmple(getCluster().getServerContext(), testAmple); + FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple); FindSplits findSplits = new FindSplits(extent); PreSplit preSplit = (PreSplit) findSplits - .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), manager); + .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), fateEnv); // The table should not need splitting assertNull(preSplit); @@ -366,7 +366,7 @@ public void testFindSplitsUnsplittable() throws Exception { findSplits = new FindSplits(extent); preSplit = (PreSplit) findSplits.call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), - manager); + fateEnv); // The table SHOULD now need splitting assertNotNull(preSplit); @@ -408,11 +408,11 @@ public void testFindSplitsDeleteUnsplittable() throws Exception { not(SplitColumnFamily.UNSPLITTABLE_COLUMN)); KeyExtent extent = new KeyExtent(tableId, null, null); - Manager manager = mockWithAmple(getCluster().getServerContext(), testAmple); + FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple); FindSplits findSplits = new FindSplits(extent); PreSplit preSplit = (PreSplit) findSplits - .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), manager); + .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), fateEnv); // The table should not need splitting assertNull(preSplit); @@ -428,7 +428,7 @@ public void testFindSplitsDeleteUnsplittable() throws Exception { findSplits = new FindSplits(extent); preSplit = (PreSplit) findSplits.call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), - manager); + fateEnv); // The table SHOULD not need splitting assertNull(preSplit); @@ -462,8 +462,8 @@ public void testCompactionDriverCleanup(Pair rangeText) throws Except TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple .create(getCluster().getServerContext(), Map.of(DataLevel.USER, metadataTable)); testAmple.createMetadataFromExisting(client, tableId); - Manager manager = mockWithAmple(getCluster().getServerContext(), testAmple); - var ctx = manager.getContext(); + FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple); + var ctx = fateEnv.getContext(); // Create the CompactionDriver to test with the given range passed into the method final AbstractFateOperation repo = new CompactionDriver(ctx.getNamespaceId(tableId), tableId, @@ -489,7 +489,7 @@ public void testCompactionDriverCleanup(Pair rangeText) throws Except assertEquals(4, extents.size()); // First call undo using the second fateId and verify there's still metadata for the first one - repo.undo(fateId2, manager); + repo.undo(fateId2, fateEnv); try (TabletsMetadata tabletsMetadata = testAmple.readTablets().forTable(tableId).build()) { tabletsMetadata.forEach(tm -> { assertHasCompactionMetadata(fateId1, tm); @@ -499,7 +499,7 @@ public void testCompactionDriverCleanup(Pair rangeText) throws Except // Now call undo on the first fateId which would clean up all the metadata for all the // tablets that overlap with the given range that was provided to the CompactionDriver // during the creation of the repo - repo.undo(fateId1, manager); + repo.undo(fateId1, fateEnv); // First, iterate over only the overlapping tablets and verify that those tablets // were cleaned up and remove any visited tablets from the extents set From d73919c3fb8ea1543eefc39c10f3e68e62afb461 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 20 Mar 2026 19:02:34 +0000 Subject: [PATCH 4/6] fix compile error --- .../java/org/apache/accumulo/manager/fate/FateWorker.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java index 5b0261171d0..f60fc453c0a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java @@ -68,7 +68,6 @@ public class FateWorker implements FateWorkerService.Iface { private final LiveTServerSet liveTserverSet; private final FateFactory fateFactory; private final Map> fates = new ConcurrentHashMap<>(); - private FateWorkerEnv fateWorkerEnv; public interface FateFactory { Fate create(FateEnv env, FateStore store, ServerContext context); @@ -79,11 +78,10 @@ public FateWorker(ServerContext ctx, LiveTServerSet liveTServerSet, FateFactory this.security = ctx.getSecurityOperation(); this.liveTserverSet = liveTServerSet; this.fateFactory = fateFactory; - Thread.interrupted() } public synchronized void setLock(ServiceLock lock) { - fateWorkerEnv = new FateWorkerEnv(context, lock, liveTserverSet); + FateWorkerEnv fateWorkerEnv = new FateWorkerEnv(context, lock, liveTserverSet); Predicate isLockHeld = l -> ServiceLock.isLockHeld(context.getZooCache(), l); try { MetaFateStore metaStore = From b091f7bacdcf631c243b16b13117e822e97194db Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 20 Mar 2026 19:18:08 +0000 Subject: [PATCH 5/6] fix test --- .../manager/tableOps/compact/CompactionDriverTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java index b1ec160734e..8d407d763dc 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java @@ -38,7 +38,6 @@ import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.zookeeper.ZooSession; -import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.manager.tableOps.delete.PreDeleteTable; import org.apache.accumulo.server.ServerContext; @@ -81,7 +80,7 @@ protected boolean isCancelled(FateId fateId, ServerContext context) { @BeforeEach public void setup() { - fateEnv = createMock(Manager.class); + fateEnv = createMock(FateEnv.class); ctx = createMock(ServerContext.class); zk = createMock(ZooSession.class); expect(ctx.getInstanceID()).andReturn(instance).anyTimes(); From 380e5515d2bb8d921468619f112b8846ce1a40b1 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 20 Mar 2026 23:36:07 +0000 Subject: [PATCH 6/6] Makes fate client available on all managers Before this change a fate client was only available on the primary manager. Now fate clients are avaiable on all managers. The primary manager publishes fate assignment locations in zookeeper. These locations are used by managers to send notifications to other managers when they seed a fate operation. --- .../org/apache/accumulo/core/Constants.java | 2 + .../core/clientImpl/ClientContext.java | 10 +- .../server/init/ZooKeeperInitializer.java | 5 + .../server/manager/FateLocations.java | 107 +++++++++++++ .../org/apache/accumulo/manager/Manager.java | 93 ++++++----- .../accumulo/manager/fate/FateManager.java | 94 ++--------- .../accumulo/manager/fate/FateNotifier.java | 150 ++++++++++++++++++ 7 files changed, 332 insertions(+), 129 deletions(-) create mode 100644 server/base/src/main/java/org/apache/accumulo/server/manager/FateLocations.java create mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/fate/FateNotifier.java diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index d2c556f2c2a..bb6e49eda45 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -52,6 +52,8 @@ public class Constants { public static final String ZMANAGER_ASSISTANT_LOCK = ZMANAGERS + "/assistants"; public static final String ZMANAGER_GOAL_STATE = ZMANAGERS + "/goal_state"; public static final String ZMANAGER_TICK = ZMANAGERS + "/tick"; + public static final String ZMANAGER_ASSIGNMENTS = ZMANAGERS + "/assignments"; + public static final String ZMANAGER_FATE_ASSIGNMENTS = ZMANAGER_ASSIGNMENTS + "/fate"; public static final String ZGC = "/gc"; public static final String ZGC_LOCK = ZGC + "/lock"; diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 8b65091c491..c9a1fb4d610 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -37,7 +37,6 @@ import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -1328,15 +1327,12 @@ public void clearTabletLocationCache() { } private static Set createPersistentWatcherPaths() { - Set pathsToWatch = new HashSet<>(); - for (String path : Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK, + return Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK, Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK, Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES, Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET, Constants.ZTEST_LOCK, - Constants.ZMANAGER_ASSISTANT_LOCK, Constants.ZRESOURCEGROUPS)) { - pathsToWatch.add(path); - } - return pathsToWatch; + Constants.ZMANAGER_ASSISTANT_LOCK, Constants.ZRESOURCEGROUPS, + Constants.ZMANAGER_ASSIGNMENTS); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java index b23c6746803..9d6a59607d3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java @@ -50,6 +50,7 @@ import org.apache.accumulo.server.conf.store.ResourceGroupPropKey; import org.apache.accumulo.server.conf.store.SystemPropKey; import org.apache.accumulo.server.log.WalStateManager; +import org.apache.accumulo.server.manager.FateLocations; import org.apache.accumulo.server.metadata.RootGcCandidates; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; @@ -180,6 +181,10 @@ void initialize(final ServerContext context, final String rootTabletDirName, ZooUtil.NodeExistsPolicy.FAIL); zrwChroot.putPersistentData(Constants.ZSHUTTING_DOWN_TSERVERS, EMPTY_BYTE_ARRAY, ZooUtil.NodeExistsPolicy.FAIL); + zrwChroot.putPersistentData(Constants.ZMANAGER_ASSIGNMENTS, EMPTY_BYTE_ARRAY, + ZooUtil.NodeExistsPolicy.FAIL); + FateLocations.storeLocations(zrwChroot, Map.of(), ZooUtil.NodeExistsPolicy.FAIL); + } /** diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/FateLocations.java b/server/base/src/main/java/org/apache/accumulo/server/manager/FateLocations.java new file mode 100644 index 00000000000..4a71ff2e085 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/FateLocations.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.manager; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.stream.Collectors.toUnmodifiableSet; +import static org.apache.accumulo.core.util.LazySingletons.GSON; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FatePartition; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.server.ServerContext; +import org.apache.zookeeper.KeeperException; + +import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; +import com.google.common.reflect.TypeToken; + +public class FateLocations { + + private final ServerContext context; + + private long lastUpdateCount; + private Map> lastLocations = null; + + public FateLocations(ServerContext context) { + this.context = context; + } + + public synchronized Map> getLocations() { + + var zooCache = context.getZooCache(); + + if (lastLocations == null || lastUpdateCount != zooCache.getUpdateCount()) { + lastUpdateCount = zooCache.getUpdateCount(); + var json = new String(context.getZooCache().get(Constants.ZMANAGER_FATE_ASSIGNMENTS), UTF_8); + var type = new TypeToken>>>() {}.getType(); + Map>> stringMap = GSON.get().fromJson(json, type); + Map> locations = new HashMap<>(); + stringMap.forEach((hp, parts) -> { + var partsSet = parts.stream().peek(part -> Preconditions.checkArgument(part.size() == 2)) + .map(part -> new FatePartition(FateId.from(part.get(0)), FateId.from(part.get(1)))) + .collect(toUnmodifiableSet()); + locations.put(HostAndPort.fromString(hp), partsSet); + }); + lastLocations = Map.copyOf(locations); + } + + return lastLocations; + } + + private static byte[] serialize(Map> assignments) { + Map>> jsonMap = new HashMap<>(); + assignments.forEach((hp, parts) -> { + var listParts = parts.stream() + .map(part -> List.of(part.start().canonical(), part.end().canonical())).toList(); + jsonMap.put(hp.toString(), listParts); + }); + + var json = GSON.get().toJson(jsonMap); + return json.getBytes(UTF_8); + } + + public static void storeLocations(ZooReaderWriter zoo, + Map> assignments, NodeExistsPolicy nodeExistsPolicy) { + try { + zoo.putPersistentData(Constants.ZMANAGER_FATE_ASSIGNMENTS, serialize(assignments), + nodeExistsPolicy); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Unable to set fate locations in zookeeper", e); + } + } + + public static void storeLocations(ServerContext context, + Map> assignments) { + try { + context.getZooSession().setData(Constants.ZMANAGER_FATE_ASSIGNMENTS, serialize(assignments), + -1); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Unable to set fate locations in zookeeper", e); + } + } + +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 204de1d79d9..9fb59deb181 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -46,7 +46,6 @@ import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -54,6 +53,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.function.Supplier; @@ -118,6 +119,7 @@ import org.apache.accumulo.core.zookeeper.ZcStat; import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; import org.apache.accumulo.manager.fate.FateManager; +import org.apache.accumulo.manager.fate.FateNotifier; import org.apache.accumulo.manager.fate.FateWorker; import org.apache.accumulo.manager.merge.FindMergeableRangeTask; import org.apache.accumulo.manager.metrics.fate.meta.MetaFateMetrics; @@ -129,6 +131,7 @@ import org.apache.accumulo.manager.upgrade.UpgradeCoordinator; import org.apache.accumulo.manager.upgrade.UpgradeCoordinator.UpgradeStatus; import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.AccumuloDataVersion; import org.apache.accumulo.server.PrimaryManagerThriftService; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.CompactionConfigStorage; @@ -215,10 +218,6 @@ public class Manager extends AbstractServer private ManagerState state = ManagerState.INITIAL; - // fateReadyLatch and fateRefs go together; when this latch is ready, then the fate references - // should already have been set; ConcurrentHashMap will guarantee that all threads will see - // the initialized fate references after the latch is ready - private final CountDownLatch fateReadyLatch = new CountDownLatch(1); private final AtomicReference>> fateClients = new AtomicReference<>(); private volatile FateManager fateManager; @@ -307,28 +306,7 @@ public boolean stillManager() { return getManagerState() != ManagerState.STOP; } - private void waitForFate() { - try { - // block up to 30 seconds until it's ready; if it's still not ready, introduce some logging - if (!fateReadyLatch.await(30, SECONDS)) { - String msgPrefix = "Unexpected use of fate in thread " + Thread.currentThread().getName() - + " at time " + System.currentTimeMillis(); - // include stack trace so we know where it's coming from, in case we need to troubleshoot it - log.warn("{} blocked until fate starts", msgPrefix, - new IllegalStateException("Attempted fate action before manager finished starting up; " - + "if this doesn't make progress, please report it as a bug to the developers")); - int minutes = 0; - while (!fateReadyLatch.await(5, MINUTES)) { - minutes += 5; - log.warn("{} still blocked after {} minutes; this is getting weird", msgPrefix, minutes); - } - log.debug("{} no longer blocked", msgPrefix); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Thread was interrupted; cannot proceed"); - } - } + private final Lock fateSetupLock = new ReentrantLock(); /** * Retrieve the FateClient object, blocking until it is ready. This could cause problems if Fate @@ -341,7 +319,23 @@ private void waitForFate() { * @return the FateClient object, only after the fate components are running and ready */ public FateClient fateClient(FateInstanceType type) { - waitForFate(); + if (fateClients.get() == null) { + // only want one thread trying to setup fate, lots of threads could call this before its setup + fateSetupLock.lock(); + try { + // check to see if another thread setup fate while we were waiting on the lock + if (fateClients.get() == null) { + // wait for upgrade to be complete + while (AccumuloDataVersion.getCurrentVersion(getContext()) < AccumuloDataVersion.get()) { + log.info("Attempted use of fate before upgrade complete, waiting for upgrade"); + UtilWaitThread.sleep(5000); + } + setupFate(getContext()); + } + } finally { + fateSetupLock.unlock(); + } + } var client = requireNonNull(fateClients.get(), "fateClients is not set yet").get(type); return requireNonNull(client, () -> "fate client type " + type + " is not present"); } @@ -921,7 +915,6 @@ private void setupPrimaryMetrics() { watchers.forEach(watcher -> metricsInfo.addMetricsProducers(watcher.getMetrics())); metricsInfo.addMetricsProducers(requireNonNull(compactionCoordinator)); // ensure fate is completely setup - Preconditions.checkState(fateReadyLatch.getCount() == 0); metricsInfo.addMetricsProducers(new MetaFateMetrics(getContext(), getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))); metricsInfo.addMetricsProducers(new UserFateMetrics(getContext(), @@ -1165,7 +1158,7 @@ boolean canSuspendTablets() { fateManager = new FateManager(getContext()); fateManager.start(); - fateClient(FateInstanceType.USER).setSeedingConsumer(fateManager::notifySeeded); + startFateMaintenance(); setupPrimaryMetrics(); @@ -1303,29 +1296,51 @@ protected Fate createFateInstance(FateEnv env, FateStore store private void setupFate(ServerContext context) { try { + Predicate isLockHeld = lock -> ServiceLock.isLockHeld(context.getZooCache(), lock); - var metaStore = new MetaFateStore(context.getZooSession(), - primaryManagerLock.getLockID(), isLockHeld); + var metaStore = + new MetaFateStore(context.getZooSession(), managerLock.getLockID(), isLockHeld); var metaFateClient = new FateClient<>(metaStore, TraceRepo::toLogString); var userStore = new UserFateStore(context, SystemTables.FATE.tableName(), managerLock.getLockID(), isLockHeld); var userFateClient = new FateClient<>(userStore, TraceRepo::toLogString); - var metaCleaner = new FateCleaner<>(metaStore, Duration.ofHours(8), this::getSteadyTime); - ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() - .scheduleWithFixedDelay(metaCleaner::ageOff, 10, 4 * 60, MINUTES)); - var userCleaner = new FateCleaner<>(userStore, Duration.ofHours(8), this::getSteadyTime); - ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() - .scheduleWithFixedDelay(userCleaner::ageOff, 10, 4 * 60, MINUTES)); + // wire up notifying the correct manager when a fate operation is seeded + FateNotifier fateNotifier = new FateNotifier(context); + fateNotifier.start(); + metaFateClient.setSeedingConsumer(fateNotifier::notifySeeded); + userFateClient.setSeedingConsumer(fateNotifier::notifySeeded); if (!fateClients.compareAndSet(null, Map.of(FateInstanceType.META, metaFateClient, FateInstanceType.USER, userFateClient))) { throw new IllegalStateException( "Unexpected previous fateClient reference map already initialized"); } + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Exception setting up Fate clients", e); + } + } + + /** + * Run fate maintenance task that only run in the primary manager. + */ + private void startFateMaintenance() { + try { + Predicate isLockHeld = + lock -> ServiceLock.isLockHeld(getContext().getZooCache(), lock); + + var metaStore = new MetaFateStore(getContext().getZooSession(), + managerLock.getLockID(), isLockHeld); + var userStore = new UserFateStore(getContext(), SystemTables.FATE.tableName(), + managerLock.getLockID(), isLockHeld); - fateReadyLatch.countDown(); + var metaCleaner = new FateCleaner<>(metaStore, Duration.ofHours(8), this::getSteadyTime); + ThreadPools.watchCriticalScheduledTask(getContext().getScheduledExecutor() + .scheduleWithFixedDelay(metaCleaner::ageOff, 10, 4 * 60, MINUTES)); + var userCleaner = new FateCleaner<>(userStore, Duration.ofHours(8), this::getSteadyTime); + ThreadPools.watchCriticalScheduledTask(getContext().getScheduledExecutor() + .scheduleWithFixedDelay(userCleaner::ageOff, 10, 4 * 60, MINUTES)); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException("Exception setting up FaTE cleanup thread", e); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java index 9d934913098..a5e64497ab1 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java @@ -26,7 +26,6 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.accumulo.core.fate.FateId; @@ -45,16 +44,14 @@ import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.manager.FateLocations; import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.collect.Range; -import com.google.common.collect.RangeMap; import com.google.common.collect.Sets; -import com.google.common.collect.TreeRangeMap; import com.google.common.net.HostAndPort; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -80,17 +77,10 @@ public FateManager(ServerContext context) { private final AtomicBoolean stop = new AtomicBoolean(false); - record FateHostPartition(HostAndPort hostPort, FatePartition partition) { - } - - private final AtomicReference> stableAssignments = - new AtomicReference<>(TreeRangeMap.create()); - - private final Map> pendingNotifications = new HashMap<>(); - private void manageAssistants() throws TException, InterruptedException { log.debug("Started Fate Manager"); long stableCount = 0; + long unstableCount = 0; outer: while (!stop.get()) { long sleepTime = Math.min(stableCount * 100, 5_000); @@ -113,18 +103,18 @@ private void manageAssistants() throws TException, InterruptedException { computeDesiredAssignments(currentAssignments, desiredParititions); if (desired.equals(currentAssignments)) { - RangeMap rangeMap = TreeRangeMap.create(); - currentAssignments.forEach((hostAndPort, partitions) -> { - partitions.forEach(partition -> { - rangeMap.put(Range.closed(partition.start(), partition.end()), - new FateHostPartition(hostAndPort, partition)); - }); - }); - stableAssignments.set(rangeMap); + if (stableCount == 0) { + FateLocations.storeLocations(context, currentAssignments); + } stableCount++; + unstableCount = 0; + continue; } else { - stableAssignments.set(TreeRangeMap.create()); + if (unstableCount == 0) { + FateLocations.storeLocations(context, Map.of()); + } stableCount = 0; + unstableCount++; } // are there any workers with extra partitions? If so need to unload those first. @@ -172,11 +162,9 @@ private void manageAssistants() throws TException, InterruptedException { } private Thread assignmentThread = null; - private Thread ntfyThread = null; public synchronized void start() { Preconditions.checkState(assignmentThread == null); - Preconditions.checkState(ntfyThread == null); Preconditions.checkState(!stop.get()); assignmentThread = Threads.createCriticalThread("Fate Manager", () -> { @@ -187,9 +175,6 @@ public synchronized void start() { } }); assignmentThread.start(); - - ntfyThread = Threads.createCriticalThread("Fate Notify", new NotifyTask()); - ntfyThread.start(); } @SuppressFBWarnings(value = "SWL_SLEEP_WITH_LOCK_HELD", @@ -206,9 +191,6 @@ public synchronized void stop(FateInstanceType fateType, Duration timeout) { if (assignmentThread != null) { assignmentThread.join(); } - if (ntfyThread != null) { - ntfyThread.join(); - } } catch (InterruptedException e) { throw new IllegalStateException(e); } @@ -235,8 +217,6 @@ public synchronized void stop(FateInstanceType fateType, Duration timeout) { } } - stableAssignments.set(TreeRangeMap.create()); - if (!timer.isExpired()) { FateStore store = switch (fateType) { case USER -> new UserFateStore(context, SystemTables.FATE.tableName(), null, null); @@ -264,58 +244,6 @@ public synchronized void stop(FateInstanceType fateType, Duration timeout) { } } - /** - * Makes a best effort to notify this fate operation was seeded. - */ - public void notifySeeded(FateId fateId) { - var hostPartition = stableAssignments.get().get(fateId); - if (hostPartition != null) { - synchronized (pendingNotifications) { - pendingNotifications.computeIfAbsent(hostPartition.hostPort(), k -> new HashSet<>()) - .add(hostPartition.partition()); - pendingNotifications.notify(); - } - } - } - - private class NotifyTask implements Runnable { - - @Override - public void run() { - while (!stop.get()) { - try { - Map> copy; - synchronized (pendingNotifications) { - if (pendingNotifications.isEmpty()) { - pendingNotifications.wait(100); - } - copy = Map.copyOf(pendingNotifications); - pendingNotifications.clear(); - } - - for (var entry : copy.entrySet()) { - HostAndPort address = entry.getKey(); - Set partitions = entry.getValue(); - FateWorkerService.Client client = - ThriftUtil.getClient(ThriftClientTypes.FATE_WORKER, address, context); - try { - log.trace("Notifying about seeding {} {}", address, partitions); - client.seeded(TraceUtil.traceInfo(), context.rpcCreds(), - partitions.stream().map(FatePartition::toThrift).toList()); - } finally { - ThriftUtil.returnClient(client, context); - } - } - - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } catch (TException e) { - log.warn("Failed to send notification that fate was seeded", e); - } - } - } - } - /** * Sets the complete set of partitions an assistant manager should work on. It will only succeed * if the update id is valid. The update id avoids race conditions w/ previously queued network diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateNotifier.java b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateNotifier.java new file mode 100644 index 00000000000..5789832277a --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateNotifier.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.fate; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FatePartition; +import org.apache.accumulo.core.manager.thrift.FateWorkerService; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.manager.FateLocations; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; +import com.google.common.net.HostAndPort; + +/** + * Responsible for sending notifications that fate operations were seeded between managers. These + * notifications are best effort, it's ok if they are lost. When lost it means fate operations will + * not be as responsive, but they will still eventually run. These notifications are important for + * interactive use of Accumulo where something like create table run in the shell should be + * responsive. Responsiveness is also important for Accumulo's integration test. + */ +public class FateNotifier { + + private static final Logger log = LoggerFactory.getLogger(FateNotifier.class); + + private final Map> pendingNotifications = new HashMap<>(); + private final ServerContext context; + private final AtomicBoolean stop = new AtomicBoolean(); + private final FateLocations fateLocations; + + private Map> lastLocations; + private RangeMap hostMapping; + + private Thread ntfyThread; + + public FateNotifier(ServerContext context) { + this.context = context; + this.fateLocations = new FateLocations(context); + } + + public synchronized void start() { + Preconditions.checkState(ntfyThread == null); + ntfyThread = Threads.createCriticalThread("Fate Notification Sender", new NotifyTask()); + ntfyThread.start(); + } + + record FateHostPartition(HostAndPort hostPort, FatePartition partition) { + } + + private synchronized RangeMap getHostMapping() { + + if (hostMapping == null || lastLocations != fateLocations.getLocations()) { + lastLocations = fateLocations.getLocations(); + RangeMap rangeMap = TreeRangeMap.create(); + lastLocations.forEach((hostAndPort, partitions) -> { + partitions.forEach(partition -> { + rangeMap.put(Range.closed(partition.start(), partition.end()), + new FateHostPartition(hostAndPort, partition)); + }); + }); + hostMapping = rangeMap; + } + + return hostMapping; + } + + /** + * Makes a best effort to notify the appropriate manager this fate operation was seeded. + */ + public void notifySeeded(FateId fateId) { + var hostPartition = getHostMapping().get(fateId); + if (hostPartition != null) { + synchronized (pendingNotifications) { + pendingNotifications.computeIfAbsent(hostPartition.hostPort(), k -> new HashSet<>()) + .add(hostPartition.partition()); + pendingNotifications.notify(); + } + } + } + + private class NotifyTask implements Runnable { + + @Override + public void run() { + while (!stop.get()) { + try { + Map> copy; + synchronized (pendingNotifications) { + if (pendingNotifications.isEmpty()) { + pendingNotifications.wait(100); + } + copy = Map.copyOf(pendingNotifications); + pendingNotifications.clear(); + } + + for (var entry : copy.entrySet()) { + HostAndPort address = entry.getKey(); + Set partitions = entry.getValue(); + FateWorkerService.Client client = + ThriftUtil.getClient(ThriftClientTypes.FATE_WORKER, address, context); + try { + log.trace("Notifying about seeding {} {}", address, partitions); + client.seeded(TraceUtil.traceInfo(), context.rpcCreds(), + partitions.stream().map(FatePartition::toThrift).toList()); + } finally { + ThriftUtil.returnClient(client, context); + } + } + + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } catch (TException e) { + log.warn("Failed to send notification that fate was seeded", e); + } + } + } + } + +}