From 48cdb57831f45236327ebb7fb26b5a771ec0e7d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Thu, 11 May 2017 22:09:50 +0200 Subject: [PATCH] [BEAM-2256] Add the last previous range filter --- .../apache/beam/sdk/io/mongodb/MongoDbIO.java | 42 ++++++++++++++----- .../beam/sdk/io/mongodb/MongoDbIOTest.java | 18 ++++++++ 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java index 7236a50655e5..5accc8afa719 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; import com.mongodb.BasicDBObject; import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; @@ -184,7 +185,11 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - private static class BoundedMongoDbSource extends BoundedSource { + /** + * A MongoDB {@link BoundedSource} reading {@link Document} from a given instance. + */ + @VisibleForTesting + protected static class BoundedMongoDbSource extends BoundedSource { private Read spec; private BoundedMongoDbSource(Read spec) { @@ -294,7 +299,8 @@ public List> split(long desiredBundleSizeBytes, * @param additionalFilter A custom (user) additional filter to append to the range filters. * @return A list of filters containing the ranges. */ - private static List splitKeysToFilters(List splitKeys, String + @VisibleForTesting + protected static List splitKeysToFilters(List splitKeys, String additionalFilter) { ArrayList filters = new ArrayList<>(); String lowestBound = null; // lower boundary (previous split in the iteration) @@ -306,30 +312,44 @@ private static List splitKeysToFilters(List splitKeys, String // the range from the beginning up to this split rangeFilter = String.format("{ $and: [ {\"_id\":{$lte:ObjectId(\"%s\")}}", splitKey); + filters.add(formatFilter(rangeFilter, additionalFilter)); } else if (i == splitKeys.size() - 1) { // this is the last split in the list, the filter defines // the range from the split up to the end + rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\")," + + "$lte:ObjectId(\"%s\")}}", lowestBound, splitKey); + filters.add(formatFilter(rangeFilter, additionalFilter)); rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\")}}", splitKey); + filters.add(formatFilter(rangeFilter, additionalFilter)); } else { // we are between two splits rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\")," + "$lte:ObjectId(\"%s\")}}", lowestBound, splitKey); + filters.add(formatFilter(rangeFilter, additionalFilter)); } - if (additionalFilter != null && !additionalFilter.isEmpty()) { - // user provided a filter, we append the user filter to the range filter - rangeFilter = String.format("%s,%s ]}", rangeFilter, additionalFilter); - } else { - // user didn't provide a filter, just cleany close the range filter - rangeFilter = String.format("%s ]}", rangeFilter); - } - - filters.add(rangeFilter); lowestBound = splitKey; } return filters; } + + /** + * Cleanly format range filter, eventually adding an user additional filter. + * + * @param filter The range filter. + * @param additionalFilter An optional user additional filter. + * @return The cleanly formatted range filter. + */ + private static String formatFilter(String filter, @Nullable String additionalFilter) { + if (additionalFilter != null && !additionalFilter.isEmpty()) { + // user provided a filter, we append the user filter to the range filter + return String.format("%s,%s ]}", filter, additionalFilter); + } else { + // user didn't provide a filter, just cleanly close the range filter + return String.format("%s ]}", filter); + } + } } private static class BoundedMongoDbReader extends BoundedSource.BoundedReader { diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java index 454c6ba1ea7b..cd26b483cda9 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java @@ -38,6 +38,8 @@ import java.io.Serializable; import java.net.ServerSocket; import java.util.ArrayList; +import java.util.List; + import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; @@ -138,6 +140,22 @@ public void stop() throws Exception { mongodExecutable.stop(); } + @Test + public void testSplitIntoFilters() throws Exception { + ArrayList documents = new ArrayList<>(); + documents.add(new Document("_id", 56)); + documents.add(new Document("_id", 109)); + documents.add(new Document("_id", 256)); + List filters = MongoDbIO.BoundedMongoDbSource.splitKeysToFilters(documents, null); + assertEquals(4, filters.size()); + assertEquals("{ $and: [ {\"_id\":{$lte:ObjectId(\"56\")}} ]}", filters.get(0)); + assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"56\"),$lte:ObjectId(\"109\")}} ]}", + filters.get(1)); + assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"109\"),$lte:ObjectId(\"256\")}} ]}", + filters.get(2)); + assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"256\")}} ]}", filters.get(3)); + } + @Test public void testFullRead() throws Exception {