Skip to content

Commit

Permalink
[NO ISSUE][NET] Ensure CLOSE Is Not Sent After Channel ERROR
Browse files Browse the repository at this point in the history
- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Currently it is possible to send network channel
  CLOSE command after a channel ERROR was sent. When this
  happens and the channel was recycled to be reused
  on the receiver side, the CLOSE command will result
  in NPE. There is no need to send a CLOSE command
  after an ERROR command because when an ERROR command
  is received, it is treated as ERROR + CLOSE on the
  receiver side.
- Avoid registering partition requests for failed jobs.

Change-Id: I17a769a46f4d13220adb22dd255e56dc4ccc458d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2954
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
  • Loading branch information
mhubail committed Sep 4, 2018
1 parent 3a68469 commit d237f0c
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 3 deletions.
Expand Up @@ -54,7 +54,7 @@ public void write(IConnectionWriterState writerState) throws NetException {
ecodeSent = true;
ccb.reportLocalEOS();
adjustChannelWritability();
} else if (eos && !eosSent) {
} else if (isPendingCloseWrite()) {
writerState.getCommand().setChannelId(channelId);
writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
writerState.getCommand().setData(0);
Expand Down
Expand Up @@ -107,6 +107,7 @@ public synchronized IPartition getPartition(PartitionId pid) {
public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer) {
if (failedJobsCache.getIfPresent(partitionId.getJobId()) != null) {
writer.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
return;
}
List<IPartition> pList = availablePartitionMap.get(partitionId);
if (pList != null && !pList.isEmpty()) {
Expand Down
Expand Up @@ -75,7 +75,7 @@ private boolean computeWritability() {
if (writableDataPresent) {
return credits > 0;
}
if (eos && !eosSent) {
if (isPendingCloseWrite()) {
return true;
}
return ecode.get() == REMOTE_ERROR_CODE && !ecodeSent;
Expand Down Expand Up @@ -116,6 +116,10 @@ public int getCredits() {
return credits;
}

protected boolean isPendingCloseWrite() {
return eos && !eosSent && !ecodeSent;
}

private class CloseableBufferAcceptor implements ICloseableBufferAcceptor {
@Override
public void accept(ByteBuffer buffer) {
Expand Down
Expand Up @@ -58,7 +58,7 @@ public void write(IConnectionWriterState writerState) throws NetException {
ecodeSent = true;
ccb.reportLocalEOS();
adjustChannelWritability();
} else if (eos && !eosSent) {
} else if (isPendingCloseWrite()) {
writerState.getCommand().setChannelId(channelId);
writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
writerState.getCommand().setData(0);
Expand Down

0 comments on commit d237f0c

Please sign in to comment.