Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions grpc-client-utils/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {

testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
testImplementation("org.mockito:mockito-core:3.4.4")
testRuntimeOnly("io.grpc:grpc-netty:1.36.0")
}

tasks.test {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.hypertrace.core.grpcutils.client;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GrpcChannelRegistry {
private static final Logger LOG = LoggerFactory.getLogger(GrpcChannelRegistry.class);
private final Map<String, ManagedChannel> channelMap = new ConcurrentHashMap<>();
private volatile boolean isShutdown = false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the integrated shutdown on this one so it didn't bring in more dependencies (the shutdown lifecycle is from), but I can bring it back if that seems more useful. The more I think on it, the more I think I'll optionally add just the future part of it (to avoid the dependency)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed my mind again - the difference of accepting a hook is really just the below anyway:

new GrpcChannelRegistry(future);
// vs
registry = new GrpcChannelRegistry();
future.thenRun(registry::shutdown);


public ManagedChannel forAddress(String host, int port) {
assert !this.isShutdown;
String channelId = this.getChannelId(host, port);
return this.channelMap.computeIfAbsent(channelId, unused -> this.buildNewChannel(host, port));
}

private ManagedChannel buildNewChannel(String host, int port) {
LOG.info("Creating new channel for {}:{}", host, port);
return ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
}

private String getChannelId(String host, int port) {
return host + ":" + port;
}

public void shutdown() {
channelMap.values().forEach(ManagedChannel::shutdown);
this.isShutdown = true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.hypertrace.core.grpcutils.client;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class GrpcChannelRegistryTest {

GrpcChannelRegistry channelRegistry;

@BeforeEach
void beforeEach() {
this.channelRegistry = new GrpcChannelRegistry();
}

@Test
void createsNewChannelsAsRequested() {
assertNotNull(this.channelRegistry.forAddress("foo", 1000));
}

@Test
void reusesChannelsForDuplicateRequests() {
Channel firstChannel = this.channelRegistry.forAddress("foo", 1000);
assertSame(firstChannel, this.channelRegistry.forAddress("foo", 1000));
assertNotSame(firstChannel, this.channelRegistry.forAddress("foo", 1001));
assertNotSame(firstChannel, this.channelRegistry.forAddress("bar", 1000));
}

@Test
void shutdownAllChannelsOnShutdown() {
ManagedChannel firstChannel = this.channelRegistry.forAddress("foo", 1000);
ManagedChannel secondChannel = this.channelRegistry.forAddress("foo", 1002);
assertFalse(firstChannel.isShutdown());
assertFalse(secondChannel.isShutdown());
this.channelRegistry.shutdown();
assertTrue(firstChannel.isShutdown());
assertTrue(secondChannel.isShutdown());
}

@Test
void throwsIfNewChannelRequestedAfterShutdown() {
this.channelRegistry.shutdown();
assertThrows(AssertionError.class, () -> this.channelRegistry.forAddress("foo", 1000));
}
}