Skip to content
Permalink
Browse files
Proper logging and refactoring for MFTController
  • Loading branch information
DImuthuUpe committed Apr 4, 2020
1 parent f0d88b8 commit 58efd2db8b50a0505255a7ad996a57f467e8ab09
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 85 deletions.
@@ -56,6 +56,14 @@ public class MFTConsulClient {
private SessionClient sessionClient;
private ObjectMapper mapper = new ObjectMapper();

public static final String TRANSFER_STATE_PATH = "mft/transfer/state/";
public static final String CONTROLLER_MESSAGE_TRANSFER_PATH = "mft/controller/messages/transfers/";
public static final String AGENTS_MESSAGE_PATH = "mft/agents/messages/";
public static final String AGENTS_INFO_PATH = "mft/agents/info/";
public static final String LIVE_AGENTS_PATH = "mft/agent/live/";
public static final String TRANSFER_PROCESSED_PATH = "mft/transfer/processed/";
public static final String TRANSFER_PENDING_PATH = "mft/transfer/pending/";

public MFTConsulClient(Map<String, Integer> consulHostPorts) {
List<HostAndPort> hostAndPorts = consulHostPorts.entrySet().stream()
.map(entry -> fromParts(entry.getKey(), entry.getValue()))
@@ -75,7 +83,7 @@ public String submitTransfer(TransferRequest transferRequest) throws MFTAdminExc
try {
String asStr = mapper.writeValueAsString(transferRequest);
String transferId = UUID.randomUUID().toString();
kvClient.putValue("mft/controller/messages/" + transferId, asStr);
kvClient.putValue(CONTROLLER_MESSAGE_TRANSFER_PATH + transferId, asStr);
return transferId;
} catch (JsonProcessingException e) {
throw new MFTAdminException("Error in serializing transfer request", e);
@@ -90,15 +98,15 @@ public void commandTransferToAgent(String agentId, TransferCommand transferComma
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Initializing the transfer"));
String asString = mapper.writeValueAsString(transferCommand);
kvClient.putValue("mft/agents/messages/" + agentId + "/" + transferCommand.getTransferId(), asString);
kvClient.putValue(AGENTS_MESSAGE_PATH + agentId + "/" + transferCommand.getTransferId(), asString);
} catch (JsonProcessingException e) {
throw new MFTAdminException("Error in serializing transfer request", e);
}
}

public List<AgentInfo> listAgents() {
List<AgentInfo> agents = new ArrayList<>();
List<String> keys = kvClient.getKeys("mft/agents/info");
List<String> keys = kvClient.getKeys(AGENTS_INFO_PATH);
for (String key : keys) {
Optional<AgentInfo> agentInfo = getAgentInfo(key.substring(key.lastIndexOf("/") + 1));
agentInfo.ifPresent(agents::add);
@@ -107,15 +115,15 @@ public List<AgentInfo> listAgents() {
}

public Optional<AgentInfo> getAgentInfo(String agentId) {
Optional<Value> value = kvClient.getValue("mft/agents/info/" + agentId);
Optional<Value> value = kvClient.getValue(AGENTS_INFO_PATH + agentId);
if (value.isPresent()) {
Value absVal = value.get();
if (absVal.getValue().isPresent()) {
String asStr = absVal.getValueAsString().get();
try {
return Optional.of(mapper.readValue(asStr, AgentInfo.class));
} catch (IOException e) {
e.printStackTrace();
logger.error("Errored while fetching agent {} info", agentId, e);
}
}
}
@@ -125,15 +133,15 @@ public Optional<AgentInfo> getAgentInfo(String agentId) {
public void registerAgent(AgentInfo agentInfo) throws MFTAdminException {
try {
String asString = mapper.writeValueAsString(agentInfo);
kvClient.putValue("mft/agents/info/" + agentInfo.getId(), asString);
kvClient.putValue(AGENTS_INFO_PATH + agentInfo.getId(), asString);
} catch (JsonProcessingException e) {
throw new MFTAdminException("Error in serializing agent information", e);
}
}

public List<String> getLiveAgentIds() throws MFTAdminException {
try {
List<String> keys = kvClient.getKeys("mft/agent/live/");
List<String> keys = kvClient.getKeys(LIVE_AGENTS_PATH);
return keys.stream().map(key -> key.substring(key.lastIndexOf("/") + 1)).collect(Collectors.toList());
} catch (ConsulException e) {
if (e.getCode() == 404) {
@@ -167,13 +175,15 @@ public Optional<TransferState> getTransferState(String transferId) throws MFTAdm
}
}

public void submitTransferStateToProcess(String transferId, String agentId, TransferState transferState) throws MFTAdminException {

}
public void submitTransferState(String transferId, TransferState transferState) throws MFTAdminException {
try {
List<TransferState> allStates = getTransferStates(transferId);
System.out.println(allStates);
allStates.add(transferState);
String asStr = mapper.writeValueAsString(allStates);
kvClient.putValue("mft/transfer/state/" + transferId, asStr);
kvClient.putValue(TRANSFER_STATE_PATH + transferId, asStr);

logger.info("Saved transfer status " + asStr);

@@ -183,7 +193,7 @@ public void submitTransferState(String transferId, TransferState transferState)
}

public List<TransferState> getTransferStates(String transferId) throws IOException {
Optional<Value> valueOp = kvClient.getValue("mft/transfer/state/" + transferId);
Optional<Value> valueOp = kvClient.getValue(TRANSFER_STATE_PATH + transferId);
List<TransferState> allStates;
if (valueOp.isPresent()) {
String prevStates = valueOp.get().getValueAsString().get();
@@ -20,10 +20,10 @@ agent.id=agent0
agent.host=localhost
agent.user=dimuthu
agent.supported.protocols=SCP,LOCAL
consul.host=149.165.156.124
consul.host=localhost
consul.port=8500

resource.service.host=149.165.156.124
resource.service.host=localhost
resource.service.port=7002
secret.service.host=149.165.156.124
secret.service.host=localhost
secret.service.port=7003
@@ -26,7 +26,6 @@
import org.apache.airavata.mft.admin.MFTAdminException;
import org.apache.airavata.mft.admin.models.TransferCommand;
import org.apache.airavata.mft.admin.models.TransferRequest;
import org.dozer.DozerBeanMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -36,6 +35,8 @@
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.ComponentScan;

import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
@@ -54,86 +55,100 @@ public class MFTController implements CommandLineRunner {

private KVCache messageCache;
private KVCache stateCache;
private ConsulCache.Listener<String, Value> messageCacheListener;
private ConsulCache.Listener<String, Value> stateCacheListener;
private ScheduledExecutorService pendingMonitor;

@Autowired
private MFTConsulClient mftConsulClient;

private ObjectMapper jsonMapper = new ObjectMapper();
private DozerBeanMapper dozerBeanMapper = new DozerBeanMapper();


public void init() {
messageCache = KVCache.newCache(mftConsulClient.getKvClient(), "mft/controller/messages");
stateCache = KVCache.newCache(mftConsulClient.getKvClient(), "mft/transfer/state");
logger.info("Initializing the Controller");
messageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.CONTROLLER_MESSAGE_TRANSFER_PATH);
stateCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.TRANSFER_STATE_PATH);
pendingMonitor = Executors.newSingleThreadScheduledExecutor();

pendingMonitor.scheduleWithFixedDelay(this::processPending, 2000, 4000, TimeUnit.MILLISECONDS);
logger.info("Controller initialized");
}

@PreDestroy
public void destroy() {
logger.info("Destroying the Controller");
try {
if (this.pendingMonitor != null) {
this.pendingMonitor.shutdown();
}
} catch (Exception e) {
logger.warn("Errored while shutting down the pending monitor", e);
}
logger.info("Controller destroyed");
}

/**
* Accepts transfer requests coming from the API and put it into the pending queue
*/
private void acceptRequests() {
messageCacheListener = newValues -> {
newValues.forEach((key, value) -> {
String transferId = key.substring(key.lastIndexOf("/") + 1);
Optional<String> decodedValue = value.getValueAsString();
decodedValue.ifPresent(v -> {
logger.info("Value is: {}", v);
try {
TransferRequest transferRequest = jsonMapper.readValue(v, TransferRequest.class);
String selectedAgent = selectAgent(transferRequest);

if (selectedAgent != null) {
logger.info("Found agent {} to initiate the transfer {}", selectedAgent, transferId);
TransferCommand transferCommand = convertRequestToCommand(transferId, transferRequest);
mftConsulClient.commandTransferToAgent(selectedAgent, transferCommand);
markAsProcessed(transferId, transferRequest);
logger.info("Marked transfer {} as processed", transferId);
} else {
markAsPending(transferId, transferRequest);
logger.info("Marked transfer {} as pending", transferId);
}
} catch (Exception e) {
logger.error("Failed to process the request", e);
} finally {
logger.info("Deleting key " + value.getKey());
mftConsulClient.getKvClient().deleteKey(value.getKey()); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
}
});
// Due to bug in consul https://github.com/hashicorp/consul/issues/571
ConsulCache.Listener<String, Value> messageCacheListener = newValues -> newValues.forEach((key, value) -> {
String transferId = key.substring(key.lastIndexOf("/") + 1);
Optional<String> decodedValue = value.getValueAsString();
decodedValue.ifPresent(v -> {
logger.info("Received transfer request : {} with id {}", v, transferId);

TransferRequest transferRequest;
try {
transferRequest = jsonMapper.readValue(v, TransferRequest.class);
} catch (IOException e) {
logger.error("Failed to parse the transfer request {}", v, e);
return;
}

try {
markAsPending(transferId, transferRequest);
logger.info("Marked transfer {} as pending", transferId);

} catch (Exception e) {
logger.error("Failed to store transfer request {}", transferId, e);

} finally {
logger.info("Deleting key " + value.getKey());
mftConsulClient.getKvClient().deleteKey(value.getKey()); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
}
});
};
});
messageCache.addListener(messageCacheListener);
messageCache.start();
}

private void acceptStates() {
stateCacheListener = newValues -> {
newValues.forEach((key, value) -> {
try {
if (value.getValueAsString().isPresent()) {
String asStr = value.getValueAsString().get();
//logger.info("Received state Key {} val {}", key, asStr);
//logger.info("Deleting key " + value.getKey());
//kvClient.deleteKey(value.getKey()); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
ConsulCache.Listener<String, Value> stateCacheListener = newValues -> newValues.forEach((key, value) -> {
try {
if (value.getValueAsString().isPresent()) {
String asStr = value.getValueAsString().get();

//logger.info("Received state Key {} val {}", key, asStr);
}
} catch (Exception e) {
logger.error("Error while processing the state message", e);
} finally {
//logger.info("Deleting key " + value.getKey());
//kvClient.deleteKey(value.getKey()); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
//logger.info("Received state Key {} val {}", key, asStr);
}
});
};
} catch (Exception e) {
logger.error("Error while processing the state message", e);
} finally {
//logger.info("Deleting key " + value.getKey());
//kvClient.deleteKey(value.getKey()); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
}
});
//stateCache.addListener(stateCacheListener);
//stateCache.start();
}

private void markAsProcessed(String transferId, TransferRequest transferRequest) throws JsonProcessingException {
mftConsulClient.getKvClient().putValue("mft/transfer/processed/" +transferId, jsonMapper.writeValueAsString(transferRequest));
mftConsulClient.getKvClient().putValue(MFTConsulClient.TRANSFER_PROCESSED_PATH +transferId, jsonMapper.writeValueAsString(transferRequest));
}

private void markAsPending(String transferId, TransferRequest transferRequest) throws JsonProcessingException {
mftConsulClient.getKvClient().putValue("mft/transfer/pending/" +transferId, jsonMapper.writeValueAsString(transferRequest));
mftConsulClient.getKvClient().putValue(MFTConsulClient.TRANSFER_PENDING_PATH +transferId, jsonMapper.writeValueAsString(transferRequest));
}

private TransferCommand convertRequestToCommand(String transferId, TransferRequest transferRequest) {
@@ -152,12 +167,18 @@ private TransferCommand convertRequestToCommand(String transferId, TransferReque
return transferCommand;
}

private String selectAgent(TransferRequest transferRequest) throws ControllerException, MFTAdminException {
private Optional<String> selectAgent(String transferId, TransferRequest transferRequest) throws MFTAdminException {

List<String> liveAgentIds = mftConsulClient.getLiveAgentIds();
if (liveAgentIds.isEmpty()) {
logger.error("Live agents are not available. Skipping for now");
return null;
return Optional.empty();
}

if (logger.isDebugEnabled()) {
logger.debug("Processing transfer request {} with target agents {}", transferId, transferRequest.getTargetAgents());
logger.debug("Printing live agents");
liveAgentIds.forEach(a -> logger.debug("Agent {} is live", a));
}

String selectedAgent = null;
@@ -173,33 +194,41 @@ private String selectAgent(TransferRequest transferRequest) throws ControllerExc

if (selectedAgent == null) {
logger.warn("Couldn't find an Agent that meet transfer requirements");
return Optional.empty();
}

return selectedAgent;
return Optional.of(selectedAgent);
}

/**
* Fetch pending transfer requests and check for available agents. If an agent is found, forwards the transfer request
* to that agent and mark transfer state as processed.
*/
private void processPending() {
List<Value> values = mftConsulClient.getKvClient().getValues("mft/transfer/pending");
List<Value> values = mftConsulClient.getKvClient().getValues(MFTConsulClient.TRANSFER_PENDING_PATH);
logger.debug("Scanning pending transfers");

values.forEach(value -> {
logger.debug("Pending " + value.getKey() + " : " + value.getValueAsString().get());
try {
TransferRequest transferRequest = jsonMapper.readValue(value.getValueAsString().get(), TransferRequest.class);
String transferId = value.getKey().substring(value.getKey().lastIndexOf("/") + 1);
String agent = selectAgent(transferRequest);

if (agent != null) {
logger.info("Found agent {} to initiate the transfer {}", agent, transferId);
TransferCommand transferCommand = convertRequestToCommand(transferId, transferRequest);

mftConsulClient.commandTransferToAgent(agent, transferCommand);
markAsProcessed(transferId, transferRequest);
mftConsulClient.getKvClient().deleteKey(value.getKey());
logger.info("Marked transfer {} as processed", transferId);

if (value.getValueAsString().isPresent()) {
logger.debug("Pending " + value.getKey() + " : " + value.getValueAsString().get());
try {
TransferRequest transferRequest = jsonMapper.readValue(value.getValueAsString().get(), TransferRequest.class);
String transferId = value.getKey().substring(value.getKey().lastIndexOf("/") + 1);
Optional<String> agent = selectAgent(transferId, transferRequest);

if (agent.isPresent()) {
logger.info("Found agent {} to initiate the transfer {}", agent, transferId);
TransferCommand transferCommand = convertRequestToCommand(transferId, transferRequest);

mftConsulClient.commandTransferToAgent(agent.get(), transferCommand);
markAsProcessed(transferId, transferRequest);
mftConsulClient.getKvClient().deleteKey(value.getKey());
logger.info("Marked transfer {} as processed", transferId);
}
} catch (Exception e) {
logger.error("Failed to process pending transfer in key {}", value.getKey(), e);
}
} catch (Exception e) {
logger.error("Failed to process pending transfer in key {}", value.getKey(), e);
}
});
}
@@ -14,5 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
consul.host=149.165.156.124
consul.host=localhost
consul.port=8500

0 comments on commit 58efd2d

Please sign in to comment.