Skip to content
Permalink
Browse files
Making MFTApiClient closable to avoid stale connections
  • Loading branch information
DImuthuUpe committed Sep 2, 2021
1 parent 6232bd0 commit 6959839f5d456aa5070e1f92e0d43b0250bd92e1
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 36 deletions.
@@ -204,10 +204,6 @@ public void run() {
AnyStoragePreference sourceSP = sourceSPOp.get();
AnyStoragePreference destSP = destSPOp.get();

MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(
this.configuration.getOutboundEventProcessor().getMftHost(),
this.configuration.getOutboundEventProcessor().getMftPort());

String decodedAuth = new String(Base64.getDecoder().decode(notificationEvent.getAuthToken()));
String[] authParts = decodedAuth.split(":");

@@ -240,7 +236,14 @@ public void run() {

// Fetching file list for parent resource

DirectoryMetadataResponse directoryResourceMetadata = mftClient.getDirectoryResourceMetadata(resourceMetadataReq.build());
DirectoryMetadataResponse directoryResourceMetadata;

try (MFTApiClient mftApiClient = new MFTApiClient(
this.configuration.getOutboundEventProcessor().getMftHost(),
this.configuration.getOutboundEventProcessor().getMftPort())) {
MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub = mftApiClient.get();
directoryResourceMetadata = mftClientStub.getDirectoryResourceMetadata(resourceMetadataReq.build());
}

List<String> resourceIDsToProcess = new ArrayList<>();
for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
@@ -72,7 +72,6 @@ public MFTDownloadResponse mftDownload(@RequestHeader("Authorization") String au

authTokenStr = authTokenStr.substring(7).trim();

MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(mftHost, mftPort);

ManagedChannel channel = ManagedChannelBuilder.forAddress(drmsHost, drmsPort).usePlaintext().build();
ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceGrpc.newBlockingStub(channel);
@@ -140,7 +139,11 @@ public MFTDownloadResponse mftDownload(@RequestHeader("Authorization") String au
downloadRequest.setMftAuthorizationToken(AuthToken.newBuilder()
.setUserTokenAuth(UserTokenAuth.newBuilder().setToken(authTokenStr).build()).build());

HttpDownloadApiResponse downloadResponse = mftClient.submitHttpDownload(downloadRequest.build());
HttpDownloadApiResponse downloadResponse;
try (MFTApiClient mftApiClient = new MFTApiClient(mftHost, mftPort)) {
MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub = mftApiClient.get();
downloadResponse = mftClientStub.submitHttpDownload(downloadRequest.build());
}

return new MFTDownloadResponse().setUrl(downloadResponse.getUrl()).setAgentId(downloadResponse.getTargetAgent());
}
@@ -100,19 +100,22 @@ public void submitDataParsingWorkflow(WorkflowInvocationRequest request) throws
for (String sourceResourceId : workflowMessage.getSourceResourceIdsList()) {
logger.info("Processing parsing workflow for resource {}", sourceResourceId);

MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(mftHost, mftPort);

DelegateAuth delegateAuth = DelegateAuth.newBuilder()
.setUserId(workflowMessage.getUsername())
.setClientId(mftClientId)
.setClientSecret(mftClientSecret)
.putProperties("TENANT_ID", workflowMessage.getTenantId()).build();

FileMetadataResponse metadata = mftClient.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
.setResourceType("SCP")
.setResourceId(sourceResourceId)
.setResourceToken(workflowMessage.getSourceCredentialToken())
.setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
FileMetadataResponse metadata;
try (MFTApiClient mftClient = new MFTApiClient(mftHost, mftPort)) {
MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub = mftClient.get();

DelegateAuth delegateAuth = DelegateAuth.newBuilder()
.setUserId(workflowMessage.getUsername())
.setClientId(mftClientId)
.setClientSecret(mftClientSecret)
.putProperties("TENANT_ID", workflowMessage.getTenantId()).build();

metadata = mftClientStub.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
.setResourceType("SCP")
.setResourceId(sourceResourceId)
.setResourceToken(workflowMessage.getSourceCredentialToken())
.setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
}

ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6566).usePlaintext().build();
DataParserServiceGrpc.DataParserServiceBlockingStub parserClient = DataParserServiceGrpc.newBlockingStub(channel);
@@ -65,7 +65,7 @@ public class AsyncDataTransferTask extends BiSectionNonBlockingTask {


public TaskResult beforeSection() {
MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(getMftHost(), getMftPort());
MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = new MFTApiClient(getMftHost(), getMftPort()).get();
TransferApiResponse submitResponse = mftClient.submitTransfer(TransferApiRequest.newBuilder()
.setMftAuthorizationToken(AuthToken.newBuilder()
.setDelegateAuth(
@@ -82,28 +82,39 @@ public static void main(String args[]) {
@Override
public TaskResult runBlockingCode() {

MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(getMftHost(), getMftPort());

DelegateAuth delegateAuth = DelegateAuth.newBuilder()
.setUserId(getUserId())
.setClientId(getMftClientId())
.setClientSecret(getMftClientSecret())
.putProperties("TENANT_ID", getTenantId()).build();

HttpDownloadApiResponse httpDownloadApiResponse = mftClient.submitHttpDownload(HttpDownloadApiRequest
.newBuilder()
.setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build())
.setSourceResourceId(getSourceResourceId())
.setSourceToken(getSourceCredToken())
.setSourceType("SCP")
.setSourceResourceChildPath("")
.build());

FileMetadataResponse metadata = mftClient.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
.setResourceType("SCP")
.setResourceId(getSourceResourceId())
.setResourceToken(getSourceCredToken())
.setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
HttpDownloadApiResponse httpDownloadApiResponse;
FileMetadataResponse metadata;

try (MFTApiClient mftClient = new MFTApiClient(getMftHost(), getMftPort())) {

MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub = mftClient.get();

httpDownloadApiResponse = mftClientStub.submitHttpDownload(HttpDownloadApiRequest
.newBuilder()
.setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build())
.setSourceResourceId(getSourceResourceId())
.setSourceToken(getSourceCredToken())
.setSourceType("SCP")
.setSourceResourceChildPath("")
.build());

metadata = mftClientStub.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
.setResourceType("SCP")
.setResourceId(getSourceResourceId())
.setResourceToken(getSourceCredToken())
.setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());

} catch (IOException e) {
logger.error("Failed to create the mft client", e);
return new TaskResult(TaskResult.Status.FAILED, "Failed to create the mft client");
}

String downloadUrl = httpDownloadApiResponse.getUrl();
logger.info("Using download URL {}", downloadUrl);

0 comments on commit 6959839

Please sign in to comment.