From f6ae60f1c38e8fa0e13628f6c9c99a90858f737b Mon Sep 17 00:00:00 2001 From: Yuri Golobokov Date: Mon, 6 May 2024 21:37:17 +0000 Subject: [PATCH] feat: manual affinity via call options or context --- .../google/cloud/grpc/GcpManagedChannel.java | 26 ++- .../cloud/grpc/SpannerIntegrationTest.java | 184 ++++++++++++++++++ 2 files changed, 206 insertions(+), 4 deletions(-) diff --git a/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java b/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java index 5f5f661..6559f37 100644 --- a/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java +++ b/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java @@ -80,6 +80,8 @@ public class GcpManagedChannel extends ManagedChannel { Context.keyWithDefault("DisableAffinity", false); public static final CallOptions.Key DISABLE_AFFINITY_KEY = CallOptions.Key.createWithDefault("DisableAffinity", false); + public static final Context.Key AFFINITY_CTX_KEY = Context.key("AffinityKey"); + public static final CallOptions.Key AFFINITY_KEY = CallOptions.Key.create("AffinityKey"); @GuardedBy("this") private Integer bindingIndex = -1; @@ -1269,14 +1271,30 @@ public String authority() { @Override public ClientCall newCall( MethodDescriptor methodDescriptor, CallOptions callOptions) { - AffinityConfig affinity = methodToAffinity.get(methodDescriptor.getFullMethodName()); - if (affinity == null - || callOptions.getOption(DISABLE_AFFINITY_KEY) + if (callOptions.getOption(DISABLE_AFFINITY_KEY) || DISABLE_AFFINITY_CTX_KEY.get(Context.current())) { return new GcpClientCall.SimpleGcpClientCall<>( getChannelRef(null), methodDescriptor, callOptions); } - return new GcpClientCall<>(this, methodDescriptor, callOptions, affinity); + + AffinityConfig affinity = methodToAffinity.get(methodDescriptor.getFullMethodName()); + String key = keyFromOptsCtx(callOptions); + if (affinity != null && key == null) { + return new GcpClientCall<>(this, methodDescriptor, callOptions, affinity); + } + + return new GcpClientCall.SimpleGcpClientCall<>( + getChannelRef(key), methodDescriptor, callOptions); + } + + @Nullable + private String keyFromOptsCtx(CallOptions callOptions) { + String key = callOptions.getOption(AFFINITY_KEY); + if (key != null) { + return key; + } + + return AFFINITY_CTX_KEY.get(Context.current()); } @Override diff --git a/grpc-gcp/src/test/java/com/google/cloud/grpc/SpannerIntegrationTest.java b/grpc-gcp/src/test/java/com/google/cloud/grpc/SpannerIntegrationTest.java index 08b0a4b..c788a2d 100644 --- a/grpc-gcp/src/test/java/com/google/cloud/grpc/SpannerIntegrationTest.java +++ b/grpc-gcp/src/test/java/com/google/cloud/grpc/SpannerIntegrationTest.java @@ -1419,6 +1419,190 @@ public void testExecuteStreamingSqlWithAffinityDisabledViaCallOptions() throws E } } + @Test + public void testExecuteStreamingSqlWithAffinityViaContext() throws Exception { + SpannerStub stub = getSpannerStub(); + String sessionName = createAsyncSessions(stub).get(0); + ExecuteSqlRequest executeSqlRequest = + ExecuteSqlRequest.newBuilder() + .setSession(sessionName) + .setSql("select * FROM Users") + .build(); + List> resps = new ArrayList<>(); + AsyncResponseObserver resp = new AsyncResponseObserver<>(); + resps.add(resp); + stub.executeStreamingSql(executeSqlRequest, resp); + // The ChannelRef which is bound with the current affinity key. + ChannelRef currentChannel = gcpChannel.affinityKeyToChannelRef.get(sessionName); + // Verify the channel is in use. + assertEquals(1, currentChannel.getActiveStreamsCount()); + + final String key = "overriden-key"; + + // Override affinity key via context. + Context ctx = Context.current().withValue(GcpManagedChannel.AFFINITY_CTX_KEY, key); + ctx.run( + () -> { + AsyncResponseObserver r = new AsyncResponseObserver<>(); + resps.add(r); + stub.executeStreamingSql(executeSqlRequest, r); + }); + + ChannelRef newChannel = gcpChannel.affinityKeyToChannelRef.get(key); + // Make sure it is mapped to a different channel, because current channel is the busiest. + assertThat(currentChannel.getId()).isNotEqualTo(newChannel.getId()); + assertEquals(1, newChannel.getActiveStreamsCount()); + + // Make another call. + ctx.run( + () -> { + AsyncResponseObserver r = new AsyncResponseObserver<>(); + resps.add(r); + stub.executeStreamingSql(executeSqlRequest, r); + }); + assertEquals(2, newChannel.getActiveStreamsCount()); + + // Make sure non-overriden affinty still works. + resp = new AsyncResponseObserver<>(); + resps.add(resp); + stub.executeStreamingSql(executeSqlRequest, resp); + assertEquals(2, currentChannel.getActiveStreamsCount()); + + // Complete the requests. + resps.forEach( + r -> { + try { + r.get(); + } catch (InterruptedException e) { + // Noop if interrupted. + } + }); + } + + @Test + public void testExecuteStreamingSqlWithAffinityViaCallOptions() throws Exception { + SpannerStub stub = getSpannerStub(); + String sessionName = createAsyncSessions(stub).get(0); + ExecuteSqlRequest executeSqlRequest = + ExecuteSqlRequest.newBuilder() + .setSession(sessionName) + .setSql("select * FROM Users") + .build(); + List> resps = new ArrayList<>(); + AsyncResponseObserver resp = new AsyncResponseObserver<>(); + resps.add(resp); + stub.executeStreamingSql(executeSqlRequest, resp); + // The ChannelRef which is bound with the current affinity key. + ChannelRef currentChannel = gcpChannel.affinityKeyToChannelRef.get(sessionName); + // Verify the channel is in use. + assertEquals(1, currentChannel.getActiveStreamsCount()); + + final String key = "overriden-key"; + + // Override affinity key via call options. + resp = new AsyncResponseObserver<>(); + resps.add(resp); + stub.withOption(GcpManagedChannel.AFFINITY_KEY, key) + .executeStreamingSql(executeSqlRequest, resp); + + ChannelRef newChannel = gcpChannel.affinityKeyToChannelRef.get(key); + // Make sure it is mapped to a different channel, because current channel is the busiest. + assertThat(currentChannel.getId()).isNotEqualTo(newChannel.getId()); + assertEquals(1, newChannel.getActiveStreamsCount()); + + // Make another call. + resp = new AsyncResponseObserver<>(); + resps.add(resp); + stub.withOption(GcpManagedChannel.AFFINITY_KEY, key) + .executeStreamingSql(executeSqlRequest, resp); + assertEquals(2, newChannel.getActiveStreamsCount()); + + // Make sure non-overriden affinty still works. + resp = new AsyncResponseObserver<>(); + resps.add(resp); + stub.executeStreamingSql(executeSqlRequest, resp); + assertEquals(2, currentChannel.getActiveStreamsCount()); + + // Complete the requests. + resps.forEach( + r -> { + try { + r.get(); + } catch (InterruptedException e) { + // Noop if interrupted. + } + }); + } + + @Test + public void testExecuteStreamingSqlWithAffinityViaContextAndCallOptions() throws Exception { + SpannerStub stub = getSpannerStub(); + String sessionName = createAsyncSessions(stub).get(0); + ExecuteSqlRequest executeSqlRequest = + ExecuteSqlRequest.newBuilder() + .setSession(sessionName) + .setSql("select * FROM Users") + .build(); + List> resps = new ArrayList<>(); + AsyncResponseObserver resp = new AsyncResponseObserver<>(); + resps.add(resp); + stub.executeStreamingSql(executeSqlRequest, resp); + // The ChannelRef which is bound with the current affinity key. + ChannelRef currentChannel = gcpChannel.affinityKeyToChannelRef.get(sessionName); + // Verify the channel is in use. + assertEquals(1, currentChannel.getActiveStreamsCount()); + + final String contextKey = "context-key"; + final String optionsKey = "options-key"; + + // Override affinity key via context. + Context ctx = Context.current().withValue(GcpManagedChannel.AFFINITY_CTX_KEY, contextKey); + ctx.run( + () -> { + AsyncResponseObserver r = new AsyncResponseObserver<>(); + resps.add(r); + stub.executeStreamingSql(executeSqlRequest, r); + }); + + ChannelRef contextChannel = gcpChannel.affinityKeyToChannelRef.get(contextKey); + // Make sure it is mapped to a different channel, because current channel is the busiest. + assertThat(currentChannel.getId()).isNotEqualTo(contextChannel.getId()); + assertEquals(1, contextChannel.getActiveStreamsCount()); + + // Make another call overriding affinity with call options. + resp = new AsyncResponseObserver<>(); + resps.add(resp); + stub.withOption(GcpManagedChannel.AFFINITY_KEY, optionsKey) + .executeStreamingSql(executeSqlRequest, resp); + // Make sure it is mapped to a different channel, because the current channel and "context" + // channel are the busiest. + ChannelRef optionsChannel = gcpChannel.affinityKeyToChannelRef.get(optionsKey); + assertThat(currentChannel.getId()).isNotEqualTo(optionsChannel.getId()); + assertThat(optionsChannel.getId()).isNotEqualTo(contextChannel.getId()); + assertEquals(1, optionsChannel.getActiveStreamsCount()); + + // Now make a call with context and call options affinity keys. + ctx.run( + () -> { + AsyncResponseObserver r = new AsyncResponseObserver<>(); + resps.add(r); + stub.withOption(GcpManagedChannel.AFFINITY_KEY, optionsKey) + .executeStreamingSql(executeSqlRequest, r); + }); + // Make sure affinity from call options is prevailing. + assertEquals(2, optionsChannel.getActiveStreamsCount()); + + // Complete the requests. + resps.forEach( + r -> { + try { + r.get(); + } catch (InterruptedException e) { + // Noop if interrupted. + } + }); + } + @Test public void testPartitionQueryAsync() throws Exception { SpannerStub stub = getSpannerStub();