Skip to content

RATIS-1853. When the stream server channelInactive, all requests in the channel.#889

Merged
szetszwo merged 6 commits intoapache:masterfrom
guohao-rosicky:guohao-RATIS-1853-dev
Jul 1, 2023
Merged

RATIS-1853. When the stream server channelInactive, all requests in the channel.#889
szetszwo merged 6 commits intoapache:masterfrom
guohao-rosicky:guohao-RATIS-1853-dev

Conversation

@guohao-rosicky
Copy link
Contributor

What changes were proposed in this pull request?

When the stream server channelInactive, all requests in the channel.

At the same time, there will be multiple stream sessions, using the same channel, for transmission, and when the connection is broken, we should clean up more stream sessions

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/RATIS-1853

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guohao-rosicky , thanks a lot for working on this! Please see the comments inlined. Most of them are cosmetic changes.

}
}

public static class RequestTracker {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call it ChannelMap.

}

public static class RequestTracker {
private final Map<ChannelId, Set<ClientInvocationId>> ref = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the field is only used locally in this class, let's simply call it map.


public void add(ChannelId channelId,
ClientInvocationId clientInvocationId) {
ref.computeIfAbsent(channelId, (e) -> new CopyOnWriteArraySet<>()).add(clientInvocationId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentHashMap is more efficient than CopyOnWriteArraySet. Let's declare it as

    private final Map<ChannelId, Map<ClientInvocationId, ClientInvocationId>> map = new ConcurrentHashMap<>();

and use ConcurrentHashMap.

Comment on lines +246 to +248
public Set<ClientInvocationId> removeAll(ChannelId channelId) {
return ref.remove(channelId);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's simply call it remove.

    Set<ClientInvocationId> remove(ChannelId channelId) {
      return Optional.ofNullable(map.remove(channelId))
          .map(Map::keySet)
          .orElse(Collections.emptySet());
    }


void cleanUpOnChannelInactive(ClientInvocationId key) {
Optional.ofNullable(streams.remove(key)).ifPresent(removed -> removed.getLocal().cleanUp());
void cleanUpOnChannelInactive(ChannelHandlerContext ctx, TimeDuration channelInactiveGracePeriod) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's pass ChannelId instead of the ChannelHandlerContext.

// Delayed memory garbage cleanup
Optional.ofNullable(requestTracker.removeAll(ctx.channel().id())).ifPresent(ids -> {
final String streamIds = Arrays.toString(ids.toArray());
LOG.info("server {} is disconnect, cleanup clientInvocationIds={}", ctx.channel(), streamIds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the same wording.

      LOG.info("Channel {} is inactive, cleanup clientInvocationIds={}", channelId, ids);

Comment on lines +418 to +423
() -> {
for (ClientInvocationId clientInvocationId : ids) {
Optional.ofNullable(streams.remove(clientInvocationId)).ifPresent(removed ->
removed.getLocal().cleanUp());
}
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be easier to read if it is refactored out

  void cleanUp(Set<ClientInvocationId> ids) {
    for (ClientInvocationId clientInvocationId : ids) {
      Optional.ofNullable(streams.remove(clientInvocationId))
          .map(StreamInfo::getLocal)
          .ifPresent(LocalStream::cleanUp);
    }
  }

  void cleanUpOnChannelInactive(ChannelId channelId, TimeDuration channelInactiveGracePeriod) {
    // Delayed memory garbage cleanup
    Optional.ofNullable(channels.remove(channelId)).ifPresent(ids -> {
      LOG.info("Channel {} is inactive, cleanup clientInvocationIds={}", channelId, ids);
      TimeoutExecutor.getInstance().onTimeout(channelInactiveGracePeriod, () -> cleanUp(ids),
          LOG, () -> "Timeout check failed, clientInvocationIds=" + ids);
    });
  }

@guohao-rosicky guohao-rosicky requested a review from szetszwo June 30, 2023 07:19
@guohao-rosicky
Copy link
Contributor Author

Thanks for the review @szetszwo .

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 the change looks good.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants