Skip to content
Permalink
Browse files
Adding MFT Controller
  • Loading branch information
DImuthuUpe committed Jan 4, 2020
1 parent e111bfe commit 3a1a3262b69c749a84d69a0bd094953d2dbb087f
Show file tree
Hide file tree
Showing 17 changed files with 1,030 additions and 35 deletions.
@@ -24,6 +24,7 @@
import com.orbitz.consul.KeyValueClient;
import com.orbitz.consul.model.kv.Value;
import org.apache.airavata.mft.admin.models.AgentInfo;
import org.apache.airavata.mft.admin.models.TransferCommand;
import org.apache.airavata.mft.admin.models.TransferRequest;
import org.apache.airavata.mft.admin.models.TransferState;

@@ -44,20 +45,27 @@ public class MFTAdmin {
private KeyValueClient kvClient = client.keyValueClient();
private ObjectMapper mapper = new ObjectMapper();

public String submitTransfer(String agentId, TransferRequest transferRequest) throws MFTAdminException {
public String submitTransfer(TransferRequest transferRequest) throws MFTAdminException{
try {

String asStr = mapper.writeValueAsString(transferRequest);
String transferId = UUID.randomUUID().toString();
updateTransferState(transferId, new TransferState().setState("INITIALIZING").setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
transferRequest.setTransferId(transferId);
String asString = mapper.writeValueAsString(transferRequest);
kvClient.putValue("mft/agents/messages/" + agentId + "/" + transferId, asString);
kvClient.putValue("mft/controller/messages/" + transferId, asStr);
return transferId;
} catch (JsonProcessingException e) {
throw new MFTAdminException("Error in serializing transfer request", e);
}
}

public void commandTransferToAgent(String agentId, TransferCommand transferCommand) throws MFTAdminException {
try {
updateTransferState(transferCommand.getTransferId(), new TransferState().setState("INITIALIZING").setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
String asString = mapper.writeValueAsString(transferCommand);
kvClient.putValue("mft/agents/messages/" + agentId + "/" + transferCommand.getTransferId(), asString);
} catch (JsonProcessingException e) {
throw new MFTAdminException("Error in serializing transfer request", e);
}
}

public List<AgentInfo> listAgents() {
List<AgentInfo> agents = new ArrayList<>();
List<String> keys = kvClient.getKeys("mft/agents/info");
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.airavata.mft.admin.models;

import java.util.List;

public class TransferCommand {

private String transferId;
private String sourceId;
private String sourceType;
private String sourceToken;
private String sourceResourceBackend;
private String sourceCredentialBackend;
private String destinationId;
private String destinationType;
private String destinationToken;
private String destResourceBackend;
private String destCredentialBackend;

public String getTransferId() {
return transferId;
}

public TransferCommand setTransferId(String transferId) {
this.transferId = transferId;
return this;
}

public String getSourceId() {
return sourceId;
}

public TransferCommand setSourceId(String sourceId) {
this.sourceId = sourceId;
return this;
}

public String getSourceType() {
return sourceType;
}

public TransferCommand setSourceType(String sourceType) {
this.sourceType = sourceType;
return this;
}

public String getSourceToken() {
return sourceToken;
}

public TransferCommand setSourceToken(String sourceToken) {
this.sourceToken = sourceToken;
return this;
}

public String getSourceResourceBackend() {
return sourceResourceBackend;
}

public TransferCommand setSourceResourceBackend(String sourceResourceBackend) {
this.sourceResourceBackend = sourceResourceBackend;
return this;
}

public String getSourceCredentialBackend() {
return sourceCredentialBackend;
}

public TransferCommand setSourceCredentialBackend(String sourceCredentialBackend) {
this.sourceCredentialBackend = sourceCredentialBackend;
return this;
}

public String getDestinationId() {
return destinationId;
}

public TransferCommand setDestinationId(String destinationId) {
this.destinationId = destinationId;
return this;
}

public String getDestinationType() {
return destinationType;
}

public TransferCommand setDestinationType(String destinationType) {
this.destinationType = destinationType;
return this;
}

public String getDestinationToken() {
return destinationToken;
}

public TransferCommand setDestinationToken(String destinationToken) {
this.destinationToken = destinationToken;
return this;
}

public String getDestResourceBackend() {
return destResourceBackend;
}

public TransferCommand setDestResourceBackend(String destResourceBackend) {
this.destResourceBackend = destResourceBackend;
return this;
}

public String getDestCredentialBackend() {
return destCredentialBackend;
}

public TransferCommand setDestCredentialBackend(String destCredentialBackend) {
this.destCredentialBackend = destCredentialBackend;
return this;
}
}
@@ -17,71 +17,128 @@

package org.apache.airavata.mft.admin.models;

import java.util.List;
import java.util.Map;

public class TransferRequest {

private String transferId;
private String sourceId;
private String sourceType;
private String sourceToken;
private String sourceResourceBackend;
private String sourceCredentialBackend;
private String destinationId;
private String destinationType;
private String destinationToken;
private String destResourceBackend;
private String destCredentialBackend;
private boolean affinityTransfer;
private Map<String, Integer> targetAgents;

public String getSourceId() {
return sourceId;
}

public void setSourceId(String sourceId) {
public TransferRequest setSourceId(String sourceId) {
this.sourceId = sourceId;
return this;
}

public String getDestinationId() {
return destinationId;
public String getSourceType() {
return sourceType;
}

public void setDestinationId(String destinationId) {
this.destinationId = destinationId;
public TransferRequest setSourceType(String sourceType) {
this.sourceType = sourceType;
return this;
}

public String getSourceType() {
return sourceType;
public String getSourceToken() {
return sourceToken;
}

public void setSourceType(String sourceType) {
this.sourceType = sourceType;
public TransferRequest setSourceToken(String sourceToken) {
this.sourceToken = sourceToken;
return this;
}

public String getDestinationType() {
return destinationType;
public String getSourceResourceBackend() {
return sourceResourceBackend;
}

public void setDestinationType(String destinationType) {
this.destinationType = destinationType;
public TransferRequest setSourceResourceBackend(String sourceResourceBackend) {
this.sourceResourceBackend = sourceResourceBackend;
return this;
}

public String getSourceToken() {
return sourceToken;
public String getSourceCredentialBackend() {
return sourceCredentialBackend;
}

public void setSourceToken(String sourceToken) {
this.sourceToken = sourceToken;
public TransferRequest setSourceCredentialBackend(String sourceCredentialBackend) {
this.sourceCredentialBackend = sourceCredentialBackend;
return this;
}

public String getDestinationId() {
return destinationId;
}

public TransferRequest setDestinationId(String destinationId) {
this.destinationId = destinationId;
return this;
}

public String getDestinationType() {
return destinationType;
}

public TransferRequest setDestinationType(String destinationType) {
this.destinationType = destinationType;
return this;
}

public String getDestinationToken() {
return destinationToken;
}

public void setDestinationToken(String destinationToken) {
public TransferRequest setDestinationToken(String destinationToken) {
this.destinationToken = destinationToken;
return this;
}

public String getDestResourceBackend() {
return destResourceBackend;
}

public TransferRequest setDestResourceBackend(String destResourceBackend) {
this.destResourceBackend = destResourceBackend;
return this;
}

public String getDestCredentialBackend() {
return destCredentialBackend;
}

public TransferRequest setDestCredentialBackend(String destCredentialBackend) {
this.destCredentialBackend = destCredentialBackend;
return this;
}

public boolean isAffinityTransfer() {
return affinityTransfer;
}

public TransferRequest setAffinityTransfer(boolean affinityTransfer) {
this.affinityTransfer = affinityTransfer;
return this;
}

public String getTransferId() {
return transferId;
public Map<String, Integer> getTargetAgents() {
return targetAgents;
}

public void setTransferId(String transferId) {
this.transferId = transferId;
public TransferRequest setTargetAgents(Map<String, Integer> targetAgents) {
this.targetAgents = targetAgents;
return this;
}
}
@@ -56,7 +56,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.26</version>
<version>${log4j.over.slf4j}</version>
</dependency>
</dependencies>

@@ -29,7 +29,7 @@
import org.apache.airavata.mft.admin.MFTAdmin;
import org.apache.airavata.mft.admin.MFTAdminException;
import org.apache.airavata.mft.admin.models.AgentInfo;
import org.apache.airavata.mft.admin.models.TransferRequest;
import org.apache.airavata.mft.admin.models.TransferCommand;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.core.ResourceMetadata;
import org.apache.airavata.mft.core.api.Connector;
@@ -83,6 +83,8 @@ public class MFTAgent implements CommandLineRunner {
private long sessionRenewSeconds = 4;
private long sessionTTLSeconds = 10;

private ObjectMapper mapper = new ObjectMapper();

private MFTAdmin admin;

public void init() {
@@ -99,10 +101,9 @@ private void acceptRequests() {
Optional<String> decodedValue = value.getValueAsString();
decodedValue.ifPresent(v -> {
System.out.println(String.format("Value is: %s", v));
ObjectMapper mapper = new ObjectMapper();
TransferRequest request = null;
TransferCommand request = null;
try {
request = mapper.readValue(v, TransferRequest.class);
request = mapper.readValue(v, TransferCommand.class);
logger.info("Received request " + request.getTransferId());
admin.updateTransferState(request.getTransferId(), new TransferState().setState("STARTING")
.setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));

0 comments on commit 3a1a326

Please sign in to comment.