Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ public class NSSummaryTask implements ReconOmTask {
private final NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy;
private final NSSummaryTaskWithOBS nsSummaryTaskWithOBS;

// Shared executor for the three FSO/Legacy/OBS sub-tasks during process().
// The sub-tasks operate on disjoint slices of the event stream (filtered by
// table and bucket layout) and write to disjoint NSSummary entries, so they
// are safe to run in parallel.
private final ExecutorService subTaskExecutor = Executors.newFixedThreadPool(
3, new ThreadFactoryBuilder().setNameFormat("NSSummarySubTask-%d").setDaemon(true).build());

/**
* Rebuild state enum to track NSSummary tree rebuild status.
*/
Expand Down Expand Up @@ -172,44 +179,58 @@ public String getDescription() {
@Override
public TaskResult process(
OMUpdateEventBatch events, Map<String, Integer> subTaskSeekPosMap) {
boolean anyFailure = false; // Track if any bucket fails
Map<String, Integer> updatedSeekPositions = new HashMap<>();

// Process FSO bucket
Integer bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.FSO.name(), 0);
Pair<Integer, Boolean> bucketResult = nsSummaryTaskWithFSO.processWithFSO(events, bucketSeek);
updatedSeekPositions.put(BucketType.FSO.name(), bucketResult.getLeft());
if (!bucketResult.getRight()) {
LOG.error("processWithFSO failed.");
anyFailure = true;
}

// Process Legacy bucket
bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.LEGACY.name(), 0);
bucketResult = nsSummaryTaskWithLegacy.processWithLegacy(events, bucketSeek);
updatedSeekPositions.put(BucketType.LEGACY.name(), bucketResult.getLeft());
if (!bucketResult.getRight()) {
LOG.error("processWithLegacy failed.");
anyFailure = true;
}

// Process OBS bucket
bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.OBS.name(), 0);
bucketResult = nsSummaryTaskWithOBS.processWithOBS(events, bucketSeek);
updatedSeekPositions.put(BucketType.OBS.name(), bucketResult.getLeft());
if (!bucketResult.getRight()) {
LOG.error("processWithOBS failed.");
anyFailure = true;
}
int fsoSeek = subTaskSeekPosMap.getOrDefault(BucketType.FSO.name(), 0);
int legacySeek = subTaskSeekPosMap.getOrDefault(BucketType.LEGACY.name(), 0);
int obsSeek = subTaskSeekPosMap.getOrDefault(BucketType.OBS.name(), 0);

Future<Pair<Integer, Boolean>> fsoFuture = subTaskExecutor.submit(
() -> nsSummaryTaskWithFSO.processWithFSO(events, fsoSeek));
Future<Pair<Integer, Boolean>> legacyFuture = subTaskExecutor.submit(
() -> nsSummaryTaskWithLegacy.processWithLegacy(events, legacySeek));
Future<Pair<Integer, Boolean>> obsFuture = subTaskExecutor.submit(
() -> nsSummaryTaskWithOBS.processWithOBS(events, obsSeek));

boolean anyFailure = false;
anyFailure |= !awaitSubTask("processWithFSO", BucketType.FSO,
fsoFuture, fsoSeek, updatedSeekPositions);
anyFailure |= !awaitSubTask("processWithLegacy", BucketType.LEGACY,
legacyFuture, legacySeek, updatedSeekPositions);
anyFailure |= !awaitSubTask("processWithOBS", BucketType.OBS,
obsFuture, obsSeek, updatedSeekPositions);

// Return task failure if any bucket failed, while keeping each bucket's latest seek position
return new TaskResult.Builder()
.setTaskName(getTaskName())
.setSubTaskSeekPositions(updatedSeekPositions)
.setTaskSuccess(!anyFailure)
.build();
}

private boolean awaitSubTask(String name, BucketType type,
Future<Pair<Integer, Boolean>> future,
int fallbackSeek,
Map<String, Integer> updatedSeekPositions) {
try {
Pair<Integer, Boolean> result = future.get();
updatedSeekPositions.put(type.name(), result.getLeft());
if (!result.getRight()) {
LOG.error("{} failed.", name);
return false;
}
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("{} interrupted.", name, e);
updatedSeekPositions.put(type.name(), fallbackSeek);
return false;
} catch (ExecutionException e) {
LOG.error("{} threw an exception.", name, e.getCause());
updatedSeekPositions.put(type.name(), fallbackSeek);
return false;
}
}

@Override
public TaskResult reprocess(OMMetadataManager omMetadataManager) {
// Unified control for all NSS tree rebuild operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.recon.ReconUtils;
Expand All @@ -43,6 +45,14 @@ public class NSSummaryTaskDbEventHandler {
private ReconNamespaceSummaryManager reconNamespaceSummaryManager;
private ReconOMMetadataManager reconOMMetadataManager;

// Bucket layout never changes for an existing bucket, so cache OmBucketInfo
// lookups across process() calls. Each delta loop hits at most a few buckets;
// without this cache, every event pays a RocksDB point read in the Legacy and
// OBS sub-tasks.
//
// Single-thread access only (one dispatcher thread per task). HashMap is fine.
private final Map<String, OmBucketInfo> bucketInfoCache = new HashMap<>();

public NSSummaryTaskDbEventHandler(ReconNamespaceSummaryManager
reconNamespaceSummaryManager,
ReconOMMetadataManager
Expand All @@ -51,6 +61,22 @@ public NSSummaryTaskDbEventHandler(ReconNamespaceSummaryManager
this.reconOMMetadataManager = reconOMMetadataManager;
}

/** Look up an {@link OmBucketInfo} via {@code getBucketTable().getSkipCache}
* and cache the result. Bucket layout/object-id are immutable for an existing
* bucket, so an unbounded field-level cache is safe and avoids one RocksDB
* point read per event in the per-event sub-task loops. */
protected OmBucketInfo lookupBucketCached(String bucketDBKey) throws IOException {
OmBucketInfo cached = bucketInfoCache.get(bucketDBKey);
if (cached != null) {
return cached;
}
OmBucketInfo info = reconOMMetadataManager.getBucketTable().getSkipCache(bucketDBKey);
if (info != null) {
bucketInfoCache.put(bucketDBKey, info);
}
return info;
}

public ReconNamespaceSummaryManager getReconNamespaceSummaryManager() {
return reconNamespaceSummaryManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,7 @@ private long setParentBucketId(OmKeyInfo keyInfo)
throws IOException {
String bucketKey = getReconOMMetadataManager()
.getBucketKey(keyInfo.getVolumeName(), keyInfo.getBucketName());
OmBucketInfo parentBucketInfo =
getReconOMMetadataManager().getBucketTable().getSkipCache(bucketKey);
OmBucketInfo parentBucketInfo = lookupBucketCached(bucketKey);

if (parentBucketInfo != null) {
return parentBucketInfo.getObjectID();
Expand All @@ -388,8 +387,7 @@ private boolean isBucketLayoutValid(ReconOMMetadataManager metadataManager,
String volumeName = keyInfo.getVolumeName();
String bucketName = keyInfo.getBucketName();
String bucketDBKey = metadataManager.getBucketKey(volumeName, bucketName);
OmBucketInfo omBucketInfo =
metadataManager.getBucketTable().getSkipCache(bucketDBKey);
OmBucketInfo omBucketInfo = lookupBucketCached(bucketDBKey);

if (omBucketInfo.getBucketLayout() != LEGACY_BUCKET_LAYOUT) {
LOG.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,15 +234,13 @@ public Pair<Integer, Boolean> processWithOBS(OMUpdateEventBatch events,
String bucketName = updatedKeyInfo.getBucketName();
String bucketDBKey =
getReconOMMetadataManager().getBucketKey(volumeName, bucketName);
// Get bucket info from bucket table
OmBucketInfo omBucketInfo = getReconOMMetadataManager().getBucketTable()
.getSkipCache(bucketDBKey);
OmBucketInfo omBucketInfo = lookupBucketCached(bucketDBKey);

if (omBucketInfo.getBucketLayout() != BUCKET_LAYOUT) {
continue;
}

long parentObjectID = getKeyParentID(updatedKeyInfo);
long parentObjectID = omBucketInfo.getObjectID();

switch (action) {
case PUT:
Expand All @@ -253,9 +251,10 @@ public Pair<Integer, Boolean> processWithOBS(OMUpdateEventBatch events,
break;
case UPDATE:
if (oldKeyInfo != null) {
// delete first, then put
long oldKeyParentObjectID = getKeyParentID(oldKeyInfo);
handleDeleteKeyEvent(oldKeyInfo, nsSummaryMap, oldKeyParentObjectID);
// For OBS, parent is always the bucket, so same parentObjectID
// applies to old and new (a key cannot move between buckets via
// an UPDATE event — that would be a delete+put).
handleDeleteKeyEvent(oldKeyInfo, nsSummaryMap, parentObjectID);
} else {
LOG.warn("Update event does not have the old keyInfo for {}.",
updatedKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -136,24 +137,29 @@ public class TestProcess {

private NSSummary nsSummaryForBucket1;
private NSSummary nsSummaryForBucket2;
private NSSummary nsSummaryForBucket3;
private ReconOmTask.TaskResult processResult;

@BeforeEach
public void setUp() throws IOException {
nSSummaryTask.reprocess(getReconOMMetadataManager());
nSSummaryTask.process(processEventBatch(), Collections.emptyMap());
// Exercise process() across all three bucket layouts in a single batch
// so the parallel sub-task dispatch is covered end-to-end.
processResult = nSSummaryTask.process(processEventBatch(), Collections.emptyMap());

nsSummaryForBucket1 =
getReconNamespaceSummaryManager().getNSSummary(BUCKET_ONE_OBJECT_ID);
assertNotNull(nsSummaryForBucket1);
nsSummaryForBucket2 =
getReconNamespaceSummaryManager().getNSSummary(BUCKET_TWO_OBJECT_ID);
assertNotNull(nsSummaryForBucket2);
NSSummary nsSummaryForBucket3 = getReconNamespaceSummaryManager().getNSSummary(BUCKET_THREE_OBJECT_ID);
nsSummaryForBucket3 =
getReconNamespaceSummaryManager().getNSSummary(BUCKET_THREE_OBJECT_ID);
assertNotNull(nsSummaryForBucket3);
}

private OMUpdateEventBatch processEventBatch() throws IOException {
// put file5 under bucket 2
// PUT file5 under bucket 2 (Legacy)
String omPutKey =
OM_KEY_PREFIX + VOL +
OM_KEY_PREFIX + BUCKET_TWO +
Expand All @@ -169,7 +175,7 @@ private OMUpdateEventBatch processEventBatch() throws IOException {
.setAction(OMDBUpdateEvent.OMDBUpdateAction.PUT)
.build();

// delete file 1 under bucket 1
// DELETE file1 under bucket 1 (FSO)
String omDeleteKey = BUCKET_ONE_OBJECT_ID + OM_KEY_PREFIX + FILE_ONE;
OmKeyInfo omDeleteInfo = buildOmKeyInfo(
VOL, BUCKET_ONE, KEY_ONE, FILE_ONE,
Expand All @@ -183,7 +189,24 @@ private OMUpdateEventBatch processEventBatch() throws IOException {
.setAction(OMDBUpdateEvent.OMDBUpdateAction.DELETE)
.build();

return new OMUpdateEventBatch(Arrays.asList(keyEvent1, keyEvent2), 0L);
// PUT file4 under bucket 3 (OBS) — exercises the OBS sub-task path so a
// regression in the OBS branch (e.g. missed events) is caught.
String omObsPutKey =
OM_KEY_PREFIX + VOL +
OM_KEY_PREFIX + BUCKET_THREE +
OM_KEY_PREFIX + KEY_FOUR;
OmKeyInfo omObsPutKeyInfo = buildOmKeyInfo(VOL, BUCKET_THREE, KEY_FOUR,
KEY_FOUR, KEY_FOUR_OBJECT_ID, BUCKET_THREE_OBJECT_ID, KEY_FOUR_SIZE);
OMDBUpdateEvent keyEvent3 = new OMDBUpdateEvent.
OMUpdateEventBuilder<String, OmKeyInfo>()
.setKey(omObsPutKey)
.setValue(omObsPutKeyInfo)
.setTable(getOmMetadataManager().getKeyTable(getOBSBucketLayout())
.getName())
.setAction(OMDBUpdateEvent.OMDBUpdateAction.PUT)
.build();

return new OMUpdateEventBatch(Arrays.asList(keyEvent1, keyEvent2, keyEvent3), 0L);
}

@Test
Expand Down Expand Up @@ -216,5 +239,24 @@ public void testProcessBucket() throws IOException {
assertEquals(0, fileSizeDist[i]);
}
}

@Test
public void testProcessObsBucket() {
// bucket 3 (OBS) had file3 from reprocess; the batch added file4.
assertEquals(2, nsSummaryForBucket3.getNumOfFiles());
assertEquals(KEY_THREE_SIZE + KEY_FOUR_SIZE,
nsSummaryForBucket3.getSizeOfFiles());
}

@Test
public void testProcessTaskResult() {
// Sub-task seek positions must be reported for all three layouts so the
// dispatcher can resume each sub-task independently on retry.
assertNotNull(processResult);
assertTrue(processResult.isTaskSuccess());
assertNotNull(processResult.getSubTaskSeekPositions().get(NSSummaryTask.BucketType.FSO.name()));
assertNotNull(processResult.getSubTaskSeekPositions().get(NSSummaryTask.BucketType.LEGACY.name()));
assertNotNull(processResult.getSubTaskSeekPositions().get(NSSummaryTask.BucketType.OBS.name()));
}
}
}