Skip to content

Commit

Permalink
[BEAM-11805] Replace user-agent for spanner (apache#13990)
Browse files Browse the repository at this point in the history
* [BEAM-11805] Replace user-agent for spanner

* [BEAM-11805] Fix checkstyle and emulator issues

* [BEAM-11805] Fix spanner test issues

* update versions

* update autovalue version

Co-authored-by: Reuven Lax <relax@google.com>
  • Loading branch information
allenpradeep and reuvenlax committed Feb 17, 2021
1 parent dd2e739 commit 4f78ab3
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 14 deletions.
Expand Up @@ -422,7 +422,7 @@ class BeamModulePlugin implements Plugin<Project> {
// a dependency version which should match across multiple
// Maven artifacts.
def activemq_version = "5.14.5"
def autovalue_version = "1.7.2"
def autovalue_version = "1.7.4"
def aws_java_sdk_version = "1.11.718"
def aws_java_sdk2_version = "2.13.54"
def cassandra_driver_version = "3.10.2"
Expand All @@ -435,7 +435,7 @@ class BeamModulePlugin implements Plugin<Project> {
def google_code_gson_version = "2.8.6"
def google_oauth_clients_version = "1.31.0"
// Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom
def grpc_version = "1.32.2"
def grpc_version = "1.35.0"
def guava_version = "30.1-jre"
def hadoop_version = "2.10.1"
def hamcrest_version = "2.1"
Expand All @@ -447,7 +447,7 @@ class BeamModulePlugin implements Plugin<Project> {
def jsr305_version = "3.0.2"
def kafka_version = "2.4.1"
def nemo_version = "0.1"
def netty_version = "4.1.51.Final"
def netty_version = "4.1.52.Final"
def postgres_version = "42.2.16"
def powermock_version = "2.0.9"
def protobuf_version = "3.12.0"
Expand Down Expand Up @@ -509,7 +509,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_api_client_jackson2 : "com.google.api-client:google-api-client-jackson2:$google_clients_version",
google_api_client_java6 : "com.google.api-client:google-api-client-java6:$google_clients_version",
google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20200719-$google_clients_version",
google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20201030-$google_clients_version",
google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20200501-$google_clients_version",
google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20200720-$google_clients_version",
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20200713-$google_clients_version",
Expand All @@ -519,7 +519,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_auth_library_credentials : "com.google.auth:google-auth-library-credentials", // google_cloud_platform_libraries_bom sets version
google_auth_library_oauth2_http : "com.google.auth:google-auth-library-oauth2-http", // google_cloud_platform_libraries_bom sets version
google_cloud_bigquery : "com.google.cloud:google-cloud-bigquery", // google_cloud_platform_libraries_bom sets version
google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage", // google_cloud_platform_libraries_bom sets version
google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage:1.8.5",
google_cloud_bigtable_client_core : "com.google.cloud.bigtable:bigtable-client-core:1.16.0",
google_cloud_bigtable_emulator : "com.google.cloud:google-cloud-bigtable-emulator:0.125.2",
google_cloud_core : "com.google.cloud:google-cloud-core", // google_cloud_platform_libraries_bom sets version
Expand All @@ -530,9 +530,9 @@ class BeamModulePlugin implements Plugin<Project> {
google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub:$google_cloud_pubsub_version",
google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite:$google_cloud_pubsublite_version",
// The GCP Libraries BOM dashboard shows the versions set by the BOM:
// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/13.2.0/artifact_details.html
// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/16.3.0/artifact_details.html
// Update libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml
google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:13.2.0",
google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:16.3.0",
google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version
google_code_gson : "com.google.code.gson:gson:$google_code_gson_version",
// google-http-client's version is explicitly declared for sdks/java/maven-archetypes/examples
Expand Down Expand Up @@ -601,7 +601,7 @@ class BeamModulePlugin implements Plugin<Project> {
powermock_mockito : "org.powermock:powermock-api-mockito2:$powermock_version",
protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version",
protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version",
proto_google_cloud_bigquery_storage_v1beta1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_bigquerybeta2_storage_v1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_bigtable_v2 : "com.google.api.grpc:proto-google-cloud-bigtable-v2", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
Expand Down
2 changes: 1 addition & 1 deletion sdks/java/container/license_scripts/dep_urls_java.yaml
Expand Up @@ -41,7 +41,7 @@ jaxen:
'1.1.6':
type: "3-Clause BSD"
libraries-bom:
'13.2.0':
'16.3.0':
license: "https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-opensource-java/master/LICENSE"
type: "Apache License 2.0"
paranamer:
Expand Down
3 changes: 2 additions & 1 deletion sdks/java/io/google-cloud-platform/build.gradle
Expand Up @@ -61,6 +61,7 @@ dependencies {
compile library.java.google_http_client
compile library.java.google_http_client_jackson2
compile library.java.grpc_alts
compile library.java.grpc_api
compile library.java.grpc_auth
compile library.java.grpc_core
compile library.java.grpc_context
Expand All @@ -77,7 +78,7 @@ dependencies {
compile library.java.junit
compile library.java.netty_handler
compile library.java.netty_tcnative_boringssl_static
compile library.java.proto_google_cloud_bigquery_storage_v1beta1
compile library.java.proto_google_cloud_bigquerybeta2_storage_v1
compile library.java.proto_google_cloud_bigtable_v2
compile library.java.proto_google_cloud_datastore_v1
compile library.java.proto_google_cloud_pubsub_v1
Expand Down
Expand Up @@ -17,8 +17,10 @@
*/
package org.apache.beam.sdk.io.gcp.spanner;

import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.NoCredentials;
Expand All @@ -29,6 +31,7 @@
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.spi.v1.SpannerInterceptorProvider;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
Expand All @@ -38,11 +41,21 @@
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,7 +67,10 @@
class SpannerAccessor implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(SpannerAccessor.class);

// A common user agent token that indicates that this request was originated from Apache Beam.
/* A common user agent token that indicates that this request was originated from
* Apache Beam. Setting the user-agent allows Cloud Spanner to detect that the
* workload is coming from Dataflow and to potentially apply performance optimizations
*/
private static final String USER_AGENT_PREFIX = "Apache_Beam_Java";

// Only create one SpannerAccessor for each different SpannerConfig.
Expand All @@ -72,6 +88,12 @@ class SpannerAccessor implements AutoCloseable {
private final DatabaseAdminClient databaseAdminClient;
private final SpannerConfig spannerConfig;

private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
private static final int MAX_METADATA_SIZE = 32 * 1024; // bytes
private static final int NUM_CHANNELS = 4;
public static final org.threeten.bp.Duration GRPC_KEEP_ALIVE_SECONDS =
org.threeten.bp.Duration.ofSeconds(120);

private SpannerAccessor(
Spanner spanner,
DatabaseClient databaseClient,
Expand Down Expand Up @@ -139,6 +161,23 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) {
.setTotalTimeout(org.threeten.bp.Duration.ofMinutes(120))
.build());

ManagedInstantiatingExecutorProvider executorProvider =
new ManagedInstantiatingExecutorProvider(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Cloud-Spanner-TransportChannel-%d")
.build());

InstantiatingGrpcChannelProvider.Builder instantiatingGrpcChannelProvider =
InstantiatingGrpcChannelProvider.newBuilder()
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(NUM_CHANNELS)
.setExecutorProvider(executorProvider)
.setKeepAliveTime(GRPC_KEEP_ALIVE_SECONDS)
.setInterceptorProvider(SpannerInterceptorProvider.createDefault())
.setAttemptDirectPath(true);

ValueProvider<String> projectId = spannerConfig.getProjectId();
if (projectId != null) {
builder.setProjectId(projectId.get());
Expand All @@ -150,14 +189,34 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) {
ValueProvider<String> host = spannerConfig.getHost();
if (host != null) {
builder.setHost(host.get());
instantiatingGrpcChannelProvider.setEndpoint(getEndpoint(host.get()));
}
ValueProvider<String> emulatorHost = spannerConfig.getEmulatorHost();
if (emulatorHost != null) {
builder.setEmulatorHost(emulatorHost.get());
builder.setCredentials(NoCredentials.getInstance());
} else {
String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion();
/* Workaround to setup user-agent string.
* InstantiatingGrpcChannelProvider will override the settings provided.
* The section below and all associated artifacts will be removed once the bug
* that prevents setting user-agent is fixed.
* https://github.com/googleapis/java-spanner/pull/871
*
* Code to be replaced:
* builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString));
*/
instantiatingGrpcChannelProvider.setHeaderProvider(
new HeaderProvider() {
@Override
public Map<String, String> getHeaders() {
final Map<String, String> headers = new HashMap<>();
headers.put("user-agent", userAgentString);
return headers;
}
});
builder.setChannelProvider(instantiatingGrpcChannelProvider.build());
}
String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion();
builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString));
SpannerOptions options = builder.build();

Spanner spanner = options.getService();
Expand All @@ -173,6 +232,17 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) {
spanner, databaseClient, databaseAdminClient, batchClient, spannerConfig);
}

private static String getEndpoint(String host) {
URL url;
try {
url = new URL(host);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Invalid host: " + host, e);
}
return String.format(
"%s:%s", url.getHost(), url.getPort() < 0 ? url.getDefaultPort() : url.getPort());
}

public DatabaseClient getDatabaseClient() {
return databaseClient;
}
Expand Down Expand Up @@ -221,4 +291,32 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
return next.newCall(method, callOptions);
}
}

private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider {
// 4 Gapic clients * 4 channels per client.
private static final int DEFAULT_MIN_THREAD_COUNT = 16;
private final List<ScheduledExecutorService> executors = new ArrayList<>();
private final ThreadFactory threadFactory;

private ManagedInstantiatingExecutorProvider(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@Override
public boolean shouldAutoClose() {
return false;
}

@Override
public ScheduledExecutorService getExecutor() {
int numCpus = Runtime.getRuntime().availableProcessors();
int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus);
ScheduledExecutorService executor =
new ScheduledThreadPoolExecutor(numThreads, threadFactory);
synchronized (this) {
executors.add(executor);
}
return executor;
}
}
}

0 comments on commit 4f78ab3

Please sign in to comment.