Skip to content
Permalink
Browse files
improve: edge triggered sendAvailable (#140)
  • Loading branch information
coderzc committed Dec 13, 2021
1 parent e081a0a commit 875bf7fb5a273316c52a860f90016679a20d79c9
Showing 2 changed files with 9 additions and 5 deletions.
@@ -82,7 +82,7 @@ protected void processAckMessage(ChannelHandlerContext ctx,
int ackId = ackMessage.ackId();
assert ackId > AbstractMessage.UNKNOWN_SEQ;
this.session().onRecvAck(ackId);
this.client.checkAndNoticeSendAvailable();
this.client.checkAndNotifySendAvailable();
}

@Override
@@ -92,7 +92,7 @@ protected void processFailMessage(ChannelHandlerContext ctx,
int failId = failMessage.ackId();
if (failId > AbstractMessage.START_SEQ) {
this.session().onRecvAck(failId);
this.client.checkAndNoticeSendAvailable();
this.client.checkAndNotifySendAvailable();
}

super.processFailMessage(ctx, channel, failMessage);
@@ -50,6 +50,7 @@ public class NettyTransportClient implements TransportClient {
private final ClientSession session;
private final long timeoutSyncRequest;
private final long timeoutFinishSession;
private volatile boolean preSendAvailable;

protected NettyTransportClient(Channel channel, ConnectionId connectionId,
NettyClientFactory clientFactory,
@@ -67,6 +68,7 @@ protected NettyTransportClient(Channel channel, ConnectionId connectionId,
this.timeoutSyncRequest = conf.timeoutSyncRequest();
this.timeoutFinishSession = conf.timeoutFinishSession();
this.session = new ClientSession(conf, this.createSendFunction());
this.preSendAvailable = false;
}

public Channel channel() {
@@ -163,10 +165,12 @@ protected boolean checkSendAvailable() {
return !this.session.flowBlocking() && this.channel.isWritable();
}

protected void checkAndNoticeSendAvailable() {
if (this.checkSendAvailable()) {
protected void checkAndNotifySendAvailable() {
boolean sendAvailable = this.checkSendAvailable();
if (sendAvailable && !this.preSendAvailable) {
this.handler.sendAvailable(this.connectionId);
}
this.preSendAvailable = sendAvailable;
}

@Override
@@ -188,7 +192,7 @@ public void close() {
@Override
public void onSuccess(Channel channel, ChannelFuture future) {
super.onSuccess(channel, future);
NettyTransportClient.this.checkAndNoticeSendAvailable();
NettyTransportClient.this.checkAndNotifySendAvailable();
}
}
}

0 comments on commit 875bf7f

Please sign in to comment.