From c7729a944bb4cc287a3c537a130503385b8c1e1c Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Thu, 11 Aug 2016 23:05:09 -0700 Subject: [PATCH 1/2] DatastoreIO Write PTranform for Bounded/Unbounded PCollections --- .../beam/sdk/io/gcp/datastore/V1Beta3.java | 415 +++++++++--------- .../sdk/io/gcp/datastore/V1Beta3Test.java | 131 ++++-- 2 files changed, 297 insertions(+), 249 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java index 052feb34a847..1dd90349349d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java @@ -24,24 +24,23 @@ import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL; import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING; import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED; +import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.io.Sink.WriteOperation; -import org.apache.beam.sdk.io.Sink.Writer; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; @@ -64,6 +63,7 @@ import com.google.datastore.v1beta3.EntityResult; import com.google.datastore.v1beta3.Key; import com.google.datastore.v1beta3.Key.PathElement; +import com.google.datastore.v1beta3.Mutation; import com.google.datastore.v1beta3.PartitionId; import com.google.datastore.v1beta3.Query; import com.google.datastore.v1beta3.QueryResultBatch; @@ -88,7 +88,7 @@ import javax.annotation.Nullable; /** - *

{@link V1Beta3} provides an API to Read and Write {@link PCollection PCollections} of + *

{@link V1Beta3} provides an API to Read, Write and Delete {@link PCollection PCollections} of * Google Cloud Datastore version v1beta3 * {@link Entity} objects. * @@ -133,7 +133,16 @@ * p.run(); * } * - *

{@link Entity Entities} in the {@code PCollection} to be written must have complete + *

To delete a {@link PCollection} of entities from Datastore, use {@link V1Beta3#delete}, + * specifying the Cloud Datastore project to write to: + * + *

 {@code
+ * PCollection entities = ...;
+ * entities.apply(DatastoreIO.v1beta3().delete().withProjectId(projectId));
+ * p.run();
+ * } 
+ * + *

{@link Entity Entities} in the {@code PCollection} to be written or deleted must have complete * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than * {@code projectId} default may be used by specifying it in the {@code Entity} {@code Keys}. @@ -143,9 +152,9 @@ * keyBuilder.getPartitionIdBuilder().setNamespace(namespace); * } * - *

{@code Entities} will be committed as upsert (update or insert) mutations. Please read - * Entities, Properties, and - * Keys for more information about {@code Entity} keys. + *

{@code Entities} will be committed as upsert (update or insert) or delete mutations. Please + * read Entities, Properties, + * and Keys for more information about {@code Entity} keys. * *

Permissions

* Permission requirements depend on the {@code PipelineRunner} that is used to execute the @@ -167,7 +176,8 @@ public class V1Beta3 { * Datastore has a limit of 500 mutations per batch operation, so we flush * changes to Datastore every 500 entities. */ - private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500; + @VisibleForTesting + static final int DATASTORE_BATCH_MUTATION_LIMIT = 500; /** * Returns an empty {@link V1Beta3.Read} builder. Configure the source {@code projectId}, @@ -634,42 +644,8 @@ public void processElement(ProcessContext context) throws Exception { } } } - - /** - * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and - * {@link QuerySplitter} - * - *

{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence - * wrapping them under this class, which implements {@link Serializable}. - */ - @VisibleForTesting - static class V1Beta3DatastoreFactory implements Serializable { - - /** Builds a Datastore client for the given pipeline options and project. */ - public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { - DatastoreOptions.Builder builder = - new DatastoreOptions.Builder() - .projectId(projectId) - .initializer( - new RetryHttpRequestInitializer() - ); - - Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); - if (credential != null) { - builder.credential(credential); - } - - return DatastoreFactory.get().create(builder.build()); - } - - /** Builds a Datastore {@link QuerySplitter}. */ - public QuerySplitter getQuerySplitter() { - return DatastoreHelper.getQuerySplitter(); - } - } } - /** * Returns an empty {@link V1Beta3.Write} builder. Configure the destination * {@code projectId} using {@link V1Beta3.Write#withProjectId}. @@ -678,21 +654,26 @@ public Write write() { return new Write(null); } + /** + * Returns an empty {@link V1Beta3.Delete} builder. Configure the destination + * {@code projectId} using {@link V1Beta3.Delete#withProjectId}. + */ + public Delete delete() { + return new Delete(null); + } + /** * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore. * * @see DatastoreIO */ - public static class Write extends PTransform, PDone> { - @Nullable - private final String projectId; - + public static class Write extends DatastoreMutation { /** * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if * it is {@code null} at instantiation time, an error will be thrown. */ public Write(@Nullable String projectId) { - this.projectId = projectId; + super(projectId, new UpsertFn()); } /** @@ -702,57 +683,68 @@ public Write withProjectId(String projectId) { checkNotNull(projectId, "projectId"); return new Write(projectId); } + } - @Override - public PDone apply(PCollection input) { - return input.apply( - org.apache.beam.sdk.io.Write.to(new DatastoreSink(projectId))); + /** + * A {@link PTransform} that deletes {@link Entity} objects from Cloud Datastore. + * + * @see DatastoreIO + */ + public static class Delete extends DatastoreMutation { + /** + * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if + * it is {@code null} at instantiation time, an error will be thrown. + */ + public Delete(@Nullable String projectId) { + super(projectId, new DeleteFn()); } - @Override - public void validate(PCollection input) { + /** + * Returns a new {@link Delete} that deletes from the Cloud Datastore for the specified project. + */ + public Delete withProjectId(String projectId) { checkNotNull(projectId, "projectId"); - } - - @Nullable - public String getProjectId() { - return projectId; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("projectId", projectId) - .toString(); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("projectId", projectId) - .withLabel("Output Project")); + return new Delete(projectId); } } /** - * A {@link org.apache.beam.sdk.io.Sink} that writes data to Datastore. + * A {@link PTransform} that writes mutations of {@link Entity} objects to Cloud Datastore. + * + *

It requires a {@link DoFn} that tranforms an {@code Entity} object to a {@link Mutation} + * Note: Only idempotent Cloud Datastore mutation operations (upsert and delete) should + * be used by the {@code DoFn} provided, as the commits are retried when failures occur. */ - static class DatastoreSink extends org.apache.beam.sdk.io.Sink { - final String projectId; + private static class DatastoreMutation extends PTransform, PDone> { + private final String projectId; + // A function that transforms each entity into a mutation. + private final SimpleFunction mutationFn; - public DatastoreSink(String projectId) { + public DatastoreMutation(String projectId, SimpleFunction mutationFn) { this.projectId = projectId; + this.mutationFn = mutationFn; + } + + @Override + public PDone apply(PCollection input) { + input.apply(MapElements.via(mutationFn)) + .apply(ParDo.of(new DatastoreWriterFn(projectId))); + + return PDone.in(input.getPipeline()); } @Override - public void validate(PipelineOptions options) { + public void validate(PCollection input) { checkNotNull(projectId, "projectId"); + checkNotNull(mutationFn, "mutationFn"); } @Override - public DatastoreWriteOperation createWriteOperation(PipelineOptions options) { - return new DatastoreWriteOperation(this); + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("projectId", projectId) + .add("mutationFn", mutationFn.getClass().getName()) + .toString(); } @Override @@ -760,93 +752,38 @@ public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder .addIfNotNull(DisplayData.item("projectId", projectId) - .withLabel("Output Project")); - } - } - - /** - * A {@link WriteOperation} that will manage a parallel write to a Datastore sink. - */ - private static class DatastoreWriteOperation - extends WriteOperation { - private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriteOperation.class); - - private final DatastoreSink sink; - - public DatastoreWriteOperation(DatastoreSink sink) { - this.sink = sink; + .withLabel("Output Project")) + .addIfNotNull(DisplayData.item("mutationFn", mutationFn.getClass().getName()) + .withLabel("Datastore Mutation Function")); } - @Override - public Coder getWriterResultCoder() { - return SerializableCoder.of(DatastoreWriteResult.class); - } - - @Override - public void initialize(PipelineOptions options) throws Exception {} - - /** - * Finalizes the write. Logs the number of entities written to the Datastore. - */ - @Override - public void finalize(Iterable writerResults, PipelineOptions options) - throws Exception { - long totalEntities = 0; - for (DatastoreWriteResult result : writerResults) { - totalEntities += result.entitiesWritten; - } - LOG.info("Wrote {} elements.", totalEntities); - } - - @Override - public DatastoreWriter createWriter(PipelineOptions options) throws Exception { - DatastoreOptions.Builder builder = - new DatastoreOptions.Builder() - .projectId(sink.projectId) - .initializer(new RetryHttpRequestInitializer()); - Credential credential = options.as(GcpOptions.class).getGcpCredential(); - if (credential != null) { - builder.credential(credential); - } - Datastore datastore = DatastoreFactory.get().create(builder.build()); - - return new DatastoreWriter(this, datastore); - } - - @Override - public DatastoreSink getSink() { - return sink; + public String getProjectId() { + return projectId; } } /** - * {@link Writer} that writes entities to a Datastore Sink. Entities are written in batches, - * where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. Entities - * are committed as upsert mutations (either update if the key already exists, or insert if it is - * a new key). If an entity does not have a complete key (i.e., it has no name or id), the bundle - * will fail. + * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in + * batches, where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_MUTATION_LIMIT}. * *

See - * Datastore: Entities, Properties, and Keys for information about entity keys and upsert - * mutations. + * href="https://cloud.google.com/datastore/docs/concepts/entities"> + * Datastore: Entities, Properties, and Keys for information about entity keys and mutations. * *

Commits are non-transactional. If a commit fails because of a conflict over an entity - * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT} - * times). - * - *

Visible for testing purposes. + * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_MUTATION_LIMIT} + * times). This means that the mutation operation should be idempotent. Thus, the writer should + * only be used for {code upsert} and {@code delete} mutation operations, as these are the only + * two Cloud Datastore mutations that are idempotent. */ @VisibleForTesting - static class DatastoreWriter extends Writer { - private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriter.class); - private final DatastoreWriteOperation writeOp; - private final Datastore datastore; - private long totalWritten = 0; - - // Visible for testing. - final List entities = new ArrayList<>(); - + static class DatastoreWriterFn extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class); + private final String projectId; + private transient Datastore datastore; + private final V1Beta3DatastoreFactory datastoreFactory; + // Current batch of mutations to be written. + private final List mutations = new ArrayList<>(); /** * Since a bundle is written in batches, we should retry the commit of a batch in order to * prevent transient errors from causing the bundle to fail. @@ -858,68 +795,41 @@ static class DatastoreWriter extends Writer { */ private static final int INITIAL_BACKOFF_MILLIS = 5000; - /** - * Returns true if a Datastore key is complete. A key is complete if its last element - * has either an id or a name. - */ - static boolean isValidKey(Key key) { - List elementList = key.getPathList(); - if (elementList.isEmpty()) { - return false; - } - PathElement lastElement = elementList.get(elementList.size() - 1); - return (lastElement.getId() != 0 || !lastElement.getName().isEmpty()); + public DatastoreWriterFn(String projectId) { + this(projectId, new V1Beta3DatastoreFactory()); } - DatastoreWriter(DatastoreWriteOperation writeOp, Datastore datastore) { - this.writeOp = writeOp; - this.datastore = datastore; + @VisibleForTesting + DatastoreWriterFn(String projectId, V1Beta3DatastoreFactory datastoreFactory) { + this.projectId = checkNotNull(projectId, "projectId"); + this.datastoreFactory = datastoreFactory; } - @Override - public void open(String uId) throws Exception {} - - /** - * Writes an entity to the Datastore. Writes are batched, up to {@link - * V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. If an entity does not have a complete key, an - * {@link IllegalArgumentException} will be thrown. - */ - @Override - public void write(Entity value) throws Exception { - // Verify that the entity to write has a complete key. - if (!isValidKey(value.getKey())) { - throw new IllegalArgumentException( - "Entities to be written to the Datastore must have complete keys"); - } - - entities.add(value); + @StartBundle + public void startBundle(Context c) { + datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId); + } - if (entities.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + mutations.add(c.element()); + if (mutations.size() >= V1Beta3.DATASTORE_BATCH_MUTATION_LIMIT) { flushBatch(); } } - /** - * Flushes any pending batch writes and returns a DatastoreWriteResult. - */ - @Override - public DatastoreWriteResult close() throws Exception { - if (entities.size() > 0) { + @FinishBundle + public void finishBundle(Context c) throws Exception { + if (mutations.size() > 0) { flushBatch(); } - return new DatastoreWriteResult(totalWritten); - } - - @Override - public DatastoreWriteOperation getWriteOperation() { - return writeOp; } /** - * Writes a batch of entities to the Datastore. + * Writes a batch of mutations to Cloud Datastore. * - *

If a commit fails, it will be retried (up to {@link DatastoreWriter#MAX_RETRIES} - * times). All entities in the batch will be committed again, even if the commit was partially + *

If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES} + * times). All mutations in the batch will be committed again, even if the commit was partially * successful. If the retry limit is exceeded, the last exception from the Datastore will be * thrown. * @@ -927,7 +837,7 @@ public DatastoreWriteOperation getWriteOperation() { * backing off between retries fails. */ private void flushBatch() throws DatastoreException, IOException, InterruptedException { - LOG.debug("Writing batch of {} entities", entities.size()); + LOG.debug("Writing batch of {} mutations", mutations.size()); Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS); @@ -935,9 +845,7 @@ private void flushBatch() throws DatastoreException, IOException, InterruptedExc // Batch upsert entities. try { CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); - for (Entity entity: entities) { - commitRequest.addMutations(makeUpsert(entity)); - } + commitRequest.addAllMutations(mutations); commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); datastore.commit(commitRequest.build()); // Break if the commit threw no exception. @@ -953,17 +861,94 @@ private void flushBatch() throws DatastoreException, IOException, InterruptedExc } } } - totalWritten += entities.size(); - LOG.debug("Successfully wrote {} entities", entities.size()); - entities.clear(); + LOG.debug("Successfully wrote {} mutations", mutations.size()); + mutations.clear(); + } + + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("projectId", projectId) + .withLabel("Output Project")); + } + } + + /** + * Returns true if a Datastore key is complete. A key is complete if its last element + * has either an id or a name. + */ + static boolean isValidKey(Key key) { + List elementList = key.getPathList(); + if (elementList.isEmpty()) { + return false; + } + PathElement lastElement = elementList.get(elementList.size() - 1); + return (lastElement.getId() != 0 || !lastElement.getName().isEmpty()); + } + + /** + * A function that constructs an upsert {@link Mutation} from an {@link Entity}. + */ + @VisibleForTesting + static class UpsertFn extends SimpleFunction { + @Override + public Mutation apply(Entity entity) { + // Verify that the entity to write has a complete key. + if (!isValidKey(entity.getKey())) { + throw new IllegalArgumentException( + "Entities to be written to the Datastore must have complete keys"); + } + return makeUpsert(entity).build(); } } - private static class DatastoreWriteResult implements Serializable { - final long entitiesWritten; + /** + * A function that constructs a delete {@link Mutation} from an {@link Entity}. + */ + @VisibleForTesting + static class DeleteFn extends SimpleFunction { + @Override + public Mutation apply(Entity entity) { + // Verify that the entity to delete has a complete key. + if (!isValidKey(entity.getKey())) { + throw new IllegalArgumentException( + "Entities to be deleted from the Datastore must have complete keys"); + } + return makeDelete(entity.getKey()).build(); + } + } + + /** + * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and + * {@link QuerySplitter} + * + *

{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence + * wrapping them under this class, which implements {@link Serializable}. + */ + @VisibleForTesting + static class V1Beta3DatastoreFactory implements Serializable { + + /** Builds a Datastore client for the given pipeline options and project. */ + public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { + DatastoreOptions.Builder builder = + new DatastoreOptions.Builder() + .projectId(projectId) + .initializer( + new RetryHttpRequestInitializer() + ); + + Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); + if (credential != null) { + builder.credential(credential); + } + + return DatastoreFactory.get().create(builder.build()); + } - public DatastoreWriteResult(long recordsWritten) { - this.entitiesWritten = recordsWritten; + /** Builds a Datastore {@link QuerySplitter}. */ + public QuerySplitter getQuerySplitter() { + return DatastoreHelper.getQuerySplitter(); } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java index 9947c6087a46..e2542ac71d62 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.datastore; +import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DATASTORE_BATCH_MUTATION_LIMIT; import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.DEFAULT_BUNDLE_SIZE_BYTES; import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.QUERY_BATCH_LIMIT; import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.getEstimatedSizeBytes; @@ -24,11 +25,12 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL; import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING; +import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder; +import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -44,11 +46,13 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriter; +import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriterFn; +import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteFn; import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.ReadFn; import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.SplitQueryFn; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3DatastoreFactory; import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3Options; +import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.UpsertFn; +import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.V1Beta3DatastoreFactory; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.transforms.DoFnTester; @@ -61,10 +65,11 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.POutput; -import com.google.common.collect.Lists; +import com.google.datastore.v1beta3.CommitRequest; import com.google.datastore.v1beta3.Entity; import com.google.datastore.v1beta3.EntityResult; import com.google.datastore.v1beta3.Key; +import com.google.datastore.v1beta3.Mutation; import com.google.datastore.v1beta3.PartitionId; import com.google.datastore.v1beta3.Query; import com.google.datastore.v1beta3.QueryResultBatch; @@ -87,7 +92,6 @@ import org.mockito.stubbing.Answer; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -199,9 +203,9 @@ public void testReadValidationSucceedsNamespace() throws Exception { @Test public void testReadDisplayData() { V1Beta3.Read read = DatastoreIO.v1beta3().read() - .withProjectId(PROJECT_ID) - .withQuery(QUERY) - .withNamespace(NAMESPACE); + .withProjectId(PROJECT_ID) + .withQuery(QUERY) + .withNamespace(NAMESPACE); DisplayData displayData = DisplayData.from(read); @@ -258,7 +262,7 @@ public void testWriteDisplayData() { @Test @Category(RunnableOnService.class) - public void testSinkPrimitiveDisplayData() { + public void testWritePrimitiveDisplayData() { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); PTransform, ?> write = DatastoreIO.v1beta3().write().withProjectId("myProject"); @@ -285,33 +289,33 @@ public void testHasNameOrId() { Key key; // Complete with name, no ancestor key = makeKey("bird", "finch").build(); - assertTrue(DatastoreWriter.isValidKey(key)); + assertTrue(V1Beta3.isValidKey(key)); // Complete with id, no ancestor key = makeKey("bird", 123).build(); - assertTrue(DatastoreWriter.isValidKey(key)); + assertTrue(V1Beta3.isValidKey(key)); // Incomplete, no ancestor key = makeKey("bird").build(); - assertFalse(DatastoreWriter.isValidKey(key)); + assertFalse(V1Beta3.isValidKey(key)); // Complete with name and ancestor key = makeKey("bird", "owl").build(); key = makeKey(key, "bird", "horned").build(); - assertTrue(DatastoreWriter.isValidKey(key)); + assertTrue(V1Beta3.isValidKey(key)); // Complete with id and ancestor key = makeKey("bird", "owl").build(); key = makeKey(key, "bird", 123).build(); - assertTrue(DatastoreWriter.isValidKey(key)); + assertTrue(V1Beta3.isValidKey(key)); // Incomplete with ancestor key = makeKey("bird", "owl").build(); key = makeKey(key, "bird").build(); - assertFalse(DatastoreWriter.isValidKey(key)); + assertFalse(V1Beta3.isValidKey(key)); key = makeKey().build(); - assertFalse(DatastoreWriter.isValidKey(key)); + assertFalse(V1Beta3.isValidKey(key)); } /** @@ -319,37 +323,96 @@ public void testHasNameOrId() { */ @Test public void testAddEntitiesWithIncompleteKeys() throws Exception { + Key key = makeKey("bird").build(); Entity entity = Entity.newBuilder().setKey(key).build(); - DatastoreWriter writer = new DatastoreWriter(null, mockDatastore); + UpsertFn upsertFn = new UpsertFn(); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Entities to be written to the Datastore must have complete keys"); - writer.write(entity); + upsertFn.apply(entity); + } + + @Test + /** + * Test that entities with valid keys are transformed to upsert mutations. + */ + public void testAddEntities() throws Exception { + Key key = makeKey("bird", "finch").build(); + Entity entity = Entity.newBuilder().setKey(key).build(); + UpsertFn upsertFn = new UpsertFn(); + + Mutation exceptedMutation = makeUpsert(entity).build(); + assertEquals(upsertFn.apply(entity), exceptedMutation); + } + + /** + * Test that entities with incomplete keys cannot be deleted. + */ + @Test + public void testDeleteEntitiesWithIncompleteKeys() throws Exception { + + Key key = makeKey("bird").build(); + Entity entity = Entity.newBuilder().setKey(key).build(); + DeleteFn deleteFn = new DeleteFn(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Entities to be deleted from the Datastore must have complete keys"); + + deleteFn.apply(entity); } /** - * Test that entities are added to the batch to update. + * Test that entities with valid keys are transformed to delete mutations. */ @Test - public void testAddingEntities() throws Exception { - List expected = Lists.newArrayList( - Entity.newBuilder().setKey(makeKey("bird", "jay").build()).build(), - Entity.newBuilder().setKey(makeKey("bird", "condor").build()).build(), - Entity.newBuilder().setKey(makeKey("bird", "robin").build()).build()); - - List allEntities = Lists.newArrayList(expected); - Collections.shuffle(allEntities); - - DatastoreWriter writer = new DatastoreWriter(null, mockDatastore); - writer.open("test_id"); - for (Entity entity : allEntities) { - writer.write(entity); + public void testDeleteEntities() throws Exception { + Key key = makeKey("bird", "finch").build(); + Entity entity = Entity.newBuilder().setKey(key).build(); + DeleteFn deleteFn = new DeleteFn(); + + Mutation exceptedMutation = makeDelete(entity.getKey()).build(); + assertEquals(deleteFn.apply(entity), exceptedMutation); + } + + @Test + public void testDatatoreWriterFnWithOneBatch() throws Exception { + datastoreWriteFnTest(100); + } + + @Test + public void testDatatoreWriterFnWithMultipleBatches() throws Exception { + datastoreWriteFnTest(DATASTORE_BATCH_MUTATION_LIMIT * 3 + 100); + } + + @Test + public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception { + datastoreWriteFnTest(DATASTORE_BATCH_MUTATION_LIMIT * 2); + } + + private void datastoreWriteFnTest(int numMutations) throws Exception { + // Create the requested number of mutations. + List mutations = new ArrayList<>(numMutations); + for (int i = 0; i < numMutations; ++i) { + mutations.add( + makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build()); } - assertEquals(expected.size(), writer.entities.size()); - assertThat(writer.entities, containsInAnyOrder(expected.toArray())); + for (int i = 0; i < numMutations; i = i + DATASTORE_BATCH_MUTATION_LIMIT) { + CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); + commitRequest.addAllMutations(mutations); + commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); + when(mockDatastore.commit(commitRequest.build())).thenReturn(null); + } + + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory); + DoFnTester doFnTester = DoFnTester.of(datastoreWriter); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + doFnTester.processBundle(mutations); + + int expectedBatches = (int) Math.ceil((double) numMutations / DATASTORE_BATCH_MUTATION_LIMIT); + verify(mockDatastore, times(expectedBatches)).commit(any(CommitRequest.class)); } /** From 3f8087bd9cb7f3030e83543f630f229ece6b6fcd Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Mon, 15 Aug 2016 10:10:46 -0700 Subject: [PATCH 2/2] Add delete example. --- .../examples/cookbook/DatastoreDelete.java | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreDelete.java diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreDelete.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreDelete.java new file mode 100644 index 000000000000..890963f4133f --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreDelete.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.cookbook; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO; +import org.apache.beam.sdk.io.gcp.datastore.V1Beta3; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import com.google.datastore.v1beta3.Query; + +import javax.annotation.Nullable; + +/** + * An example that deletes entities from Cloud Datastore using DatastoreIO. + * + *

This example shows how to use DatastoreIO to delete entities from Datastore. Note that this + * example will write data to Datastore, which may incur charge for Cloud Datastore operations. + * + *

To run this example, users need to use gcloud to get credential for Cloud Datastore: + *

{@code
+ * $ gcloud auth login
+ * }
+ * + *

To run this pipeline locally, the following options must be provided: + *

{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --kind=YOUR_DATASTORE_KIND_NAME
+ * }
+ * + *

To run this example using Dataflow service, you must additionally + * provide either {@literal --tempLocation} or {@literal --tempLocation}, and + * select one of the Dataflow pipeline runners, eg + * {@literal --runner=BlockingDataflowRunner}. + * + */ +public class DatastoreDelete { + /** + * Options supported by {@link DatastoreDelete}. + * + *

Inherits standard configuration options. + */ + public static interface Options extends PipelineOptions { + @Description("Project ID to read from datastore") + @Validation.Required + String getProject(); + void setProject(String value); + + @Description("Datastore Entity kind") + @Validation.Required + String getKind(); + void setKind(String value); + + @Description("Datastore Namespace") + String getNamespace(); + void setNamespace(@Nullable String value); + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory + .fromArgs(args).withValidation().as(DatastoreDelete.Options.class); + + // A query to read all entities from the given kind. + Query.Builder query = Query.newBuilder(); + query.addKindBuilder().setName(options.getKind()); + + // A transform to read entities + V1Beta3.Read read = DatastoreIO.v1beta3().read() + .withProjectId(options.getProject()) + .withQuery(query.build()) + .withNamespace(options.getNamespace()); + + // A transform to delete entities. + V1Beta3.Delete delete = DatastoreIO.v1beta3().delete() + .withProjectId(options.getProject()); + + Pipeline p = Pipeline.create(options); + p.apply("ReadEntities", read) + .apply("DeleteEntities", delete); + + p.run(); + } +}