Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vikkyrk committed Dec 14, 2016
1 parent 613f53a commit 48f2876
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 113 deletions.
Expand Up @@ -69,7 +69,6 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -205,8 +204,7 @@ public class DatastoreV1 {
* {@link DatastoreV1.Read#withNamespace}, {@link DatastoreV1.Read#withNumQuerySplits}.
*/
public DatastoreV1.Read read() {
return new AutoValue_DatastoreV1_Read.Builder().setNumQuerySplits(
StaticValueProvider.of(0)).build();
return new AutoValue_DatastoreV1_Read.Builder().setNumQuerySplits(0).build();
}

/**
Expand Down Expand Up @@ -236,10 +234,10 @@ public abstract static class Read extends PTransform<PBegin, PCollection<Entity>
static final int QUERY_BATCH_LIMIT = 500;

@Nullable public abstract ValueProvider<String> getProjectId();
@Nullable public abstract ValueProvider<Query> getQuery();
@Nullable public abstract Query getQuery();
@Nullable public abstract ValueProvider<String> getGqlQuery();
@Nullable public abstract ValueProvider<String> getNamespace();
public abstract ValueProvider<Integer> getNumQuerySplits();
public abstract int getNumQuerySplits();

@Override
public abstract String toString();
Expand All @@ -249,10 +247,10 @@ public abstract static class Read extends PTransform<PBegin, PCollection<Entity>
@AutoValue.Builder
abstract static class Builder {
abstract Builder setProjectId(ValueProvider<String> projectId);
abstract Builder setQuery(ValueProvider<Query> query);
abstract Builder setQuery(Query query);
abstract Builder setGqlQuery(ValueProvider<String> gqlQuery);
abstract Builder setNamespace(ValueProvider<String> namespace);
abstract Builder setNumQuerySplits(ValueProvider<Integer> numQuerySplits);
abstract Builder setNumQuerySplits(int numQuerySplits);
abstract Read build();
}

Expand Down Expand Up @@ -283,6 +281,8 @@ static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable Str
private static long queryLatestStatisticsTimestamp(Datastore datastore,
@Nullable String namespace) throws DatastoreException {
Query.Builder query = Query.newBuilder();
// Note: namespace either being null or empty represents the default namespace, in which
// case we treat it as not provided by the user.
if (namespace == null || namespace.isEmpty()) {
query.addKindBuilder().setName("__Stat_Total__");
} else {
Expand Down Expand Up @@ -346,6 +346,9 @@ static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable St
/** Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}. */
static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
// Note: namespace either being null or empty represents the default namespace.
// Datastore Client libraries expect users to not set the namespace proto field in
// either of these cases.
if (namespace != null && !namespace.isEmpty()) {
requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
}
Expand Down Expand Up @@ -405,14 +408,6 @@ public DatastoreV1.Read withQuery(Query query) {
checkNotNull(query, "query");
checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0,
"Invalid query limit %s: must be positive", query.getLimit().getValue());
return toBuilder().setQuery(StaticValueProvider.of(query)).build();
}

/**
* Same as {@link Read#withQuery(Query)} but with a {@link ValueProvider}.
*/
public DatastoreV1.Read withQuery(ValueProvider<Query> query) {
checkNotNull(query, "query");
return toBuilder().setQuery(query).build();
}

Expand Down Expand Up @@ -464,13 +459,7 @@ public DatastoreV1.Read withNamespace(ValueProvider<String> namespace) {
*/
public DatastoreV1.Read withNumQuerySplits(int numQuerySplits) {
return toBuilder()
.setNumQuerySplits(StaticValueProvider.of(Math.min(Math.max(numQuerySplits, 0),
NUM_QUERY_SPLITS_MAX)))
.build();
}

public DatastoreV1.Read withNumQuerySplits(ValueProvider<Integer> numQuerySplits) {
return toBuilder().setNumQuerySplits(numQuerySplits).build();
.setNumQuerySplits(Math.min(Math.max(numQuerySplits, 0), NUM_QUERY_SPLITS_MAX)).build();
}

@Override
Expand All @@ -479,9 +468,8 @@ public PCollection<Entity> expand(PBegin input) {

/*
* This composite transform involves the following steps:
* 1. Create a singleton of the user provided {@code query} or {@code gqlQuery} and
* apply a {@link ParDo} that translates the {@code gqlQuery} into a {@query}, if a
* gqlQuery is provided, else the query is bypassed.
* 1. Create a singleton of the user provided {@code query} or if {@code gqlQuery} is
* provided apply a {@link ParDo} that translates the {@code gqlQuery} into a {@query}.
*
* 2. A {@link ParDo} splits the resulting query into {@code numQuerySplits} and
* assign each split query a unique {@code Integer} as the key. The resulting output is
Expand All @@ -498,14 +486,19 @@ public PCollection<Entity> expand(PBegin input) {
* a {@code PCollection<Entity>}.
*/

Coder<KV<ValueProvider<String>, ValueProvider<Query>>> valueProviderCoder =
SerializableCoder.of(
new TypeDescriptor<KV<ValueProvider<String>, ValueProvider<Query>>>() {});
PCollection<KV<Integer, Query>> queries;
if (getQuery() != null) {
queries = input
.apply(Create.of(getQuery()))
.apply(ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits())));

PCollection<KV<Integer, Query>> queries = input
.apply(Create.of(KV.of(getGqlQuery(), getQuery())).withCoder(valueProviderCoder))
.apply(ParDo.of(new GqlQueryTranslatorFn(v1Options)))
.apply(ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits())));
} else {
queries = input
.apply(Create.of(getGqlQuery()).withCoder(
SerializableCoder.of(new TypeDescriptor<ValueProvider<String>>() {})))
.apply(ParDo.of(new GqlQueryTranslatorFn(v1Options)))
.apply(ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits())));
}

PCollection<Query> shardedQueries = queries
.apply(GroupByKey.<Integer, Query>create())
Expand Down Expand Up @@ -536,10 +529,6 @@ public void validate(PBegin input) {
"Only one of query or gql query ValueProvider should be provided");
}

if (getQuery() != null && getQuery().isAccessible()) {
checkNotNull(getQuery().get(), "query");
}

if (getGqlQuery() != null && getGqlQuery().isAccessible()) {
checkNotNull(getGqlQuery().get(), "gql query");
}
Expand All @@ -548,11 +537,7 @@ public void validate(PBegin input) {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
String query = null;
if (getQuery() != null) {
query = getQuery().isAccessible() ? getQuery().get().toString() : null;
}

String query = getQuery() == null ? null : getQuery().toString();
builder
.addIfNotNull(DisplayData.item("projectId", getProjectId())
.withLabel("ProjectId"))
Expand Down Expand Up @@ -606,7 +591,7 @@ public ValueProvider<String> getNamespaceValueProvider() {
* A DoFn that translates a Cloud Datastore gql query string to {@code Query}.
*/
static class GqlQueryTranslatorFn
extends DoFn<KV<ValueProvider<String>, ValueProvider<Query>>, Query> {
extends DoFn<ValueProvider<String>, Query> {
private final V1Options options;
private transient Datastore datastore;
private final V1DatastoreFactory datastoreFactory;
Expand All @@ -628,36 +613,27 @@ public void startBundle(Context c) throws Exception {

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
ValueProvider<String> gqlQuery = c.element().getKey();
ValueProvider<Query> query = c.element().getValue();

// If query is provided, just use it. This will most likely be a StaticValueProvider
if (query != null && query.get() != null) {
LOG.info("User query: {}", query.get());
c.output(query.get());
} else if (gqlQuery != null && gqlQuery.get() != null) {
LOG.info("User query: '{}'", gqlQuery.get());
// Gql query is provided, convert it to query
// Note: If user's query already has a limit, then that limit could take precedence
// over limit 0. resulting in actually reading entities but should not be a large
// number since service has a cap on the number of entities returned in a single request.
String gqlQueryWithZeroLimit = gqlQuery.get() + " limit 0";
GqlQuery gql = GqlQuery.newBuilder().setQueryString(gqlQueryWithZeroLimit)
.setAllowLiterals(true).build();

RunQueryRequest req = makeRequest(gql, options.getNamespace());
RunQueryResponse resp = datastore.runQuery(req);
Query translatedQuery = resp.getQuery();

if (translatedQuery.getLimit().getValue() == 0) {
// Clear the limit if we set it to 0
translatedQuery = translatedQuery.toBuilder().clearLimit().build();
}
LOG.info("User gql query translated to Query({})", translatedQuery);
c.output(translatedQuery);
} else {
throw new RuntimeException("Either query or gql query should be provided");
ValueProvider<String> gqlQuery = c.element();

LOG.info("User query: '{}'", gqlQuery.get());
// Gql query is provided, convert it to query
// Note: If user's query already has a limit, then that limit could take precedence
// over limit 0. resulting in actually reading entities but should not be a large
// number since service has a cap on the number of entities returned in a single request.
String gqlQueryWithZeroLimit = gqlQuery.get() + " limit 0";
GqlQuery gql = GqlQuery.newBuilder().setQueryString(gqlQueryWithZeroLimit)
.setAllowLiterals(true).build();

RunQueryRequest req = makeRequest(gql, options.getNamespace());
RunQueryResponse resp = datastore.runQuery(req);
Query translatedQuery = resp.getQuery();

if (translatedQuery.getLimit().getValue() == 0) {
// Clear the limit if we set it to 0
translatedQuery = translatedQuery.toBuilder().clearLimit().build();
}
LOG.info("User gql query translated to Query({})", translatedQuery);
c.output(translatedQuery);
}

@Override
Expand All @@ -679,20 +655,20 @@ public void populateDisplayData(DisplayData.Builder builder) {
static class SplitQueryFn extends DoFn<Query, KV<Integer, Query>> {
private final V1Options options;
// number of splits to make for a given query
private final ValueProvider<Integer> numSplits;
private final int numSplits;

private final V1DatastoreFactory datastoreFactory;
// Datastore client
private transient Datastore datastore;
// Query splitter
private transient QuerySplitter querySplitter;

public SplitQueryFn(V1Options options, ValueProvider<Integer> numSplits) {
public SplitQueryFn(V1Options options, int numSplits) {
this(options, numSplits, new V1DatastoreFactory());
}

@VisibleForTesting
SplitQueryFn(V1Options options, ValueProvider<Integer> numSplits,
SplitQueryFn(V1Options options, int numSplits,
V1DatastoreFactory datastoreFactory) {
this.options = options;
this.numSplits = numSplits;
Expand All @@ -718,10 +694,10 @@ public void processElement(ProcessContext c) throws Exception {

int estimatedNumSplits;
// Compute the estimated numSplits if numSplits is not specified by the user.
if (numSplits.get() <= 0) {
if (numSplits <= 0) {
estimatedNumSplits = getEstimatedNumSplits(datastore, query, options.getNamespace());
} else {
estimatedNumSplits = numSplits.get();
estimatedNumSplits = numSplits;
}

LOG.info("Splitting the query into {} splits", estimatedNumSplits);
Expand Down
Expand Up @@ -155,7 +155,7 @@ public void setUp() {
public void testBuildRead() throws Exception {
DatastoreV1.Read read = DatastoreIO.v1().read()
.withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
assertEquals(QUERY, read.getQuery().get());
assertEquals(QUERY, read.getQuery());
assertEquals(PROJECT_ID, read.getProjectId().get());
assertEquals(NAMESPACE, read.getNamespace().get());
}
Expand All @@ -176,7 +176,7 @@ public void testBuildReadWithGqlQuery() throws Exception {
public void testBuildReadAlt() throws Exception {
DatastoreV1.Read read = DatastoreIO.v1().read()
.withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY);
assertEquals(QUERY, read.getQuery().get());
assertEquals(QUERY, read.getQuery());
assertEquals(PROJECT_ID, read.getProjectId().get());
assertEquals(NAMESPACE, read.getNamespace().get());
}
Expand Down Expand Up @@ -626,8 +626,7 @@ public void testSplitQueryFnWithNumSplits() throws Exception {
eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)))
.thenReturn(splitQuery(QUERY, numSplits));

SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, StaticValueProvider.of(numSplits),
mockDatastoreFactory);
SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory);
DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
/**
* Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through
Expand Down Expand Up @@ -673,8 +672,7 @@ public void testSplitQueryFnWithoutNumSplits() throws Exception {
eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)))
.thenReturn(splitQuery(QUERY, expectedNumSplits));

SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, StaticValueProvider.of(numSplits),
mockDatastoreFactory);
SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory);
DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
Expand All @@ -696,8 +694,7 @@ public void testSplitQueryFnWithQueryLimit() throws Exception {
.setLimit(Int32Value.newBuilder().setValue(1))
.build();

SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, StaticValueProvider.of(10),
mockDatastoreFactory);
SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, mockDatastoreFactory);
DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
List<KV<Integer, Query>> queries = doFnTester.processBundle(queryWithLimit);
Expand Down Expand Up @@ -726,25 +723,10 @@ public void testReadFnWithBatchesExactMultiple() throws Exception {
readFnTest(5 * QUERY_BATCH_LIMIT);
}

@Test
public void testGqlTranslatorFnWithQuery() throws Exception {
GqlQueryTranslatorFn translatorFn = new GqlQueryTranslatorFn(V_1_OPTIONS, mockDatastoreFactory);
DoFnTester<KV<ValueProvider<String>, ValueProvider<Query>>, Query> doFnTester =
DoFnTester.of(translatorFn);
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
ValueProvider<Query> query = StaticValueProvider.of(QUERY);
List<Query> queries = doFnTester.processBundle(KV.of((ValueProvider<String>) null, query));

assertEquals(queries.size(), 1);
assertEquals(queries.get(0), QUERY);
verifyNoMoreInteractions(mockDatastore);
}

@Test
public void testGqlTranslatorFnWithGqlQuery() throws Exception {
GqlQueryTranslatorFn translatorFn = new GqlQueryTranslatorFn(V_1_OPTIONS, mockDatastoreFactory);
DoFnTester<KV<ValueProvider<String>, ValueProvider<Query>>, Query> doFnTester =
DoFnTester.of(translatorFn);
DoFnTester<ValueProvider<String>, Query> doFnTester = DoFnTester.of(translatorFn);
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
ValueProvider<String> gqlQuery = StaticValueProvider.of(GQL_QUERY);
String gqlQueryWithZeroLimit = gqlQuery.get() + " limit 0";
Expand All @@ -754,7 +736,7 @@ public void testGqlTranslatorFnWithGqlQuery() throws Exception {
when(mockDatastore.runQuery(gqlRequest))
.thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build());

List<Query> queries = doFnTester.processBundle(KV.of(gqlQuery, (ValueProvider<Query>) null));
List<Query> queries = doFnTester.processBundle(gqlQuery);

assertEquals(queries.size(), 1);
assertEquals(queries.get(0), QUERY);
Expand All @@ -764,8 +746,7 @@ public void testGqlTranslatorFnWithGqlQuery() throws Exception {
@Test
public void testGqlTranslatorFnWithGqlQueryWithLimit() throws Exception {
GqlQueryTranslatorFn translatorFn = new GqlQueryTranslatorFn(V_1_OPTIONS, mockDatastoreFactory);
DoFnTester<KV<ValueProvider<String>, ValueProvider<Query>>, Query> doFnTester =
DoFnTester.of(translatorFn);
DoFnTester<ValueProvider<String>, Query> doFnTester = DoFnTester.of(translatorFn);
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
int limit = 100;
Query queryWithLimit = QUERY.toBuilder().setLimit(
Expand All @@ -781,7 +762,7 @@ public void testGqlTranslatorFnWithGqlQueryWithLimit() throws Exception {
when(mockDatastore.runQuery(gqlRequest))
.thenReturn(RunQueryResponse.newBuilder().setQuery(queryWithLimit).build());

List<Query> queries = doFnTester.processBundle(KV.of(gqlQuery, (ValueProvider<Query>) null));
List<Query> queries = doFnTester.processBundle(gqlQuery);

assertEquals(queries.size(), 1);
assertEquals(queries.get(0), queryWithLimit);
Expand All @@ -793,9 +774,6 @@ public interface RuntimeTestOptions extends PipelineOptions {
ValueProvider<String> getDatastoreProject();
void setDatastoreProject(ValueProvider<String> value);

ValueProvider<Query> getQuery();
void setQuery(ValueProvider<Query> value);

ValueProvider<String> getGqlQuery();
void setGqlQuery(ValueProvider<String> value);

Expand All @@ -810,7 +788,7 @@ public void testRuntimeOptionsNotCalledInApplyQuery() {
pipeline
.apply(DatastoreIO.v1().read()
.withProjectId(options.getDatastoreProject())
.withQuery(options.getQuery()))
.withQuery(QUERY))
.apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
}

Expand All @@ -832,7 +810,7 @@ public void testRuntimeOptionsNotCalledInApplyNamespace() {
pipeline
.apply(DatastoreIO.v1().read()
.withProjectId(options.getDatastoreProject())
.withQuery(options.getQuery())
.withGqlQuery(options.getGqlQuery())
.withNamespace(options.getNamespace()))
.apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
}
Expand Down

0 comments on commit 48f2876

Please sign in to comment.