Skip to content

[BEAM-7926] Data-centric Interactive Part2#10346

Merged
pabloem merged 3 commits intoapache:masterfrom
nika-qubit:BEAM-7926-part2
Jan 28, 2020
Merged

[BEAM-7926] Data-centric Interactive Part2#10346
pabloem merged 3 commits intoapache:masterfrom
nika-qubit:BEAM-7926-part2

Conversation

@nika-qubit
Copy link
Contributor

@nika-qubit nika-qubit commented Dec 11, 2019

  1. Added pipeline_fragment module to build pipeline fragments including
    only transforms necessary to produce user-desired PCollections and
    implicitly execute the fragment to emit data for user-desired
    PCollections.
  2. Added the notion of cached PCollection completeness. Only cache of
    completely computed PCollections can be read. p.run() of the
    InteractiveRunner by default displays the pipeline graph and doesn't use
    any cached intermediate PCollections but generates new cache for them.
  3. Whenever a pipeline fragment is executed, by default the implicit
    pipeline run doesn't display pipeline graph but uses available cached
    intermediate PCollections and generates cache for PCollections haven't
    been computed.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@nika-qubit
Copy link
Contributor Author

R: @davidyan74
R: @rohdesamuel
PTAL.

Adding Pablo as the committer.
R: @pabloem

Thank you all!

@rohdesamuel
Copy link
Contributor

Taking a look

@nika-qubit nika-qubit changed the title [BEAM-7926] Data-centric Interactive Part2 [WIP]Separating to 2 PRs [BEAM-7926] Data-centric Interactive Part2 Dec 11, 2019
@nika-qubit
Copy link
Contributor Author

Taking a look

Split into 2 PRs.
This adds pipeline_fragment module with necessary dependency changes.
The following PR will implement show and add other minor changes.

@nika-qubit nika-qubit changed the title [WIP]Separating to 2 PRs [BEAM-7926] Data-centric Interactive Part2 [BEAM-7926] Data-centric Interactive Part2 Dec 11, 2019
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a test that the original pipeline is intact? i.e. p still has the PTransform "Cube".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the test

@rohdesamuel
Copy link
Contributor

Thank you for splitting it up, currently taking a look.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method already does bounds checking, this is redundant. It's okay to just do data_sample = data.head(25).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, will remove the bound checking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Displays the first n entries of the PCollection". It might also be helpful to parameterize n for the user, also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not an exposed API. Users wouldn't be able to directly invoke it. Think it as the UI displayed in the terminal when the user invokes show(pcoll) in ipython shell.
We can have a head() API in the interactive_beam module and we don't necessarily need to display a data-table but return a list like result.get(pcoll).

Comment on lines 266 to 269
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can replace these four lines with self._computed_pcolls.update(pc for pc in pcolls).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you didn't add it in this PR, but in general it is easier to understand "positive" parameters. enable_display with a default of True is easier to understand than skip_display with default of False (since skip_display with False is a double negative).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is added by an internal Googler and their code in Google3 need/set this value. So let's probably leave it be since we cannot make an atomic change that doesn't break things in Google3 at some point. I'll contact the author when we've settled down our changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah gotcha, np then

Comment on lines 193 to 213
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious as to why use the PipelineVisitor to manipulate the graph instead of doing something like the PipelineAnalyzer which manipulates the proto?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the proto way is less intuitive than traversing the pipeline as a graph directly. At least for the user pipeline.
With PipelineVisitor, all the logic can be applied by traversing the pipeline graph only once.
If you are traversing the pipeline of the user-defined pipeline instance, you can immediately query for metadata such as id(pcoll), which PTransform produces the pcoll, what are the inputs, side inputs and outputs of a PTransform easily.

With proto, there is a missing link between protos and whatever has been defined by the user that lives in memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I leave it be for now, but I still have my reservations. I think that this works by some weird trick of the implementation. It's actually more surprising that the API allows for the modification of the graph while iterating over it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Yeah, I had the same hunch. But then think about how we mutate linked list, and here, mutating a graph (actually a tree). It's very common to move things around in a tree/graph, like all the tree balancing logic, they all happen in-place without relying on converting a tree to a serialized representation. It's actually not a "bad" practice as long as we are not mutating the pipeline defined by the user.
A proto and a pipeline object are basically two representations of a graph similar to adjacency list vs. graph object. Proto is more platform-agnostic and language-agnostic while the pipeline object is only platform-agnostic (but represented in Python, or need different implementations in different platforms).
To me, proto is something that gets passed across systems, and I'd like it to be used as immutable medium when I pass the representation of a pipeline object around. When a mutation is needed, we deserialize it into a mutable pipeline object.
And the pipeline object is designed to be mutable and to be used to construct pipelines, then we just mutate it when we see fit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to build correlations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

show(pcoll1, pcoll2, pcoll3) given by the user shows PCollections defined in the user pipeline.
When building a fragment, the pipeline deduced is a runner pipeline (mutated standalone copy of the user pipeline). Without the correlations, we don't know what PCollections are pcoll1, pcoll2, pcoll3 in the runner pipeline anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also please make a test that the PipelineFragment produces the correct output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, adding a test to run the pipeline fragment and check the result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this map? What does it do?

Copy link
Contributor Author

@nika-qubit nika-qubit Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It tells us the correlation between PCollections from the runner pipeline instance and PCollections from the user pipeline instance.
When querying the values(), it also tells us what PCollections are cached.
You can even build a pipeline fragment or call show() with the values().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you test this please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test that runs a pipeline, verifies the computed PCollections and checks the result of computed PCollections.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to mark the completeness here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because with our design, there are only 2 APIs mark the completeness: show and run.
This unit test has neither but it tries to test the execution path where cache is already available for read.

We can change it to p.run() then instrument again, but this might be more descriptive.

@nika-qubit
Copy link
Contributor Author

Run Portable_Python PreCommit

@rohdesamuel
Copy link
Contributor

LGTM

@nika-qubit
Copy link
Contributor Author

R: @pabloem

This PR is ready for your review! Thank you very much!

@nika-qubit
Copy link
Contributor Author

Resolved merge conflicts and force pushed.

@nika-qubit
Copy link
Contributor Author

Run Python PreCommit

3 similar comments
@nika-qubit
Copy link
Contributor Author

Run Python PreCommit

@nika-qubit
Copy link
Contributor Author

nika-qubit commented Jan 2, 2020

Run Python PreCommit

@nika-qubit
Copy link
Contributor Author

Run Python PreCommit

@nika-qubit
Copy link
Contributor Author

retest this please

rohdesamuel added a commit to rohdesamuel/beam that referenced this pull request Jan 15, 2020
* Manual merge of apache#10346
* use real coders
* Modify the PipelineInstrument class to add the TestStream for unbounded PCollections
* address comments
* Modify the PipelineInstrument class to add the TestStream for unbounded PCollections
* Implements the StreamingCacheManager

cache_manager.py:
 - Add the 'write' method to write an element to the cache
 - Refactor the 'source/sink' methods to return a PTransform
 - Refactor the 'read' method to return a generator instead of the full
 list
 - Refactor the ReadCache and WriteCache methods to use the source/sink
 methods directly

cache_manager_test.py:
 - Refactor to use the new implementations

streaming_cache.py:
 - Refactor to implement the CacheManager interface
 - Refactor 'Reader' class to take in a list of headers
 - Refactor 'read' method to read the headers from the cache
 - Refactor the 'reader' method into the 'read_multiple' method to read
 multiple PCollections from cache given their headers.

streaming_cache_test.py:
 - Refactor to use the new implementations
 - Create a new class the "InMemoryCache" that is able to write and read
 PCollections directly from memory instead of from file.

pipeline_instrument.py:
 - Refactor the _read_cache method to only use the ReadCache method

pipeline_instrument_test.py:
 - Create a new class the "InMemoryCache" that is able to write and read
 PCollections directly from memory instead of from file.
 - Simplify the mock writes and tests to use the InMemoryCache
@nika-qubit
Copy link
Contributor Author

Rebased to upstream head.
retest this please.

@nika-qubit
Copy link
Contributor Author

R: @pabloem
Friendly ping :)

@pabloem
Copy link
Member

pabloem commented Jan 22, 2020

looking

@nika-qubit
Copy link
Contributor Author

nika-qubit commented Jan 22, 2020

Rebased to upstream head.
Retest this please.

@pabloem
Copy link
Member

pabloem commented Jan 22, 2020

Retest this please

1 similar comment
@pabloem
Copy link
Member

pabloem commented Jan 22, 2020

Retest this please

@tvalentyn
Copy link
Contributor

Run Python PreCommit

2 similar comments
@nika-qubit
Copy link
Contributor Author

Run Python PreCommit

@nika-qubit
Copy link
Contributor Author

Run Python PreCommit

@nika-qubit
Copy link
Contributor Author

Retest this please

@tvalentyn
Copy link
Contributor

Run Python PreCommit

@tvalentyn
Copy link
Contributor

retest this please

@tvalentyn
Copy link
Contributor

Run PythonLint PreCommit

2 similar comments
@tvalentyn
Copy link
Contributor

Run PythonLint PreCommit

@pabloem
Copy link
Member

pabloem commented Jan 28, 2020

Run PythonLint PreCommit

Ning Kang added 3 commits January 27, 2020 17:54
1. Added pipeline_fragment module to build pipeline fragments including
only transforms necessary to produce user-desired PCollections and
implicitly execute the fragment to emit data for user-desired
PCollections.
2. Added the notion of cached PCollection completeness. Only cache of
completely computed PCollections can be read. `p.run()` of the
InteractiveRunner by default displays the pipeline graph and doesn't use
any cached intermediate PCollections but generates new cache for them.
3. Whenever a pipeline fragment is executed, by default the implicit
pipeline run doesn't display pipeline graph but uses available cached
intermediate PCollections and generates cache for PCollections haven't
been computed.
@nika-qubit
Copy link
Contributor Author

Rebased to upstream head to pick up a lint change.

@nika-qubit
Copy link
Contributor Author

Run PythonLint PreCommit

@nika-qubit
Copy link
Contributor Author

Run Python PreCommit

1 similar comment
@tvalentyn
Copy link
Contributor

Run Python PreCommit

@nika-qubit
Copy link
Contributor Author

Run PythonLint PreCommit

3 similar comments
@pabloem
Copy link
Member

pabloem commented Jan 28, 2020

Run PythonLint PreCommit

@pabloem
Copy link
Member

pabloem commented Jan 28, 2020

Run PythonLint PreCommit

@nika-qubit
Copy link
Contributor Author

Run PythonLint PreCommit

@nika-qubit
Copy link
Contributor Author

Run Python PreCommit

@nika-qubit
Copy link
Contributor Author

Retest this please.

@nika-qubit
Copy link
Contributor Author

Retest this please

@nika-qubit
Copy link
Contributor Author

Run PythonLint PreCommit

@pabloem
Copy link
Member

pabloem commented Jan 28, 2020

Run Python PreCommit

@pabloem
Copy link
Member

pabloem commented Jan 28, 2020

Run PythonLint PreCommit

@nika-qubit
Copy link
Contributor Author

Retest this please.

@pabloem pabloem merged commit 302e4d9 into apache:master Jan 28, 2020
nika-qubit pushed a commit to nika-qubit/beam that referenced this pull request Feb 12, 2020
* Manual merge of apache#10346
* use real coders
* Modify the PipelineInstrument class to add the TestStream for unbounded PCollections
* address comments
* Modify the PipelineInstrument class to add the TestStream for unbounded PCollections
* Implements the StreamingCacheManager

cache_manager.py:
 - Add the 'write' method to write an element to the cache
 - Refactor the 'source/sink' methods to return a PTransform
 - Refactor the 'read' method to return a generator instead of the full
 list
 - Refactor the ReadCache and WriteCache methods to use the source/sink
 methods directly

cache_manager_test.py:
 - Refactor to use the new implementations

streaming_cache.py:
 - Refactor to implement the CacheManager interface
 - Refactor 'Reader' class to take in a list of headers
 - Refactor 'read' method to read the headers from the cache
 - Refactor the 'reader' method into the 'read_multiple' method to read
 multiple PCollections from cache given their headers.

streaming_cache_test.py:
 - Refactor to use the new implementations
 - Create a new class the "InMemoryCache" that is able to write and read
 PCollections directly from memory instead of from file.

pipeline_instrument.py:
 - Refactor the _read_cache method to only use the ReadCache method

pipeline_instrument_test.py:
 - Create a new class the "InMemoryCache" that is able to write and read
 PCollections directly from memory instead of from file.
 - Simplify the mock writes and tests to use the InMemoryCache
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants