Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: manual affinity via call options or context #175

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class GcpManagedChannel extends ManagedChannel {
Context.keyWithDefault("DisableAffinity", false);
public static final CallOptions.Key<Boolean> DISABLE_AFFINITY_KEY =
CallOptions.Key.createWithDefault("DisableAffinity", false);
public static final Context.Key<String> AFFINITY_CTX_KEY = Context.key("AffinityKey");
public static final CallOptions.Key<String> AFFINITY_KEY = CallOptions.Key.create("AffinityKey");

@GuardedBy("this")
private Integer bindingIndex = -1;
Expand Down Expand Up @@ -1269,14 +1271,30 @@ public String authority() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions) {
AffinityConfig affinity = methodToAffinity.get(methodDescriptor.getFullMethodName());
if (affinity == null
|| callOptions.getOption(DISABLE_AFFINITY_KEY)
if (callOptions.getOption(DISABLE_AFFINITY_KEY)
|| DISABLE_AFFINITY_CTX_KEY.get(Context.current())) {
return new GcpClientCall.SimpleGcpClientCall<>(
getChannelRef(null), methodDescriptor, callOptions);
}
return new GcpClientCall<>(this, methodDescriptor, callOptions, affinity);

AffinityConfig affinity = methodToAffinity.get(methodDescriptor.getFullMethodName());
String key = keyFromOptsCtx(callOptions);
if (affinity != null && key == null) {
return new GcpClientCall<>(this, methodDescriptor, callOptions, affinity);
}

return new GcpClientCall.SimpleGcpClientCall<>(
getChannelRef(key), methodDescriptor, callOptions);
}

@Nullable
private String keyFromOptsCtx(CallOptions callOptions) {
String key = callOptions.getOption(AFFINITY_KEY);
if (key != null) {
return key;
}

return AFFINITY_CTX_KEY.get(Context.current());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,190 @@ public void testExecuteStreamingSqlWithAffinityDisabledViaCallOptions() throws E
}
}

@Test
public void testExecuteStreamingSqlWithAffinityViaContext() throws Exception {
SpannerStub stub = getSpannerStub();
String sessionName = createAsyncSessions(stub).get(0);
ExecuteSqlRequest executeSqlRequest =
ExecuteSqlRequest.newBuilder()
.setSession(sessionName)
.setSql("select * FROM Users")
.build();
List<AsyncResponseObserver<PartialResultSet>> resps = new ArrayList<>();
AsyncResponseObserver<PartialResultSet> resp = new AsyncResponseObserver<>();
resps.add(resp);
stub.executeStreamingSql(executeSqlRequest, resp);
// The ChannelRef which is bound with the current affinity key.
ChannelRef currentChannel = gcpChannel.affinityKeyToChannelRef.get(sessionName);
// Verify the channel is in use.
assertEquals(1, currentChannel.getActiveStreamsCount());

final String key = "overriden-key";

// Override affinity key via context.
Context ctx = Context.current().withValue(GcpManagedChannel.AFFINITY_CTX_KEY, key);
ctx.run(
() -> {
AsyncResponseObserver<PartialResultSet> r = new AsyncResponseObserver<>();
resps.add(r);
stub.executeStreamingSql(executeSqlRequest, r);
});

ChannelRef newChannel = gcpChannel.affinityKeyToChannelRef.get(key);
// Make sure it is mapped to a different channel, because current channel is the busiest.
assertThat(currentChannel.getId()).isNotEqualTo(newChannel.getId());
assertEquals(1, newChannel.getActiveStreamsCount());

// Make another call.
ctx.run(
() -> {
AsyncResponseObserver<PartialResultSet> r = new AsyncResponseObserver<>();
resps.add(r);
stub.executeStreamingSql(executeSqlRequest, r);
});
assertEquals(2, newChannel.getActiveStreamsCount());

// Make sure non-overriden affinty still works.
resp = new AsyncResponseObserver<>();
resps.add(resp);
stub.executeStreamingSql(executeSqlRequest, resp);
assertEquals(2, currentChannel.getActiveStreamsCount());

// Complete the requests.
resps.forEach(
r -> {
try {
r.get();
} catch (InterruptedException e) {
// Noop if interrupted.
}
});
}

@Test
public void testExecuteStreamingSqlWithAffinityViaCallOptions() throws Exception {
SpannerStub stub = getSpannerStub();
String sessionName = createAsyncSessions(stub).get(0);
ExecuteSqlRequest executeSqlRequest =
ExecuteSqlRequest.newBuilder()
.setSession(sessionName)
.setSql("select * FROM Users")
.build();
List<AsyncResponseObserver<PartialResultSet>> resps = new ArrayList<>();
AsyncResponseObserver<PartialResultSet> resp = new AsyncResponseObserver<>();
resps.add(resp);
stub.executeStreamingSql(executeSqlRequest, resp);
// The ChannelRef which is bound with the current affinity key.
ChannelRef currentChannel = gcpChannel.affinityKeyToChannelRef.get(sessionName);
// Verify the channel is in use.
assertEquals(1, currentChannel.getActiveStreamsCount());

final String key = "overriden-key";

// Override affinity key via call options.
resp = new AsyncResponseObserver<>();
resps.add(resp);
stub.withOption(GcpManagedChannel.AFFINITY_KEY, key)
.executeStreamingSql(executeSqlRequest, resp);

ChannelRef newChannel = gcpChannel.affinityKeyToChannelRef.get(key);
// Make sure it is mapped to a different channel, because current channel is the busiest.
assertThat(currentChannel.getId()).isNotEqualTo(newChannel.getId());
assertEquals(1, newChannel.getActiveStreamsCount());

// Make another call.
resp = new AsyncResponseObserver<>();
resps.add(resp);
stub.withOption(GcpManagedChannel.AFFINITY_KEY, key)
.executeStreamingSql(executeSqlRequest, resp);
assertEquals(2, newChannel.getActiveStreamsCount());

// Make sure non-overriden affinty still works.
resp = new AsyncResponseObserver<>();
resps.add(resp);
stub.executeStreamingSql(executeSqlRequest, resp);
assertEquals(2, currentChannel.getActiveStreamsCount());

// Complete the requests.
resps.forEach(
r -> {
try {
r.get();
} catch (InterruptedException e) {
// Noop if interrupted.
}
});
}

@Test
public void testExecuteStreamingSqlWithAffinityViaContextAndCallOptions() throws Exception {
SpannerStub stub = getSpannerStub();
String sessionName = createAsyncSessions(stub).get(0);
ExecuteSqlRequest executeSqlRequest =
ExecuteSqlRequest.newBuilder()
.setSession(sessionName)
.setSql("select * FROM Users")
.build();
List<AsyncResponseObserver<PartialResultSet>> resps = new ArrayList<>();
AsyncResponseObserver<PartialResultSet> resp = new AsyncResponseObserver<>();
resps.add(resp);
stub.executeStreamingSql(executeSqlRequest, resp);
// The ChannelRef which is bound with the current affinity key.
ChannelRef currentChannel = gcpChannel.affinityKeyToChannelRef.get(sessionName);
// Verify the channel is in use.
assertEquals(1, currentChannel.getActiveStreamsCount());

final String contextKey = "context-key";
final String optionsKey = "options-key";

// Override affinity key via context.
Context ctx = Context.current().withValue(GcpManagedChannel.AFFINITY_CTX_KEY, contextKey);
ctx.run(
() -> {
AsyncResponseObserver<PartialResultSet> r = new AsyncResponseObserver<>();
resps.add(r);
stub.executeStreamingSql(executeSqlRequest, r);
});

ChannelRef contextChannel = gcpChannel.affinityKeyToChannelRef.get(contextKey);
// Make sure it is mapped to a different channel, because current channel is the busiest.
assertThat(currentChannel.getId()).isNotEqualTo(contextChannel.getId());
assertEquals(1, contextChannel.getActiveStreamsCount());

// Make another call overriding affinity with call options.
resp = new AsyncResponseObserver<>();
resps.add(resp);
stub.withOption(GcpManagedChannel.AFFINITY_KEY, optionsKey)
.executeStreamingSql(executeSqlRequest, resp);
// Make sure it is mapped to a different channel, because the current channel and "context"
// channel are the busiest.
ChannelRef optionsChannel = gcpChannel.affinityKeyToChannelRef.get(optionsKey);
assertThat(currentChannel.getId()).isNotEqualTo(optionsChannel.getId());
assertThat(optionsChannel.getId()).isNotEqualTo(contextChannel.getId());
assertEquals(1, optionsChannel.getActiveStreamsCount());

// Now make a call with context and call options affinity keys.
ctx.run(
() -> {
AsyncResponseObserver<PartialResultSet> r = new AsyncResponseObserver<>();
resps.add(r);
stub.withOption(GcpManagedChannel.AFFINITY_KEY, optionsKey)
.executeStreamingSql(executeSqlRequest, r);
});
// Make sure affinity from call options is prevailing.
assertEquals(2, optionsChannel.getActiveStreamsCount());

// Complete the requests.
resps.forEach(
r -> {
try {
r.get();
} catch (InterruptedException e) {
// Noop if interrupted.
}
});
}

@Test
public void testPartitionQueryAsync() throws Exception {
SpannerStub stub = getSpannerStub();
Expand Down