Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with data being materialized after cross and before subsequent map #1905

Open
ttim opened this issue Mar 25, 2019 · 5 comments
Open

Issue with data being materialized after cross and before subsequent map #1905

ttim opened this issue Mar 25, 2019 · 5 comments

Comments

@ttim
Copy link
Collaborator

ttim commented Mar 25, 2019

In twitter we found a case when data got serialized after cross and before subsequent map. This might be critical since result of cross is usually really big and subsequent operations decrease size of it a lot. I created test for this: ttim@85ddcff . Seems like it only reproducable when .toPipe is used and two branches happen after .cross.map.

Any ideas @johnynek ?

@ttim
Copy link
Collaborator Author

ttim commented Mar 25, 2019

Removing .orElse(composeSame) from default map reduce optimization rules fixes the test.

@ttim
Copy link
Collaborator Author

ttim commented Mar 25, 2019

I think I know what's wrong.

The test I wrote doing next:

 val crossedMapped = TypedPipe.from(source)
      .cross(mapPipe)
      .map { case (value, map) => map(value) }

  crossedMapped.toPipe('value).write(sink1)
  crossedMapped.map(identity).toPipe('value).write(sink2)

During optimization phase we optimize two maps going one after another in second fork and therefore only common part of this two graphs become cross join. Forcing this common part to disk (my guess) made by cascading itself.

I think with typed writes it works because during typed writes planning we have both writes at the same time and able to extract common part out of it to calculate it only once.

@johnynek
Copy link
Collaborator

seems like a good guess that the interaction of toPipe is causing the issue.

That is really hiding from the optimizer what is going on.

We could put another rule that we should always avoid materializing a cross.

It's hard to say in-general when we should materialize vs not, only looking at the static graph.

We could imagine adding a new node to the graph to hint: even if we fan this out, don't recomputed it.

maybe .ephemeral or something, that is just a hint to the optimizer to ignore it when adding explicit Forks. I don't know how usable that will be though.

I would probably rather improve the optimization rules...

@ttim
Copy link
Collaborator Author

ttim commented Mar 26, 2019

So basically what happens is:

  1. Two pipe writes get optimized separately
  2. One of them has two maps subsequently which get composed by optimizer
  3. The only common part of this two branches our Pipe compiler sees is cross result so it creates fork in cascading at this point.

Knowing this I also can make similar test without using toPipe:

    val crossed = TypedPipe.from(source).cross(mapPipe)
    crossed.map { case (value, map) => map(value) }.write(sink1)
    crossed.map { case (value, map) => map(identity(value)) }.write(sink2)

For initial issue we can solve it with ignoring Pipes cache when we compile typed pipes in toPipe. But this seems like very weird solution.
Also we can solve it by forbidding squashing of maps etc after cross & hashJoin.
Or we can do something to avoid materialization of hashJoin & cross at all cost (this might mean we never want to reuse existing cascading Pipe corresponding to hashJoin from compilers cache?)
Or we can do something around .ephemeral.

Also I do think hashJoins (and cross in particular) should be never materialized by default. It's really hard for me to imagine a case when it's needed to be materialized and we already have tweak for this - .forceToDisk.

@johnynek
Copy link
Collaborator

I agree that we shouldn't materialize immediately after a cross. A hashJoin is maybe a bit different since it can be used as a filter, and in the filter case maybe we don want to checkpoint?

But as you say, the user can add forceToDisk if they want to insist in that case.

We can also add a configuration to change the plan in that case, but with the default not doing a force after the cross/hashJoin.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants