Skip to content
Permalink
Browse files
Merge branch 'master' into develop
  • Loading branch information
pokearu committed Apr 9, 2020
2 parents 10b15ac + 634bab9 commit d353ab0f1668b6463cc12c305fed4bfed88fe16a
Show file tree
Hide file tree
Showing 40 changed files with 1,713 additions and 379 deletions.
@@ -18,7 +18,6 @@
package org.apache.airavata.mft.admin;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.net.HostAndPort;
import com.orbitz.consul.Consul;
@@ -36,7 +35,6 @@
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.net.HostAndPort.*;

@@ -56,6 +54,16 @@ public class MFTConsulClient {
private SessionClient sessionClient;
private ObjectMapper mapper = new ObjectMapper();

public static final String TRANSFER_STATE_PATH = "mft/transfer/state/";
public static final String CONTROLLER_TRANSFER_MESSAGE_PATH = "mft/controller/messages/transfers/";
public static final String CONTROLLER_STATE_MESSAGE_PATH = "mft/controller/messages/states/";
public static final String AGENTS_MESSAGE_PATH = "mft/agents/messages/";
public static final String AGENTS_SCHEDULED_PATH = "mft/agents/scheduled/";
public static final String AGENTS_INFO_PATH = "mft/agents/info/";
public static final String LIVE_AGENTS_PATH = "mft/agent/live/";
public static final String TRANSFER_PROCESSED_PATH = "mft/transfer/processed/";
public static final String TRANSFER_PENDING_PATH = "mft/transfer/pending/";

public MFTConsulClient(Map<String, Integer> consulHostPorts) {
List<HostAndPort> hostAndPorts = consulHostPorts.entrySet().stream()
.map(entry -> fromParts(entry.getKey(), entry.getValue()))
@@ -71,81 +79,162 @@ public MFTConsulClient(String host, int port) {
this.sessionClient = client.sessionClient();
}

public String submitTransfer(TransferRequest transferRequest) throws MFTAdminException{
public String submitTransfer(TransferRequest transferRequest) throws MFTConsulClientException {
try {
String asStr = mapper.writeValueAsString(transferRequest);
String transferId = UUID.randomUUID().toString();
kvClient.putValue("mft/controller/messages/" + transferId, asStr);
kvClient.putValue(CONTROLLER_TRANSFER_MESSAGE_PATH + transferId, asStr);
return transferId;
} catch (JsonProcessingException e) {
throw new MFTAdminException("Error in serializing transfer request", e);
throw new MFTConsulClientException("Error in serializing transfer request", e);
}
}

public void commandTransferToAgent(String agentId, TransferCommand transferCommand) throws MFTAdminException {
/**
* Submits a {@link TransferCommand} to a target agent
*
* @param agentId Agent Id
* @param transferCommand Target transfer command
* @throws MFTConsulClientException If {@link TransferCommand} can not be delivered to consul store
*/
public void commandTransferToAgent(String agentId, TransferCommand transferCommand) throws MFTConsulClientException {
try {
submitTransferState(transferCommand.getTransferId(), new TransferState()
submitTransferStateToProcess(transferCommand.getTransferId(), "controller", new TransferState()
.setState("INITIALIZING")
.setPercentage(0)
.setUpdateTimeMils(System.currentTimeMillis())
.setPublisher("controller")
.setDescription("Initializing the transfer"));
String asString = mapper.writeValueAsString(transferCommand);
kvClient.putValue("mft/agents/messages/" + agentId + "/" + transferCommand.getTransferId(), asString);
kvClient.putValue(AGENTS_MESSAGE_PATH + agentId + "/" + transferCommand.getTransferId(), asString);

} catch (JsonProcessingException e) {
throw new MFTAdminException("Error in serializing transfer request", e);
throw new MFTConsulClientException("Error in serializing transfer request", e);
}
}

/**
* List all currently registered agents.
*
* @return A list of {@link AgentInfo}
*/
public List<AgentInfo> listAgents() {
List<AgentInfo> agents = new ArrayList<>();
List<String> keys = kvClient.getKeys("mft/agents/info");
List<String> keys = kvClient.getKeys(AGENTS_INFO_PATH);
for (String key : keys) {
Optional<AgentInfo> agentInfo = getAgentInfo(key.substring(key.lastIndexOf("/") + 1));
agentInfo.ifPresent(agents::add);
}
return agents;
}

/**
* Get the {@link AgentInfo} for a given agent id
* @param agentId Agent Id
* @return AgentInfo if such agent is available
*/
public Optional<AgentInfo> getAgentInfo(String agentId) {
Optional<Value> value = kvClient.getValue("mft/agents/info/" + agentId);
Optional<Value> value = kvClient.getValue(AGENTS_INFO_PATH + agentId);
if (value.isPresent()) {
Value absVal = value.get();
if (absVal.getValue().isPresent()) {
String asStr = absVal.getValueAsString().get();
try {
return Optional.of(mapper.readValue(asStr, AgentInfo.class));
} catch (IOException e) {
e.printStackTrace();
logger.error("Errored while fetching agent {} info", agentId, e);
}
}
}
return Optional.empty();
}

public void registerAgent(AgentInfo agentInfo) throws MFTAdminException {
/**
* Agents are supposed to register themselves in MFT using this method
*
* @param agentInfo {@link AgentInfo} of the source Agents
* @throws MFTConsulClientException If {@link AgentInfo} can not be saved in consul store
*/
public void registerAgent(AgentInfo agentInfo) throws MFTConsulClientException {
try {
String asString = mapper.writeValueAsString(agentInfo);
kvClient.putValue("mft/agents/info/" + agentInfo.getId(), asString);
kvClient.putValue(AGENTS_INFO_PATH + agentInfo.getId(), asString);
} catch (JsonProcessingException e) {
throw new MFTAdminException("Error in serializing agent information", e);
throw new MFTConsulClientException("Error in serializing agent information", e);
}
}

public List<String> getLiveAgentIds() throws MFTAdminException {
/**
* List all currently live agents
*
* @return A list of live agent ids
* @throws MFTConsulClientException If live agents can not be fetched from consul store
*/
public List<String> getLiveAgentIds() throws MFTConsulClientException {
try {
List<String> keys = kvClient.getKeys("mft/agent/live/");
List<String> keys = kvClient.getKeys(LIVE_AGENTS_PATH);
return keys.stream().map(key -> key.substring(key.lastIndexOf("/") + 1)).collect(Collectors.toList());
} catch (ConsulException e) {
if (e.getCode() == 404) {
return Collections.emptyList();
}
throw new MFTAdminException("Error in fetching live agents", e);
throw new MFTConsulClientException("Error in fetching live agents", e);
} catch (Exception e) {
throw new MFTAdminException("Error in fetching live agents", e);
throw new MFTConsulClientException("Error in fetching live agents", e);
}
}

public Optional<TransferState> getTransferState(String transferId) throws MFTAdminException {
/**
* Agents should call this method to submit {@link TransferState}. These status are received by the controller and reorder
* status messages and put in the final status array.
*
* @param transferId
* @param agentId
* @param transferState
* @throws MFTConsulClientException
*/
public void submitTransferStateToProcess(String transferId, String agentId, TransferState transferState) throws MFTConsulClientException {
try {
kvClient.putValue(CONTROLLER_STATE_MESSAGE_PATH + transferId + "/" + agentId + "/" + transferState.getUpdateTimeMils(),
mapper.writeValueAsString(transferState));
} catch (Exception e) {
logger.error("Error in submitting transfer status to process for transfer {} and agent {}", transferId, agentId, e);
throw new MFTConsulClientException("Error in submitting transfer status", e);
}
}

/**
* Add the {@link TransferState} to the aggregated state array. This method should only be called by the
* Controller and API server once the transfer is accepted. Agents should NEVER call this method as it would corrupt
* state array when multiple clients are writing at the same time
*
* @param transferId
* @param transferState
* @throws MFTConsulClientException
*/
public void saveTransferState(String transferId, TransferState transferState) throws MFTConsulClientException {
try {
List<TransferState> allStates = getTransferStates(transferId);
// TODO implement sequence consistency
allStates.add(transferState);
String asStr = mapper.writeValueAsString(allStates);
kvClient.putValue(TRANSFER_STATE_PATH + transferId, asStr);

logger.info("Saved transfer status " + asStr);

} catch (Exception e) {
throw new MFTConsulClientException("Error in serializing transfer status", e);
}
}

/**
* Get the latest {@link TransferState} for given transfer id
*
* @param transferId Transfer Id
* @return Optional {@link TransferState } is there is any
* @throws MFTConsulClientException
*/
public Optional<TransferState> getTransferState(String transferId) throws MFTConsulClientException {

try {
List<TransferState> states = getTransferStates(transferId);
@@ -161,29 +250,21 @@ public Optional<TransferState> getTransferState(String transferId) throws MFTAdm
return lastStatusOp;

} catch (ConsulException e) {
throw new MFTAdminException("Error in fetching transfer status " + transferId, e);
} catch (Exception e) {
throw new MFTAdminException("Error in fetching transfer status " + transferId, e);
}
}

public void submitTransferState(String transferId, TransferState transferState) throws MFTAdminException {
try {
List<TransferState> allStates = getTransferStates(transferId);
System.out.println(allStates);
allStates.add(transferState);
String asStr = mapper.writeValueAsString(allStates);
kvClient.putValue("mft/transfer/state/" + transferId, asStr);

logger.info("Saved transfer status " + asStr);

throw new MFTConsulClientException("Error in fetching transfer status " + transferId, e);
} catch (Exception e) {
throw new MFTAdminException("Error in serializing transfer status", e);
throw new MFTConsulClientException("Error in fetching transfer status " + transferId, e);
}
}

/**
* Provide all {@link TransferState} for given transfer id
*
* @param transferId Transfer Id
* @return The list of all {@link TransferState}
* @throws IOException
*/
public List<TransferState> getTransferStates(String transferId) throws IOException {
Optional<Value> valueOp = kvClient.getValue("mft/transfer/state/" + transferId);
Optional<Value> valueOp = kvClient.getValue(TRANSFER_STATE_PATH + transferId);
List<TransferState> allStates;
if (valueOp.isPresent()) {
String prevStates = valueOp.get().getValueAsString().get();
@@ -194,7 +275,7 @@ public List<TransferState> getTransferStates(String transferId) throws IOExcepti
return allStates;
}

public List<AgentInfo> getLiveAgentInfos() throws MFTAdminException {
public List<AgentInfo> getLiveAgentInfos() throws MFTConsulClientException {
List<String> liveAgentIds = getLiveAgentIds();
return liveAgentIds.stream().map(id -> getAgentInfo(id).get()).collect(Collectors.toList());
}
@@ -17,24 +17,24 @@

package org.apache.airavata.mft.admin;

public class MFTAdminException extends Exception {
public MFTAdminException() {
public class MFTConsulClientException extends Exception {
public MFTConsulClientException() {
super();
}

public MFTAdminException(String message) {
public MFTConsulClientException(String message) {
super(message);
}

public MFTAdminException(String message, Throwable cause) {
public MFTConsulClientException(String message, Throwable cause) {
super(message, cause);
}

public MFTAdminException(Throwable cause) {
public MFTConsulClientException(Throwable cause) {
super(cause);
}

protected MFTAdminException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
protected MFTConsulClientException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
@@ -19,6 +19,7 @@

public class TransferState {
private String state;
private String publisher;
private long updateTimeMils;
private double percentage;
private String description;
@@ -58,4 +59,13 @@ public TransferState setDescription(String description) {
this.description = description;
return this;
}

public String getPublisher() {
return publisher;
}

public TransferState setPublisher(String publisher) {
this.publisher = publisher;
return this;
}
}
@@ -43,6 +43,11 @@
<artifactId>mft-local-transport</artifactId>
<version>0.01-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
<artifactId>mft-s3-transport</artifactId>
<version>0.01-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
<artifactId>mft-admin</artifactId>

0 comments on commit d353ab0

Please sign in to comment.