From da8467b721ac0e896055d991e782e5bfab36bb76 Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Fri, 28 Oct 2016 15:31:38 -0700 Subject: [PATCH 1/4] Add localhost option for DatastoreIO --- .../sdk/io/gcp/datastore/DatastoreV1.java | 147 +++++++++++------- .../sdk/io/gcp/datastore/DatastoreV1Test.java | 15 +- .../sdk/io/gcp/datastore/SplitQueryFnIT.java | 2 +- .../beam/sdk/io/gcp/datastore/V1TestUtil.java | 4 +- 4 files changed, 105 insertions(+), 63 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index 1e8271caf887..ea77e9ed5e20 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -93,7 +93,8 @@ /** * {@link DatastoreV1} provides an API to Read, Write and Delete {@link PCollection PCollections} * of Google Cloud Datastore version v1 - * {@link Entity} objects. + * {@link Entity} objects. Read is only supported for Bounded PCollections while Write and Delete + * are supported for both Bounded and Unbounded PCollections. * *

This API currently requires an authentication workaround. To use {@link DatastoreV1}, users * must use the {@code gcloud} command line tool to get credentials for Cloud Datastore: @@ -124,8 +125,10 @@ * *

Note: Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across * many workers. However, when the {@link Query} is configured with a limit using - * {@link com.google.datastore.v1.Query.Builder#setLimit(Int32Value)}, then - * all returned results will be read by a single Dataflow worker in order to ensure correct data. + * {@link com.google.datastore.v1.Query.Builder#setLimit(Int32Value)} or if the Query contains + * inequality filters like {@code GREATER_THAN, LESS_THAN} etc., then all returned results + * will be read by a single Dataflow worker in order to ensure correct data. Since data is read from + * a single worker, this could have a significant impact on the performance of the job. * *

To write a {@link PCollection} to a Cloud Datastore, use {@link DatastoreV1#write}, * specifying the Cloud Datastore project to write to: @@ -176,6 +179,10 @@ *

Please see Cloud Datastore Sign Up * for security and permission related information specific to Cloud Datastore. * + *

Optionally, Cloud Datastore V1 Emulator, running locally, could be used for testing purposes + * by providing the host port information through {@code withLocalhost("host:port"} for all the + * above transforms. In such a case, all the Cloud Datastore API calls are directed to the Emulator. + * * @see org.apache.beam.sdk.runners.PipelineRunner */ @Experimental(Experimental.Kind.SOURCE_SINK) @@ -231,6 +238,7 @@ public abstract static class Read extends PTransform @Nullable public abstract Query getQuery(); @Nullable public abstract String getNamespace(); public abstract int getNumQuerySplits(); + @Nullable public abstract String getLocalhost(); @Override public abstract String toString(); @@ -243,6 +251,7 @@ abstract static class Builder { abstract Builder setQuery(Query query); abstract Builder setNamespace(String namespace); abstract Builder setNumQuerySplits(int numQuerySplits); + abstract Builder setLocalhost(String hostPort); abstract Read build(); } @@ -410,10 +419,15 @@ public DatastoreV1.Read withNumQuerySplits(int numQuerySplits) { .build(); } - @Override + public DatastoreV1.Read withLocalhost(String localhost) { + return toBuilder() + .setLocalhost(localhost) + .build(); + } + public PCollection expand(PBegin input) { V1Options v1Options = V1Options.from(getProjectId(), getQuery(), - getNamespace()); + getNamespace(), getLocalhost()); /* * This composite transform involves the following steps: @@ -469,34 +483,17 @@ public void populateDisplayData(DisplayData.Builder builder) { * A class for v1 Cloud Datastore related options. */ @VisibleForTesting - static class V1Options implements Serializable { - private final Query query; - private final String projectId; - @Nullable - private final String namespace; - - private V1Options(String projectId, Query query, @Nullable String namespace) { - this.projectId = checkNotNull(projectId, "projectId"); - this.query = checkNotNull(query, "query"); - this.namespace = namespace; - } - - public static V1Options from(String projectId, Query query, @Nullable String namespace) { - return new V1Options(projectId, query, namespace); - } - - public Query getQuery() { - return query; + @AutoValue + abstract static class V1Options implements Serializable { + public static V1Options from(String projectId, Query query, @Nullable String namespace, + @Nullable String localhost) { + return new AutoValue_DatastoreV1_Read_V1Options(projectId, query, namespace, localhost); } - public String getProjectId() { - return projectId; - } - - @Nullable - public String getNamespace() { - return namespace; - } + public abstract String getProjectId(); + public abstract Query getQuery(); + @Nullable public abstract String getNamespace(); + @Nullable public abstract String getLocalhost(); } /** @@ -529,7 +526,8 @@ public SplitQueryFn(V1Options options, int numSplits) { @StartBundle public void startBundle(Context c) throws Exception { - datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.projectId); + datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId(), + options.getLocalhost()); querySplitter = datastoreFactory.getQuerySplitter(); } @@ -603,7 +601,8 @@ public ReadFn(V1Options options) { @StartBundle public void startBundle(Context c) throws Exception { - datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId()); + datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId(), + options.getLocalhost()); } /** Read and output entities for the given query. */ @@ -664,7 +663,7 @@ public void processElement(ProcessContext context) throws Exception { * {@code projectId} using {@link DatastoreV1.Write#withProjectId}. */ public Write write() { - return new Write(null); + return new Write(null, null); } /** @@ -672,7 +671,7 @@ public Write write() { * {@code projectId} using {@link DeleteEntity#withProjectId}. */ public DeleteEntity deleteEntity() { - return new DeleteEntity(null); + return new DeleteEntity(null, null); } /** @@ -680,7 +679,7 @@ public DeleteEntity deleteEntity() { * {@code projectId} using {@link DeleteKey#withProjectId}. */ public DeleteKey deleteKey() { - return new DeleteKey(null); + return new DeleteKey(null, null); } /** @@ -693,8 +692,8 @@ public static class Write extends Mutate { * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if * it is {@code null} at instantiation time, an error will be thrown. */ - Write(@Nullable String projectId) { - super(projectId, new UpsertFn()); + Write(@Nullable String projectId, @Nullable String localhost) { + super(projectId, localhost, new UpsertFn()); } /** @@ -702,7 +701,16 @@ public static class Write extends Mutate { */ public Write withProjectId(String projectId) { checkNotNull(projectId, "projectId"); - return new Write(projectId); + return new Write(projectId, null); + } + + /** + * Returns a new {@link Write} that writes to the Cloud Datastore Emulator running locally on + * the specified host port. + */ + public Write withLocalhost(String localhost) { + checkNotNull(localhost, "localhost"); + return new Write(null, localhost); } } @@ -716,8 +724,8 @@ public static class DeleteEntity extends Mutate { * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if * it is {@code null} at instantiation time, an error will be thrown. */ - DeleteEntity(@Nullable String projectId) { - super(projectId, new DeleteEntityFn()); + DeleteEntity(@Nullable String projectId, @Nullable String localhost) { + super(projectId, localhost, new DeleteEntityFn()); } /** @@ -726,7 +734,16 @@ public static class DeleteEntity extends Mutate { */ public DeleteEntity withProjectId(String projectId) { checkNotNull(projectId, "projectId"); - return new DeleteEntity(projectId); + return new DeleteEntity(projectId, null); + } + + /** + * Returns a new {@link DeleteEntity} that deletes entities from the Cloud Datastore Emulator + * running locally on the specified host port. + */ + public DeleteEntity withLocalhost(String localhost) { + checkNotNull(localhost, "localhost"); + return new DeleteEntity(null, localhost); } } @@ -741,8 +758,8 @@ public static class DeleteKey extends Mutate { * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if * it is {@code null} at instantiation time, an error will be thrown. */ - DeleteKey(@Nullable String projectId) { - super(projectId, new DeleteKeyFn()); + DeleteKey(@Nullable String projectId, @Nullable String localhost) { + super(projectId, localhost, new DeleteKeyFn()); } /** @@ -751,7 +768,16 @@ public static class DeleteKey extends Mutate { */ public DeleteKey withProjectId(String projectId) { checkNotNull(projectId, "projectId"); - return new DeleteKey(projectId); + return new DeleteKey(projectId, null); + } + + /** + * Returns a new {@link DeleteKey} that deletes entities from the Cloud Datastore Emulator + * running locally on the specified host port. + */ + public DeleteKey withLocalhost(String localhost) { + checkNotNull(localhost, "localhost"); + return new DeleteKey(null, localhost); } } @@ -766,6 +792,8 @@ public DeleteKey withProjectId(String projectId) { private abstract static class Mutate extends PTransform, PDone> { @Nullable private final String projectId; + @Nullable + private final String localhost; /** A function that transforms each {@code T} into a mutation. */ private final SimpleFunction mutationFn; @@ -773,15 +801,18 @@ private abstract static class Mutate extends PTransform, PDone * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if * it is {@code null} at instantiation time, an error will be thrown. */ - Mutate(@Nullable String projectId, SimpleFunction mutationFn) { + Mutate(@Nullable String projectId, @Nullable String localhost, + SimpleFunction mutationFn) { this.projectId = projectId; + this.localhost = localhost; this.mutationFn = checkNotNull(mutationFn); } @Override public PDone expand(PCollection input) { input.apply("Convert to Mutation", MapElements.via(mutationFn)) - .apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId))); + .apply("Write Mutation to Datastore", ParDo.of( + new DatastoreWriterFn(projectId, localhost))); return PDone.in(input.getPipeline()); } @@ -832,6 +863,8 @@ public String getProjectId() { static class DatastoreWriterFn extends DoFn { private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class); private final String projectId; + @Nullable + private final String localhost; private transient Datastore datastore; private final V1DatastoreFactory datastoreFactory; // Current batch of mutations to be written. @@ -842,19 +875,21 @@ static class DatastoreWriterFn extends DoFn { FluentBackoff.DEFAULT .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5)); - DatastoreWriterFn(String projectId) { - this(projectId, new V1DatastoreFactory()); + DatastoreWriterFn(String projectId, @Nullable String localhost) { + this(projectId, localhost, new V1DatastoreFactory()); } @VisibleForTesting - DatastoreWriterFn(String projectId, V1DatastoreFactory datastoreFactory) { + DatastoreWriterFn(String projectId, @Nullable String localhost, + V1DatastoreFactory datastoreFactory) { this.projectId = checkNotNull(projectId, "projectId"); + this.localhost = localhost; this.datastoreFactory = datastoreFactory; } @StartBundle public void startBundle(Context c) { - datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId); + datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId, localhost); } @ProcessElement @@ -1008,7 +1043,8 @@ public void populateDisplayData(Builder builder) { static class V1DatastoreFactory implements Serializable { /** Builds a Cloud Datastore client for the given pipeline options and project. */ - public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { + public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId, + @Nullable String localhost) { Credentials credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); HttpRequestInitializer initializer; if (credential != null) { @@ -1019,10 +1055,9 @@ public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) initializer = new RetryHttpRequestInitializer(); } - DatastoreOptions.Builder builder = - new DatastoreOptions.Builder() - .projectId(projectId) - .initializer(initializer); + if (localhost != null) { + builder.localHost(localhost); + } return DatastoreFactory.get().create(builder.build()); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index dd1904ae48a7..c2bc8d2ba728 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -110,12 +110,13 @@ public class DatastoreV1Test { private static final String NAMESPACE = "testNamespace"; private static final String KIND = "testKind"; private static final Query QUERY; + private static final String LOCALHOST = "localhost:9955"; private static final V1Options V_1_OPTIONS; static { Query.Builder q = Query.newBuilder(); q.addKindBuilder().setName(KIND); QUERY = q.build(); - V_1_OPTIONS = V1Options.from(PROJECT_ID, QUERY, NAMESPACE); + V_1_OPTIONS = V1Options.from(PROJECT_ID, QUERY, NAMESPACE, null); } private DatastoreV1.Read initialRead; @@ -136,7 +137,8 @@ public void setUp() { initialRead = DatastoreIO.v1().read() .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE); - when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class))) + when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class), + any(String.class))) .thenReturn(mockDatastore); when(mockDatastoreFactory.getQuerySplitter()) .thenReturn(mockQuerySplitter); @@ -157,10 +159,12 @@ public void testBuildRead() throws Exception { @Test public void testBuildReadAlt() throws Exception { DatastoreV1.Read read = DatastoreIO.v1().read() - .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY); + .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY) + .withLocalhost(LOCALHOST); assertEquals(QUERY, read.getQuery()); assertEquals(PROJECT_ID, read.getProjectId()); assertEquals(NAMESPACE, read.getNamespace()); + assertEquals(LOCALHOST, read.getLocalhost()); } @Test @@ -504,7 +508,7 @@ public void testDeleteKeys() throws Exception { @Test public void testDatastoreWriteFnDisplayData() { - DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID); + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null); DisplayData displayData = DisplayData.from(datastoreWriter); assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); } @@ -539,7 +543,8 @@ private void datastoreWriterFnTest(int numMutations) throws Exception { makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build()); } - DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory); + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null, + mockDatastoreFactory); DoFnTester doFnTester = DoFnTester.of(datastoreWriter); doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); doFnTester.processBundle(mutations); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java index 4dd1608e7abe..49a60c646e05 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java @@ -86,7 +86,7 @@ private void testSplitQueryFn(String projectId, String kind, @Nullable String na query.addKindBuilder().setName(kind); SplitQueryFn splitQueryFn = new SplitQueryFn( - V1Options.from(projectId, query.build(), namespace), 0); + V1Options.from(projectId, query.build(), namespace, null), 0); DoFnTester> doFnTester = DoFnTester.of(splitQueryFn); List> queries = doFnTester.processBundle(query.build()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java index 526f035d6093..56302f2e84f5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java @@ -133,7 +133,8 @@ public void processElement(ProcessContext c) throws Exception { /** * Build a new datastore client. */ - static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { + static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId, + String localhost) { Credentials credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); HttpRequestInitializer initializer; if (credential != null) { @@ -148,6 +149,7 @@ static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) new DatastoreOptions.Builder() .projectId(projectId) .initializer(initializer); + .localhost(localhost) return DatastoreFactory.get().create(builder.build()); } From 6902ff434dd250d69685354b747e7e0b10bbe60b Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Mon, 7 Nov 2016 16:12:33 -0800 Subject: [PATCH 2/4] Fix merge conflicts --- .../org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java | 6 ++++++ .../org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index ea77e9ed5e20..d688cadb20b9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -425,6 +425,7 @@ public DatastoreV1.Read withLocalhost(String localhost) { .build(); } + @Override public PCollection expand(PBegin input) { V1Options v1Options = V1Options.from(getProjectId(), getQuery(), getNamespace(), getLocalhost()); @@ -1055,6 +1056,11 @@ public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId, initializer = new RetryHttpRequestInitializer(); } + DatastoreOptions.Builder builder = + new DatastoreOptions.Builder() + .projectId(projectId) + .initializer(initializer); + if (localhost != null) { builder.localHost(localhost); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java index 56302f2e84f5..93c3783cd2a5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java @@ -148,8 +148,8 @@ static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId, DatastoreOptions.Builder builder = new DatastoreOptions.Builder() .projectId(projectId) - .initializer(initializer); - .localhost(localhost) + .initializer(initializer) + .localhost(localhost); return DatastoreFactory.get().create(builder.build()); } From 17413b1e5bc98f6bfa59f435f6a380e4f0e070b7 Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Wed, 16 Nov 2016 13:48:28 -0800 Subject: [PATCH 3/4] Address comments --- .../org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index d688cadb20b9..4a219aa2dd5a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -127,7 +127,7 @@ * many workers. However, when the {@link Query} is configured with a limit using * {@link com.google.datastore.v1.Query.Builder#setLimit(Int32Value)} or if the Query contains * inequality filters like {@code GREATER_THAN, LESS_THAN} etc., then all returned results - * will be read by a single Dataflow worker in order to ensure correct data. Since data is read from + * will be read by a single worker in order to ensure correct data. Since data is read from * a single worker, this could have a significant impact on the performance of the job. * *

To write a {@link PCollection} to a Cloud Datastore, use {@link DatastoreV1#write}, @@ -251,7 +251,7 @@ abstract static class Builder { abstract Builder setQuery(Query query); abstract Builder setNamespace(String namespace); abstract Builder setNumQuerySplits(int numQuerySplits); - abstract Builder setLocalhost(String hostPort); + abstract Builder setLocalhost(String localhost); abstract Read build(); } @@ -419,6 +419,10 @@ public DatastoreV1.Read withNumQuerySplits(int numQuerySplits) { .build(); } + /** + * Returns a new {@link DatastoreV1.Read} that reads from a Datastore Emulator running at the + * given localhost address. + */ public DatastoreV1.Read withLocalhost(String localhost) { return toBuilder() .setLocalhost(localhost) From 1f3647c83576551e7eb47a38f44c8a5760883bec Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Thu, 17 Nov 2016 14:40:55 -0800 Subject: [PATCH 4/4] Fix build break --- .../org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java index 93c3783cd2a5..526f035d6093 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java @@ -133,8 +133,7 @@ public void processElement(ProcessContext c) throws Exception { /** * Build a new datastore client. */ - static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId, - String localhost) { + static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { Credentials credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); HttpRequestInitializer initializer; if (credential != null) { @@ -148,8 +147,7 @@ static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId, DatastoreOptions.Builder builder = new DatastoreOptions.Builder() .projectId(projectId) - .initializer(initializer) - .localhost(localhost); + .initializer(initializer); return DatastoreFactory.get().create(builder.build()); }