Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-8451. [Snapshot] Catch Exception and log it in generateSnapshotDiffReport #4600

Merged
merged 4 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public final class OmSnapshotManager implements AutoCloseable {
private static final String SNAP_DIFF_PURGED_JOB_TABLE_NAME =
"snap-diff-purged-job-table";

private final long diffCleanupServiceInterval;
private final ManagedColumnFamilyOptions columnFamilyOptions;
private final ManagedDBOptions options;
private final List<ColumnFamilyDescriptor> columnFamilyDescriptors;
Expand Down Expand Up @@ -234,7 +235,7 @@ public OmSnapshotManager(OzoneManager ozoneManager) {
ozoneManager, snapshotCache, snapDiffJobCf, snapDiffReportCf,
columnFamilyOptions, codecRegistry);

long diffCleanupServiceInterval = ozoneManager.getConfiguration()
diffCleanupServiceInterval = ozoneManager.getConfiguration()
.getTimeDuration(OZONE_OM_SNAPSHOT_DIFF_CLEANUP_SERVICE_RUN_INTERVAL,
OZONE_OM_SNAPSHOT_DIFF_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -748,4 +749,8 @@ public void close() {
options.close();
}
}

public long getDiffCleanupServiceInterval() {
return diffCleanupServiceInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,11 @@ public SnapshotDiffManager(ManagedRocksDB db,
this.executorService = new ThreadPoolExecutor(threadPoolSize,
threadPoolSize,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(threadPoolSize)
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadPoolSize),
new ThreadFactoryBuilder()
.setNameFormat("snapshot-diff-job-thread-id-%d")
.build()
);

Path path = Paths.get(differ.getMetadataDir(), "snapDiff");
Expand All @@ -222,6 +225,7 @@ public SnapshotDiffManager(ManagedRocksDB db,
// When we build snapDiff HA aware, we will revisit this.
// Details: https://github.com/apache/ozone/pull/4438#discussion_r1149788226
this.loadJobsOnStartUp();

isNativeRocksToolsLoaded = NativeLibraryLoader.getInstance()
.loadLibrary(NativeConstants.ROCKS_TOOLS_NATIVE_LIBRARY_NAME);
if (isNativeRocksToolsLoaded) {
Expand Down Expand Up @@ -363,8 +367,7 @@ public SnapshotDiffResponse getSnapshotDiffReport(
tsInfo.getSnapshotID();

SnapshotDiffJob snapDiffJob = getSnapDiffReportStatus(snapDiffJobKey,
volume, bucket, fromSnapshot.getName(), toSnapshot.getName(),
forceFullDiff);
volume, bucket, fsInfo.getName(), tsInfo.getName(), forceFullDiff);

OFSPath snapshotRoot = getSnapshotRootPath(volume, bucket);

Expand All @@ -383,10 +386,14 @@ public SnapshotDiffResponse getSnapshotDiffReport(
fromSnapshot.getName(), toSnapshot.getName(), new ArrayList<>(),
null), FAILED, defaultWaitTime);
case DONE:
SnapshotDiffReportOzone report =
createPageResponse(snapDiffJob.getJobId(), volume, bucket,
fromSnapshot, toSnapshot, index, pageSize);
SnapshotDiffReportOzone report = createPageResponse(snapDiffJob, volume,
bucket, fromSnapshot, toSnapshot, index, pageSize);
return new SnapshotDiffResponse(report, DONE, 0L);
case REJECTED:
return new SnapshotDiffResponse(
new SnapshotDiffReportOzone(snapshotRoot.toString(), volume, bucket,
fromSnapshot.getName(), toSnapshot.getName(), new ArrayList<>(),
null), REJECTED, defaultWaitTime);
default:
throw new IllegalStateException("Unknown snapshot job status: " +
snapDiffJob.getStatus());
Expand All @@ -397,26 +404,28 @@ public SnapshotDiffResponse getSnapshotDiffReport(
private static OFSPath getSnapshotRootPath(String volume, String bucket) {
org.apache.hadoop.fs.Path bucketPath = new org.apache.hadoop.fs.Path(
OZONE_URI_DELIMITER + volume + OZONE_URI_DELIMITER + bucket);
OFSPath path = new OFSPath(bucketPath, new OzoneConfiguration());
return path;
return new OFSPath(bucketPath, new OzoneConfiguration());
}

private SnapshotDiffReportOzone createPageResponse(final String jobId,
final String volume,
final String bucket,
final OmSnapshot fromSnapshot,
final OmSnapshot toSnapshot,
final int index,
final int pageSize)
throws IOException {
private SnapshotDiffReportOzone createPageResponse(
final SnapshotDiffJob snapDiffJob,
final String volume,
final String bucket,
final OmSnapshot fromSnapshot,
final OmSnapshot toSnapshot,
final int index,
final int pageSize
) throws IOException {
List<DiffReportEntry> diffReportList = new ArrayList<>();

OFSPath path = getSnapshotRootPath(volume, bucket);

boolean hasMoreEntries = true;

for (int idx = index; idx - index < pageSize; idx++) {
byte[] rawKey = codecRegistry.asRawData(jobId + DELIMITER + idx);
int idx;
for (idx = index; idx - index < pageSize; idx++) {
byte[] rawKey =
codecRegistry.asRawData(snapDiffJob.getJobId() + DELIMITER + idx);
byte[] bytes = snapDiffReportTable.get(rawKey);
if (bytes == null) {
hasMoreEntries = false;
Expand All @@ -425,14 +434,37 @@ private SnapshotDiffReportOzone createPageResponse(final String jobId,
diffReportList.add(codecRegistry.asObject(bytes, DiffReportEntry.class));
}

String tokenString = hasMoreEntries ?
String.valueOf(index + pageSize) : null;
String tokenString = hasMoreEntries ? String.valueOf(idx) : null;

if (!hasMoreEntries) {
checkReportsIntegrity(snapDiffJob, idx);
}

return new SnapshotDiffReportOzone(path.toString(), volume, bucket,
fromSnapshot.getName(), toSnapshot.getName(), diffReportList,
tokenString);
}

/**
* Check that total number of entries after creating the last page matches
* that the total number of entries set after the diff report generation.
* If check fails, it marks the job failed so that it is GC-ed by clean up
* service and throws the exception to client.
*/
private void checkReportsIntegrity(final SnapshotDiffJob diffJob,
final int totalDiffEntries)
throws IOException {
if (diffJob.getTotalDiffEntries() != totalDiffEntries) {
LOG.error("Expected TotalDiffEntries: {} but found only " +
"TotalDiffEntries: {}",
diffJob.getTotalDiffEntries(),
totalDiffEntries);
updateJobStatus(diffJob.getJobId(), DONE, FAILED);
throw new IOException("Report integrity check failed. Retry after: " +
ozoneManager.getOmSnapshotManager().getDiffCleanupServiceInterval());
}
}

@SuppressWarnings("parameternumber")
private synchronized SnapshotDiffResponse submitSnapDiffJob(
final String jobKey,
Expand Down Expand Up @@ -469,9 +501,8 @@ private synchronized SnapshotDiffResponse submitSnapDiffJob(
if (snapDiffJob.getStatus() != QUEUED) {
// Same request is submitted by another thread and already completed.
if (snapDiffJob.getStatus() == DONE) {
SnapshotDiffReportOzone report =
createPageResponse(snapDiffJob.getJobId(), volume, bucket,
fromSnapshot, toSnapshot, index, pageSize);
SnapshotDiffReportOzone report = createPageResponse(snapDiffJob, volume,
bucket, fromSnapshot, toSnapshot, index, pageSize);
return new SnapshotDiffResponse(report, DONE, 0L);
} else {
// Otherwise, return the same status as in DB with wait time.
Expand Down Expand Up @@ -501,7 +532,7 @@ private synchronized SnapshotDiffResponse submitSnapDiffJob(

LOG.info("Submitting snap diff report generation request for" +
" volume: {}, bucket: {}, fromSnapshot: {} and toSnapshot: {}",
volume, bucket, fromSnapshot.getName(), toSnapshot.getName());
volume, bucket, fsInfo.getName(), tsInfo.getName());

OFSPath snapshotRoot = getSnapshotRootPath(volume, bucket);

Expand All @@ -524,11 +555,21 @@ private synchronized SnapshotDiffResponse submitSnapDiffJob(
// before the cleanup kicks in.
snapDiffJobTable.remove(jobKey);
LOG.info("Exceeded the snapDiff parallel requests progressing " +
"limit. Please retry after {}.", defaultWaitTime);
"limit. Removed the jobKey: {}. Please retry after {}.",
jobKey, defaultWaitTime);
return new SnapshotDiffResponse(
new SnapshotDiffReportOzone(snapshotRoot.toString(), volume, bucket,
fromSnapshot.getName(), toSnapshot.getName(), new ArrayList<>(),
null), REJECTED, defaultWaitTime);
} catch (Exception exception) {
// Remove the entry from job table as well.
snapDiffJobTable.remove(jobKey);
LOG.error("Failure in job submission to the executor. Removed the" +
" jobKey: {}.", jobKey, exception);
return new SnapshotDiffResponse(
new SnapshotDiffReportOzone(snapshotRoot.toString(), volume, bucket,
fromSnapshot.getName(), toSnapshot.getName(), new ArrayList<>(),
null), FAILED, defaultWaitTime);
}
}

Expand Down Expand Up @@ -567,6 +608,12 @@ private void generateSnapshotDiffReport(final String jobKey,
final SnapshotInfo fsInfo,
final SnapshotInfo tsInfo,
final boolean forceFullDiff) {
LOG.info("Started snap diff report generation for volume: {} " +
"bucket: {}, fromSnapshot: {} and toSnapshot: {}," +
" fromSnapshot: {}, toSnapshot: {} ",
volume, bucket, fsInfo.getName(), tsInfo.getName(), fromSnapshot,
toSnapshot);

ColumnFamilyHandle fromSnapshotColumnFamily = null;
ColumnFamilyHandle toSnapshotColumnFamily = null;
ColumnFamilyHandle objectIDsColumnFamily = null;
Expand Down Expand Up @@ -644,8 +691,8 @@ private void generateSnapshotDiffReport(final String jobKey,
objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
tablePrefixes);
} catch (NativeLibraryNotLoadedException e) {
// Workaround to handle deletes if use of native rockstools for reading
// tombstone fails.
// Workaround to handle deletes if use of native rocksDb tool for
// reading tombstone fails.
// TODO: [SNAPSHOT] Update Rocksdb SSTFileIterator to read tombstone
deltaFilesForKeyOrFileTable.addAll(getSSTFileListForSnapshot(
fromSnapshot, tablesToLookUp));
Expand Down Expand Up @@ -704,11 +751,22 @@ private void generateSnapshotDiffReport(final String jobKey,
objectIDsToCheckMap,
objectIdToKeyNameMapForFromSnapshot,
objectIdToKeyNameMapForToSnapshot);

updateJobStatusToDone(jobKey, totalDiffEntries);
} catch (IOException | RocksDBException exception) {
updateJobStatus(jobKey, IN_PROGRESS, FAILED);
LOG.error("Caught checked exception during diff report generation for " +
"volume: {} bucket: {}, fromSnapshot: {} and toSnapshot: {}",
volume, bucket, fsInfo.getName(), tsInfo.getName(), exception);
// TODO: [SNAPSHOT] Fail gracefully.
throw new RuntimeException(exception);
} catch (Exception exception) {
updateJobStatus(jobKey, IN_PROGRESS, FAILED);
LOG.error("Caught unchecked exception during diff report generation " +
"for volume: {} bucket: {}, fromSnapshot: {} and toSnapshot: {}",
volume, bucket, fsInfo.getName(), tsInfo.getName(), exception);
// TODO: [SNAPSHOT] Fail gracefully.
throw new RuntimeException(exception.getCause());
throw new RuntimeException(exception);
} finally {
// Clean up: drop the intermediate column family and close them.
dropAndCloseColumnFamilyHandle(fromSnapshotColumnFamily);
Expand Down Expand Up @@ -790,7 +848,8 @@ private String getKeyOrDirectoryName(boolean isDirectory,
private Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
OmSnapshot toSnapshot,
List<String> tablesToLookUp,
SnapshotInfo fsInfo, SnapshotInfo tsInfo,
SnapshotInfo fsInfo,
SnapshotInfo tsInfo,
boolean useFullDiff,
Map<String, String> tablePrefixes,
String diffDir)
Expand Down Expand Up @@ -847,7 +906,9 @@ private long generateDiffReport(
final PersistentSet<byte[]> objectIDsToCheck,
final PersistentMap<byte[], byte[]> oldObjIdToKeyMap,
final PersistentMap<byte[], byte[]> newObjIdToKeyMap
) throws IOException {
) {

LOG.debug("Starting diff report generation for jobId: {}.", jobId);
ColumnFamilyHandle deleteDiffColumnFamily = null;
ColumnFamilyHandle renameDiffColumnFamily = null;
ColumnFamilyHandle createDiffColumnFamily = null;
Expand Down Expand Up @@ -973,7 +1034,6 @@ private long generateDiffReport(
// TODO: [SNAPSHOT] Fail gracefully.
throw new RuntimeException(e);
} finally {

dropAndCloseColumnFamilyHandle(deleteDiffColumnFamily);
dropAndCloseColumnFamilyHandle(renameDiffColumnFamily);
dropAndCloseColumnFamilyHandle(createDiffColumnFamily);
Expand Down Expand Up @@ -1030,9 +1090,10 @@ private synchronized void updateJobStatus(String jobKey,
JobStatus newStatus) {
SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey);
if (snapshotDiffJob.getStatus() != oldStatus) {
throw new IllegalStateException("Invalid job status. Current job " +
"status is '" + snapshotDiffJob.getStatus() + "', while '" +
oldStatus + "' is expected.");
throw new IllegalStateException("Invalid job status for jobID: " +
snapshotDiffJob.getJobId() + ". Job's current status is '" +
snapshotDiffJob.getStatus() + "', while '" + oldStatus +
"' is expected.");
}
snapshotDiffJob.setStatus(newStatus);
snapDiffJobTable.put(jobKey, snapshotDiffJob);
Expand All @@ -1042,10 +1103,12 @@ private synchronized void updateJobStatusToDone(String jobKey,
long totalNumberOfEntries) {
SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey);
if (snapshotDiffJob.getStatus() != IN_PROGRESS) {
throw new IllegalStateException("Invalid job status. Current job " +
"status is '" + snapshotDiffJob.getStatus() + "', while '" +
IN_PROGRESS + "' is expected.");
throw new IllegalStateException("Invalid job status for jobID: " +
snapshotDiffJob.getJobId() + ". Job's current status is '" +
snapshotDiffJob.getStatus() + "', while '" + IN_PROGRESS +
"' is expected.");
}

snapshotDiffJob.setStatus(DONE);
snapshotDiffJob.setTotalDiffEntries(totalNumberOfEntries);
snapDiffJobTable.put(jobKey, snapshotDiffJob);
Expand Down