Skip to content
Permalink
Browse files
Optimizations to speedup resource registration + scanning internal di…
…rectories
  • Loading branch information
DImuthuUpe committed Oct 13, 2021
1 parent 875f809 commit 8227ff7c735be46fb182af688ea6bc753df9a0af
Showing 1 changed file with 72 additions and 48 deletions.
@@ -62,8 +62,9 @@ public OrchestratorEventProcessor(Configuration configuration, Notification noti
this.notificationClient = notificationClient;
}

private List<GenericResource> createResourceRecursively(String hostName, String storageId, String basePath,
String resourcePath, String resourceType, String user)
private List<GenericResource> createResourceWithParentDirectories(String hostName, String storageId, String basePath,
String resourcePath, String resourceType, String user,
Map<String, GenericResource> resourceCache)
throws Exception {

List<GenericResource> resourceList = new ArrayList<>();
@@ -77,6 +78,12 @@ private List<GenericResource> createResourceRecursively(String hostName, String
for (int i = 0; i < splitted.length - 1; i++) {
String resourceName = splitted[i];
currentPath = currentPath + "/" + resourceName;

if (resourceCache.containsKey(currentPath)) {
resourceList.add(resourceCache.get(currentPath));
continue;
}

String resourceId = Utils.getId(storageId + ":" + currentPath);
Optional<GenericResource> optionalGenericResource =
this.drmsConnector.createResource(notification.getAuthToken(),
@@ -92,6 +99,7 @@ private List<GenericResource> createResourceRecursively(String hostName, String
this.drmsConnector.addResourceMetadata(notification.getAuthToken(),
notification.getTenantId(), parentId, user, parentType, metadata);

resourceCache.put(currentPath, optionalGenericResource.get());
resourceList.add(optionalGenericResource.get());
} else {
logger.error("Could not create a resource for path {}", currentPath);
@@ -150,6 +158,7 @@ public void run() {
logger.info("Processing resource path {} on storage {}", notification.getResourcePath(),
notification.getBasePath());

Map<String, GenericResource> resourceCache = new HashMap<>();
try {

this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
@@ -196,10 +205,10 @@ public void run() {

// Creating parent resource

List<GenericResource> resourceList = createResourceRecursively(sourceHostName, sourceStorageId,
List<GenericResource> resourceList = createResourceWithParentDirectories(sourceHostName, sourceStorageId,
notification.getBasePath(),
notification.getResourcePath(),
"COLLECTION", adminUser);
"COLLECTION", adminUser, resourceCache);

shareResourcesWithUsers(Collections.singletonList(resourceList.get(resourceList.size() - 1)),
adminUser, owner, "VIEWER");
@@ -245,54 +254,15 @@ public void run() {
.putProperties("TENANT_ID", notification.getTenantId()).build();

AuthToken mftAuth = AuthToken.newBuilder().setDelegateAuth(delegateAuth).build();

FetchResourceMetadataRequest.Builder resourceMetadataReq = FetchResourceMetadataRequest.newBuilder()
.setMftAuthorizationToken(mftAuth)
.setResourceId(resourceObj.getResourceId());

switch (sourceSP.getStorageCase()) {
case SSH_STORAGE_PREFERENCE:
resourceMetadataReq.setResourceType("SCP");
resourceMetadataReq.setResourceToken(sourceSP.getSshStoragePreference().getStoragePreferenceId());
break;
case S3_STORAGE_PREFERENCE:
resourceMetadataReq.setResourceType("S3");
resourceMetadataReq.setResourceToken(sourceSP.getS3StoragePreference().getStoragePreferenceId());
break;
}

// Fetching file list for parent resource

DirectoryMetadataResponse directoryResourceMetadata;

try (MFTApiClient mftApiClient = new MFTApiClient(
this.configuration.getOutboundEventProcessor().getMftHost(),
this.configuration.getOutboundEventProcessor().getMftPort())) {
MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub = mftApiClient.get();
directoryResourceMetadata = mftClientStub.getDirectoryResourceMetadata(resourceMetadataReq.build());
}

List<String> resourceIDsToProcess = new ArrayList<>();
for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
logger.info("Registering file {} for source storage {}", fileMetadata.getResourcePath(), sourceStorageId);
resourceList = createResourceRecursively(sourceHostName, sourceStorageId, notification.getBasePath(),
fileMetadata.getResourcePath(), "FILE", adminUser);
GenericResource fileResource = resourceList.get(resourceList.size() - 1);

resourceIDsToProcess.add(fileResource.getResourceId());
}

for (DirectoryMetadataResponse directoryMetadata : directoryResourceMetadata.getDirectoriesList()) {
logger.info("Registering directory {} for source storage {}", directoryMetadata.getResourcePath(), sourceStorageId);
createResourceRecursively(sourceHostName, sourceStorageId, notification.getBasePath(),
directoryMetadata.getResourcePath(),
"COLLECTION", adminUser);
// TODO scan directories
}
// Fetching file list for parent resource
scanResourceForChildResources(resourceObj, mftAuth, sourceSP, sourceStorageId, sourceHostName,
adminUser, resourceIDsToProcess, resourceCache, 4);

logger.info("Creating destination zip resource for directory {}", notification.getResourcePath());
resourceList = createResourceRecursively(destinationHostName, destinationStorageId, notification.getBasePath(),
notification.getResourcePath(), "FILE", adminUser);
resourceList = createResourceWithParentDirectories(destinationHostName, destinationStorageId, notification.getBasePath(),
notification.getResourcePath(), "FILE", adminUser, resourceCache);

GenericResource destinationResource = resourceList.get(resourceList.size() - 1);

@@ -328,4 +298,58 @@ public void run() {
this.eventCache.remove(notification.getResourcePath() + ":" + notification.getHostName());
}
}

private void scanResourceForChildResources(GenericResource resourceObj, AuthToken mftAuth, AnyStoragePreference sourceSP,
String sourceStorageId, String sourceHostName, String adminUser,
List<String> resourceIDsToProcess, Map<String, GenericResource> resourceCache,
int scanDepth)
throws Exception {

FetchResourceMetadataRequest.Builder resourceMetadataReq = FetchResourceMetadataRequest.newBuilder()
.setMftAuthorizationToken(mftAuth)
.setResourceId(resourceObj.getResourceId());

switch (sourceSP.getStorageCase()) {
case SSH_STORAGE_PREFERENCE:
resourceMetadataReq.setResourceType("SCP");
resourceMetadataReq.setResourceToken(sourceSP.getSshStoragePreference().getStoragePreferenceId());
break;
case S3_STORAGE_PREFERENCE:
resourceMetadataReq.setResourceType("S3");
resourceMetadataReq.setResourceToken(sourceSP.getS3StoragePreference().getStoragePreferenceId());
break;
}

DirectoryMetadataResponse directoryResourceMetadata;

try (MFTApiClient mftApiClient = new MFTApiClient(
this.configuration.getOutboundEventProcessor().getMftHost(),
this.configuration.getOutboundEventProcessor().getMftPort())) {
MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub = mftApiClient.get();
directoryResourceMetadata = mftClientStub.getDirectoryResourceMetadata(resourceMetadataReq.build());
}

for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
logger.info("Registering file {} for source storage {}", fileMetadata.getResourcePath(), sourceStorageId);
List<GenericResource> resourceList = createResourceWithParentDirectories(sourceHostName, sourceStorageId, notification.getBasePath(),
fileMetadata.getResourcePath(), "FILE", adminUser, resourceCache);
GenericResource fileResource = resourceList.get(resourceList.size() - 1);

resourceIDsToProcess.add(fileResource.getResourceId());
}

for (DirectoryMetadataResponse directoryMetadata : directoryResourceMetadata.getDirectoriesList()) {
logger.info("Registering directory {} for source storage {}", directoryMetadata.getResourcePath(), sourceStorageId);
List<GenericResource> createResources = createResourceWithParentDirectories(sourceHostName, sourceStorageId, notification.getBasePath(),
directoryMetadata.getResourcePath(),
"COLLECTION", adminUser, resourceCache);
GenericResource dirResource = createResources.get(createResources.size() - 1);

if (scanDepth > 0) {
// Scanning the directories recursively
scanResourceForChildResources(dirResource, mftAuth, sourceSP, sourceStorageId, sourceHostName, adminUser,
resourceIDsToProcess, resourceCache, scanDepth - 1);
}
}
}
}

0 comments on commit 8227ff7

Please sign in to comment.