Skip to content
Permalink
Browse files
Fetching file metadata directly through agents using a custom sync - …
…rpc protocol with consul backend
  • Loading branch information
DImuthuUpe committed Aug 15, 2020
1 parent 7515c3b commit d6bd3d99ed08fdce95fa0b2b89821884bc8531fc
Show file tree
Hide file tree
Showing 17 changed files with 345 additions and 14 deletions.
@@ -17,6 +17,7 @@

package org.apache.airavata.mft.admin.models.rpc;

import java.util.HashMap;
import java.util.Map;

public class SyncRPCRequest {
@@ -70,4 +71,54 @@ public SyncRPCRequest setMessageId(String messageId) {
this.messageId = messageId;
return this;
}

public static final class SyncRPCRequestBuilder {
private String agentId;
private String method;
private Map<String, String> parameters = new HashMap<>();
private String returnAddress;
private String messageId;

private SyncRPCRequestBuilder() {
}

public static SyncRPCRequestBuilder builder() {
return new SyncRPCRequestBuilder();
}

public SyncRPCRequestBuilder withAgentId(String agentId) {
this.agentId = agentId;
return this;
}

public SyncRPCRequestBuilder withMethod(String method) {
this.method = method;
return this;
}

public SyncRPCRequestBuilder withParameter(String key, String value) {
this.parameters.put(key, value);
return this;
}

public SyncRPCRequestBuilder withReturnAddress(String returnAddress) {
this.returnAddress = returnAddress;
return this;
}

public SyncRPCRequestBuilder withMessageId(String messageId) {
this.messageId = messageId;
return this;
}

public SyncRPCRequest build() {
SyncRPCRequest syncRPCRequest = new SyncRPCRequest();
syncRPCRequest.setAgentId(agentId);
syncRPCRequest.setMethod(method);
syncRPCRequest.setParameters(parameters);
syncRPCRequest.setReturnAddress(returnAddress);
syncRPCRequest.setMessageId(messageId);
return syncRPCRequest;
}
}
}
@@ -18,6 +18,7 @@
package org.apache.airavata.mft.agent;

import org.apache.airavata.mft.admin.MFTConsulClient;
import org.apache.airavata.mft.agent.rpc.RPCParser;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
@@ -35,4 +36,9 @@ public class AppConfig {
public MFTConsulClient mftConsulClient() {
return new MFTConsulClient(consulHost, consulPort);
}

@Bean
public RPCParser rpcParser() {
return new RPCParser();
}
}
@@ -30,7 +30,7 @@
import org.apache.airavata.mft.admin.models.TransferCommand;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
import org.apache.airavata.mft.agent.rpc.RPCParser;
import org.apache.airavata.mft.core.ConnectorResolver;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
import org.apache.airavata.mft.core.api.Connector;
@@ -95,6 +95,9 @@ public class MFTAgent implements CommandLineRunner {

private ObjectMapper mapper = new ObjectMapper();

@Autowired
private RPCParser rpcParser;

@Autowired
private MFTConsulClient mftConsulClient;

@@ -110,7 +113,7 @@ private void acceptRPCRequests() {
decodedValue.ifPresent(v -> {
try {
SyncRPCRequest rpcRequest = mapper.readValue(v, SyncRPCRequest.class);
mftConsulClient.sendSyncRPCResponseFromAgent(rpcRequest.getReturnAddress(), processRPCRequest(rpcRequest));
mftConsulClient.sendSyncRPCResponseFromAgent(rpcRequest.getReturnAddress(), rpcParser.processRPCRequest(rpcRequest));
} catch (Throwable e) {
logger.error("Error processing the RPC request {}", value.getKey(), e);
} finally {
@@ -124,11 +127,6 @@ private void acceptRPCRequests() {
rpcMessageCache.start();
}

private SyncRPCResponse processRPCRequest(SyncRPCRequest request) {
// TODO implement using the reflection
return null;
}

private void acceptTransferRequests() {

transferCacheListener = newValues -> {
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.airavata.mft.agent.rpc;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
import org.apache.airavata.mft.core.FileResourceMetadata;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;

public class RPCParser {

private static final Logger logger = LoggerFactory.getLogger(RPCParser.class);

@org.springframework.beans.factory.annotation.Value("${resource.service.host}")
private String resourceServiceHost;

@org.springframework.beans.factory.annotation.Value("${resource.service.port}")
private int resourceServicePort;

@org.springframework.beans.factory.annotation.Value("${secret.service.host}")
private String secretServiceHost;

@org.springframework.beans.factory.annotation.Value("${secret.service.port}")
private int secretServicePort;

public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
// TODO implement using the reflection
ObjectMapper mapper = new ObjectMapper();

switch (request.getMethod()) {
case "getFileResourceMetadata":
String resourceId = request.getParameters().get("resourceId");
String resourceType = request.getParameters().get("resourceType");
String resourceToken = request.getParameters().get("resourceToken");
String mftAuthorizationToken = request.getParameters().get("mftAuthorizationToken");

Optional<MetadataCollector> metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(resourceType);
if (metadataCollectorOp.isPresent()) {
MetadataCollector metadataCollector = metadataCollectorOp.get();
metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(resourceId, resourceToken);
return mapper.writeValueAsString(fileResourceMetadata);
}
break;
}
logger.error("Unknown method type specified {}", request.getMethod());
throw new Exception("Unknown method " + request.getMethod());
}

public SyncRPCResponse processRPCRequest(SyncRPCRequest request) {
SyncRPCResponse response = new SyncRPCResponse();
response.setMessageId(request.getMessageId());
try {
String respStr = resolveRPCRequest(request);
response.setResponseAsStr(respStr);
response.setResponseStatus(SyncRPCResponse.ResponseStatus.SUCCESS);
} catch (Exception e) {
logger.error("Errored while processing the rpc request for message {} and method {}",
request.getMessageId(), request.getMethod(), e);
response.setErrorAsStr(e.getMessage());
response.setResponseStatus(SyncRPCResponse.ResponseStatus.FAIL);
}
return response;
}
}
@@ -18,6 +18,7 @@
package org.apache.airavata.mft.api;

import org.apache.airavata.mft.admin.MFTConsulClient;
import org.apache.airavata.mft.admin.SyncRPCClient;
import org.apache.airavata.mft.admin.models.TransferRequest;
import org.apache.airavata.mft.api.service.TransferApiRequest;
import org.dozer.DozerBeanMapper;
@@ -34,11 +35,21 @@ public class AppConfig {
@org.springframework.beans.factory.annotation.Value("${consul.port}")
public Integer consulPort;

@org.springframework.beans.factory.annotation.Value("${api.id}")
public String apiId;

@Bean
public MFTConsulClient mftConsulClient() {
return new MFTConsulClient(consulHost, consulPort);
}

@Bean
public SyncRPCClient agentRPCClient() {
SyncRPCClient client = new SyncRPCClient("api-server-" + apiId, mftConsulClient());
client.init();
return client;
}

@Bean
public DozerBeanMapper dozerBeanMapper() {
DozerBeanMapper mapper = new DozerBeanMapper();
@@ -17,11 +17,16 @@

package org.apache.airavata.mft.api.handler;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.grpc.stub.StreamObserver;
import org.apache.airavata.mft.admin.MFTConsulClient;
import org.apache.airavata.mft.admin.SyncRPCClient;
import org.apache.airavata.mft.admin.models.TransferRequest;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
import org.apache.airavata.mft.api.service.*;
import org.apache.airavata.mft.core.FileResourceMetadata;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.dozer.DozerBeanMapper;
@@ -32,6 +37,7 @@

import java.util.List;
import java.util.Optional;
import java.util.UUID;

@GRpcService
public class MFTApiHandler extends MFTApiServiceGrpc.MFTApiServiceImplBase {
@@ -44,6 +50,11 @@ public class MFTApiHandler extends MFTApiServiceGrpc.MFTApiServiceImplBase {
@Autowired
private DozerBeanMapper dozerBeanMapper;

private ObjectMapper jsonMapper = new ObjectMapper();

@Autowired
private SyncRPCClient agentRPCClient;

@org.springframework.beans.factory.annotation.Value("${resource.service.host}")
private String resourceServiceHost;

@@ -131,4 +142,43 @@ public void getResourceAvailability(ResourceAvailabilityRequest request, StreamO
responseObserver.onError(new Exception("Failed to check the availability", e));
}
}

@Override
public void getFileResourceMetadata(FetchResourceMetadataRequest request, StreamObserver<FileMetadataResponse> responseObserver) {

try {
SyncRPCResponse rpcResponse = agentRPCClient.sendSyncRequest(SyncRPCRequest.SyncRPCRequestBuilder.builder()
.withAgentId(request.getTargetAgentId())
.withMessageId(UUID.randomUUID().toString())
.withMethod("getFileResourceMetadata")
.withParameter("resourceId", request.getResourceId())
.withParameter("resourceType", request.getResourceType())
.withParameter("resourceToken", request.getResourceToken())
.withParameter("mftAuthorizationToken", request.getMftAuthorizationToken())
.build());

switch (rpcResponse.getResponseStatus()) {
case SUCCESS:
FileResourceMetadata fileResourceMetadata = jsonMapper.readValue(rpcResponse.getResponseAsStr(), FileResourceMetadata.class);
FileMetadataResponse.Builder responseBuilder = FileMetadataResponse.newBuilder();
dozerBeanMapper.map(fileResourceMetadata, responseBuilder);
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
return;
case FAIL:
logger.error("Errored while processing the fetch file metadata response for resource id {}. Error msg : {}",
request.getResourceId(), rpcResponse.getErrorAsStr());
responseObserver.onError(new Exception("Errored while processing the the fetch file metadata response. Error msg : " +
rpcResponse.getErrorAsStr()));
}
} catch (Exception e) {
logger.error("Error while fetching resource metadata for resource " + request.getResourceId(), e);
responseObserver.onError(new Exception("Failed to fetch resource metadata", e));
}
}

@Override
public void getDirectoryResourceMetadata(FetchResourceMetadataRequest request, StreamObserver<DirectoryMetadataResponse> responseObserver) {
super.getDirectoryResourceMetadata(request, responseObserver);
}
}
@@ -19,6 +19,7 @@ server.port=8088
grpc.port=7004
consul.host=localhost
consul.port=8500
api.id=0

resource.service.host=localhost
resource.service.port=7002
@@ -28,6 +28,7 @@ message TransferApiResponse {

message TransferStateApiRequest {
string transferId = 1;
string mftAuthorizationToken = 2;
}

message TransferStateApiResponse {
@@ -43,12 +44,46 @@ message ResourceAvailabilityRequest {
string resourceToken = 3;
string resourceBackend = 4;
string resourceCredentialBackend = 5;
string mftAuthorizationToken = 6;
}

message ResourceAvailabilityResponse {
bool available = 1;
}

message FileMetadataResponse {
string friendlyName = 1;
int64 resourceSize = 2;
int64 createdTime = 3;
int64 updateTime = 4;
string md5sum = 5;
string resourcePath = 6;
string parentResourceId = 7;
string parentResourceType = 8;
}

message DirectoryMetadataResponse {
string friendlyName = 1;
int64 createdTime = 2;
int64 updateTime = 3;
string resourcePath = 4;
string parentResourceId = 5;
string parentResourceType = 6;
repeated DirectoryMetadataResponse directories = 7;
repeated FileMetadataResponse files = 8;
bool lazyInitialized = 9;
}

message FetchResourceMetadataRequest {
string resourceId = 1;
string resourceType = 2;
string resourceToken = 3;
string resourceBackend = 4;
string resourceCredentialBackend = 5;
string targetAgentId = 7;
string mftAuthorizationToken = 8;
}

service MFTApiService {

rpc submitTransfer(TransferApiRequest) returns (TransferApiResponse) {
@@ -70,4 +105,16 @@ service MFTApiService {
}

rpc getResourceAvailability(ResourceAvailabilityRequest) returns (ResourceAvailabilityResponse) {}

rpc getFileResourceMetadata(FetchResourceMetadataRequest) returns (FileMetadataResponse) {
option (google.api.http) = {
get: "/v1.0/api/resource/metadata/file"
};
}

rpc getDirectoryResourceMetadata(FetchResourceMetadataRequest) returns (DirectoryMetadataResponse) {
option (google.api.http) = {
get: "/v1.0/api/resource/metadata/directory"
};
}
}

0 comments on commit d6bd3d9

Please sign in to comment.