diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java index 139190e4baa1..05dc8f4cd430 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java @@ -83,6 +83,13 @@ public class NSSummaryTask implements ReconOmTask { private final NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy; private final NSSummaryTaskWithOBS nsSummaryTaskWithOBS; + // Shared executor for the three FSO/Legacy/OBS sub-tasks during process(). + // The sub-tasks operate on disjoint slices of the event stream (filtered by + // table and bucket layout) and write to disjoint NSSummary entries, so they + // are safe to run in parallel. + private final ExecutorService subTaskExecutor = Executors.newFixedThreadPool( + 3, new ThreadFactoryBuilder().setNameFormat("NSSummarySubTask-%d").setDaemon(true).build()); + /** * Rebuild state enum to track NSSummary tree rebuild status. */ @@ -172,37 +179,27 @@ public String getDescription() { @Override public TaskResult process( OMUpdateEventBatch events, Map subTaskSeekPosMap) { - boolean anyFailure = false; // Track if any bucket fails Map updatedSeekPositions = new HashMap<>(); - // Process FSO bucket - Integer bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.FSO.name(), 0); - Pair bucketResult = nsSummaryTaskWithFSO.processWithFSO(events, bucketSeek); - updatedSeekPositions.put(BucketType.FSO.name(), bucketResult.getLeft()); - if (!bucketResult.getRight()) { - LOG.error("processWithFSO failed."); - anyFailure = true; - } - - // Process Legacy bucket - bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.LEGACY.name(), 0); - bucketResult = nsSummaryTaskWithLegacy.processWithLegacy(events, bucketSeek); - updatedSeekPositions.put(BucketType.LEGACY.name(), bucketResult.getLeft()); - if (!bucketResult.getRight()) { - LOG.error("processWithLegacy failed."); - anyFailure = true; - } - - // Process OBS bucket - bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.OBS.name(), 0); - bucketResult = nsSummaryTaskWithOBS.processWithOBS(events, bucketSeek); - updatedSeekPositions.put(BucketType.OBS.name(), bucketResult.getLeft()); - if (!bucketResult.getRight()) { - LOG.error("processWithOBS failed."); - anyFailure = true; - } + int fsoSeek = subTaskSeekPosMap.getOrDefault(BucketType.FSO.name(), 0); + int legacySeek = subTaskSeekPosMap.getOrDefault(BucketType.LEGACY.name(), 0); + int obsSeek = subTaskSeekPosMap.getOrDefault(BucketType.OBS.name(), 0); + + Future> fsoFuture = subTaskExecutor.submit( + () -> nsSummaryTaskWithFSO.processWithFSO(events, fsoSeek)); + Future> legacyFuture = subTaskExecutor.submit( + () -> nsSummaryTaskWithLegacy.processWithLegacy(events, legacySeek)); + Future> obsFuture = subTaskExecutor.submit( + () -> nsSummaryTaskWithOBS.processWithOBS(events, obsSeek)); + + boolean anyFailure = false; + anyFailure |= !awaitSubTask("processWithFSO", BucketType.FSO, + fsoFuture, fsoSeek, updatedSeekPositions); + anyFailure |= !awaitSubTask("processWithLegacy", BucketType.LEGACY, + legacyFuture, legacySeek, updatedSeekPositions); + anyFailure |= !awaitSubTask("processWithOBS", BucketType.OBS, + obsFuture, obsSeek, updatedSeekPositions); - // Return task failure if any bucket failed, while keeping each bucket's latest seek position return new TaskResult.Builder() .setTaskName(getTaskName()) .setSubTaskSeekPositions(updatedSeekPositions) @@ -210,6 +207,30 @@ public TaskResult process( .build(); } + private boolean awaitSubTask(String name, BucketType type, + Future> future, + int fallbackSeek, + Map updatedSeekPositions) { + try { + Pair result = future.get(); + updatedSeekPositions.put(type.name(), result.getLeft()); + if (!result.getRight()) { + LOG.error("{} failed.", name); + return false; + } + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("{} interrupted.", name, e); + updatedSeekPositions.put(type.name(), fallbackSeek); + return false; + } catch (ExecutionException e) { + LOG.error("{} threw an exception.", name, e.getCause()); + updatedSeekPositions.put(type.name(), fallbackSeek); + return false; + } + } + @Override public TaskResult reprocess(OMMetadataManager omMetadataManager) { // Unified control for all NSS tree rebuild operations diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java index cd0d10c6f9ea..7f1486bb6d56 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java @@ -20,8 +20,10 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.recon.ReconUtils; @@ -43,6 +45,14 @@ public class NSSummaryTaskDbEventHandler { private ReconNamespaceSummaryManager reconNamespaceSummaryManager; private ReconOMMetadataManager reconOMMetadataManager; + // Bucket layout never changes for an existing bucket, so cache OmBucketInfo + // lookups across process() calls. Each delta loop hits at most a few buckets; + // without this cache, every event pays a RocksDB point read in the Legacy and + // OBS sub-tasks. + // + // Single-thread access only (one dispatcher thread per task). HashMap is fine. + private final Map bucketInfoCache = new HashMap<>(); + public NSSummaryTaskDbEventHandler(ReconNamespaceSummaryManager reconNamespaceSummaryManager, ReconOMMetadataManager @@ -51,6 +61,22 @@ public NSSummaryTaskDbEventHandler(ReconNamespaceSummaryManager this.reconOMMetadataManager = reconOMMetadataManager; } + /** Look up an {@link OmBucketInfo} via {@code getBucketTable().getSkipCache} + * and cache the result. Bucket layout/object-id are immutable for an existing + * bucket, so an unbounded field-level cache is safe and avoids one RocksDB + * point read per event in the per-event sub-task loops. */ + protected OmBucketInfo lookupBucketCached(String bucketDBKey) throws IOException { + OmBucketInfo cached = bucketInfoCache.get(bucketDBKey); + if (cached != null) { + return cached; + } + OmBucketInfo info = reconOMMetadataManager.getBucketTable().getSkipCache(bucketDBKey); + if (info != null) { + bucketInfoCache.put(bucketDBKey, info); + } + return info; + } + public ReconNamespaceSummaryManager getReconNamespaceSummaryManager() { return reconNamespaceSummaryManager; } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java index 186a89e294ab..5225ffa1464a 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java @@ -363,8 +363,7 @@ private long setParentBucketId(OmKeyInfo keyInfo) throws IOException { String bucketKey = getReconOMMetadataManager() .getBucketKey(keyInfo.getVolumeName(), keyInfo.getBucketName()); - OmBucketInfo parentBucketInfo = - getReconOMMetadataManager().getBucketTable().getSkipCache(bucketKey); + OmBucketInfo parentBucketInfo = lookupBucketCached(bucketKey); if (parentBucketInfo != null) { return parentBucketInfo.getObjectID(); @@ -388,8 +387,7 @@ private boolean isBucketLayoutValid(ReconOMMetadataManager metadataManager, String volumeName = keyInfo.getVolumeName(); String bucketName = keyInfo.getBucketName(); String bucketDBKey = metadataManager.getBucketKey(volumeName, bucketName); - OmBucketInfo omBucketInfo = - metadataManager.getBucketTable().getSkipCache(bucketDBKey); + OmBucketInfo omBucketInfo = lookupBucketCached(bucketDBKey); if (omBucketInfo.getBucketLayout() != LEGACY_BUCKET_LAYOUT) { LOG.debug( diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java index a78439616729..aa5c3576ea8a 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java @@ -234,15 +234,13 @@ public Pair processWithOBS(OMUpdateEventBatch events, String bucketName = updatedKeyInfo.getBucketName(); String bucketDBKey = getReconOMMetadataManager().getBucketKey(volumeName, bucketName); - // Get bucket info from bucket table - OmBucketInfo omBucketInfo = getReconOMMetadataManager().getBucketTable() - .getSkipCache(bucketDBKey); + OmBucketInfo omBucketInfo = lookupBucketCached(bucketDBKey); if (omBucketInfo.getBucketLayout() != BUCKET_LAYOUT) { continue; } - long parentObjectID = getKeyParentID(updatedKeyInfo); + long parentObjectID = omBucketInfo.getObjectID(); switch (action) { case PUT: @@ -253,9 +251,10 @@ public Pair processWithOBS(OMUpdateEventBatch events, break; case UPDATE: if (oldKeyInfo != null) { - // delete first, then put - long oldKeyParentObjectID = getKeyParentID(oldKeyInfo); - handleDeleteKeyEvent(oldKeyInfo, nsSummaryMap, oldKeyParentObjectID); + // For OBS, parent is always the bucket, so same parentObjectID + // applies to old and new (a key cannot move between buckets via + // an UPDATE event — that would be a delete+put). + handleDeleteKeyEvent(oldKeyInfo, nsSummaryMap, parentObjectID); } else { LOG.warn("Update event does not have the old keyInfo for {}.", updatedKey); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java index 7de5f54b8815..9ef61d94b5cf 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; import java.io.IOException; @@ -136,11 +137,15 @@ public class TestProcess { private NSSummary nsSummaryForBucket1; private NSSummary nsSummaryForBucket2; + private NSSummary nsSummaryForBucket3; + private ReconOmTask.TaskResult processResult; @BeforeEach public void setUp() throws IOException { nSSummaryTask.reprocess(getReconOMMetadataManager()); - nSSummaryTask.process(processEventBatch(), Collections.emptyMap()); + // Exercise process() across all three bucket layouts in a single batch + // so the parallel sub-task dispatch is covered end-to-end. + processResult = nSSummaryTask.process(processEventBatch(), Collections.emptyMap()); nsSummaryForBucket1 = getReconNamespaceSummaryManager().getNSSummary(BUCKET_ONE_OBJECT_ID); @@ -148,12 +153,13 @@ public void setUp() throws IOException { nsSummaryForBucket2 = getReconNamespaceSummaryManager().getNSSummary(BUCKET_TWO_OBJECT_ID); assertNotNull(nsSummaryForBucket2); - NSSummary nsSummaryForBucket3 = getReconNamespaceSummaryManager().getNSSummary(BUCKET_THREE_OBJECT_ID); + nsSummaryForBucket3 = + getReconNamespaceSummaryManager().getNSSummary(BUCKET_THREE_OBJECT_ID); assertNotNull(nsSummaryForBucket3); } private OMUpdateEventBatch processEventBatch() throws IOException { - // put file5 under bucket 2 + // PUT file5 under bucket 2 (Legacy) String omPutKey = OM_KEY_PREFIX + VOL + OM_KEY_PREFIX + BUCKET_TWO + @@ -169,7 +175,7 @@ private OMUpdateEventBatch processEventBatch() throws IOException { .setAction(OMDBUpdateEvent.OMDBUpdateAction.PUT) .build(); - // delete file 1 under bucket 1 + // DELETE file1 under bucket 1 (FSO) String omDeleteKey = BUCKET_ONE_OBJECT_ID + OM_KEY_PREFIX + FILE_ONE; OmKeyInfo omDeleteInfo = buildOmKeyInfo( VOL, BUCKET_ONE, KEY_ONE, FILE_ONE, @@ -183,7 +189,24 @@ private OMUpdateEventBatch processEventBatch() throws IOException { .setAction(OMDBUpdateEvent.OMDBUpdateAction.DELETE) .build(); - return new OMUpdateEventBatch(Arrays.asList(keyEvent1, keyEvent2), 0L); + // PUT file4 under bucket 3 (OBS) — exercises the OBS sub-task path so a + // regression in the OBS branch (e.g. missed events) is caught. + String omObsPutKey = + OM_KEY_PREFIX + VOL + + OM_KEY_PREFIX + BUCKET_THREE + + OM_KEY_PREFIX + KEY_FOUR; + OmKeyInfo omObsPutKeyInfo = buildOmKeyInfo(VOL, BUCKET_THREE, KEY_FOUR, + KEY_FOUR, KEY_FOUR_OBJECT_ID, BUCKET_THREE_OBJECT_ID, KEY_FOUR_SIZE); + OMDBUpdateEvent keyEvent3 = new OMDBUpdateEvent. + OMUpdateEventBuilder() + .setKey(omObsPutKey) + .setValue(omObsPutKeyInfo) + .setTable(getOmMetadataManager().getKeyTable(getOBSBucketLayout()) + .getName()) + .setAction(OMDBUpdateEvent.OMDBUpdateAction.PUT) + .build(); + + return new OMUpdateEventBatch(Arrays.asList(keyEvent1, keyEvent2, keyEvent3), 0L); } @Test @@ -216,5 +239,24 @@ public void testProcessBucket() throws IOException { assertEquals(0, fileSizeDist[i]); } } + + @Test + public void testProcessObsBucket() { + // bucket 3 (OBS) had file3 from reprocess; the batch added file4. + assertEquals(2, nsSummaryForBucket3.getNumOfFiles()); + assertEquals(KEY_THREE_SIZE + KEY_FOUR_SIZE, + nsSummaryForBucket3.getSizeOfFiles()); + } + + @Test + public void testProcessTaskResult() { + // Sub-task seek positions must be reported for all three layouts so the + // dispatcher can resume each sub-task independently on retry. + assertNotNull(processResult); + assertTrue(processResult.isTaskSuccess()); + assertNotNull(processResult.getSubTaskSeekPositions().get(NSSummaryTask.BucketType.FSO.name())); + assertNotNull(processResult.getSubTaskSeekPositions().get(NSSummaryTask.BucketType.LEGACY.name())); + assertNotNull(processResult.getSubTaskSeekPositions().get(NSSummaryTask.BucketType.OBS.name())); + } } }