Skip to content
Permalink
Browse files
Proper error handling at transfer status delivery and logging improve…
…ments
  • Loading branch information
DImuthuUpe committed Mar 5, 2020
1 parent a5bd69a commit 65a43e25e3f3cf39f2d24c2eb545872f636321d9
Show file tree
Hide file tree
Showing 15 changed files with 209 additions and 83 deletions.
10 README
@@ -1 +1,9 @@
Airavata Managed File Transfer Service and Clients
Airavata Managed File Transfer Service and Clients

Initialization steps

Start consul server - ./consul agent -dev
Start ResourceServiceApplication
Start SecretServiceApplication
Start MftController
Start ApiServiceApplication
@@ -58,7 +58,11 @@ public String submitTransfer(TransferRequest transferRequest) throws MFTAdminExc

public void commandTransferToAgent(String agentId, TransferCommand transferCommand) throws MFTAdminException {
try {
updateTransferState(transferCommand.getTransferId(), new TransferState().setState("INITIALIZING").setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
updateTransferState(transferCommand.getTransferId(), new TransferState()
.setState("INITIALIZING")
.setPercentage(0)
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Initializing the transfer"));
String asString = mapper.writeValueAsString(transferCommand);
kvClient.putValue("mft/agents/messages/" + agentId + "/" + transferCommand.getTransferId(), asString);
} catch (JsonProcessingException e) {
@@ -25,6 +25,7 @@ public class AgentInfo {
private String user;
private boolean sudo;
private List<String> supportedProtocols;
private List<String> localStorages;

public String getId() {
return id;
@@ -70,4 +71,13 @@ public AgentInfo setSupportedProtocols(List<String> supportedProtocols) {
this.supportedProtocols = supportedProtocols;
return this;
}

public List<String> getLocalStorages() {
return localStorages;
}

public AgentInfo setLocalStorages(List<String> localStorages) {
this.localStorages = localStorages;
return this;
}
}
@@ -21,6 +21,7 @@ public class TransferState {
private String state;
private long updateTimeMils;
private double percentage;
private String description;

public String getState() {
return state;
@@ -48,4 +49,13 @@ public TransferState setPercentage(double percentage) {
this.percentage = percentage;
return this;
}

public String getDescription() {
return description;
}

public TransferState setDescription(String description) {
this.description = description;
return this;
}
}
@@ -40,6 +40,7 @@
import org.apache.airavata.mft.transport.scp.SCPMetadataCollector;
import org.apache.airavata.mft.transport.scp.SCPReceiver;
import org.apache.airavata.mft.transport.scp.SCPSender;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
@@ -48,7 +49,9 @@
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.PropertySource;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -100,13 +103,16 @@ private void acceptRequests() {
newValues.values().forEach(value -> {
Optional<String> decodedValue = value.getValueAsString();
decodedValue.ifPresent(v -> {
System.out.println(String.format("Value is: %s", v));
logger.info("Received raw message: {}", v);
TransferCommand request = null;
try {
request = mapper.readValue(v, TransferCommand.class);
logger.info("Received request " + request.getTransferId());
admin.updateTransferState(request.getTransferId(), new TransferState().setState("STARTING")
.setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
admin.updateTransferState(request.getTransferId(), new TransferState()
.setState("STARTING")
.setPercentage(0)
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Starting the transfer"));

Connector inConnector = MFTAgent.this.resolveConnector(request.getSourceType(), "IN");
inConnector.init(request.getSourceId(), request.getSourceToken());
@@ -116,8 +122,11 @@ private void acceptRequests() {
MetadataCollector metadataCollector = MFTAgent.this.resolveMetadataCollector(request.getSourceType());
ResourceMetadata metadata = metadataCollector.getGetResourceMetadata(request.getSourceId(), request.getSourceToken());
logger.debug("File size " + metadata.getResourceSize());
admin.updateTransferState(request.getTransferId(), new TransferState().setState("STARTED")
.setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
admin.updateTransferState(request.getTransferId(), new TransferState()
.setState("STARTED")
.setPercentage(0)
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Started the transfer"));

String transferId = mediator.transfer(request.getTransferId(), inConnector, outConnector, metadata, (id, st) -> {
try {
@@ -130,16 +139,21 @@ private void acceptRequests() {
logger.info("Submitted transfer " + transferId);

} catch (Exception e) {
e.printStackTrace();
if (request != null) {
try {
admin.updateTransferState(request.getTransferId(), new TransferState().setState("FAILED")
.setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
logger.error("Error in submitting transfer {}", request.getTransferId(), e);

admin.updateTransferState(request.getTransferId(), new TransferState()
.setState("FAILED")
.setPercentage(0)
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription(ExceptionUtils.getStackTrace(e)));
} catch (MFTAdminException ex) {
ex.printStackTrace();
logger.warn(ex.getMessage());
// Ignore
}
} else {
logger.error("Unknown error in processing message {}", v, e);
}
} finally {
logger.info("Deleting key " + value.getKey());
@@ -202,7 +216,8 @@ private boolean connectAgent() throws MFTAdminException {
.setId(agentId)
.setHost(agentHost)
.setUser(agentUser)
.setSupportedProtocols(Arrays.asList(supportedProtocols.split(","))));
.setSupportedProtocols(Arrays.asList(supportedProtocols.split(",")))
.setLocalStorages(new ArrayList<>()));
}

logger.info("Acquired lock " + acquired);
@@ -224,9 +239,20 @@ public void stop() {

public void start() throws Exception {
init();
boolean connected = connectAgent();
if (!connected) {
throw new Exception("Failed to connect to the cluster");
boolean connected = false;
int connectionRetries = 0;
while (!connected) {
connected = connectAgent();
if (connected) {
logger.info("Successfully connected to consul");
} else {
logger.info("Retrying to connect to consul");
Thread.sleep(5000);
connectionRetries++;
if (connectionRetries > 10) {
throw new Exception("Failed to connect to the cluster");
}
}
}
acceptRequests();
}
@@ -23,6 +23,9 @@
import org.apache.airavata.mft.core.ResourceMetadata;
import org.apache.airavata.mft.core.TransferTask;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
@@ -33,6 +36,8 @@

public class TransportMediator {

private static final Logger logger = LoggerFactory.getLogger(TransportMediator.class);

private ExecutorService executor = Executors.newFixedThreadPool(10);
private ExecutorService monitor = Executors.newFixedThreadPool(10);

@@ -71,17 +76,21 @@ public void run() {
ft.get();
} catch (InterruptedException e) {
// Interrupted
e.printStackTrace();
logger.error("Transfer task interrupted", e);
} catch (ExecutionException e) {
// Snap, something went wrong in the task! Abort! Abort! Abort!
System.out.println("One task failed with error: " + e.getMessage());
e.printStackTrace();
onCallback.accept(transferId, new TransferState().setPercentage(0).setState("FAILED").setUpdateTimeMils(System.currentTimeMillis()));
logger.error("One task failed with error", e);

onCallback.accept(transferId, new TransferState()
.setPercentage(0)
.setState("FAILED")
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
for (Future<Integer> f : futureList) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
logger.error("Sleep failed", e);
}
f.cancel(true);
}
@@ -92,9 +101,13 @@ public void run() {
long endTime = System.nanoTime();

double time = (endTime - startTime) * 1.0 /1000000000;
onCallback.accept(transferId, new TransferState().setPercentage(100).setState("COMPLETED").setUpdateTimeMils(System.currentTimeMillis()));
System.out.println("Transfer Speed " + (metadata.getResourceSize() * 1.0 / time) / (1024 * 1024) + " MB/s");
System.out.println("Transfer " + transferId + " completed");
onCallback.accept(transferId, new TransferState()
.setPercentage(100)
.setState("COMPLETED")
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer successfully completed"));
logger.info("Transfer Speed " + (metadata.getResourceSize() * 1.0 / time) / (1024 * 1024) + " MB/s");
logger.info("Transfer " + transferId + " completed");
} catch (Exception e) {
e.printStackTrace();
} finally {
@@ -39,6 +39,9 @@ public class TransferStatusEntity {
@Column(name = "PERCENTAGE")
private double percentage;

@Column(name = "DESCRIPTION", columnDefinition = "varchar(2048) default ''")
private String description;

public int getId() {
return id;
}
@@ -83,4 +86,13 @@ public TransferStatusEntity setPercentage(double percentage) {
this.percentage = percentage;
return this;
}

public String getDescription() {
return description;
}

public TransferStatusEntity setDescription(String description) {
this.description = description;
return this;
}
}
@@ -87,10 +87,15 @@ public void getTransferStates(TransferStateApiRequest request, StreamObserver<Tr
public void getTransferState(TransferStateApiRequest request, StreamObserver<TransferStateApiResponse> responseObserver) {
try {
List<TransferStatusEntity> states = statusRepository.getByTransferId(request.getTransferId());
states.stream().findFirst().ifPresent(st -> {
TransferStateApiResponse s = dozerBeanMapper.map(st, TransferStateApiResponse.newBuilder().getClass()).build();

Optional<TransferStatusEntity> firstElement = states.stream().findFirst();
if (firstElement.isPresent()) {
TransferStateApiResponse s = dozerBeanMapper.map(firstElement.get(),
TransferStateApiResponse.newBuilder().getClass()).build();
responseObserver.onNext(s);
});
} else {
responseObserver.onNext(TransferStateApiResponse.getDefaultInstance());
}
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error in fetching transfer state", e);
@@ -33,6 +33,7 @@ message TransferStateApiResponse {
string state = 1;
int64 updateTimeMils = 2;
double percentage = 3;
string description = 4;
}

service MFTApiService {

0 comments on commit 65a43e2

Please sign in to comment.