Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11831] Partially Revert "[BEAM-11805] Replace user-agent for spanner (#13990)" #15591

Merged
merged 1 commit into from
Oct 5, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.spanner;

import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.NoCredentials;
Expand All @@ -31,7 +29,6 @@
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.spi.v1.SpannerInterceptorProvider;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
Expand All @@ -41,21 +38,11 @@
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,12 +75,6 @@ public class SpannerAccessor implements AutoCloseable {
private final DatabaseAdminClient databaseAdminClient;
private final SpannerConfig spannerConfig;

private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
private static final int MAX_METADATA_SIZE = 32 * 1024; // bytes
private static final int NUM_CHANNELS = 4;
public static final org.threeten.bp.Duration GRPC_KEEP_ALIVE_SECONDS =
org.threeten.bp.Duration.ofSeconds(120);

private SpannerAccessor(
Spanner spanner,
DatabaseClient databaseClient,
Expand Down Expand Up @@ -161,23 +142,6 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) {
.setTotalTimeout(org.threeten.bp.Duration.ofMinutes(120))
.build());

ManagedInstantiatingExecutorProvider executorProvider =
new ManagedInstantiatingExecutorProvider(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Cloud-Spanner-TransportChannel-%d")
.build());

InstantiatingGrpcChannelProvider.Builder instantiatingGrpcChannelProvider =
InstantiatingGrpcChannelProvider.newBuilder()
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(NUM_CHANNELS)
.setExecutorProvider(executorProvider)
.setKeepAliveTime(GRPC_KEEP_ALIVE_SECONDS)
.setInterceptorProvider(SpannerInterceptorProvider.createDefault())
.setAttemptDirectPath(true);

ValueProvider<String> projectId = spannerConfig.getProjectId();
if (projectId != null) {
builder.setProjectId(projectId.get());
Expand All @@ -189,34 +153,14 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) {
ValueProvider<String> host = spannerConfig.getHost();
if (host != null) {
builder.setHost(host.get());
instantiatingGrpcChannelProvider.setEndpoint(getEndpoint(host.get()));
}
ValueProvider<String> emulatorHost = spannerConfig.getEmulatorHost();
if (emulatorHost != null) {
builder.setEmulatorHost(emulatorHost.get());
builder.setCredentials(NoCredentials.getInstance());
} else {
String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion();
/* Workaround to setup user-agent string.
* InstantiatingGrpcChannelProvider will override the settings provided.
* The section below and all associated artifacts will be removed once the bug
* that prevents setting user-agent is fixed.
* https://github.com/googleapis/java-spanner/pull/871
*
* Code to be replaced:
* builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString));
*/
instantiatingGrpcChannelProvider.setHeaderProvider(
new HeaderProvider() {
@Override
public Map<String, String> getHeaders() {
final Map<String, String> headers = new HashMap<>();
headers.put("user-agent", userAgentString);
return headers;
}
});
builder.setChannelProvider(instantiatingGrpcChannelProvider.build());
}
String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion();
builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString));
SpannerOptions options = builder.build();

Spanner spanner = options.getService();
Expand All @@ -232,17 +176,6 @@ public Map<String, String> getHeaders() {
spanner, databaseClient, databaseAdminClient, batchClient, spannerConfig);
}

private static String getEndpoint(String host) {
URL url;
try {
url = new URL(host);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Invalid host: " + host, e);
}
return String.format(
"%s:%s", url.getHost(), url.getPort() < 0 ? url.getDefaultPort() : url.getPort());
}

public DatabaseClient getDatabaseClient() {
return databaseClient;
}
Expand Down Expand Up @@ -291,32 +224,4 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
return next.newCall(method, callOptions);
}
}

private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider {
// 4 Gapic clients * 4 channels per client.
private static final int DEFAULT_MIN_THREAD_COUNT = 16;
private final List<ScheduledExecutorService> executors = new ArrayList<>();
private final ThreadFactory threadFactory;

private ManagedInstantiatingExecutorProvider(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@Override
public boolean shouldAutoClose() {
return false;
}

@Override
public ScheduledExecutorService getExecutor() {
int numCpus = Runtime.getRuntime().availableProcessors();
int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus);
ScheduledExecutorService executor =
new ScheduledThreadPoolExecutor(numThreads, threadFactory);
synchronized (this) {
executors.add(executor);
}
return executor;
}
}
}