From 329f1c6042e98b578eba27e98e0b380338c638b5 Mon Sep 17 00:00:00 2001 From: Mike Miller Date: Tue, 29 Jun 2021 10:42:53 -0400 Subject: [PATCH 1/3] Add upgradeFiles to Upgrade code * Add upgradeFiles method for upgrading files to Upgrader * Refactor status handling in UpgradeCoordinator to allow upgradeFiles to run concurrently and wait for metadata upgrade to complete * Implement upgradeFiles method in Upgrader9to10 * Add dropSortedMapWALFiles for resolving sorted map files that may still be around during upgrade. Follow up for #2117 * Closes #2179 --- .../org/apache/accumulo/manager/Manager.java | 5 ++ .../manager/upgrade/UpgradeCoordinator.java | 58 +++++++++++++++++-- .../accumulo/manager/upgrade/Upgrader.java | 2 + .../manager/upgrade/Upgrader8to9.java | 5 ++ .../manager/upgrade/Upgrader9to10.java | 44 ++++++++++++++ .../manager/upgrade/Upgrader9to10Test.java | 42 ++++++++++++++ 6 files changed, 150 insertions(+), 6 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 0f289454c14..8b58ea5cbce 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -272,12 +272,14 @@ synchronized void setManagerState(ManagerState newState) { + " all logs and file a bug."); } upgradeMetadataFuture = upgradeCoordinator.upgradeMetadata(getContext(), nextEvent); + upgradeFilesFuture = upgradeCoordinator.upgradeFiles(getContext(), nextEvent); } } private final UpgradeCoordinator upgradeCoordinator = new UpgradeCoordinator(); private Future upgradeMetadataFuture; + private Future upgradeFilesFuture; private ManagerClientServiceHandler clientHandler; @@ -1119,6 +1121,9 @@ boolean canSuspendTablets() { if (null != upgradeMetadataFuture) { upgradeMetadataFuture.get(); } + if (null != upgradeFilesFuture) { + upgradeFilesFuture.get(); + } } catch (ExecutionException | InterruptedException e) { throw new IllegalStateException("Metadata upgrade failed", e); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index fb5f28344ad..25c405d540a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.manager.EventCoordinator; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.ServerContext; @@ -69,6 +70,16 @@ public boolean isParentLevelUpgraded(KeyExtent extent) { return extent.isMeta(); } }, + + /** + * This signifies that only zookeeper, root and metadata table have been upgraded. + */ + UPGRADED_METADATA { + @Override + public boolean isParentLevelUpgraded(KeyExtent extent) { + return !extent.isMeta(); + } + }, /** * This signifies that everything (zookeeper, root table, metadata table) is upgraded. */ @@ -98,7 +109,7 @@ public boolean isParentLevelUpgraded(KeyExtent extent) { private static Logger log = LoggerFactory.getLogger(UpgradeCoordinator.class); private int currentVersion; - private Map upgraders = Map.of(ServerConstants.SHORTEN_RFILE_KEYS, + private final Map upgraders = Map.of(ServerConstants.SHORTEN_RFILE_KEYS, new Upgrader8to9(), ServerConstants.CRYPTO_CHANGES, new Upgrader9to10()); private volatile UpgradeStatus status; @@ -166,7 +177,7 @@ public synchronized Future upgradeMetadata(ServerContext context, if (currentVersion < ServerConstants.DATA_VERSION) { return ThreadPools.createThreadPool(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, - "UpgradeMetadataThreads", new SynchronousQueue(), OptionalInt.empty(), false) + "UpgradeMetadataThreads", new SynchronousQueue<>(), OptionalInt.empty(), false) .submit(() -> { try { for (int v = currentVersion; v < ServerConstants.DATA_VERSION; v++) { @@ -181,10 +192,7 @@ public synchronized Future upgradeMetadata(ServerContext context, upgraders.get(v).upgradeMetadata(context); } - log.info("Updating persistent data version."); - ServerUtil.updateAccumuloVersion(context.getVolumeManager(), currentVersion); - log.info("Upgrade complete"); - setStatus(UpgradeStatus.COMPLETE, eventCoordinator); + setStatus(UpgradeStatus.UPGRADED_METADATA, eventCoordinator); } catch (Exception e) { handleFailure(e); } @@ -195,6 +203,44 @@ public synchronized Future upgradeMetadata(ServerContext context, } } + public synchronized Future upgradeFiles(ServerContext context, + EventCoordinator eventCoordinator) { + if (status == UpgradeStatus.COMPLETE) + return CompletableFuture.completedFuture(null); + + if (currentVersion < ServerConstants.DATA_VERSION) { + return ThreadPools.createThreadPool(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, + "UpgradeFilesThreads", new SynchronousQueue<>(), OptionalInt.empty(), false) + .submit(() -> { + try { + for (int v = currentVersion; v < ServerConstants.DATA_VERSION; v++) { + log.info("Upgrading files from data version {}", v); + upgraders.get(v).upgradeFiles(context); + } + + log.info("Upgrade files completed"); + while (status != UpgradeStatus.UPGRADED_METADATA) { + log.info("Waiting for upgrade metadata to complete"); + UtilWaitThread.sleepUninterruptibly(5, TimeUnit.SECONDS); + } + completeUpgrade(context, eventCoordinator); + } catch (Exception e) { + handleFailure(e); + } + return null; + }); + } else { + return CompletableFuture.completedFuture(null); + } + } + + private void completeUpgrade(ServerContext context, EventCoordinator eventCoordinator) { + log.info("Updating persistent data version."); + ServerUtil.updateAccumuloVersion(context.getVolumeManager(), currentVersion); + log.info("Upgrade complete"); + setStatus(UpgradeStatus.COMPLETE, eventCoordinator); + } + public UpgradeStatus getStatus() { return status; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader.java index 157a4740c6e..208fdc5cfbb 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader.java @@ -35,4 +35,6 @@ public interface Upgrader { void upgradeRoot(ServerContext ctx); void upgradeMetadata(ServerContext ctx); + + void upgradeFiles(ServerContext ctx); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader8to9.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader8to9.java index 81da2d8ec21..e9f1290cea6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader8to9.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader8to9.java @@ -41,4 +41,9 @@ public void upgradeMetadata(ServerContext ctx) { // There is no action that needs to be taken for metadata } + @Override + public void upgradeFiles(ServerContext ctx) { + // There is no action that needs to be taken for the files + } + } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java index 5e874c51063..4f96b303885 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java @@ -105,6 +105,12 @@ * #1642, and * #1643 as well. * + * + * Sorted recovery was updated to use RFiles instead of map files. So to prevent issues during + * tablet recovery, remove the old temporary map files and resort using RFiles. This is done in + * {@link #dropSortedMapWALFiles(VolumeManager)}. For more information see the following issues: + * #2117 and + * #2179 */ public class Upgrader9to10 implements Upgrader { @@ -144,6 +150,11 @@ public void upgradeMetadata(ServerContext ctx) { upgradeFileDeletes(ctx, Ample.DataLevel.USER); } + @Override + public void upgradeFiles(ServerContext ctx) { + dropSortedMapWALFiles(ctx.getVolumeManager()); + } + private void setMetaTableProps(ServerContext ctx) { try { TablePropUtil.setTableProperty(ctx, RootTable.ID, @@ -726,4 +737,37 @@ static Path resolveRelativeDelete(String oldDelete, String upgradeProperty) { } return new Path(upgradeProperty, VolumeManager.FileType.TABLE.getDirectory() + oldDelete); } + + /** + * Remove old temporary map files to prevent problems during recovery. + */ + static void dropSortedMapWALFiles(VolumeManager vm) { + Path recoveryDir = new Path("/accumulo/recovery"); + try { + if (!vm.exists(recoveryDir)) { + log.info("There are no recover files in /accumulo/recovery"); + return; + } + List directoriesToDrop = new ArrayList<>(); + for (FileStatus walDir : vm.listStatus(recoveryDir)) { + // map files will be in a directory starting with "part" + Path walDirPath = walDir.getPath(); + for (FileStatus dirOrFile : vm.listStatus(walDirPath)) { + if (dirOrFile.isDirectory()) { + directoriesToDrop.add(walDirPath); + break; + } + } + } + if (!directoriesToDrop.isEmpty()) { + log.info("Found {} old sorted map directories to delete.", directoriesToDrop.size()); + for (Path dir : directoriesToDrop) { + log.info("Deleting everything in old sorted map directory: {}", dir); + vm.deleteRecursively(dir); + } + } + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader9to10Test.java b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader9to10Test.java index a5752b917a1..25800da5b50 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader9to10Test.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader9to10Test.java @@ -24,11 +24,13 @@ import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -49,6 +51,7 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.gc.GcVolumeUtil; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -324,4 +327,43 @@ private void verifyPathsReplaced(List expected, List results assertEquals("Replacements should have update for every delete", deleteCount, updateCount); } + + @Test + public void testDropSortedMapWALs() throws IOException { + Path recoveryDir = new Path("/accumulo/recovery"); + VolumeManager fs = createMock(VolumeManager.class); + FileStatus[] dirs = new FileStatus[2]; + dirs[0] = createMock(FileStatus.class); + Path dir0 = new Path("/accumulo/recovery/A123456789"); + FileStatus[] dir0Files = new FileStatus[1]; + dir0Files[0] = createMock(FileStatus.class); + dirs[1] = createMock(FileStatus.class); + Path dir1 = new Path("/accumulo/recovery/B123456789"); + FileStatus[] dir1Files = new FileStatus[1]; + dir1Files[0] = createMock(FileStatus.class); + Path part1Dir = new Path("/accumulo/recovery/B123456789/part-r-0000"); + + expect(fs.exists(recoveryDir)).andReturn(true).once(); + expect(fs.listStatus(recoveryDir)).andReturn(dirs).once(); + expect(dirs[0].getPath()).andReturn(dir0).once(); + expect(fs.listStatus(dir0)).andReturn(dir0Files).once(); + expect(dir0Files[0].isDirectory()).andReturn(false).once(); + + expect(dirs[1].getPath()).andReturn(dir1).once(); + expect(fs.listStatus(dir1)).andReturn(dir1Files).once(); + expect(dir1Files[0].isDirectory()).andReturn(true).once(); + expect(dir1Files[0].getPath()).andReturn(part1Dir).once(); + + expect(fs.deleteRecursively(dir1)).andReturn(true).once(); + + replay(fs, dirs[0], dirs[1], dir0Files[0], dir1Files[0]); + Upgrader9to10.dropSortedMapWALFiles(fs); + + reset(fs); + + // test case where there is no recovery + expect(fs.exists(recoveryDir)).andReturn(false).once(); + replay(fs); + Upgrader9to10.dropSortedMapWALFiles(fs); + } } From 9eb10dca5443905a92e2c76ce125a620ca4e699b Mon Sep 17 00:00:00 2001 From: Mike Miller Date: Fri, 2 Jul 2021 09:07:09 -0400 Subject: [PATCH 2/3] Fix typo --- .../java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java index 4f96b303885..7167911b42d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java @@ -745,7 +745,7 @@ static void dropSortedMapWALFiles(VolumeManager vm) { Path recoveryDir = new Path("/accumulo/recovery"); try { if (!vm.exists(recoveryDir)) { - log.info("There are no recover files in /accumulo/recovery"); + log.info("There are no recovery files in /accumulo/recovery"); return; } List directoriesToDrop = new ArrayList<>(); From 03e72b38fcc07e59b70aa8ecce279d1f2f925ca1 Mon Sep 17 00:00:00 2001 From: Mike Miller Date: Fri, 30 Jul 2021 10:13:37 -0400 Subject: [PATCH 3/3] Revert new upgradeFiles and put method call in metadata upgrade --- .../org/apache/accumulo/manager/Manager.java | 5 -- .../manager/upgrade/UpgradeCoordinator.java | 58 ++----------------- .../accumulo/manager/upgrade/Upgrader.java | 2 - .../manager/upgrade/Upgrader8to9.java | 5 -- .../manager/upgrade/Upgrader9to10.java | 5 +- 5 files changed, 7 insertions(+), 68 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 8b58ea5cbce..0f289454c14 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -272,14 +272,12 @@ synchronized void setManagerState(ManagerState newState) { + " all logs and file a bug."); } upgradeMetadataFuture = upgradeCoordinator.upgradeMetadata(getContext(), nextEvent); - upgradeFilesFuture = upgradeCoordinator.upgradeFiles(getContext(), nextEvent); } } private final UpgradeCoordinator upgradeCoordinator = new UpgradeCoordinator(); private Future upgradeMetadataFuture; - private Future upgradeFilesFuture; private ManagerClientServiceHandler clientHandler; @@ -1121,9 +1119,6 @@ boolean canSuspendTablets() { if (null != upgradeMetadataFuture) { upgradeMetadataFuture.get(); } - if (null != upgradeFilesFuture) { - upgradeFilesFuture.get(); - } } catch (ExecutionException | InterruptedException e) { throw new IllegalStateException("Metadata upgrade failed", e); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index 25c405d540a..fb5f28344ad 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@ -27,7 +27,6 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.util.threads.ThreadPools; -import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.manager.EventCoordinator; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.ServerContext; @@ -70,16 +69,6 @@ public boolean isParentLevelUpgraded(KeyExtent extent) { return extent.isMeta(); } }, - - /** - * This signifies that only zookeeper, root and metadata table have been upgraded. - */ - UPGRADED_METADATA { - @Override - public boolean isParentLevelUpgraded(KeyExtent extent) { - return !extent.isMeta(); - } - }, /** * This signifies that everything (zookeeper, root table, metadata table) is upgraded. */ @@ -109,7 +98,7 @@ public boolean isParentLevelUpgraded(KeyExtent extent) { private static Logger log = LoggerFactory.getLogger(UpgradeCoordinator.class); private int currentVersion; - private final Map upgraders = Map.of(ServerConstants.SHORTEN_RFILE_KEYS, + private Map upgraders = Map.of(ServerConstants.SHORTEN_RFILE_KEYS, new Upgrader8to9(), ServerConstants.CRYPTO_CHANGES, new Upgrader9to10()); private volatile UpgradeStatus status; @@ -177,7 +166,7 @@ public synchronized Future upgradeMetadata(ServerContext context, if (currentVersion < ServerConstants.DATA_VERSION) { return ThreadPools.createThreadPool(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, - "UpgradeMetadataThreads", new SynchronousQueue<>(), OptionalInt.empty(), false) + "UpgradeMetadataThreads", new SynchronousQueue(), OptionalInt.empty(), false) .submit(() -> { try { for (int v = currentVersion; v < ServerConstants.DATA_VERSION; v++) { @@ -192,7 +181,10 @@ public synchronized Future upgradeMetadata(ServerContext context, upgraders.get(v).upgradeMetadata(context); } - setStatus(UpgradeStatus.UPGRADED_METADATA, eventCoordinator); + log.info("Updating persistent data version."); + ServerUtil.updateAccumuloVersion(context.getVolumeManager(), currentVersion); + log.info("Upgrade complete"); + setStatus(UpgradeStatus.COMPLETE, eventCoordinator); } catch (Exception e) { handleFailure(e); } @@ -203,44 +195,6 @@ public synchronized Future upgradeMetadata(ServerContext context, } } - public synchronized Future upgradeFiles(ServerContext context, - EventCoordinator eventCoordinator) { - if (status == UpgradeStatus.COMPLETE) - return CompletableFuture.completedFuture(null); - - if (currentVersion < ServerConstants.DATA_VERSION) { - return ThreadPools.createThreadPool(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, - "UpgradeFilesThreads", new SynchronousQueue<>(), OptionalInt.empty(), false) - .submit(() -> { - try { - for (int v = currentVersion; v < ServerConstants.DATA_VERSION; v++) { - log.info("Upgrading files from data version {}", v); - upgraders.get(v).upgradeFiles(context); - } - - log.info("Upgrade files completed"); - while (status != UpgradeStatus.UPGRADED_METADATA) { - log.info("Waiting for upgrade metadata to complete"); - UtilWaitThread.sleepUninterruptibly(5, TimeUnit.SECONDS); - } - completeUpgrade(context, eventCoordinator); - } catch (Exception e) { - handleFailure(e); - } - return null; - }); - } else { - return CompletableFuture.completedFuture(null); - } - } - - private void completeUpgrade(ServerContext context, EventCoordinator eventCoordinator) { - log.info("Updating persistent data version."); - ServerUtil.updateAccumuloVersion(context.getVolumeManager(), currentVersion); - log.info("Upgrade complete"); - setStatus(UpgradeStatus.COMPLETE, eventCoordinator); - } - public UpgradeStatus getStatus() { return status; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader.java index 208fdc5cfbb..157a4740c6e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader.java @@ -35,6 +35,4 @@ public interface Upgrader { void upgradeRoot(ServerContext ctx); void upgradeMetadata(ServerContext ctx); - - void upgradeFiles(ServerContext ctx); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader8to9.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader8to9.java index e9f1290cea6..81da2d8ec21 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader8to9.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader8to9.java @@ -41,9 +41,4 @@ public void upgradeMetadata(ServerContext ctx) { // There is no action that needs to be taken for metadata } - @Override - public void upgradeFiles(ServerContext ctx) { - // There is no action that needs to be taken for the files - } - } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java index 7167911b42d..597f88c875d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java @@ -148,10 +148,7 @@ public void upgradeMetadata(ServerContext ctx) { upgradeRelativePaths(ctx, Ample.DataLevel.USER); upgradeDirColumns(ctx, Ample.DataLevel.USER); upgradeFileDeletes(ctx, Ample.DataLevel.USER); - } - - @Override - public void upgradeFiles(ServerContext ctx) { + // special case where old files need to be deleted dropSortedMapWALFiles(ctx.getVolumeManager()); }