From d536ef18152ef074d56031644ee5802dbf8d46a9 Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Wed, 28 Sep 2016 22:44:37 -0400 Subject: [PATCH 01/14] Split BoundedSource into a BoundedSource and a DoFn<...> --- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 306 +++++++----------- .../sdk/io/mongodb/MongoDBGridFSIOTest.java | 42 +-- 2 files changed, 131 insertions(+), 217 deletions(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 337e5f555b8f..5d2a0a75c009 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.mongodb; +import com.google.common.base.Preconditions; import com.mongodb.DB; import com.mongodb.DBCursor; import com.mongodb.DBObject; @@ -30,6 +31,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.Serializable; +import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -42,7 +44,9 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -80,105 +84,66 @@ public class MongoDbGridFSIO { /** - * Function for parsing the GridFSDBFile into objects for the PCollection. + * * @param */ - public interface ParseCallback extends Serializable { - /** - * Each value parsed from the file should be output as an - * Iterable of Line<T>. If timestamp is omitted, it will - * use the uploadDate of the GridFSDBFile. - */ - public static class Line { - final Instant timestamp; - final T value; - - public Line(T value, Instant timestamp) { - this.value = value; - this.timestamp = timestamp; - } - public Line(T value) { - this.value = value; - this.timestamp = null; - } - }; - public Iterator> parse(GridFSDBFile input) throws IOException; + public interface Parser extends Serializable { + public void parse(GridFSDBFile input, DoFn.ProcessContext result) throws IOException; } /** - * Default implementation for parsing the InputStream to collection of - * strings splitting on the cr/lf. + * */ - private static class StringsParseCallback implements ParseCallback { - static final StringsParseCallback INSTANCE = new StringsParseCallback(); + public static class StringParser implements Parser { + static final StringParser INSTANCE = new StringParser(); @Override - public Iterator> parse(final GridFSDBFile input) throws IOException { - final BufferedReader reader = - new BufferedReader(new InputStreamReader(input.getInputStream())); - return new Iterator>() { - String val = reader.readLine(); - @Override - public boolean hasNext() { - return val != null; + public void parse(GridFSDBFile input, DoFn.ProcessContext result) + throws IOException { + final Instant time = new Instant(input.getUploadDate().getTime()); + try (BufferedReader reader = + new BufferedReader(new InputStreamReader(input.getInputStream()))) { + String line = reader.readLine(); + while (line != null) { + result.outputWithTimestamp(line, time); + line = reader.readLine(); } - - @Override - public Line next() { - Line l = new Line(val); - try { - val = reader.readLine(); - } catch (IOException e) { - val = null; - } - return l; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Remove not supported"); - } - }; + } } } /** Read data from GridFS. */ public static Read read() { return new Read(new Read.BoundedGridFSSource(null, null, null, null, - StringsParseCallback.INSTANCE, StringUtf8Coder.of())); + StringParser.INSTANCE, null)); } static class Read extends PTransform> { public Read withUri(String uri) { return new Read(new BoundedGridFSSource(uri, options.database, options.bucket, options.filterJson, - options.parser, options.coder)); + options.parser, null)); } public Read withDatabase(String database) { return new Read(new BoundedGridFSSource(options.uri, database, options.bucket, options.filterJson, - options.parser, options.coder)); + options.parser, null)); } public Read withBucket(String bucket) { return new Read(new BoundedGridFSSource(options.uri, options.database, bucket, - options.filterJson, options.parser, options.coder)); + options.filterJson, options.parser, null)); } - public Read withParsingFn(ParseCallback f) { + public Read withParser(Parser f) { return new Read(new BoundedGridFSSource(options.uri, options.database, options.bucket, options.filterJson, f, null)); } - public Read withCoder(Coder coder) { - return new Read(new BoundedGridFSSource(options.uri, options.database, - options.bucket, options.filterJson, options.parser, coder)); - } - public Read withQueryFilter(String filterJson) { return new Read(new BoundedGridFSSource(options.uri, options.database, - options.bucket, filterJson, options.parser, options.coder)); + options.bucket, filterJson, options.parser, null)); } private final BoundedGridFSSource options; @@ -187,18 +152,38 @@ public Read withQueryFilter(String filterJson) { this.options = options; } + @SuppressWarnings("unchecked") @Override public PCollection apply(PBegin input) { - org.apache.beam.sdk.io.Read.Bounded unbounded = + org.apache.beam.sdk.io.Read.Bounded unbounded = org.apache.beam.sdk.io.Read.from(options); - PCollection output = input.getPipeline().apply(unbounded); - if (options.coder != null) { - output.setCoder(options.coder); + PCollection output = input.getPipeline().apply(unbounded) + .apply(ParDo.of(new DoFn() { + Mongo mongo; + GridFS gridfs; + @org.apache.beam.sdk.transforms.DoFn.Setup + public void setup() { + mongo = options.setupMongo(); + gridfs = options.setupGridFS(mongo); + } + @org.apache.beam.sdk.transforms.DoFn.Teardown + public void teardown() { + mongo.close(); + } + @ProcessElement + public void processElement(ProcessContext c) throws IOException { + ObjectId oid = c.element(); + GridFSDBFile file = gridfs.find(oid); + options.parser.parse(file, c); + } + })); + if (options.parser == StringParser.INSTANCE) { + output.setCoder((Coder) StringUtf8Coder.of()); } return output; } - static class BoundedGridFSSource extends BoundedSource { + static class BoundedGridFSSource extends BoundedSource { @Nullable private final String uri; @Nullable @@ -208,69 +193,52 @@ static class BoundedGridFSSource extends BoundedSource { @Nullable private final String filterJson; @Nullable - private final ParseCallback parser; - @Nullable - private final Coder coder; - @Nullable private List objectIds; - private transient Mongo mongo; - private transient GridFS gridfs; - BoundedGridFSSource(String uri, String database, String bucket, String filterJson, - ParseCallback parser, Coder coder) { + private Parser parser; + + BoundedGridFSSource(String uri, String database, + String bucket, String filterJson, + Parser parser, + List objectIds) { this.uri = uri; this.database = database; this.bucket = bucket; + this.objectIds = objectIds; this.parser = parser; - this.coder = coder; this.filterJson = filterJson; } - BoundedGridFSSource(String uri, String database, String bucket, String filterJson, - ParseCallback parser, Coder coder, List objectIds) { - this.uri = uri; - this.database = database; - this.bucket = bucket; - this.parser = parser; - this.coder = coder; - this.objectIds = objectIds; - this.filterJson = filterJson; + private Mongo setupMongo() { + return uri == null ? new Mongo() : new Mongo(new MongoURI(uri)); } - private synchronized void setupGridFS() { - if (gridfs == null) { - mongo = uri == null ? new Mongo() : new Mongo(new MongoURI(uri)); - DB db = database == null ? mongo.getDB("gridfs") : mongo.getDB(database); - gridfs = bucket == null ? new GridFS(db) : new GridFS(db, bucket); - } + private GridFS setupGridFS(Mongo mongo) { + DB db = database == null ? mongo.getDB("gridfs") : mongo.getDB(database); + return bucket == null ? new GridFS(db) : new GridFS(db, bucket); } - private synchronized void closeGridFS() { - if (gridfs != null) { - gridfs = null; - mongo.close(); - mongo = null; + private DBCursor createCursor(GridFS gridfs) { + if (filterJson != null) { + DBObject query = (DBObject) JSON.parse(filterJson); + return gridfs.getFileList(query).sort(null); } + return gridfs.getFileList().sort(null); } - @Override - public List> splitIntoBundles(long desiredBundleSizeBytes, + public List> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + Mongo mongo = setupMongo(); try { - setupGridFS(); - DBCursor cursor; - if (filterJson != null) { - DBObject query = (DBObject) JSON.parse(filterJson); - cursor = gridfs.getFileList(query).sort(null); - } else { - cursor = gridfs.getFileList().sort(null); - } - long size = 0; + GridFS gridfs = setupGridFS(mongo); + DBCursor cursor = createCursor(gridfs); + long size = 0; List> list = new LinkedList<>(); List objects = new LinkedList<>(); while (cursor.hasNext()) { GridFSDBFile file = (GridFSDBFile) cursor.next(); long len = file.getLength(); if ((size + len) > desiredBundleSizeBytes && !objects.isEmpty()) { - list.add(new BoundedGridFSSource(uri, database, bucket, filterJson, - parser, coder, objects)); + list.add(new BoundedGridFSSource(uri, database, bucket, + filterJson, parser, + objects)); size = 0; objects = new LinkedList<>(); } @@ -278,26 +246,22 @@ public List> splitIntoBundles(long desiredBundleSizeB size += len; } if (!objects.isEmpty() || list.isEmpty()) { - list.add(new BoundedGridFSSource(uri, database, bucket, filterJson, - parser, coder, objects)); + list.add(new BoundedGridFSSource(uri, database, bucket, + filterJson, parser, + objects)); } return list; } finally { - closeGridFS(); + mongo.close(); } } @Override public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + Mongo mongo = setupMongo(); try { - setupGridFS(); - DBCursor cursor; - if (filterJson != null) { - DBObject query = (DBObject) JSON.parse(filterJson); - cursor = gridfs.getFileList(query).sort(null); - } else { - cursor = gridfs.getFileList().sort(null); - } + GridFS gridfs = setupGridFS(mongo); + DBCursor cursor = createCursor(gridfs); long size = 0; while (cursor.hasNext()) { GridFSDBFile file = (GridFSDBFile) cursor.next(); @@ -305,23 +269,40 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { } return size; } finally { - closeGridFS(); + mongo.close(); } } + @Override public boolean producesSortedKeys(PipelineOptions options) throws Exception { return false; } @Override - public org.apache.beam.sdk.io.BoundedSource.BoundedReader createReader( + public org.apache.beam.sdk.io.BoundedSource.BoundedReader createReader( PipelineOptions options) throws IOException { - return new GridFSReader(this); + List objs = objectIds; + if (objs == null) { + objs = new ArrayList<>(); + Mongo mongo = setupMongo(); + try { + GridFS gridfs = setupGridFS(mongo); + DBCursor cursor = createCursor(gridfs); + while (cursor.hasNext()) { + GridFSDBFile file = (GridFSDBFile) cursor.next(); + objs.add((ObjectId) file.getId()); + } + } finally { + mongo.close(); + } + } + return new GridFSReader(this, objs); } @Override public void validate() { + Preconditions.checkNotNull(parser, "Parser cannot be null"); } @Override @@ -333,93 +314,48 @@ public void populateDisplayData(DisplayData.Builder builder) { builder.addIfNotNull(DisplayData.item("filterJson", filterJson)); } - @SuppressWarnings("unchecked") @Override - public Coder getDefaultOutputCoder() { - if (coder != null) { - return coder; - } - return (Coder) SerializableCoder.of(Serializable.class); + public Coder getDefaultOutputCoder() { + return SerializableCoder.of(ObjectId.class); } - - class GridFSReader extends org.apache.beam.sdk.io.BoundedSource.BoundedReader { + static class GridFSReader extends BoundedSource.BoundedReader { final BoundedGridFSSource source; + final List objects; - Instant timestamp = Instant.now(); - Iterator> currentIterator; - ParseCallback.Line currentLine; + Iterator iterator; + ObjectId current; + GridFSReader(BoundedGridFSSource s, List objects) { + source = s; + this.objects = objects; + } - GridFSReader(BoundedGridFSSource source) { - this.source = source; + @Override + public BoundedSource getCurrentSource() { + return source; } @Override public boolean start() throws IOException { - setupGridFS(); - if (objectIds == null) { - objectIds = new LinkedList<>(); - DBCursor cursor = gridfs.getFileList().sort(null); - while (cursor.hasNext()) { - DBObject ob = cursor.next(); - objectIds.add((ObjectId) ob.get("_id")); - } - } + iterator = objects.iterator(); return advance(); } @Override public boolean advance() throws IOException { - if (currentIterator != null && !currentIterator.hasNext()) { - objectIds.remove(0); - currentIterator = null; - } - if (currentIterator == null) { - if (objectIds.isEmpty()) { - return false; - } - ObjectId oid = objectIds.get(0); - GridFSDBFile file = gridfs.find(oid); - if (file == null) { - return false; - } - timestamp = new Instant(file.getUploadDate().getTime()); - currentIterator = parser.parse(file); - } - - if (currentIterator.hasNext()) { - currentLine = currentIterator.next(); + if (iterator.hasNext()) { + current = iterator.next(); return true; } return false; } @Override - public BoundedSource getCurrentSource() { - return source; - } - - @Override - public T getCurrent() throws NoSuchElementException { - if (currentLine != null) { - return currentLine.value; - } - throw new NoSuchElementException(); - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - if (currentLine != null) { - if (currentLine.timestamp != null) { - return currentLine.timestamp; - } - return timestamp; - } - throw new NoSuchElementException(); + public ObjectId getCurrent() throws NoSuchElementException { + return current; } @Override public void close() throws IOException { - closeGridFS(); } } } diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java index f8e5f779af1b..fd08a9a92e89 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java @@ -45,18 +45,17 @@ import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Serializable; -import java.util.Iterator; import java.util.Random; import java.util.Scanner; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.ParseCallback; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; @@ -196,45 +195,24 @@ public void testReadWithParser() throws Exception { .withUri("mongodb://localhost:" + PORT) .withDatabase(DATABASE) .withBucket("mapBucket") - .withParsingFn(new ParseCallback>() { + .withParser(new MongoDbGridFSIO.Parser>() { @Override - public Iterator>> parse( - GridFSDBFile input) throws IOException { - final BufferedReader reader = - new BufferedReader(new InputStreamReader(input.getInputStream())); - return new Iterator>>() { + public void parse(GridFSDBFile input, + DoFn>.ProcessContext result) throws IOException { + try (final BufferedReader reader = + new BufferedReader(new InputStreamReader(input.getInputStream()))) { String line = reader.readLine(); - @Override - public boolean hasNext() { - return line != null; - } - @Override - public MongoDbGridFSIO.ParseCallback.Line> next() { + while (line != null) { try (Scanner scanner = new Scanner(line.trim())) { scanner.useDelimiter("\\t"); long timestamp = scanner.nextLong(); String name = scanner.next(); int score = scanner.nextInt(); - - try { - line = reader.readLine(); - } catch (IOException e) { - line = null; - } - if (line == null) { - try { - reader.close(); - } catch (IOException e) { - //ignore - } - } - return new Line<>(KV.of(name, score), new Instant(timestamp)); + result.outputWithTimestamp(KV.of(name, score), new Instant(timestamp)); } + line = reader.readLine(); } - @Override - public void remove() { - } - }; + } } })).setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); From 9e82cd2776c3926b929d5144db4aaa9be30089be Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Thu, 29 Sep 2016 09:03:31 -0400 Subject: [PATCH 02/14] Optimize reading for non-split case --- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 77 +++++++++++-------- 1 file changed, 45 insertions(+), 32 deletions(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 5d2a0a75c009..0d899a9b41c3 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -33,7 +33,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; @@ -61,8 +60,10 @@ *

*

MongoDbGridFSIO source returns a bounded collection of String as {@code PCollection}. *

- *

To configure the MongoDB source, you have to provide the connection URI, the database name - * and the bucket name. The following example illustrates various options for configuring the + *

To configure the MongoDB source, you can provide the connection URI, the database name + * and the bucket name. If unspecified, the default values from the GridFS driver are used. + * + * The following example illustrates various options for configuring the * source:

*

*

{@code
@@ -77,14 +78,15 @@
  * 

The source also accepts an optional configuration: {@code withQueryFilter()} allows you to * define a JSON filter to get subset of files in the database.

* - *

There is also an optional {@code ParseCallback} that can be specified that can be used to + *

There is also an optional {@code Parser} that can be specified that can be used to * parse the InputStream into objects usable with Beam. By default, MongoDbGridFSIO will parse * into Strings, splitting on line breaks and using the uploadDate of the file as the timestamp. */ public class MongoDbGridFSIO { /** - * + * Interface for the parser that is used to parse the GridFSDBFile into + * the appropriate types. * @param */ public interface Parser extends Serializable { @@ -92,9 +94,11 @@ public interface Parser extends Serializable { } /** - * + * For the default Read case, this is the parser that is used to + * split the input file into Strings. It uses the timestamp of the file + * for the event timestamp. */ - public static class StringParser implements Parser { + private static class StringParser implements Parser { static final StringParser INSTANCE = new StringParser(); @Override @@ -161,12 +165,12 @@ public PCollection apply(PBegin input) { .apply(ParDo.of(new DoFn() { Mongo mongo; GridFS gridfs; - @org.apache.beam.sdk.transforms.DoFn.Setup + @Setup public void setup() { mongo = options.setupMongo(); gridfs = options.setupGridFS(mongo); } - @org.apache.beam.sdk.transforms.DoFn.Teardown + @Teardown public void teardown() { mongo.close(); } @@ -194,7 +198,6 @@ static class BoundedGridFSSource extends BoundedSource { private final String filterJson; @Nullable private List objectIds; - private Parser parser; BoundedGridFSSource(String uri, String database, @@ -230,8 +233,8 @@ public List> splitIntoBundles(long desiredBund GridFS gridfs = setupGridFS(mongo); DBCursor cursor = createCursor(gridfs); long size = 0; - List> list = new LinkedList<>(); - List objects = new LinkedList<>(); + List> list = new ArrayList<>(); + List objects = new ArrayList<>(); while (cursor.hasNext()) { GridFSDBFile file = (GridFSDBFile) cursor.next(); long len = file.getLength(); @@ -240,7 +243,7 @@ public List> splitIntoBundles(long desiredBund filterJson, parser, objects)); size = 0; - objects = new LinkedList<>(); + objects = new ArrayList<>(); } objects.add((ObjectId) file.getId()); size += len; @@ -280,24 +283,9 @@ public boolean producesSortedKeys(PipelineOptions options) throws Exception { } @Override - public org.apache.beam.sdk.io.BoundedSource.BoundedReader createReader( + public BoundedSource.BoundedReader createReader( PipelineOptions options) throws IOException { - List objs = objectIds; - if (objs == null) { - objs = new ArrayList<>(); - Mongo mongo = setupMongo(); - try { - GridFS gridfs = setupGridFS(mongo); - DBCursor cursor = createCursor(gridfs); - while (cursor.hasNext()) { - GridFSDBFile file = (GridFSDBFile) cursor.next(); - objs.add((ObjectId) file.getId()); - } - } finally { - mongo.close(); - } - } - return new GridFSReader(this, objs); + return new GridFSReader(this, objectIds); } @Override @@ -322,6 +310,8 @@ static class GridFSReader extends BoundedSource.BoundedReader { final BoundedGridFSSource source; final List objects; + Mongo mongo; + DBCursor cursor; Iterator iterator; ObjectId current; GridFSReader(BoundedGridFSSource s, List objects) { @@ -336,26 +326,49 @@ public BoundedSource getCurrentSource() { @Override public boolean start() throws IOException { - iterator = objects.iterator(); + if (objects == null) { + mongo = source.setupMongo(); + GridFS gridfs = source.setupGridFS(mongo); + cursor = source.createCursor(gridfs); + } else { + iterator = objects.iterator(); + } return advance(); } @Override public boolean advance() throws IOException { - if (iterator.hasNext()) { + if (iterator != null && iterator.hasNext()) { current = iterator.next(); return true; + } else if (cursor != null && cursor.hasNext()) { + GridFSDBFile file = (GridFSDBFile) cursor.next(); + current = (ObjectId) file.getId(); + return true; } + current = null; return false; } @Override public ObjectId getCurrent() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } return current; } + public Instant getCurrentTimestamp() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return Instant.now(); + } @Override public void close() throws IOException { + if (mongo != null) { + mongo.close(); + } } } } From 947e7fbe08a60e66591f4b82cab1ddf9d796b46c Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Thu, 29 Sep 2016 09:18:17 -0400 Subject: [PATCH 03/14] Use objectId timestamp --- .../apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 0d899a9b41c3..6b2d0c8a55cb 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.bson.types.ObjectId; +import org.joda.time.Duration; import org.joda.time.Instant; @@ -180,6 +181,12 @@ public void processElement(ProcessContext c) throws IOException { GridFSDBFile file = gridfs.find(oid); options.parser.parse(file, c); } + /* + @Override + public Duration getAllowedTimestampSkew() { + return Duration.millis(Long.MAX_VALUE); + } + */ })); if (options.parser == StringParser.INSTANCE) { output.setCoder((Coder) StringUtf8Coder.of()); @@ -361,7 +368,9 @@ public Instant getCurrentTimestamp() throws NoSuchElementException { if (current == null) { throw new NoSuchElementException(); } - return Instant.now(); + long time = current.getTimestamp(); + time *= 1000L; + return new Instant(time); } @Override From 24d64c5154445cab6c21f13e99936d1e7fd1af85 Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Thu, 29 Sep 2016 09:57:44 -0400 Subject: [PATCH 04/14] Pull parser out of BoundedSource, add maxSkew --- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 44 ++++++++++--------- .../sdk/io/mongodb/MongoDBGridFSIOTest.java | 3 ++ 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 6b2d0c8a55cb..7408fcaaacff 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -120,49 +120,58 @@ public void parse(GridFSDBFile input, DoFn.ProcessContext result) /** Read data from GridFS. */ public static Read read() { return new Read(new Read.BoundedGridFSSource(null, null, null, null, - StringParser.INSTANCE, null)); + null), StringParser.INSTANCE, Duration.ZERO); } static class Read extends PTransform> { public Read withUri(String uri) { return new Read(new BoundedGridFSSource(uri, options.database, options.bucket, options.filterJson, - options.parser, null)); + null), parser, maxSkew); } public Read withDatabase(String database) { return new Read(new BoundedGridFSSource(options.uri, database, options.bucket, options.filterJson, - options.parser, null)); + null), parser, maxSkew); } public Read withBucket(String bucket) { return new Read(new BoundedGridFSSource(options.uri, options.database, bucket, - options.filterJson, options.parser, null)); + options.filterJson, null), parser, maxSkew); } public Read withParser(Parser f) { + Preconditions.checkNotNull(f, "Parser cannot be null"); return new Read(new BoundedGridFSSource(options.uri, options.database, - options.bucket, options.filterJson, f, null)); + options.bucket, options.filterJson, null), f, maxSkew); + } + public Read maxSkew(Duration skew) { + return new Read(new BoundedGridFSSource(options.uri, options.database, + options.bucket, options.filterJson, null), parser, skew == null ? Duration.ZERO : skew); } public Read withQueryFilter(String filterJson) { return new Read(new BoundedGridFSSource(options.uri, options.database, - options.bucket, filterJson, options.parser, null)); + options.bucket, filterJson, null), parser, maxSkew); } private final BoundedGridFSSource options; + private final Parser parser; + private final Duration maxSkew; - Read(BoundedGridFSSource options) { + Read(BoundedGridFSSource options, Parser parser, Duration maxSkew) { this.options = options; + this.parser = parser; + this.maxSkew = maxSkew; } @SuppressWarnings("unchecked") @Override public PCollection apply(PBegin input) { - org.apache.beam.sdk.io.Read.Bounded unbounded = + org.apache.beam.sdk.io.Read.Bounded bounded = org.apache.beam.sdk.io.Read.from(options); - PCollection output = input.getPipeline().apply(unbounded) + PCollection output = input.getPipeline().apply(bounded) .apply(ParDo.of(new DoFn() { Mongo mongo; GridFS gridfs; @@ -179,16 +188,15 @@ public void teardown() { public void processElement(ProcessContext c) throws IOException { ObjectId oid = c.element(); GridFSDBFile file = gridfs.find(oid); - options.parser.parse(file, c); + parser.parse(file, c); } - /* + @Override public Duration getAllowedTimestampSkew() { - return Duration.millis(Long.MAX_VALUE); + return maxSkew; } - */ })); - if (options.parser == StringParser.INSTANCE) { + if (parser == StringParser.INSTANCE) { output.setCoder((Coder) StringUtf8Coder.of()); } return output; @@ -205,17 +213,14 @@ static class BoundedGridFSSource extends BoundedSource { private final String filterJson; @Nullable private List objectIds; - private Parser parser; BoundedGridFSSource(String uri, String database, String bucket, String filterJson, - Parser parser, List objectIds) { this.uri = uri; this.database = database; this.bucket = bucket; this.objectIds = objectIds; - this.parser = parser; this.filterJson = filterJson; } private Mongo setupMongo() { @@ -247,7 +252,7 @@ public List> splitIntoBundles(long desiredBund long len = file.getLength(); if ((size + len) > desiredBundleSizeBytes && !objects.isEmpty()) { list.add(new BoundedGridFSSource(uri, database, bucket, - filterJson, parser, + filterJson, objects)); size = 0; objects = new ArrayList<>(); @@ -257,7 +262,7 @@ public List> splitIntoBundles(long desiredBund } if (!objects.isEmpty() || list.isEmpty()) { list.add(new BoundedGridFSSource(uri, database, bucket, - filterJson, parser, + filterJson, objects)); } return list; @@ -297,7 +302,6 @@ public BoundedSource.BoundedReader createReader( @Override public void validate() { - Preconditions.checkNotNull(parser, "Parser cannot be null"); } @Override diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java index fd08a9a92e89..da156bc1c924 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java @@ -60,6 +60,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.After; import org.junit.Before; @@ -195,6 +196,7 @@ public void testReadWithParser() throws Exception { .withUri("mongodb://localhost:" + PORT) .withDatabase(DATABASE) .withBucket("mapBucket") + .maxSkew(new Duration(Long.MAX_VALUE)) .withParser(new MongoDbGridFSIO.Parser>() { @Override public void parse(GridFSDBFile input, @@ -229,6 +231,7 @@ public Void apply(Iterable> input) { return null; } }); + pipeline.run(); } From 24c8ab1942fa88f5f196da4f94229b7a9b342753 Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Thu, 29 Sep 2016 10:48:42 -0400 Subject: [PATCH 05/14] Add test case for the split --- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 4 ++ .../sdk/io/mongodb/MongoDBGridFSIOTest.java | 52 ++++++++++++++++--- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 7408fcaaacff..8bb389c92c8b 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -166,6 +166,10 @@ public Read withQueryFilter(String filterJson) { this.maxSkew = maxSkew; } + public BoundedGridFSSource getSource() { + return options; + } + @SuppressWarnings("unchecked") @Override public PCollection apply(PBegin input) { diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java index da156bc1c924..1fe102279113 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java @@ -45,14 +45,20 @@ import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Serializable; +import java.util.List; import java.util.Random; import java.util.Scanner; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.Read.BoundedGridFSSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; @@ -60,10 +66,11 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.bson.types.ObjectId; import org.joda.time.Duration; import org.joda.time.Instant; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; @@ -80,10 +87,10 @@ public class MongoDBGridFSIOTest implements Serializable { private static final int PORT = 27017; private static final String DATABASE = "gridfs"; - private transient MongodExecutable mongodExecutable; + private static transient MongodExecutable mongodExecutable; - @Before - public void setup() throws Exception { + @BeforeClass + public static void setup() throws Exception { LOGGER.info("Starting MongoDB embedded instance"); try { Files.forceDelete(new File(MONGODB_LOCATION)); @@ -150,8 +157,8 @@ public void setup() throws Exception { } } - @After - public void stop() throws Exception { + @AfterClass + public static void stop() throws Exception { LOGGER.info("Stopping MongoDB instance"); mongodExecutable.stop(); } @@ -235,4 +242,35 @@ public Void apply(Iterable> input) { pipeline.run(); } + @Test + public void testSplit() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + MongoDbGridFSIO.Read read = MongoDbGridFSIO.read() + .withUri("mongodb://localhost:" + PORT) + .withDatabase(DATABASE); + + BoundedGridFSSource src = read.getSource(); + + // make sure 2 files can fit in + long desiredBundleSizeBytes = src.getEstimatedSizeBytes(options) * 2L / 5L + 1000; + List> splits = src.splitIntoBundles( + desiredBundleSizeBytes, options); + + int expectedNbSplits = 3; + assertEquals(expectedNbSplits, splits.size()); + SourceTestUtils. + assertSourcesEqualReferenceSource(src, splits, options); + int nonEmptySplits = 0; + int count = 0; + for (BoundedSource subSource : splits) { + List result = SourceTestUtils.readFromSource(subSource, options); + if (result.size() > 0) { + nonEmptySplits += 1; + } + count += result.size(); + } + assertEquals(expectedNbSplits, nonEmptySplits); + assertEquals(5, count); + } + } From 3966a430c6e8231f8671f5fe125c4e70a5864406 Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Thu, 29 Sep 2016 11:00:44 -0400 Subject: [PATCH 06/14] Don't need the generic on the Source and Reader --- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 42 ++++++++++--------- .../sdk/io/mongodb/MongoDBGridFSIOTest.java | 2 +- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 8bb389c92c8b..a40b7eff2ce1 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -82,6 +82,8 @@ *

There is also an optional {@code Parser} that can be specified that can be used to * parse the InputStream into objects usable with Beam. By default, MongoDbGridFSIO will parse * into Strings, splitting on line breaks and using the uploadDate of the file as the timestamp. + * When using a parser that outputs via outputWithTimestamp, you may also need to specify + * the maxSkew option. */ public class MongoDbGridFSIO { @@ -119,54 +121,54 @@ public void parse(GridFSDBFile input, DoFn.ProcessContext result) /** Read data from GridFS. */ public static Read read() { - return new Read(new Read.BoundedGridFSSource(null, null, null, null, + return new Read(new Read.BoundedGridFSSource(null, null, null, null, null), StringParser.INSTANCE, Duration.ZERO); } static class Read extends PTransform> { public Read withUri(String uri) { - return new Read(new BoundedGridFSSource(uri, options.database, + return new Read(new BoundedGridFSSource(uri, options.database, options.bucket, options.filterJson, null), parser, maxSkew); } public Read withDatabase(String database) { - return new Read(new BoundedGridFSSource(options.uri, database, + return new Read(new BoundedGridFSSource(options.uri, database, options.bucket, options.filterJson, null), parser, maxSkew); } public Read withBucket(String bucket) { - return new Read(new BoundedGridFSSource(options.uri, options.database, bucket, + return new Read(new BoundedGridFSSource(options.uri, options.database, bucket, options.filterJson, null), parser, maxSkew); } public Read withParser(Parser f) { Preconditions.checkNotNull(f, "Parser cannot be null"); - return new Read(new BoundedGridFSSource(options.uri, options.database, + return new Read(new BoundedGridFSSource(options.uri, options.database, options.bucket, options.filterJson, null), f, maxSkew); } public Read maxSkew(Duration skew) { - return new Read(new BoundedGridFSSource(options.uri, options.database, + return new Read(new BoundedGridFSSource(options.uri, options.database, options.bucket, options.filterJson, null), parser, skew == null ? Duration.ZERO : skew); } public Read withQueryFilter(String filterJson) { - return new Read(new BoundedGridFSSource(options.uri, options.database, + return new Read(new BoundedGridFSSource(options.uri, options.database, options.bucket, filterJson, null), parser, maxSkew); } - private final BoundedGridFSSource options; + private final BoundedGridFSSource options; private final Parser parser; private final Duration maxSkew; - Read(BoundedGridFSSource options, Parser parser, Duration maxSkew) { + Read(BoundedGridFSSource options, Parser parser, Duration maxSkew) { this.options = options; this.parser = parser; this.maxSkew = maxSkew; } - public BoundedGridFSSource getSource() { + public BoundedGridFSSource getSource() { return options; } @@ -206,7 +208,7 @@ public Duration getAllowedTimestampSkew() { return output; } - static class BoundedGridFSSource extends BoundedSource { + static class BoundedGridFSSource extends BoundedSource { @Nullable private final String uri; @Nullable @@ -249,13 +251,13 @@ public List> splitIntoBundles(long desiredBund GridFS gridfs = setupGridFS(mongo); DBCursor cursor = createCursor(gridfs); long size = 0; - List> list = new ArrayList<>(); + List list = new ArrayList<>(); List objects = new ArrayList<>(); while (cursor.hasNext()) { GridFSDBFile file = (GridFSDBFile) cursor.next(); long len = file.getLength(); if ((size + len) > desiredBundleSizeBytes && !objects.isEmpty()) { - list.add(new BoundedGridFSSource(uri, database, bucket, + list.add(new BoundedGridFSSource(uri, database, bucket, filterJson, objects)); size = 0; @@ -265,9 +267,9 @@ public List> splitIntoBundles(long desiredBund size += len; } if (!objects.isEmpty() || list.isEmpty()) { - list.add(new BoundedGridFSSource(uri, database, bucket, - filterJson, - objects)); + list.add(new BoundedGridFSSource(uri, database, bucket, + filterJson, + objects)); } return list; } finally { @@ -301,7 +303,7 @@ public boolean producesSortedKeys(PipelineOptions options) throws Exception { @Override public BoundedSource.BoundedReader createReader( PipelineOptions options) throws IOException { - return new GridFSReader(this, objectIds); + return new GridFSReader(this, objectIds); } @Override @@ -321,15 +323,15 @@ public void populateDisplayData(DisplayData.Builder builder) { public Coder getDefaultOutputCoder() { return SerializableCoder.of(ObjectId.class); } - static class GridFSReader extends BoundedSource.BoundedReader { - final BoundedGridFSSource source; + static class GridFSReader extends BoundedSource.BoundedReader { + final BoundedGridFSSource source; final List objects; Mongo mongo; DBCursor cursor; Iterator iterator; ObjectId current; - GridFSReader(BoundedGridFSSource s, List objects) { + GridFSReader(BoundedGridFSSource s, List objects) { source = s; this.objects = objects; } diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java index 1fe102279113..f8aa1bccf890 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java @@ -249,7 +249,7 @@ public void testSplit() throws Exception { .withUri("mongodb://localhost:" + PORT) .withDatabase(DATABASE); - BoundedGridFSSource src = read.getSource(); + BoundedGridFSSource src = read.getSource(); // make sure 2 files can fit in long desiredBundleSizeBytes = src.getEstimatedSizeBytes(options) * 2L / 5L + 1000; From 35e34a4960c435c26223e93383c1898af73409ab Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Thu, 29 Sep 2016 12:29:51 -0400 Subject: [PATCH 07/14] Rename maxSkew to allowedTimestampSkew to match other DoFn's --- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 20 +++++++++---------- .../sdk/io/mongodb/MongoDBGridFSIOTest.java | 6 ++++-- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index a40b7eff2ce1..24c13be31567 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -129,43 +129,43 @@ static class Read extends PTransform> { public Read withUri(String uri) { return new Read(new BoundedGridFSSource(uri, options.database, options.bucket, options.filterJson, - null), parser, maxSkew); + null), parser, allowedTimestampSkew); } public Read withDatabase(String database) { return new Read(new BoundedGridFSSource(options.uri, database, options.bucket, options.filterJson, - null), parser, maxSkew); + null), parser, allowedTimestampSkew); } public Read withBucket(String bucket) { return new Read(new BoundedGridFSSource(options.uri, options.database, bucket, - options.filterJson, null), parser, maxSkew); + options.filterJson, null), parser, allowedTimestampSkew); } public Read withParser(Parser f) { Preconditions.checkNotNull(f, "Parser cannot be null"); return new Read(new BoundedGridFSSource(options.uri, options.database, - options.bucket, options.filterJson, null), f, maxSkew); + options.bucket, options.filterJson, null), f, allowedTimestampSkew); } - public Read maxSkew(Duration skew) { + public Read allowedTimestampSkew(Duration skew) { return new Read(new BoundedGridFSSource(options.uri, options.database, options.bucket, options.filterJson, null), parser, skew == null ? Duration.ZERO : skew); } public Read withQueryFilter(String filterJson) { return new Read(new BoundedGridFSSource(options.uri, options.database, - options.bucket, filterJson, null), parser, maxSkew); + options.bucket, filterJson, null), parser, allowedTimestampSkew); } private final BoundedGridFSSource options; private final Parser parser; - private final Duration maxSkew; + private final Duration allowedTimestampSkew; - Read(BoundedGridFSSource options, Parser parser, Duration maxSkew) { + Read(BoundedGridFSSource options, Parser parser, Duration allowedTimestampSkew) { this.options = options; this.parser = parser; - this.maxSkew = maxSkew; + this.allowedTimestampSkew = allowedTimestampSkew; } public BoundedGridFSSource getSource() { @@ -199,7 +199,7 @@ public void processElement(ProcessContext c) throws IOException { @Override public Duration getAllowedTimestampSkew() { - return maxSkew; + return allowedTimestampSkew; } })); if (parser == StringParser.INSTANCE) { diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java index f8aa1bccf890..b001babbba67 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java @@ -155,6 +155,7 @@ public static void setup() throws Exception { writer.flush(); writer.close(); } + client.close(); } @AfterClass @@ -203,7 +204,6 @@ public void testReadWithParser() throws Exception { .withUri("mongodb://localhost:" + PORT) .withDatabase(DATABASE) .withBucket("mapBucket") - .maxSkew(new Duration(Long.MAX_VALUE)) .withParser(new MongoDbGridFSIO.Parser>() { @Override public void parse(GridFSDBFile input, @@ -223,7 +223,9 @@ public void parse(GridFSDBFile input, } } } - })).setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); + }) + .allowedTimestampSkew(new Duration(3601000L))) + .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); PAssert.thatSingleton(output.apply("Count All", Count.>globally())) .isEqualTo(50100L); From 7ac1e8ab847160454df4d4cd0795ee038015f670 Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Thu, 29 Sep 2016 13:45:39 -0400 Subject: [PATCH 08/14] Address some of the review issues --- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 24c13be31567..ecd076108299 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -110,10 +110,8 @@ public void parse(GridFSDBFile input, DoFn.ProcessContext result) final Instant time = new Instant(input.getUploadDate().getTime()); try (BufferedReader reader = new BufferedReader(new InputStreamReader(input.getInputStream()))) { - String line = reader.readLine(); - while (line != null) { + for (String line = reader.readLine(); line != null; line = reader.readLine()) { result.outputWithTimestamp(line, time); - line = reader.readLine(); } } } @@ -122,57 +120,64 @@ public void parse(GridFSDBFile input, DoFn.ProcessContext result) /** Read data from GridFS. */ public static Read read() { return new Read(new Read.BoundedGridFSSource(null, null, null, null, - null), StringParser.INSTANCE, Duration.ZERO); + null), StringParser.INSTANCE, StringUtf8Coder.of(), Duration.ZERO); } static class Read extends PTransform> { public Read withUri(String uri) { return new Read(new BoundedGridFSSource(uri, options.database, options.bucket, options.filterJson, - null), parser, allowedTimestampSkew); + null), parser, coder, allowedTimestampSkew); } public Read withDatabase(String database) { return new Read(new BoundedGridFSSource(options.uri, database, options.bucket, options.filterJson, - null), parser, allowedTimestampSkew); + null), parser, coder, allowedTimestampSkew); } public Read withBucket(String bucket) { return new Read(new BoundedGridFSSource(options.uri, options.database, bucket, - options.filterJson, null), parser, allowedTimestampSkew); + options.filterJson, null), parser, coder, allowedTimestampSkew); } public Read withParser(Parser f) { + return withParser(f, null); + } + public Read withParser(Parser f, Coder coder) { Preconditions.checkNotNull(f, "Parser cannot be null"); + //coder can be null as it can be set on the output directly return new Read(new BoundedGridFSSource(options.uri, options.database, - options.bucket, options.filterJson, null), f, allowedTimestampSkew); + options.bucket, options.filterJson, null), f, coder, allowedTimestampSkew); } public Read allowedTimestampSkew(Duration skew) { return new Read(new BoundedGridFSSource(options.uri, options.database, - options.bucket, options.filterJson, null), parser, skew == null ? Duration.ZERO : skew); + options.bucket, options.filterJson, null), + parser, coder, skew == null ? Duration.ZERO : skew); } public Read withQueryFilter(String filterJson) { return new Read(new BoundedGridFSSource(options.uri, options.database, - options.bucket, filterJson, null), parser, allowedTimestampSkew); + options.bucket, filterJson, null), parser, coder, allowedTimestampSkew); } private final BoundedGridFSSource options; private final Parser parser; + private final Coder coder; private final Duration allowedTimestampSkew; - Read(BoundedGridFSSource options, Parser parser, Duration allowedTimestampSkew) { + Read(BoundedGridFSSource options, Parser parser, + Coder coder, Duration allowedTimestampSkew) { this.options = options; this.parser = parser; this.allowedTimestampSkew = allowedTimestampSkew; + this.coder = coder; } - public BoundedGridFSSource getSource() { + BoundedGridFSSource getSource() { return options; } - @SuppressWarnings("unchecked") @Override public PCollection apply(PBegin input) { org.apache.beam.sdk.io.Read.Bounded bounded = @@ -181,15 +186,18 @@ public PCollection apply(PBegin input) { .apply(ParDo.of(new DoFn() { Mongo mongo; GridFS gridfs; + @Setup public void setup() { mongo = options.setupMongo(); gridfs = options.setupGridFS(mongo); } + @Teardown public void teardown() { mongo.close(); } + @ProcessElement public void processElement(ProcessContext c) throws IOException { ObjectId oid = c.element(); @@ -202,8 +210,8 @@ public Duration getAllowedTimestampSkew() { return allowedTimestampSkew; } })); - if (parser == StringParser.INSTANCE) { - output.setCoder((Coder) StringUtf8Coder.of()); + if (coder != null) { + output.setCoder(coder); } return output; } From b758eb1cba1333294efdd38f7718aebf01a396ec Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Thu, 29 Sep 2016 14:01:19 -0400 Subject: [PATCH 09/14] Fix javadoc failure --- .../java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index ecd076108299..ac9665260b82 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -97,7 +97,7 @@ public interface Parser extends Serializable { } /** - * For the default Read case, this is the parser that is used to + * For the default Read<String> case, this is the parser that is used to * split the input file into Strings. It uses the timestamp of the file * for the event timestamp. */ From b2a1ebe965be247fbae04020d95142f1be1cbd56 Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Thu, 29 Sep 2016 14:11:04 -0400 Subject: [PATCH 10/14] Use a callback function for output --- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 26 +++++++++++++++---- .../sdk/io/mongodb/MongoDBGridFSIOTest.java | 5 ++-- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index ac9665260b82..5355c5201ac4 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -87,13 +87,20 @@ */ public class MongoDbGridFSIO { + /** + * Callback for the parser to use to submit data. + */ + public interface ParserCallback extends Serializable { + public void output(T output, @Nullable Instant timestamp); + } + /** * Interface for the parser that is used to parse the GridFSDBFile into * the appropriate types. * @param */ public interface Parser extends Serializable { - public void parse(GridFSDBFile input, DoFn.ProcessContext result) throws IOException; + public void parse(GridFSDBFile input, ParserCallback callback) throws IOException; } /** @@ -105,13 +112,13 @@ private static class StringParser implements Parser { static final StringParser INSTANCE = new StringParser(); @Override - public void parse(GridFSDBFile input, DoFn.ProcessContext result) + public void parse(GridFSDBFile input, ParserCallback callback) throws IOException { final Instant time = new Instant(input.getUploadDate().getTime()); try (BufferedReader reader = new BufferedReader(new InputStreamReader(input.getInputStream()))) { for (String line = reader.readLine(); line != null; line = reader.readLine()) { - result.outputWithTimestamp(line, time); + callback.output(line, time); } } } @@ -199,10 +206,19 @@ public void teardown() { } @ProcessElement - public void processElement(ProcessContext c) throws IOException { + public void processElement(final ProcessContext c) throws IOException { ObjectId oid = c.element(); GridFSDBFile file = gridfs.find(oid); - parser.parse(file, c); + parser.parse(file, new ParserCallback() { + @Override + public void output(T output, Instant timestamp) { + if (timestamp == null) { + c.output(output); + } else { + c.outputWithTimestamp(output, timestamp); + } + } + }); } @Override diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java index b001babbba67..866b032bada8 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java @@ -61,7 +61,6 @@ import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; @@ -207,7 +206,7 @@ public void testReadWithParser() throws Exception { .withParser(new MongoDbGridFSIO.Parser>() { @Override public void parse(GridFSDBFile input, - DoFn>.ProcessContext result) throws IOException { + MongoDbGridFSIO.ParserCallback> callback) throws IOException { try (final BufferedReader reader = new BufferedReader(new InputStreamReader(input.getInputStream()))) { String line = reader.readLine(); @@ -217,7 +216,7 @@ public void parse(GridFSDBFile input, long timestamp = scanner.nextLong(); String name = scanner.next(); int score = scanner.nextInt(); - result.outputWithTimestamp(KV.of(name, score), new Instant(timestamp)); + callback.output(KV.of(name, score), new Instant(timestamp)); } line = reader.readLine(); } From cec85eafad741a011b960a20cacde6fa3682da71 Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Thu, 29 Sep 2016 15:22:26 -0400 Subject: [PATCH 11/14] Split the output callback into two methods for convienience --- .../apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 5355c5201ac4..58aafbe5c9a4 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -82,8 +82,8 @@ *

There is also an optional {@code Parser} that can be specified that can be used to * parse the InputStream into objects usable with Beam. By default, MongoDbGridFSIO will parse * into Strings, splitting on line breaks and using the uploadDate of the file as the timestamp. - * When using a parser that outputs via outputWithTimestamp, you may also need to specify - * the maxSkew option. + * When using a parser that outputs with custom timestamps, you may also need to specify + * the allowedTimestampSkew option. */ public class MongoDbGridFSIO { @@ -91,6 +91,8 @@ public class MongoDbGridFSIO { * Callback for the parser to use to submit data. */ public interface ParserCallback extends Serializable { + public void output(T output); + public void output(T output, @Nullable Instant timestamp); } @@ -218,6 +220,11 @@ public void output(T output, Instant timestamp) { c.outputWithTimestamp(output, timestamp); } } + + @Override + public void output(T output) { + c.output(output); + } }); } From ceb2c743393e4f2ed9d6314eee73a0b3dd4750f2 Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Fri, 30 Sep 2016 17:11:47 -0400 Subject: [PATCH 12/14] Updates based on review --- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 49 ++++++++++++------- .../sdk/io/mongodb/MongoDBGridFSIOTest.java | 2 +- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 58aafbe5c9a4..8060c9f06345 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -59,9 +59,9 @@ *

*

Reading from MongoDB via GridFS

*

- *

MongoDbGridFSIO source returns a bounded collection of String as {@code PCollection}. + *

MongoDbGridFSIO source returns a bounded collection of Objects as {@code PCollection}. *

- *

To configure the MongoDB source, you can provide the connection URI, the database name + *

To configure the MongoDB GridFS source, you can provide the connection URI, the database name * and the bucket name. If unspecified, the default values from the GridFS driver are used. * * The following example illustrates various options for configuring the @@ -91,9 +91,19 @@ public class MongoDbGridFSIO { * Callback for the parser to use to submit data. */ public interface ParserCallback extends Serializable { + /** + * Output the object. The default timestamp will be the GridFSDBFile + * creation timestamp. + * @param output + */ public void output(T output); - public void output(T output, @Nullable Instant timestamp); + /** + * Output the object using the specified timestamp. + * @param output + * @param timestamp + */ + public void output(T output, Instant timestamp); } /** @@ -106,13 +116,11 @@ public interface Parser extends Serializable { } /** - * For the default Read<String> case, this is the parser that is used to + * For the default {@code Read} case, this is the parser that is used to * split the input file into Strings. It uses the timestamp of the file * for the event timestamp. */ - private static class StringParser implements Parser { - static final StringParser INSTANCE = new StringParser(); - + private static final Parser TEXT_PARSER = new Parser() { @Override public void parse(GridFSDBFile input, ParserCallback callback) throws IOException { @@ -124,12 +132,12 @@ public void parse(GridFSDBFile input, ParserCallback callback) } } } - } + }; /** Read data from GridFS. */ public static Read read() { return new Read(new Read.BoundedGridFSSource(null, null, null, null, - null), StringParser.INSTANCE, StringUtf8Coder.of(), Duration.ZERO); + null), TEXT_PARSER, StringUtf8Coder.of(), Duration.ZERO); } static class Read extends PTransform> { @@ -189,9 +197,9 @@ BoundedGridFSSource getSource() { @Override public PCollection apply(PBegin input) { - org.apache.beam.sdk.io.Read.Bounded bounded = + org.apache.beam.sdk.io.Read.Bounded objectIds = org.apache.beam.sdk.io.Read.from(options); - PCollection output = input.getPipeline().apply(bounded) + PCollection output = input.getPipeline().apply(objectIds) .apply(ParDo.of(new DoFn() { Mongo mongo; GridFS gridfs; @@ -214,11 +222,8 @@ public void processElement(final ProcessContext c) throws IOException { parser.parse(file, new ParserCallback() { @Override public void output(T output, Instant timestamp) { - if (timestamp == null) { - c.output(output); - } else { - c.outputWithTimestamp(output, timestamp); - } + Preconditions.checkNotNull(timestamp); + c.outputWithTimestamp(output, timestamp); } @Override @@ -260,13 +265,16 @@ static class BoundedGridFSSource extends BoundedSource { this.objectIds = objectIds; this.filterJson = filterJson; } + private Mongo setupMongo() { return uri == null ? new Mongo() : new Mongo(new MongoURI(uri)); } + private GridFS setupGridFS(Mongo mongo) { DB db = database == null ? mongo.getDB("gridfs") : mongo.getDB(database); return bucket == null ? new GridFS(db) : new GridFS(db, bucket); } + private DBCursor createCursor(GridFS gridfs) { if (filterJson != null) { DBObject query = (DBObject) JSON.parse(filterJson); @@ -274,6 +282,7 @@ private DBCursor createCursor(GridFS gridfs) { } return gridfs.getFileList().sort(null); } + @Override public List> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception { @@ -325,7 +334,6 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { } } - @Override public boolean producesSortedKeys(PipelineOptions options) throws Exception { return false; @@ -354,16 +362,19 @@ public void populateDisplayData(DisplayData.Builder builder) { public Coder getDefaultOutputCoder() { return SerializableCoder.of(ObjectId.class); } + static class GridFSReader extends BoundedSource.BoundedReader { final BoundedGridFSSource source; + @Nullable final List objects; Mongo mongo; DBCursor cursor; Iterator iterator; ObjectId current; - GridFSReader(BoundedGridFSSource s, List objects) { - source = s; + + GridFSReader(BoundedGridFSSource source, List objects) { + this.source = source; this.objects = objects; } diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java index 866b032bada8..e5e5c8f1a286 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java @@ -253,7 +253,7 @@ public void testSplit() throws Exception { BoundedGridFSSource src = read.getSource(); // make sure 2 files can fit in - long desiredBundleSizeBytes = src.getEstimatedSizeBytes(options) * 2L / 5L + 1000; + long desiredBundleSizeBytes = (src.getEstimatedSizeBytes(options) * 2L) / 5L + 1000; List> splits = src.splitIntoBundles( desiredBundleSizeBytes, options); From 5dd248f9a3e464a27a66dc873bba48e0b0e3d016 Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Fri, 30 Sep 2016 17:21:08 -0400 Subject: [PATCH 13/14] Document a field --- .../org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 8060c9f06345..2937fecedb15 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -365,6 +365,12 @@ public Coder getDefaultOutputCoder() { static class GridFSReader extends BoundedSource.BoundedReader { final BoundedGridFSSource source; + + /* When split into bundles, this records the ObjectId's of the files for + * this bundle. Otherwise, this is null. When null, a DBCursor of the + * files is used directly to avoid having the ObjectId's queried and + * loaded ahead of time saving time and memory. + */ @Nullable final List objects; @@ -416,6 +422,7 @@ public ObjectId getCurrent() throws NoSuchElementException { } return current; } + public Instant getCurrentTimestamp() throws NoSuchElementException { if (current == null) { throw new NoSuchElementException(); From ba06fe9e4e20f8a3cd51bc02b2c6c002cfedda00 Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Sat, 1 Oct 2016 09:32:25 -0400 Subject: [PATCH 14/14] Fix checkstyle --- .../org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 2937fecedb15..cebda64bf48a 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.io.mongodb; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkNotNull; import com.mongodb.DB; import com.mongodb.DBCursor; import com.mongodb.DBObject; @@ -162,7 +162,7 @@ public Read withParser(Parser f) { return withParser(f, null); } public Read withParser(Parser f, Coder coder) { - Preconditions.checkNotNull(f, "Parser cannot be null"); + checkNotNull(f, "Parser cannot be null"); //coder can be null as it can be set on the output directly return new Read(new BoundedGridFSSource(options.uri, options.database, options.bucket, options.filterJson, null), f, coder, allowedTimestampSkew); @@ -222,7 +222,7 @@ public void processElement(final ProcessContext c) throws IOException { parser.parse(file, new ParserCallback() { @Override public void output(T output, Instant timestamp) { - Preconditions.checkNotNull(timestamp); + checkNotNull(timestamp); c.outputWithTimestamp(output, timestamp); }