Skip to content
Permalink
Browse files
[NO ISSUE][HYR] += ability to bypass work queue for high priority app…
… messages

- use said ability for active stats responses

Change-Id: I5d4747e08a380a585d0c4a9312873ea39b80abbf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10964
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
Contrib: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
  • Loading branch information
mblow committed Apr 8, 2021
1 parent 9ba7487 commit 1f15c812c692ece6a32eeea116fa6140b21030f1
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 10 deletions.
@@ -153,8 +153,8 @@ private void requestStats(ActiveStatsRequestMessage message) throws HyracksDataE
String stats = runtime.getStats();
LOGGER.debug("Sending stats response for {} ", runtimeId);
ActiveStatsResponse response = new ActiveStatsResponse(reqId, stats, null);
((NodeControllerService) serviceCtx.getControllerService()).sendApplicationMessageToCC(message.getCcId(),
JavaSerializationUtils.serialize(response), null);
((NodeControllerService) serviceCtx.getControllerService()).sendRealTimeApplicationMessageToCC(
message.getCcId(), JavaSerializationUtils.serialize(response), null);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -129,8 +129,16 @@ public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid,
break;
case SEND_APPLICATION_MESSAGE:
CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
ccs.getWorkQueue().schedule(
new ApplicationMessageWork(ccs, rsf.getMessage(), rsf.getDeploymentId(), rsf.getNodeId()));
ApplicationMessageWork work =
new ApplicationMessageWork(ccs, rsf.getMessage(), rsf.getDeploymentId(), rsf.getNodeId());
if (rsf.isRealTime()) {
final ExecutorService executor = ccs.getExecutor();
if (executor != null) {
executor.execute(work);
}
} else {
ccs.getWorkQueue().schedule(work);
}
break;
case GET_NODE_CONTROLLERS_INFO:
ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs.getNodeManager(),
@@ -66,6 +66,8 @@ void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Ex

void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;

void sendRealTimeApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;

void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, IResultMetadata metadata, boolean emptyResult,
int partition, int nPartitions, NetworkAddress networkAddress) throws Exception;

@@ -127,9 +127,10 @@ public enum FunctionId {
}

public static class SendApplicationMessageFunction extends Function {
private static final long serialVersionUID = 1L;
private byte[] serializedMessage;
private DeploymentId deploymentId;
private static final long serialVersionUID = 2L;
private final byte[] serializedMessage;
private final DeploymentId deploymentId;
private final boolean realTime;
private String nodeId;

public DeploymentId getDeploymentId() {
@@ -148,9 +149,14 @@ public byte[] getMessage() {
return serializedMessage;
}

public SendApplicationMessageFunction(byte[] data, DeploymentId deploymentId, String nodeId) {
public boolean isRealTime() {
return realTime;
}

public SendApplicationMessageFunction(byte[] data, DeploymentId deploymentId, boolean realTime, String nodeId) {
this.serializedMessage = data;
this.deploymentId = deploymentId;
this.realTime = realTime;
this.nodeId = nodeId;
}

@@ -128,7 +128,14 @@ public void registerPartitionRequest(PartitionRequest partitionRequest) throws E

@Override
public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, nodeId);
SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, false, nodeId);
ipcHandle.send(-1, fn, null);
}

@Override
public void sendRealTimeApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId)
throws Exception {
SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, true, nodeId);
ipcHandle.send(-1, fn, null);
}

@@ -129,7 +129,7 @@ public void shutdown(boolean terminateNCService) throws Exception {

@Override
public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, nodeId);
SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, false, nodeId);
ipcHandle.send(-1, fn, null);
}

@@ -633,6 +633,10 @@ public void sendApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId depl
getClusterController(ccId).sendApplicationMessageToCC(data, deploymentId, id);
}

public void sendRealTimeApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId) throws Exception {
getClusterController(ccId).sendRealTimeApplicationMessageToCC(data, deploymentId, id);
}

public IResultPartitionManager getResultPartitionManager() {
return resultPartitionManager;
}

0 comments on commit 1f15c81

Please sign in to comment.