Skip to content
Permalink
Browse files
Load balancing transfer requests across over agents
  • Loading branch information
DImuthuUpe committed May 17, 2022
1 parent 61f00a2 commit 555fc422372e735658caf002a5e85aa777e0b3c6
Showing 4 changed files with 63 additions and 15 deletions.
@@ -366,6 +366,7 @@ private boolean connectAgent() throws MFTConsulClientException {
.setId(agentId)
.setHost(agentHost)
.setUser(agentUser)
.setSessionId(this.session)
.setSupportedProtocols(Arrays.asList(supportedProtocols.split(",")))
.setLocalStorages(new ArrayList<>()));
}
@@ -207,6 +207,27 @@ public List<String> getLiveAgentIds() throws MFTConsulClientException {
}
}

/**
* Lists all currently processing transfer id for the given agent
*
* @param agentInfo
* @return
* @throws MFTConsulClientException
*/
public List<String> getAgentActiveTransferIds(AgentInfo agentInfo) throws MFTConsulClientException {
try {
List<String> keys = kvClient.getKeys(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentInfo.getId() + "/" + agentInfo.getSessionId());
return keys.stream().map(key -> key.substring(key.lastIndexOf("/") + 1)).collect(Collectors.toList());
} catch (ConsulException e) {
if (e.getCode() == 404) {
return Collections.emptyList();
}
throw new MFTConsulClientException("Error in fetching active transfers for agent " + agentInfo.getId(), e);
} catch (Exception e) {
throw new MFTConsulClientException("Error in fetching active transfers for agent " + agentInfo.getId(), e);
}
}

/**
* Agents should call this method to submit {@link TransferState}. These status are received by the controller and reorder
* status messages and put in the final status array.
@@ -237,12 +258,8 @@ public void submitTransferStateToProcess(String transferId, String agentId, Tran
*/
public void saveTransferState(String transferId, TransferState transferState) throws MFTConsulClientException {
try {
List<TransferState> allStates = getTransferStates(transferId);
// TODO implement sequence consistency
allStates.add(transferState);
String asStr = mapper.writeValueAsString(allStates);
kvClient.putValue(TRANSFER_STATE_PATH + transferId, asStr);

String asStr = mapper.writeValueAsString(transferState);
kvClient.putValue(TRANSFER_STATE_PATH + transferId + "/" + UUID.randomUUID().toString(), asStr);
logger.info("Saved transfer status " + asStr);

} catch (Exception e) {
@@ -287,15 +304,20 @@ public Optional<TransferState> getTransferState(String transferId) throws MFTCon
* @throws IOException
*/
public List<TransferState> getTransferStates(String transferId) throws IOException {
Optional<Value> valueOp = kvClient.getValue(TRANSFER_STATE_PATH + transferId);
List<TransferState> allStates;
if (valueOp.isPresent()) {
String prevStates = valueOp.get().getValueAsString().get();
allStates = new ArrayList<>(Arrays.asList(mapper.readValue(prevStates, TransferState[].class)));
} else {
allStates = new ArrayList<>();
List<String> keys = kvClient.getKeys(TRANSFER_STATE_PATH + transferId);

List<TransferState> allStates = new ArrayList<>();

for (String key: keys) {
Optional<Value> valueOp = kvClient.getValue(key);
String stateAsStr = valueOp.get().getValueAsString().get();
TransferState transferState = mapper.readValue(stateAsStr, TransferState.class);
allStates.add(transferState);
}
return allStates;
List<TransferState> sortedStates = allStates.stream().sorted((o1, o2) ->
(o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) < 0 ? -1 :
(o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) == 0 ? 0 : 1).collect(Collectors.toList());
return sortedStates;
}

public List<AgentInfo> getLiveAgentInfos() throws MFTConsulClientException {
@@ -24,6 +24,7 @@ public class AgentInfo {
private String host;
private String user;
private boolean sudo;
private String sessionId;
private List<String> supportedProtocols;
private List<String> localStorages;

@@ -80,4 +81,13 @@ public AgentInfo setLocalStorages(List<String> localStorages) {
this.localStorages = localStorages;
return this;
}

public String getSessionId() {
return sessionId;
}

public AgentInfo setSessionId(String sessionId) {
this.sessionId = sessionId;
return this;
}
}
@@ -25,6 +25,7 @@
import com.orbitz.consul.model.kv.Value;
import org.apache.airavata.mft.admin.MFTConsulClient;
import org.apache.airavata.mft.admin.MFTConsulClientException;
import org.apache.airavata.mft.admin.models.AgentInfo;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.api.service.TransferApiRequest;
import org.slf4j.Logger;
@@ -40,10 +41,12 @@
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@SpringBootApplication()
@ComponentScan(basePackages = {"org.apache.airavata.mft"})
@@ -247,7 +250,19 @@ private Optional<String> selectAgent(String transferId, TransferApiRequest trans
selectedAgent = possibleAgent.get();
}
} else if (!transferRequest.getAffinityTransfer()){
selectedAgent = liveAgentIds.get(0);
List<Optional<AgentInfo>> agentInfos = liveAgentIds.stream().map(id -> mftConsulClient.getAgentInfo(id)).collect(Collectors.toList());
int transferCount = -1;
for (Optional<AgentInfo> agentInfo : agentInfos) {
if (agentInfo.isPresent()) {
if (transferCount == -1) {
transferCount = mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size();
selectedAgent = agentInfo.get().getId();
} else if (transferCount > mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size()) {
transferCount = mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size();
selectedAgent = agentInfo.get().getId();
}
}
}
}

if (selectedAgent == null) {

0 comments on commit 555fc42

Please sign in to comment.