diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java index f3749671a..6afaffafe 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java @@ -16,13 +16,28 @@ package com.google.cloud.pubsublite; +import com.google.api.gax.core.GaxProperties; +import com.google.api.gax.grpc.GaxGrpcProperties; +import com.google.api.gax.rpc.ApiClientHeaderProvider; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.pubsublite.internal.ChannelCache; import com.google.common.collect.ImmutableList; +import io.grpc.CallOptions; import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.MethodDescriptor; import io.grpc.auth.MoreCallCredentials; import io.grpc.stub.AbstractStub; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.function.Function; public class Stubs { @@ -31,7 +46,7 @@ public class Stubs { public static > StubT defaultStub( String target, Function stubFactory) throws IOException { return stubFactory - .apply(channels.get(target)) + .apply(ClientInterceptors.intercept(channels.get(target), getClientInterceptors())) .withCallCredentials( MoreCallCredentials.from( GoogleCredentials.getApplicationDefault() @@ -39,5 +54,36 @@ public static > StubT defaultStub( ImmutableList.of("https://www.googleapis.com/auth/cloud-platform")))); } + private static List getClientInterceptors() { + List clientInterceptors = new ArrayList<>(); + Map apiClientHeaders = + ApiClientHeaderProvider.newBuilder() + .setClientLibToken("gccl", GaxProperties.getLibraryVersion(Stubs.class)) + .setTransportToken( + GaxGrpcProperties.getGrpcTokenName(), GaxGrpcProperties.getGrpcVersion()) + .build() + .getHeaders(); + clientInterceptors.add( + new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + ClientCall call = next.newCall(method, callOptions); + return new SimpleForwardingClientCall(call) { + @Override + public void start(ClientCall.Listener responseListener, Metadata headers) { + for (Entry apiClientHeader : apiClientHeaders.entrySet()) { + headers.put( + Key.of(apiClientHeader.getKey(), Metadata.ASCII_STRING_MARSHALLER), + apiClientHeader.getValue()); + } + super.start(responseListener, headers); + } + }; + } + }); + return clientInterceptors; + } + private Stubs() {} }