-
Notifications
You must be signed in to change notification settings - Fork 319
Unbounded Firebase Source, and associated DoFns #69
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be public final, remove getters and setters.
|
@puf EDIT: Silly typo, but see below for non-superficial error it was covering up. |
|
Need to add tests for |
|
@amygdala (and who do I add from the dataflow team to take a look) So if you run the My hypothesis is this, locally, firebase is firing lots of value changed events, (specifically one for each entry in testdata.json), so So there are a bunch of events missing. (@puf This hypothesis sounds reasonable from a Firebase client perspective) This is the realtime pipeline testing being really hard. I think in the end I'll have to write two pipelines and compare of the result of them with a |
86ebf31 to
a8300b9
Compare
First working commit prepare to add licenses Cleared Checkstyle Moved all logging to testers Added failfast behavior option to FirebaseSource Expand error message for erased type variables in CoderRegistry ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=104912339 Do not overwrite TypeDescriptor of PCollection in PCollectionTuple The previous behavior was added based on an assumption that a TupleTag indexing into a PCollectionTuple would probably have better type information than was already present on a PCollection. This assumption is unwarranted if the PCollection received a particular TypeDescriptor prior to being added to a PCollectionTuple. In particular, MapElements sets a specific type descriptor which is overwritten when the PCollection is put into a PCollectionTuple during evaluation by the DirectPipelineRunner. The assumption is also not needed -- the affected use case is that of a multi-output ParDo, where the outputs are primitives, and these follow a separate code path. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=104930665 FlatMapElements: javadoc fixes - two broken tags missing } - one broken link to refactored java8 function ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=104933857 Use MapElements in WordCount example ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=104998220 Fix comment regarding condition for AfterAll finishing ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105007693 Test simple Combine with Java 8 lambdas and method references Combine, as written, accepts the functional interface SerializableFunction. This change adds Java 8 tests to verify that this invocation operates correctly when provided an anonymous lambda function or method reference. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105008164 BigQueryTableRowIterator: make getF/getV work correctly BigQuery's recommended API is that users should use the getF and getV commands to read data from TableRows, but we have been breaking that API and supporting a Map<String, Object> interface instead. Switch to make the code support both interfaces, with the eventual goal of deprecating the Map<String, Object> version. Includes a little bit of cleanup to internal function names and arguments. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105022905 Hide the CloudDebuggerOptions ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105068420 Avoid deserializing and calling start/finishBundle on empty bundles Often in streaming mode small bundles arrive and get buffered by the GroupAlsoByWindowsFn. In this case it's wasteful to deserialize and initialize all subsequent DoFns. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105073190 Write: move from transforms to io package And leave behind a deprecated subclass for backwards compatibility. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105076144 Fixes a shutdown race in ReadOperation Prevents ReadOperation from calling requestDynamicSplit() on a closed reader, by closing the reader only after the operation is marked finished. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105078926 Enable dynamic work rebalancing when reading from intermediate data ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105083061 Add tests using Java 8 method references Filter, FlatMapElements, and MapElements should work with first-class functions however they are created. Previously, they were tested with lambda. Now they are also tested with method references. (They do work.) ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105084851 Add DataflowProfilingOptions ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105085000 Merge before processing elements This means that when a trigger fires we don't need to merge, which greatly simplifies the implementation. It also means that the number of intermediate (partially merged) windows is significantly reduced (dependent on the number of calls to processElements rather than the number of individual elements). Persist windows when running tests that involve merging. This is more representative of how actual bundles are processed by GroupAlsoByWindows. Remove the delayed commit of the finished set. With pre-merging this is no longer necessary. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105085413 Add a pipeline option to GcsOptions for user-specified buffer size The default settings for opening IO Channels to Google Cloud Storage use 64 MB buffers per connection, which can lead to out-of-memory issues when running a pipeline with several GCS connections. See GitHub issue GoogleCloudPlatform#62. Add a GcsOptions pipeline option so that users can override these sizes as needed. Also clean up the tests. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105087013 Fix typo in error message Observed in GoogleCloudPlatform#65 (scroll right on the text box). ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105206603 WorkerDiskType option: document and add reference URL The format of the string expected by this option is set by GCE and is non-obvious. Provide an example and a link for more information. Fixes GoogleCloudPlatform#68 ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105223902 Stop sampling after StateSampler.close() Implemented by serializing all the scheduled task runs and introducing a cancellation flag. Fixes a race condition where a scheduled sampling could still happen after cancellatino. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105226569 Add size-based file rotation to java logger The logger will now automatically write logs to a new file when the size exceeds a configurable limit (default 1 GB). This mechanism - which is similar to glog - is intended to work in combination with a purger that removes old files. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105228034 Fix a BigQuery output issue in DirectPipelineRunner getOrCreateTable() was called for each output element, and it introduces unnecessary BigQuery APIs call and duplicated messages. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105328508 Miscellaneous cleanup ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105338462 Refactored logging to use deterministic events Fixed DoFn Pipelining Finished Pipeline Fix Moved to two Firebases for test Cleaning a firebase to allow for duplicate events turned out to be too difficult for a number of test cases. It's fine to use two different demo Firebases for test event generation though, as events should be Firebase repo agnostic (Pipeline's can use tags and tranform names to label events with their originating pipeline in the case they want to use multiple Firebases) Updated tests Added a test for Child Changed events. Updated ChildAddedTest to depend on DoFirebasePush and fixed bug this revealed in DoFirebasePush All tests to green * Removed FirebaseErrors after discussion with puf@ firebase errors should not be passed down the pipeline * Tests now share a single demo firebase rather than using disposable firebases. Cleanup has been fixed with the exception of a detected error in DataflowJavaSDK
|
@dhalperi This is probably ready for a little bit more thorough of a review. All the tests should work. I'm working on tests for failure recovery (to make sure the |
contrib/firebaseio/pom.xml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We recommend [1.0.0,2.0.0) here unless you're requiring on something specific to a minor release, [1.2.0, 2.0.0).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, fine to include inline below if you are only using it once.
| result = auther.authenticate(this.getRef()); | ||
| } catch (FirebaseException|InterruptedException e) { | ||
| result = null; | ||
| e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use logging instead of printStackTrace.
re-interrupt currentThread.
why check for non-null (with an empty failure message if the check fails) rather than simply converting the FireBasedException into a RuntimeException and throwing it?
| FirebaseCheckpoint<T> checkpoint) throws NoSuchAlgorithmException{ | ||
| FirebaseCheckpoint<T> checkpoint) throws NoSuchAlgorithmException { | ||
| this.source = source; | ||
| this.digest = MessageDigest.getInstance("MD5"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
per comments elsewhere -- would probably catch here and wrap it as an IOException or a RuntimeException.
|
Pretty close! Let's merge soon. Also, you should consider porting this contrib forward to https://github.com/apache/incubator-beam. That way we will have this great new functionality in Apache Beam + Dataflow 2.0 and beyond. |
|
LGTM. Will merge once passing. |
|
RE: contributing this to Beam. The Firebase 3.0 client is out and changes/simplifies a lot of things, but I am blocked by some authentication features which are high priority for the team. So once those are fixed I will likely update this package to use the 3.0 client and at that point I'll submit that to beam. |
Creating a PR for review, Do Not Merge. Once the review is passed I will squash commits and submit.
@puf
@amygdala