Skip to content
Permalink
Browse files
Support dynamic client providers
  • Loading branch information
isururanawaka committed Feb 26, 2021
2 parents 0701959 + e70c21e commit eb158056162d1cfdd6117f6f3f79004192c15425
Showing 59 changed files with 733 additions and 421 deletions.
@@ -17,6 +17,8 @@

package org.apache.airavata.mft.admin.models;

import org.apache.airavata.mft.common.AuthToken;

public class TransferCommand {

private String transferId;
@@ -32,7 +34,7 @@ public class TransferCommand {
private String destinationToken;
private String destResourceBackend;
private String destCredentialBackend;
private String mftAuthorizationToken;
private AuthToken mftAuthorizationToken;

public String getTransferId() {
return transferId;
@@ -151,11 +153,12 @@ public TransferCommand setDestCredentialBackend(String destCredentialBackend) {
return this;
}

public String getMftAuthorizationToken() {
public AuthToken getMftAuthorizationToken() {
return mftAuthorizationToken;
}

public void setMftAuthorizationToken(String mftAuthorizationToken) {
public TransferCommand setMftAuthorizationToken(AuthToken mftAuthorizationToken) {
this.mftAuthorizationToken = mftAuthorizationToken;
return this;
}
}
@@ -17,6 +17,8 @@

package org.apache.airavata.mft.admin.models;

import org.apache.airavata.mft.common.AuthToken;

import java.util.Map;

public class TransferRequest {
@@ -33,7 +35,7 @@ public class TransferRequest {
private String destinationToken;
private String destResourceBackend;
private String destCredentialBackend;
private String mftAuthorizationToken;
private AuthToken mftAuthorizationToken;
private boolean affinityTransfer;
private Map<String, Integer> targetAgents;

@@ -163,11 +165,12 @@ public TransferRequest setTargetAgents(Map<String, Integer> targetAgents) {
return this;
}

public String getMftAuthorizationToken() {
public AuthToken getMftAuthorizationToken() {
return mftAuthorizationToken;
}

public void setMftAuthorizationToken(String mftAuthorizationToken) {
public TransferRequest setMftAuthorizationToken(AuthToken mftAuthorizationToken) {
this.mftAuthorizationToken = mftAuthorizationToken;
return this;
}
}
@@ -33,7 +33,7 @@
import org.apache.airavata.mft.agent.http.HttpServer;
import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
import org.apache.airavata.mft.agent.rpc.RPCParser;
import org.apache.airavata.mft.core.AuthZToken;
import org.apache.airavata.mft.common.AuthToken;
import org.apache.airavata.mft.core.ConnectorResolver;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
import org.apache.airavata.mft.core.api.Connector;
@@ -164,14 +164,13 @@ private void acceptTransferRequests() {
.setPublisher(agentId)
.setDescription("Starting the transfer"));

AuthZToken authZToken = new AuthZToken(request.getMftAuthorizationToken(), agentId, agentSecret);
Optional<Connector> inConnectorOpt = ConnectorResolver.resolveConnector(request.getSourceType(), "IN");
Connector inConnector = inConnectorOpt.orElseThrow(() -> new Exception("Could not find an in connector for given input"));
inConnector.init(authZToken,request.getSourceStorageId(), request.getSourceToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
inConnector.init(request.getMftAuthorizationToken(), request.getSourceStorageId(), request.getSourceToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);

Optional<Connector> outConnectorOpt = ConnectorResolver.resolveConnector(request.getDestinationType(), "OUT");
Connector outConnector = outConnectorOpt.orElseThrow(() -> new Exception("Could not find an out connector for given input"));
outConnector.init(authZToken, request.getDestinationStorageId(), request.getDestinationToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
outConnector.init(request.getMftAuthorizationToken(), request.getDestinationStorageId(), request.getDestinationToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);

Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
@@ -189,7 +188,7 @@ private void acceptTransferRequests() {
.setDescription("Started the transfer"));


String transferId = mediator.transfer(authZToken,request, inConnector, outConnector, srcMetadataCollector, dstMetadataCollector,
String transferId = mediator.transfer(request.getMftAuthorizationToken(), request, inConnector, outConnector, srcMetadataCollector, dstMetadataCollector,
(id, st) -> {
try {
mftConsulClient.submitTransferStateToProcess(id, agentId, st.setPublisher(agentId));
@@ -19,6 +19,7 @@

import org.apache.airavata.mft.admin.models.TransferCommand;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.common.AuthToken;
import org.apache.airavata.mft.core.*;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.core.api.MetadataCollector;
@@ -48,7 +49,7 @@ public void destroy() {
executor.shutdown();
}

public String transfer(AuthZToken authZToken, TransferCommand command, Connector inConnector, Connector outConnector, MetadataCollector srcMetadataCollector,
public String transfer(AuthToken authZToken, TransferCommand command, Connector inConnector, Connector outConnector, MetadataCollector srcMetadataCollector,
MetadataCollector destMetadataCollector, BiConsumer<String, TransferState> onStatusCallback,
BiConsumer<String, Boolean> exitingCallback) throws Exception {

@@ -124,10 +125,9 @@ public void run() {
}

if (!transferErrored) {
Boolean transferred = destMetadataCollector.isAvailable(
Boolean transferred = destMetadataCollector.isAvailable(authZToken,
command.getDestinationStorageId(),
command.getDestinationPath(),
command.getDestinationToken());
command.getDestinationPath(), command.getDestinationToken());


if (!transferred) {
@@ -22,6 +22,7 @@
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedStream;
import io.netty.util.CharsetUtil;
import org.apache.airavata.mft.common.AuthToken;
import org.apache.airavata.mft.core.*;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.core.api.MetadataCollector;
@@ -76,17 +77,18 @@ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) thr

ConnectorParams params = httpTransferRequest.getConnectorParams();

AuthZToken authZToken = new AuthZToken();
// TODO Load from HTTP Headers
AuthToken authToken = AuthToken.newBuilder().build();

connector.init(authZToken, params.getStorageId(), params.getCredentialToken(), params.getResourceServiceHost(),
connector.init(authToken, params.getStorageId(), params.getCredentialToken(), params.getResourceServiceHost(),
params.getResourceServicePort(), params.getSecretServiceHost(), params.getSecretServicePort());

metadataCollector.init(params.getResourceServiceHost(), params.getResourceServicePort(),
params.getSecretServiceHost(), params.getSecretServicePort());

Boolean available = metadataCollector.isAvailable(params.getStorageId(),
httpTransferRequest.getTargetResourcePath(),
params.getCredentialToken());
Boolean available = metadataCollector.isAvailable(authToken,
params.getStorageId(),
httpTransferRequest.getTargetResourcePath(), params.getCredentialToken());


if (!available) {
@@ -95,7 +97,7 @@ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) thr
return;
}

FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(authZToken, params.getStorageId(),
FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(authToken, params.getStorageId(),
httpTransferRequest.getTargetResourcePath(),
params.getCredentialToken());

@@ -18,13 +18,14 @@
package org.apache.airavata.mft.agent.rpc;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.util.JsonFormat;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
import org.apache.airavata.mft.agent.http.ConnectorParams;
import org.apache.airavata.mft.agent.http.HttpTransferRequest;
import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
import org.apache.airavata.mft.common.AuthToken;
import org.apache.airavata.mft.core.ConnectorResolver;
import org.apache.airavata.mft.core.AuthZToken;
import org.apache.airavata.mft.core.DirectoryResourceMetadata;
import org.apache.airavata.mft.core.FileResourceMetadata;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
@@ -73,16 +74,17 @@ public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
String resourceId = request.getParameters().get("resourceId");
String resourceType = request.getParameters().get("resourceType");
String resourceToken = request.getParameters().get("resourceToken");
String mftAuthorizationToken = request.getParameters().get("mftAuthorizationToken");
String agentId = request.getAgentId();
String agentSecret = request.getParameters().get("agentSecret");

AuthToken.Builder tokenBuilder = AuthToken.newBuilder();
JsonFormat.parser().merge(request.getParameters().get("mftAuthorizationToken"), tokenBuilder);
AuthToken mftAuthorizationToken = tokenBuilder.build();

Optional<MetadataCollector> metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(resourceType);
if (metadataCollectorOp.isPresent()) {
MetadataCollector metadataCollector = metadataCollectorOp.get();
metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
FileResourceMetadata fileResourceMetadata = metadataCollector
.getFileResourceMetadata(new AuthZToken(mftAuthorizationToken, agentId, agentSecret), resourceId, resourceToken);
.getFileResourceMetadata(mftAuthorizationToken, resourceId, resourceToken);
return mapper.writeValueAsString(fileResourceMetadata);
}
break;
@@ -92,16 +94,17 @@ public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
resourceType = request.getParameters().get("resourceType");
resourceToken = request.getParameters().get("resourceToken");
String childPath = request.getParameters().get("childPath");
mftAuthorizationToken = request.getParameters().get("mftAuthorizationToken");
agentId = request.getAgentId();
agentSecret = request.getParameters().get("agentSecret");

tokenBuilder = AuthToken.newBuilder();
JsonFormat.parser().merge(request.getParameters().get("mftAuthorizationToken"), tokenBuilder);
mftAuthorizationToken = tokenBuilder.build();

metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(resourceType);
if (metadataCollectorOp.isPresent()) {
MetadataCollector metadataCollector = metadataCollectorOp.get();
metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
FileResourceMetadata fileResourceMetadata = metadataCollector
.getFileResourceMetadata(new AuthZToken(mftAuthorizationToken, agentId, agentSecret), resourceId, childPath, resourceToken);
.getFileResourceMetadata(mftAuthorizationToken, resourceId, childPath, resourceToken);
return mapper.writeValueAsString(fileResourceMetadata);
}
break;
@@ -110,16 +113,17 @@ public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
resourceId = request.getParameters().get("resourceId");
resourceType = request.getParameters().get("resourceType");
resourceToken = request.getParameters().get("resourceToken");
mftAuthorizationToken = request.getParameters().get("mftAuthorizationToken");
agentId = request.getAgentId();
agentSecret = request.getParameters().get("agentSecret");

tokenBuilder = AuthToken.newBuilder();
JsonFormat.parser().merge(request.getParameters().get("mftAuthorizationToken"), tokenBuilder);
mftAuthorizationToken = tokenBuilder.build();

metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(resourceType);
if (metadataCollectorOp.isPresent()) {
MetadataCollector metadataCollector = metadataCollectorOp.get();
metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
DirectoryResourceMetadata dirResourceMetadata = metadataCollector
.getDirectoryResourceMetadata(new AuthZToken(mftAuthorizationToken, agentId, agentSecret), resourceId, resourceToken);
.getDirectoryResourceMetadata(mftAuthorizationToken, resourceId, resourceToken);
return mapper.writeValueAsString(dirResourceMetadata);
}
break;
@@ -129,16 +133,17 @@ public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
resourceType = request.getParameters().get("resourceType");
resourceToken = request.getParameters().get("resourceToken");
childPath = request.getParameters().get("childPath");
mftAuthorizationToken = request.getParameters().get("mftAuthorizationToken");
agentId = request.getAgentId();
agentSecret = request.getParameters().get("agentSecret");

tokenBuilder = AuthToken.newBuilder();
JsonFormat.parser().merge(request.getParameters().get("mftAuthorizationToken"), tokenBuilder);
mftAuthorizationToken = tokenBuilder.build();

metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(resourceType);
if (metadataCollectorOp.isPresent()) {
MetadataCollector metadataCollector = metadataCollectorOp.get();
metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
DirectoryResourceMetadata dirResourceMetadata = metadataCollector
.getDirectoryResourceMetadata(new AuthZToken(mftAuthorizationToken, agentId, agentSecret), resourceId, childPath, resourceToken);
.getDirectoryResourceMetadata(mftAuthorizationToken, resourceId, childPath, resourceToken);
return mapper.writeValueAsString(dirResourceMetadata);
}
break;
@@ -148,7 +153,10 @@ public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
String sourcePath = request.getParameters().get("sourcePath");
String sourceToken = request.getParameters().get("sourceToken");
String storeType = request.getParameters().get("storeType");
mftAuthorizationToken = request.getParameters().get("mftAuthorizationToken");

tokenBuilder = AuthToken.newBuilder();
JsonFormat.parser().merge(request.getParameters().get("mftAuthorizationToken"), tokenBuilder);
mftAuthorizationToken = tokenBuilder.build();

metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(storeType);
Optional<Connector> connectorOp = ConnectorResolver.resolveConnector(storeType, "IN");
@@ -0,0 +1,20 @@
package org.apache.airavata.mft.api.client.examples;

import org.apache.airavata.mft.api.client.MFTApiClient;
import org.apache.airavata.mft.api.service.FetchResourceMetadataRequest;
import org.apache.airavata.mft.api.service.MFTApiServiceGrpc;
import org.apache.airavata.mft.api.service.ResourceAvailabilityRequest;

public class Example {

public static void main(String a[]) {
MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient("localhost", 7004);
mftClient.getResourceAvailability(ResourceAvailabilityRequest.newBuilder()
.setResourceId("a")
.setResourceToken("b")
.setResourceType("SCP")
.setResourceBackend("AIRAVATA")
.setResourceCredentialBackend("AIRAVATA").build());
System.out.println("Hooooo");
}
}
@@ -66,6 +66,12 @@
<version>2.3.1</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.15.0</version>
</dependency>

<!-- To be removed -->

<dependency>

0 comments on commit eb15805

Please sign in to comment.