Skip to content
Permalink
Browse files
Making storage as first class entity and supporting http transport in…
… the agent
  • Loading branch information
DImuthuUpe committed Jan 18, 2021
1 parent 774afbc commit fe56f1c0962ce48f380db0a491a27a29aa8346e6
Show file tree
Hide file tree
Showing 64 changed files with 2,557 additions and 763 deletions.
@@ -22,12 +22,14 @@
public class TransferCommand {

private String transferId;
private String sourceId;
private String sourceStorageId;
private String sourcePath;
private String sourceType;
private String sourceToken;
private String sourceResourceBackend;
private String sourceCredentialBackend;
private String destinationId;
private String destinationStorageId;
private String destinationPath;
private String destinationType;
private String destinationToken;
private String destResourceBackend;
@@ -42,12 +44,21 @@ public TransferCommand setTransferId(String transferId) {
return this;
}

public String getSourceId() {
return sourceId;
public String getSourceStorageId() {
return sourceStorageId;
}

public TransferCommand setSourceId(String sourceId) {
this.sourceId = sourceId;
public TransferCommand setSourceStorageId(String sourceStorageId) {
this.sourceStorageId = sourceStorageId;
return this;
}

public String getSourcePath() {
return sourcePath;
}

public TransferCommand setSourcePath(String sourcePath) {
this.sourcePath = sourcePath;
return this;
}

@@ -87,12 +98,21 @@ public TransferCommand setSourceCredentialBackend(String sourceCredentialBackend
return this;
}

public String getDestinationId() {
return destinationId;
public String getDestinationStorageId() {
return destinationStorageId;
}

public TransferCommand setDestinationStorageId(String destinationStorageId) {
this.destinationStorageId = destinationStorageId;
return this;
}

public String getDestinationPath() {
return destinationPath;
}

public TransferCommand setDestinationId(String destinationId) {
this.destinationId = destinationId;
public TransferCommand setDestinationPath(String destinationPath) {
this.destinationPath = destinationPath;
return this;
}

@@ -21,25 +21,36 @@

public class TransferRequest {

private String sourceId;
private String sourceStorageId;
private String sourcePath;
private String sourceType;
private String sourceToken;
private String sourceResourceBackend;
private String sourceCredentialBackend;
private String destinationId;
private String destinationStorageId;
private String destinationPath;
private String destinationType;
private String destinationToken;
private String destResourceBackend;
private String destCredentialBackend;
private boolean affinityTransfer;
private Map<String, Integer> targetAgents;

public String getSourceId() {
return sourceId;
public String getSourceStorageId() {
return sourceStorageId;
}

public TransferRequest setSourceId(String sourceId) {
this.sourceId = sourceId;
public TransferRequest setSourceStorageId(String sourceStorageId) {
this.sourceStorageId = sourceStorageId;
return this;
}

public String getSourcePath() {
return sourcePath;
}

public TransferRequest setSourcePath(String sourcePath) {
this.sourcePath = sourcePath;
return this;
}

@@ -79,12 +90,21 @@ public TransferRequest setSourceCredentialBackend(String sourceCredentialBackend
return this;
}

public String getDestinationId() {
return destinationId;
public String getDestinationStorageId() {
return destinationStorageId;
}

public TransferRequest setDestinationStorageId(String destinationStorageId) {
this.destinationStorageId = destinationStorageId;
return this;
}

public String getDestinationPath() {
return destinationPath;
}

public TransferRequest setDestinationId(String destinationId) {
this.destinationId = destinationId;
public TransferRequest setDestinationPath(String destinationPath) {
this.destinationPath = destinationPath;
return this;
}

@@ -99,6 +99,11 @@
<artifactId>log4j-over-slf4j</artifactId>
<version>${log4j.over.slf4j}</version>
</dependency>
<dependency>
<groupId>com.sun.activation</groupId>
<artifactId>javax.activation</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>

<build>
@@ -18,10 +18,10 @@
package org.apache.airavata.mft.agent;

import org.apache.airavata.mft.admin.MFTConsulClient;
import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
import org.apache.airavata.mft.agent.rpc.RPCParser;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;

@Configuration
public class AppConfig {
@@ -41,4 +41,9 @@ public MFTConsulClient mftConsulClient() {
public RPCParser rpcParser() {
return new RPCParser();
}

@Bean
public HttpTransferRequestsStore transferRequestStore() {
return new HttpTransferRequestsStore();
}
}
@@ -30,6 +30,8 @@
import org.apache.airavata.mft.admin.models.TransferCommand;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
import org.apache.airavata.mft.agent.http.HttpServer;
import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
import org.apache.airavata.mft.agent.rpc.RPCParser;
import org.apache.airavata.mft.core.ConnectorResolver;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
@@ -101,6 +103,9 @@ public class MFTAgent implements CommandLineRunner {
@Autowired
private MFTConsulClient mftConsulClient;

@Autowired
private HttpTransferRequestsStore transferRequestsStore;

public void init() {
transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId );
rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId );
@@ -147,11 +152,11 @@ private void acceptTransferRequests() {

Optional<Connector> inConnectorOpt = ConnectorResolver.resolveConnector(request.getSourceType(), "IN");
Connector inConnector = inConnectorOpt.orElseThrow(() -> new Exception("Could not find an in connector for given input"));
inConnector.init(request.getSourceId(), request.getSourceToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
inConnector.init(request.getSourceStorageId(), request.getSourceToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);

Optional<Connector> outConnectorOpt = ConnectorResolver.resolveConnector(request.getDestinationType(), "OUT");
Connector outConnector = outConnectorOpt.orElseThrow(() -> new Exception("Could not find an out connector for given input"));
outConnector.init(request.getDestinationId(), request.getDestinationToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
outConnector.init(request.getDestinationStorageId(), request.getDestinationToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);

Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
@@ -221,6 +226,19 @@ private void acceptTransferRequests() {
transferMessageCache.start();
}

private void acceptHTTPRequests() {
logger.info("Starting the HTTP front end");

new Thread(() -> {
HttpServer httpServer = new HttpServer(3333, false, transferRequestsStore);
try {
httpServer.run();
} catch (Exception e) {
logger.error("Http frontend server start failed", e);
}
}).start();
}

private boolean connectAgent() throws MFTConsulClientException {
final ImmutableSession session = ImmutableSession.builder()
.name(agentId)
@@ -316,6 +334,7 @@ public void start() throws Exception {

acceptTransferRequests();
acceptRPCRequests();
acceptHTTPRequests();
}

@Override
@@ -52,7 +52,10 @@ public String transfer(TransferCommand command, Connector inConnector, Connector
MetadataCollector destMetadataCollector, BiConsumer<String, TransferState> onStatusCallback,
BiConsumer<String, Boolean> exitingCallback) throws Exception {

FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(command.getSourceId(), command.getSourceToken());
FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(
command.getSourceStorageId(),
command.getSourcePath(),
command.getSourceToken());

final long resourceSize = srcMetadata.getResourceSize();
logger.debug("Source file size {}. MD5 {}", resourceSize, srcMetadata.getMd5sum());
@@ -65,8 +68,8 @@ public String transfer(TransferCommand command, Connector inConnector, Connector
context.setStreamBuffer(streamBuffer);
context.setTransferId(command.getTransferId());

TransferTask recvTask = new TransferTask(inConnector, context);
TransferTask sendTask = new TransferTask(outConnector, context);
TransferTask recvTask = new TransferTask(inConnector, context, command.getSourcePath());
TransferTask sendTask = new TransferTask(outConnector, context, command.getDestinationPath());
List<Future<Integer>> futureList = new ArrayList<>();

ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
@@ -121,14 +124,19 @@ public void run() {
}

if (!transferErrored) {
Boolean transferred = destMetadataCollector.isAvailable(command.getDestinationId(), command.getDestinationToken());
Boolean transferred = destMetadataCollector.isAvailable(
command.getDestinationStorageId(),
command.getDestinationPath(),
command.getDestinationToken());

if (!transferred) {
logger.error("Transfer completed but resource is not available in destination");
throw new Exception("Transfer completed but resource is not available in destination");
}

FileResourceMetadata destMetadata = destMetadataCollector.getFileResourceMetadata(command.getDestinationId(),
FileResourceMetadata destMetadata = destMetadataCollector.getFileResourceMetadata(
command.getDestinationStorageId(),
command.getDestinationPath(),
command.getDestinationToken());

boolean doIntegrityVerify = true;
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.airavata.mft.agent.http;

public class ConnectorParams {

private String storageId, credentialToken, resourceServiceHost, secretServiceHost;
private int resourceServicePort, secretServicePort;

public String getStorageId() {
return storageId;
}

public ConnectorParams setStorageId(String storageId) {
this.storageId = storageId;
return this;
}

public String getCredentialToken() {
return credentialToken;
}

public ConnectorParams setCredentialToken(String credentialToken) {
this.credentialToken = credentialToken;
return this;
}

public String getResourceServiceHost() {
return resourceServiceHost;
}

public ConnectorParams setResourceServiceHost(String resourceServiceHost) {
this.resourceServiceHost = resourceServiceHost;
return this;
}

public String getSecretServiceHost() {
return secretServiceHost;
}

public ConnectorParams setSecretServiceHost(String secretServiceHost) {
this.secretServiceHost = secretServiceHost;
return this;
}

public int getResourceServicePort() {
return resourceServicePort;
}

public ConnectorParams setResourceServicePort(int resourceServicePort) {
this.resourceServicePort = resourceServicePort;
return this;
}

public int getSecretServicePort() {
return secretServicePort;
}

public ConnectorParams setSecretServicePort(int secretServicePort) {
this.secretServicePort = secretServicePort;
return this;
}
}

0 comments on commit fe56f1c

Please sign in to comment.