Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-354]: Modify DatastoreIO to use the v1beta3 API #499

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 10 additions & 5 deletions examples/java/pom.xml
Expand Up @@ -258,11 +258,6 @@
<artifactId>avro</artifactId>
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-datastore-protobuf</artifactId>
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
Expand All @@ -278,6 +273,16 @@
<artifactId>jsr305</artifactId>
</dependency>

<dependency>
<groupId>com.google.cloud.datastore</groupId>
<artifactId>datastore-v1beta3-proto-client</artifactId>
</dependency>

<dependency>
<groupId>com.google.cloud.datastore</groupId>
<artifactId>datastore-v1beta3-protos</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
Expand Down
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.examples.complete;

import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;

import org.apache.beam.examples.common.DataflowExampleUtils;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExamplePubsubTopicOptions;
Expand Down Expand Up @@ -55,18 +57,19 @@
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.datastore.DatastoreV1.Entity;
import com.google.api.services.datastore.DatastoreV1.Key;
import com.google.api.services.datastore.DatastoreV1.Value;
import com.google.api.services.datastore.client.DatastoreHelper;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.datastore.v1beta3.Entity;
import com.google.datastore.v1beta3.Key;
import com.google.datastore.v1beta3.Value;
import com.google.datastore.v1beta3.client.DatastoreHelper;

import org.joda.time.Duration;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -396,16 +399,15 @@ public void processElement(ProcessContext c) {

entityBuilder.setKey(key);
List<Value> candidates = new ArrayList<>();
Map<String, Value> properties = new HashMap<>();
for (CompletionCandidate tag : c.element().getValue()) {
Entity.Builder tagEntity = Entity.newBuilder();
tagEntity.addProperty(
DatastoreHelper.makeProperty("tag", DatastoreHelper.makeValue(tag.value)));
tagEntity.addProperty(
DatastoreHelper.makeProperty("count", DatastoreHelper.makeValue(tag.count)));
candidates.add(DatastoreHelper.makeValue(tagEntity).setIndexed(false).build());
properties.put("tag", makeValue(tag.value).build());
properties.put("count", makeValue(tag.count).build());
candidates.add(makeValue(tagEntity).build());
}
entityBuilder.addProperty(
DatastoreHelper.makeProperty("candidates", DatastoreHelper.makeValue(candidates)));
properties.put("candidates", makeValue(candidates).build());
entityBuilder.putAllProperties(properties);
c.output(entityBuilder.build());
}
}
Expand All @@ -426,7 +428,7 @@ private static interface Options extends ExamplePubsubTopicOptions, ExampleBigQu
Boolean getRecursive();
void setRecursive(Boolean value);

@Description("Dataset entity kind")
@Description("Datastore entity kind")
@Default.String("autocomplete-demo")
String getKind();
void setKind(String value);
Expand All @@ -440,10 +442,6 @@ private static interface Options extends ExamplePubsubTopicOptions, ExampleBigQu
@Default.Boolean(false)
Boolean getOutputToDatastore();
void setOutputToDatastore(Boolean value);

@Description("Datastore output dataset ID, defaults to project ID")
String getOutputDataset();
void setOutputDataset(String value);
}

public static void main(String[] args) throws IOException {
Expand Down Expand Up @@ -479,8 +477,7 @@ public static void main(String[] args) throws IOException {
if (options.getOutputToDatastore()) {
toWrite
.apply(ParDo.named("FormatForDatastore").of(new FormatForDatastore(options.getKind())))
.apply(DatastoreIO.writeTo(MoreObjects.firstNonNull(
options.getOutputDataset(), options.getProject())));
.apply(DatastoreIO.writeTo(options.getProject()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please keep the OutputDataset option here. You may rename it to OutputProject if you like.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if OutputProject is different from the current Project. Are dataflow pipelines allowed to work with multiple projects at the same time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is a common and necessary use case.

On Mon, Jun 20, 2016 at 2:35 PM, Vikas Kedigehalli <notifications@github.com

wrote:

In
examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
#499 (comment):

@@ -479,8 +477,7 @@ public static void main(String[] args) throws IOException {
if (options.getOutputToDatastore()) {
toWrite
.apply(ParDo.named("FormatForDatastore").of(new FormatForDatastore(options.getKind())))

  •  .apply(DatastoreIO.writeTo(MoreObjects.firstNonNull(
    
  •      options.getOutputDataset(), options.getProject())));
    
  •  .apply(DatastoreIO.writeTo(options.getProject()));
    

What if OutputProject is different from the current Project. Are dataflow
pipelines allowed to work with multiple projects at the same time?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/apache/incubator-beam/pull/499/files/834b87394f5834f7c6475032cafaed89be977ecb#r67771180,
or mute the thread
https://github.com/notifications/unsubscribe/AAgIT7OFwqMhdKeVuxt6f85WKhzfxWFuks5qNwevgaJpZM4I5_6o
.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

}
if (options.getOutputToBigQuery()) {
dataflowUtils.setupBigQueryTable();
Expand Down
Expand Up @@ -17,11 +17,10 @@
*/
package org.apache.beam.examples.cookbook;

import static com.google.api.services.datastore.client.DatastoreHelper.getPropertyMap;
import static com.google.api.services.datastore.client.DatastoreHelper.getString;
import static com.google.api.services.datastore.client.DatastoreHelper.makeFilter;
import static com.google.api.services.datastore.client.DatastoreHelper.makeKey;
import static com.google.api.services.datastore.client.DatastoreHelper.makeValue;
import static com.google.datastore.v1beta3.client.DatastoreHelper.getString;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;

import org.apache.beam.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
Expand All @@ -37,12 +36,11 @@
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;

import com.google.api.services.datastore.DatastoreV1.Entity;
import com.google.api.services.datastore.DatastoreV1.Key;
import com.google.api.services.datastore.DatastoreV1.Property;
import com.google.api.services.datastore.DatastoreV1.PropertyFilter;
import com.google.api.services.datastore.DatastoreV1.Query;
import com.google.api.services.datastore.DatastoreV1.Value;
import com.google.datastore.v1beta3.Entity;
import com.google.datastore.v1beta3.Key;
import com.google.datastore.v1beta3.PropertyFilter;
import com.google.datastore.v1beta3.Query;
import com.google.datastore.v1beta3.Value;

import java.util.Map;
import java.util.UUID;
Expand All @@ -64,7 +62,6 @@
* <p>To run this pipeline locally, the following options must be provided:
* <pre>{@code
* --project=YOUR_PROJECT_ID
* --dataset=YOUR_DATASET_ID
* --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PATH]
* }</pre>
*
Expand All @@ -89,7 +86,7 @@ public class DatastoreWordCount {
static class GetContentFn extends DoFn<Entity, String> {
@Override
public void processElement(ProcessContext c) {
Map<String, Value> props = getPropertyMap(c.element());
Map<String, Value> props = c.element().getProperties();
Value value = props.get("content");
if (value != null) {
c.output(getString(value));
Expand All @@ -106,7 +103,7 @@ public void processElement(ProcessContext c) {
static Key makeAncestorKey(@Nullable String namespace, String kind) {
Key.Builder keyBuilder = makeKey(kind, "root");
if (namespace != null) {
keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
}
return keyBuilder.build();
}
Expand Down Expand Up @@ -136,12 +133,11 @@ public Entity makeEntity(String content) {
// we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added,
// we can simplify this code.
if (namespace != null) {
keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
}

entityBuilder.setKey(keyBuilder.build());
entityBuilder.addProperty(Property.newBuilder().setName("content")
.setValue(Value.newBuilder().setStringValue(content)));
entityBuilder.getMutableProperties().put("content", makeValue(content).build());
return entityBuilder.build();
}

Expand All @@ -167,21 +163,21 @@ public static interface Options extends PipelineOptions {
String getOutput();
void setOutput(String value);

@Description("Dataset ID to read from datastore")
@Description("Project ID to read from datastore")
@Validation.Required
String getDataset();
void setDataset(String value);
String getProject();
void setProject(String value);

@Description("Dataset entity kind")
@Description("Datastore Entity kind")
@Default.String("shakespeare-demo")
String getKind();
void setKind(String value);

@Description("Dataset namespace")
@Description("Datastore Namespace")
String getNamespace();
void setNamespace(@Nullable String value);

@Description("Read an existing dataset, do not write first")
@Description("Read an existing project, do not write first")
boolean isReadOnly();
void setReadOnly(boolean value);

Expand All @@ -199,7 +195,7 @@ public static void writeDataToDatastore(Options options) {
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
.apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind())))
.apply(DatastoreIO.writeTo(options.getDataset()));
.apply(DatastoreIO.writeTo(options.getProject()));

p.run();
}
Expand Down Expand Up @@ -231,7 +227,7 @@ public static void readDataFromDatastore(Options options) {

// For Datastore sources, the read namespace can be set on the entire query.
DatastoreIO.Source source = DatastoreIO.source()
.withDataset(options.getDataset())
.withProject(options.getProject())
.withQuery(query)
.withNamespace(options.getNamespace());

Expand Down Expand Up @@ -259,7 +255,7 @@ public static void main(String args[]) {
// First example: write data to Datastore for reading later.
//
// NOTE: this write does not delete any existing Entities in the Datastore, so if run
// multiple times with the same output dataset, there may be duplicate entries. The
// multiple times with the same output project, there may be duplicate entries. The
// Datastore Query tool in the Google Developers Console can be used to inspect or erase all
// entries with a particular namespace and/or kind.
DatastoreWordCount.writeDataToDatastore(options);
Expand Down
46 changes: 15 additions & 31 deletions pom.xml
Expand Up @@ -105,7 +105,8 @@
<clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version>
<dataflow.version>v1b3-rev26-1.22.0</dataflow.version>
<dataflow.proto.version>0.5.160222</dataflow.proto.version>
<datastore.version>v1beta2-rev1-4.0.0</datastore.version>
<datastore.client.version>1.0.0-beta.2</datastore.client.version>
<datastore.proto.version>1.0.0-beta</datastore.proto.version>
<google-auto-service.version>1.0-rc2</google-auto-service.version>
<google-auto-value.version>1.1</google-auto-value.version>
<google-clients.version>1.22.0</google-clients.version>
Expand Down Expand Up @@ -437,37 +438,20 @@
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-datastore-protobuf</artifactId>
<version>${datastore.version}</version>
<groupId>com.google.cloud.datastore</groupId>
<artifactId>datastore-v1beta3-proto-client</artifactId>
<version>${datastore.client.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled in by a transitive
dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
<!-- Exclude old version of api client dependencies. -->
<exclusion>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.oauth-client</groupId>
<artifactId>google-oauth-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client-jackson</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client-protobuf</artifactId>
</exclusion>
<!-- TODO -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

</exclusions>
</dependency>

<dependency>
<groupId>com.google.cloud.datastore</groupId>
<artifactId>datastore-v1beta3-protos</artifactId>
<version>${datastore.proto.version}</version>
<exclusions>
<!-- TODO -->
</exclusions>
</dependency>

Expand Down
11 changes: 6 additions & 5 deletions runners/google-cloud-dataflow-java/pom.xml
Expand Up @@ -311,11 +311,6 @@
<artifactId>google-api-client</artifactId>
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-datastore-protobuf</artifactId>
</dependency>

<dependency>
<groupId>com.google.oauth-client</groupId>
<artifactId>google-oauth-client</artifactId>
Expand Down Expand Up @@ -453,5 +448,11 @@
<artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.cloud.datastore</groupId>
<artifactId>datastore-v1beta3-protos</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Expand Up @@ -30,7 +30,8 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;

import com.google.api.services.datastore.DatastoreV1;
import com.google.datastore.v1beta3.Entity;
import com.google.datastore.v1beta3.Query;

import org.junit.Test;

Expand All @@ -44,20 +45,20 @@ public class DataflowDatastoreIOTest {
public void testSourcePrimitiveDisplayData() {
DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
PTransform<PInput, ?> read = DatastoreIO.readFrom(
"myDataset", DatastoreV1.Query.newBuilder().build());
"myProject", Query.newBuilder().build());

Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
assertThat("DatastoreIO read should include the dataset in its primitive display data",
displayData, hasItem(hasDisplayItem("dataset")));
assertThat("DatastoreIO read should include the project in its primitive display data",
displayData, hasItem(hasDisplayItem("project")));
}

@Test
public void testSinkPrimitiveDisplayData() {
DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
PTransform<PCollection<DatastoreV1.Entity>, ?> write = DatastoreIO.writeTo("myDataset");
PTransform<PCollection<Entity>, ?> write = DatastoreIO.writeTo("myProject");

Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat("DatastoreIO write should include the dataset in its primitive display data",
displayData, hasItem(hasDisplayItem("dataset")));
assertThat("DatastoreIO write should include the project in its primitive display data",
displayData, hasItem(hasDisplayItem("project")));
}
}
9 changes: 7 additions & 2 deletions sdks/java/core/pom.xml
Expand Up @@ -393,8 +393,13 @@
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-datastore-protobuf</artifactId>
<groupId>com.google.cloud.datastore</groupId>
<artifactId>datastore-v1beta3-proto-client</artifactId>
</dependency>

<dependency>
<groupId>com.google.cloud.datastore</groupId>
<artifactId>datastore-v1beta3-protos</artifactId>
</dependency>

<dependency>
Expand Down