Skip to content
Permalink
Browse files
Passing TransferStates through controller to avoid corrupted final st…
…ates
  • Loading branch information
DImuthuUpe committed Apr 4, 2020
1 parent 58efd2d commit 8d3df3d4766d7a1c3daa9e3030e5e89901b18765
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 63 deletions.
@@ -18,7 +18,6 @@
package org.apache.airavata.mft.admin;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.net.HostAndPort;
import com.orbitz.consul.Consul;
@@ -36,7 +35,6 @@
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.net.HostAndPort.*;

@@ -57,7 +55,8 @@ public class MFTConsulClient {
private ObjectMapper mapper = new ObjectMapper();

public static final String TRANSFER_STATE_PATH = "mft/transfer/state/";
public static final String CONTROLLER_MESSAGE_TRANSFER_PATH = "mft/controller/messages/transfers/";
public static final String CONTROLLER_TRANSFER_MESSAGE_PATH = "mft/controller/messages/transfers/";
public static final String CONTROLLER_STATE_MESSAGE_PATH = "mft/controller/messages/states/";
public static final String AGENTS_MESSAGE_PATH = "mft/agents/messages/";
public static final String AGENTS_INFO_PATH = "mft/agents/info/";
public static final String LIVE_AGENTS_PATH = "mft/agent/live/";
@@ -79,31 +78,45 @@ public MFTConsulClient(String host, int port) {
this.sessionClient = client.sessionClient();
}

public String submitTransfer(TransferRequest transferRequest) throws MFTAdminException{
public String submitTransfer(TransferRequest transferRequest) throws MFTConsulClientException {
try {
String asStr = mapper.writeValueAsString(transferRequest);
String transferId = UUID.randomUUID().toString();
kvClient.putValue(CONTROLLER_MESSAGE_TRANSFER_PATH + transferId, asStr);
kvClient.putValue(CONTROLLER_TRANSFER_MESSAGE_PATH + transferId, asStr);
return transferId;
} catch (JsonProcessingException e) {
throw new MFTAdminException("Error in serializing transfer request", e);
throw new MFTConsulClientException("Error in serializing transfer request", e);
}
}

public void commandTransferToAgent(String agentId, TransferCommand transferCommand) throws MFTAdminException {
/**
* Submits a {@link TransferCommand} to a target agent
*
* @param agentId Agent Id
* @param transferCommand Target transfer command
* @throws MFTConsulClientException If {@link TransferCommand} can not be delivered to consul store
*/
public void commandTransferToAgent(String agentId, TransferCommand transferCommand) throws MFTConsulClientException {
try {
submitTransferState(transferCommand.getTransferId(), new TransferState()
submitTransferStateToProcess(transferCommand.getTransferId(), "controller", new TransferState()
.setState("INITIALIZING")
.setPercentage(0)
.setUpdateTimeMils(System.currentTimeMillis())
.setPublisher("controller")
.setDescription("Initializing the transfer"));
String asString = mapper.writeValueAsString(transferCommand);
kvClient.putValue(AGENTS_MESSAGE_PATH + agentId + "/" + transferCommand.getTransferId(), asString);

} catch (JsonProcessingException e) {
throw new MFTAdminException("Error in serializing transfer request", e);
throw new MFTConsulClientException("Error in serializing transfer request", e);
}
}

/**
* List all currently registered agents.
*
* @return A list of {@link AgentInfo}
*/
public List<AgentInfo> listAgents() {
List<AgentInfo> agents = new ArrayList<>();
List<String> keys = kvClient.getKeys(AGENTS_INFO_PATH);
@@ -114,6 +127,11 @@ public List<AgentInfo> listAgents() {
return agents;
}

/**
* Get the {@link AgentInfo} for a given agent id
* @param agentId Agent Id
* @return AgentInfo if such agent is available
*/
public Optional<AgentInfo> getAgentInfo(String agentId) {
Optional<Value> value = kvClient.getValue(AGENTS_INFO_PATH + agentId);
if (value.isPresent()) {
@@ -130,30 +148,92 @@ public Optional<AgentInfo> getAgentInfo(String agentId) {
return Optional.empty();
}

public void registerAgent(AgentInfo agentInfo) throws MFTAdminException {
/**
* Agents are supposed to register themselves in MFT using this method
*
* @param agentInfo {@link AgentInfo} of the source Agents
* @throws MFTConsulClientException If {@link AgentInfo} can not be saved in consul store
*/
public void registerAgent(AgentInfo agentInfo) throws MFTConsulClientException {
try {
String asString = mapper.writeValueAsString(agentInfo);
kvClient.putValue(AGENTS_INFO_PATH + agentInfo.getId(), asString);
} catch (JsonProcessingException e) {
throw new MFTAdminException("Error in serializing agent information", e);
throw new MFTConsulClientException("Error in serializing agent information", e);
}
}

public List<String> getLiveAgentIds() throws MFTAdminException {
/**
* List all currently live agents
*
* @return A list of live agent ids
* @throws MFTConsulClientException If live agents can not be fetched from consul store
*/
public List<String> getLiveAgentIds() throws MFTConsulClientException {
try {
List<String> keys = kvClient.getKeys(LIVE_AGENTS_PATH);
return keys.stream().map(key -> key.substring(key.lastIndexOf("/") + 1)).collect(Collectors.toList());
} catch (ConsulException e) {
if (e.getCode() == 404) {
return Collections.emptyList();
}
throw new MFTAdminException("Error in fetching live agents", e);
throw new MFTConsulClientException("Error in fetching live agents", e);
} catch (Exception e) {
throw new MFTConsulClientException("Error in fetching live agents", e);
}
}

/**
* Agents should call this method to submit {@link TransferState}. These status are received by the controller and reorder
* status messages and put in the final status array.
*
* @param transferId
* @param agentId
* @param transferState
* @throws MFTConsulClientException
*/
public void submitTransferStateToProcess(String transferId, String agentId, TransferState transferState) throws MFTConsulClientException {
try {
kvClient.putValue(CONTROLLER_STATE_MESSAGE_PATH + transferId + "/" + agentId + "/" + transferState.getUpdateTimeMils(),
mapper.writeValueAsString(transferState));
} catch (Exception e) {
logger.error("Error in submitting transfer status to process for transfer {} and agent {}", transferId, agentId, e);
throw new MFTConsulClientException("Error in submitting transfer status", e);
}
}

/**
* Add the {@link TransferState} to the aggregated state array. This method should only be called by the
* Controller and API server once the transfer is accepted. Agents should NEVER call this method as it would corrupt
* state array when multiple clients are writing at the same time
*
* @param transferId
* @param transferState
* @throws MFTConsulClientException
*/
public void saveTransferState(String transferId, TransferState transferState) throws MFTConsulClientException {
try {
List<TransferState> allStates = getTransferStates(transferId);
// TODO implement sequence consistency
allStates.add(transferState);
String asStr = mapper.writeValueAsString(allStates);
kvClient.putValue(TRANSFER_STATE_PATH + transferId, asStr);

logger.info("Saved transfer status " + asStr);

} catch (Exception e) {
throw new MFTAdminException("Error in fetching live agents", e);
throw new MFTConsulClientException("Error in serializing transfer status", e);
}
}

public Optional<TransferState> getTransferState(String transferId) throws MFTAdminException {
/**
* Get the latest {@link TransferState} for given transfer id
*
* @param transferId Transfer Id
* @return Optional {@link TransferState } is there is any
* @throws MFTConsulClientException
*/
public Optional<TransferState> getTransferState(String transferId) throws MFTConsulClientException {

try {
List<TransferState> states = getTransferStates(transferId);
@@ -169,29 +249,19 @@ public Optional<TransferState> getTransferState(String transferId) throws MFTAdm
return lastStatusOp;

} catch (ConsulException e) {
throw new MFTAdminException("Error in fetching transfer status " + transferId, e);
} catch (Exception e) {
throw new MFTAdminException("Error in fetching transfer status " + transferId, e);
}
}

public void submitTransferStateToProcess(String transferId, String agentId, TransferState transferState) throws MFTAdminException {

}
public void submitTransferState(String transferId, TransferState transferState) throws MFTAdminException {
try {
List<TransferState> allStates = getTransferStates(transferId);
allStates.add(transferState);
String asStr = mapper.writeValueAsString(allStates);
kvClient.putValue(TRANSFER_STATE_PATH + transferId, asStr);

logger.info("Saved transfer status " + asStr);

throw new MFTConsulClientException("Error in fetching transfer status " + transferId, e);
} catch (Exception e) {
throw new MFTAdminException("Error in serializing transfer status", e);
throw new MFTConsulClientException("Error in fetching transfer status " + transferId, e);
}
}

/**
* Provide all {@link TransferState} for given transfer id
*
* @param transferId Transfer Id
* @return The list of all {@link TransferState}
* @throws IOException
*/
public List<TransferState> getTransferStates(String transferId) throws IOException {
Optional<Value> valueOp = kvClient.getValue(TRANSFER_STATE_PATH + transferId);
List<TransferState> allStates;
@@ -204,7 +274,7 @@ public List<TransferState> getTransferStates(String transferId) throws IOExcepti
return allStates;
}

public List<AgentInfo> getLiveAgentInfos() throws MFTAdminException {
public List<AgentInfo> getLiveAgentInfos() throws MFTConsulClientException {
List<String> liveAgentIds = getLiveAgentIds();
return liveAgentIds.stream().map(id -> getAgentInfo(id).get()).collect(Collectors.toList());
}
@@ -17,24 +17,24 @@

package org.apache.airavata.mft.admin;

public class MFTAdminException extends Exception {
public MFTAdminException() {
public class MFTConsulClientException extends Exception {
public MFTConsulClientException() {
super();
}

public MFTAdminException(String message) {
public MFTConsulClientException(String message) {
super(message);
}

public MFTAdminException(String message, Throwable cause) {
public MFTConsulClientException(String message, Throwable cause) {
super(message, cause);
}

public MFTAdminException(Throwable cause) {
public MFTConsulClientException(Throwable cause) {
super(cause);
}

protected MFTAdminException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
protected MFTConsulClientException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
@@ -19,6 +19,7 @@

public class TransferState {
private String state;
private String publisher;
private long updateTimeMils;
private double percentage;
private String description;
@@ -58,4 +59,13 @@ public TransferState setDescription(String description) {
this.description = description;
return this;
}

public String getPublisher() {
return publisher;
}

public TransferState setPublisher(String publisher) {
this.publisher = publisher;
return this;
}
}
@@ -25,7 +25,7 @@
import com.orbitz.consul.model.session.ImmutableSession;
import com.orbitz.consul.model.session.SessionCreatedResponse;
import org.apache.airavata.mft.admin.MFTConsulClient;
import org.apache.airavata.mft.admin.MFTAdminException;
import org.apache.airavata.mft.admin.MFTConsulClientException;
import org.apache.airavata.mft.admin.models.AgentInfo;
import org.apache.airavata.mft.admin.models.TransferCommand;
import org.apache.airavata.mft.admin.models.TransferState;
@@ -108,10 +108,11 @@ private void acceptRequests() {
try {
request = mapper.readValue(v, TransferCommand.class);
logger.info("Received request " + request.getTransferId());
mftConsulClient.submitTransferState(request.getTransferId(), new TransferState()
mftConsulClient.submitTransferStateToProcess(request.getTransferId(), agentId, new TransferState()
.setState("STARTING")
.setPercentage(0)
.setUpdateTimeMils(System.currentTimeMillis())
.setPublisher(agentId)
.setDescription("Starting the transfer"));

Optional<Connector> inConnectorOpt = ConnectorResolver.resolveConnector(request.getSourceType(), "IN");
@@ -128,16 +129,17 @@ private void acceptRequests() {

ResourceMetadata metadata = metadataCollector.getGetResourceMetadata(request.getSourceId(), request.getSourceToken());
logger.debug("File size " + metadata.getResourceSize());
mftConsulClient.submitTransferState(request.getTransferId(), new TransferState()
mftConsulClient.submitTransferStateToProcess(request.getTransferId(), agentId, new TransferState()
.setState("STARTED")
.setPercentage(0)
.setUpdateTimeMils(System.currentTimeMillis())
.setPublisher(agentId)
.setDescription("Started the transfer"));

String transferId = mediator.transfer(request.getTransferId(), inConnector, outConnector, metadata, (id, st) -> {
try {
mftConsulClient.submitTransferState(id, st);
} catch (MFTAdminException e) {
mftConsulClient.submitTransferStateToProcess(id, agentId, st.setPublisher(agentId));
} catch (MFTConsulClientException e) {
logger.error("Failed while updating transfer state", e);
}
});
@@ -149,12 +151,13 @@ private void acceptRequests() {
try {
logger.error("Error in submitting transfer {}", request.getTransferId(), e);

mftConsulClient.submitTransferState(request.getTransferId(), new TransferState()
mftConsulClient.submitTransferStateToProcess(request.getTransferId(), agentId, new TransferState()
.setState("FAILED")
.setPercentage(0)
.setUpdateTimeMils(System.currentTimeMillis())
.setPublisher(agentId)
.setDescription(ExceptionUtils.getStackTrace(e)));
} catch (MFTAdminException ex) {
} catch (MFTConsulClientException ex) {
logger.warn(ex.getMessage());
// Ignore
}
@@ -173,7 +176,7 @@ private void acceptRequests() {
messageCache.start();
}

private boolean connectAgent() throws MFTAdminException {
private boolean connectAgent() throws MFTConsulClientException {
final ImmutableSession session = ImmutableSession.builder()
.name(agentId)
.behavior("delete")
@@ -65,9 +65,10 @@ public void submitTransfer(TransferApiRequest request, StreamObserver<TransferAp
String transferId = mftConsulClient.submitTransfer(transferRequest);
logger.info("Submitted the transfer request {}", transferId);

mftConsulClient.submitTransferState(transferId, new TransferState()
mftConsulClient.saveTransferState(transferId, new TransferState()
.setUpdateTimeMils(System.currentTimeMillis())
.setState("RECEIVED").setPercentage(0)
.setPublisher("api")
.setDescription("Received transfer job " + transferId));

responseObserver.onNext(TransferApiResponse.newBuilder().setTransferId(transferId).build());

0 comments on commit 8d3df3d

Please sign in to comment.