Skip to content
Permalink
Browse files
Handling transfers in case of agent and controller restarts
  • Loading branch information
DImuthuUpe committed Apr 8, 2020
1 parent 402587f commit 634bab9c478ca7a016a4ed1f8a49d20e9401880e
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 11 deletions.
@@ -58,6 +58,7 @@ public class MFTConsulClient {
public static final String CONTROLLER_TRANSFER_MESSAGE_PATH = "mft/controller/messages/transfers/";
public static final String CONTROLLER_STATE_MESSAGE_PATH = "mft/controller/messages/states/";
public static final String AGENTS_MESSAGE_PATH = "mft/agents/messages/";
public static final String AGENTS_SCHEDULED_PATH = "mft/agents/scheduled/";
public static final String AGENTS_INFO_PATH = "mft/agents/info/";
public static final String LIVE_AGENTS_PATH = "mft/agent/live/";
public static final String TRANSFER_PROCESSED_PATH = "mft/transfer/processed/";
@@ -87,14 +87,15 @@ public class MFTAgent implements CommandLineRunner {
private final ScheduledExecutorService sessionRenewPool = Executors.newSingleThreadScheduledExecutor();
private long sessionRenewSeconds = 4;
private long sessionTTLSeconds = 10;
private String session;

private ObjectMapper mapper = new ObjectMapper();

@Autowired
private MFTConsulClient mftConsulClient;

public void init() {
messageCache = KVCache.newCache(mftConsulClient.getKvClient(), "mft/agents/messages/" + agentId );
messageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_MESSAGE_PATH + agentId );
}

private void acceptRequests() {
@@ -138,16 +139,29 @@ private void acceptRequests() {
.setPublisher(agentId)
.setDescription("Started the transfer"));

String transferId = mediator.transfer(request, inConnector, outConnector, srcMetadataCollector, dstMetadataCollector, (id, st) -> {
try {
mftConsulClient.submitTransferStateToProcess(id, agentId, st.setPublisher(agentId));
} catch (MFTConsulClientException e) {
logger.error("Failed while updating transfer state", e);

String transferId = mediator.transfer(request, inConnector, outConnector, srcMetadataCollector, dstMetadataCollector,
(id, st) -> {
try {
mftConsulClient.submitTransferStateToProcess(id, agentId, st.setPublisher(agentId));
} catch (MFTConsulClientException e) {
logger.error("Failed while updating transfer state", e);
}
},
(id, transferSuccess) -> {
try {
// Delete scheduled key as the transfer completed / failed if it was placed in current session
mftConsulClient.getKvClient().deleteKey(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentId + "/" + session + "/" + id);
} catch (Exception e) {
logger.error("Failed while deleting scheduled path for transfer {}", id);
}
}
});
);

logger.info("Submitted transfer " + transferId);
logger.info("Started the transfer " + transferId);

// Save transfer metadata in scheduled path to recover in case of an Agent failures. Recovery is done from controller
mftConsulClient.getKvClient().putValue(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentId + "/" + session + "/" + transferId, v);
} catch (Exception e) {
if (request != null) {
try {
@@ -185,11 +199,12 @@ private boolean connectAgent() throws MFTConsulClientException {
.ttl(sessionTTLSeconds + "s").build();

final SessionCreatedResponse sessResp = mftConsulClient.getSessionClient().createSession(session);
final String lockPath = "mft/agent/live/" + agentId;
final String lockPath = MFTConsulClient.LIVE_AGENTS_PATH + agentId;

boolean acquired = mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());

if (acquired) {
this.session = sessResp.getId();
sessionRenewPool.scheduleAtFixedRate(() -> {
try {
mftConsulClient.getSessionClient().renewSession(sessResp.getId());
@@ -255,7 +270,7 @@ public void start() throws Exception {
while (!connected) {
connected = connectAgent();
if (connected) {
logger.info("Successfully connected to consul");
logger.info("Successfully connected to consul with session id {}", session);
} else {
logger.info("Retrying to connect to consul");
Thread.sleep(5000);
@@ -265,6 +280,7 @@ public void start() throws Exception {
}
}
}

acceptRequests();
}

@@ -49,7 +49,8 @@ public void destroy() {
}

public String transfer(TransferCommand command, Connector inConnector, Connector outConnector, MetadataCollector srcMetadataCollector,
MetadataCollector destMetadataCollector, BiConsumer<String, TransferState> onStatusCallback) throws Exception {
MetadataCollector destMetadataCollector, BiConsumer<String, TransferState> onStatusCallback,
BiConsumer<String, Boolean> exitingCallback) throws Exception {

ResourceMetadata srcMetadata = srcMetadataCollector.getGetResourceMetadata(command.getSourceId(), command.getSourceToken());

@@ -76,6 +77,8 @@ public String transfer(TransferCommand command, Connector inConnector, Connector
futureList.add(completionService.submit(sendTask));

final AtomicBoolean transferInProgress = new AtomicBoolean(true);
final AtomicBoolean transferSuccess = new AtomicBoolean(true);


// Monitoring the completeness of the transfer
Thread monitorThread = new Thread(new Runnable() {
@@ -100,6 +103,7 @@ public void run() {
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
transferInProgress.set(false);
transferSuccess.set(false);
statusLock.unlock();

for (Future<Integer> f : futureList) {
@@ -144,6 +148,7 @@ public void run() {
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer successfully completed"));
transferInProgress.set(false);
transferSuccess.set(true);
statusLock.unlock();

logger.info("Transfer {} completed. Speed {} MB/s", command.getTransferId(),
@@ -158,13 +163,15 @@ public void run() {
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
transferInProgress.set(false);
transferSuccess.set(false);
statusLock.unlock();

logger.error("Transfer {} failed", command.getTransferId(), e);
} finally {
inConnector.destroy();
outConnector.destroy();
transferInProgress.set(false);
exitingCallback.accept(command.getTransferId(),transferSuccess.get());
}
}
});
@@ -57,6 +57,7 @@ public class MFTController implements CommandLineRunner {

private KVCache messageCache;
private KVCache stateCache;
private KVCache liveAgentCache;
private ScheduledExecutorService pendingMonitor;

@Autowired
@@ -68,6 +69,7 @@ public void init() {
logger.info("Initializing the Controller");
messageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.CONTROLLER_TRANSFER_MESSAGE_PATH);
stateCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.CONTROLLER_STATE_MESSAGE_PATH);
liveAgentCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.LIVE_AGENTS_PATH);
pendingMonitor = Executors.newSingleThreadScheduledExecutor();

pendingMonitor.scheduleWithFixedDelay(this::processPending, 2000, 4000, TimeUnit.MILLISECONDS);
@@ -154,6 +156,61 @@ private void acceptStates() {
stateCache.start();
}

private void acceptLiveAgents() {
ConsulCache.Listener<String, Value> liveAgentCacheListener = newValues -> newValues.forEach((agentId, value) -> {
try {
Optional<String> session = mftConsulClient.getKvClient().getSession(value.getKey());
if (session.isPresent()) {
String sessionId = session.get();
logger.info("Agent connected in path {} agent id {} session {}", value.getKey(), agentId, sessionId);

List<Value> scheduledTransfers = mftConsulClient.getKvClient().getValues(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentId);
for (Value v: scheduledTransfers) {
logger.info("Found scheduled key {}", v.getKey());

try {
// Key = AGENTS_SCHEDULED_PATH agent id / session id / transfer id
String[] parts = v.getKey().split("/");
// Make sure right amount of data is available
if (parts.length == MFTConsulClient.AGENTS_SCHEDULED_PATH.split("/").length + 3) {

String scheduledSession = parts[parts.length - 2];
String scheduledTransfer = parts[parts.length - 1];

logger.info("Scheduled session {} transfer {}", scheduledSession, scheduledTransfer);

if (!scheduledSession.equals(sessionId)) {
logger.info("Old transfer session found. Re scheduling to agent {}", agentId);
mftConsulClient.commandTransferToAgent(agentId,
mapper.readValue(v.getValueAsString().get(), TransferCommand.class));

// Delete the key as it is already processed
mftConsulClient.getKvClient().deleteKey(v.getKey());

} else {
logger.info("Session {} is already active so skipping scheduled transfer", scheduledSession);
}

} else {
logger.warn("Invalid schedule key {}", v.getKey());
}

} catch (Exception e) {
logger.error("Failed to process schedule key {}", v.getKey());
}
}
}
} catch (Exception e) {
logger.error("Errored while processing live agent cache key {}", agentId, e);
} finally {

}
});

liveAgentCache.addListener(liveAgentCacheListener);
liveAgentCache.start();
}

private void markAsProcessed(String transferId, TransferRequest transferRequest) throws JsonProcessingException {
mftConsulClient.getKvClient().putValue(MFTConsulClient.TRANSFER_PROCESSED_PATH +transferId, jsonMapper.writeValueAsString(transferRequest));
}
@@ -249,6 +306,7 @@ public void run(String... args) throws Exception {
init();
acceptRequests();
acceptStates();
acceptLiveAgents();
mainHold.acquire();
}

0 comments on commit 634bab9

Please sign in to comment.