Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge streamed individual rows from source striped tables #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

devgrok
Copy link

@devgrok devgrok commented Mar 12, 2020

Two changes that I had started a while ago and just around to finishing off. It be would be good to get some feedback on.

  1. Stream each source striped table as individual logical rows.
    I think it greatly simplifies the merging logic and seems to create less overhead than doing the merges as blocks of entities.
    This was also a precursor to various attempts to run stages of it concurrently as the merge command is single threaded. But also an attempt to use memory more efficiently.

  2. Used splitting the source striped table when necessary to stop the memory being swamped with huge objects (large logical table) when the source file wasn't properly chunked.
    This was the case for the output of the scatter job.
    This simple change alone speed up the merge task a fair amount with my dummy test data (by reducing GC time by roughly 20%).

…max internal chunk size to stop the stream consumer from being flooded with huge objects.
Copy link
Owner

@HuwCampbell HuwCampbell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I think getting rid of the chunks is a very good idea. When there are lots of files being merged, I think the minimum maximum each time a new block was added was probably yielding only a few rows per iteration, and we just ended up slicing the vectors hundreds of times.

So I like the approach.

I'll have to cherry pick the changes onto my icicle-lang repo and go through it properly to review the code well.

Probably a bit too much going to and from lists, streams, and vectors, and cons vectors; most of it the compiler should elide, but it's hard to know.

fromJust $ Cons.index 0 kvss

2 -> do
let mergeS s1 s2 = remapStreamEnd $ Stream.merge s1 s2
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The remap can just be void can't it?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also on this line, should it be mergeBy rowKey?


Stream.mapMaybeM (hoistEither . mergeRows msize schema) $
Stream.filter (/= []) $
Stream.mapped Stream.toList $
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a toList from then Boxed.fromList almost immediately. You might be able to just map the stream with rowValues and use FoldL.impurely S.foldM FoldL.vectorM to build the boxed vector.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if list stream fusion will work through the call (though it might?)

@HuwCampbell
Copy link
Owner

Sorry I only noticed this recently.

Stream.mapMaybeM (hoistEither . mergeRows msize schema) $
Stream.filter (/= []) $
Stream.mapped Stream.toList $
Stream.groupBy compKey $
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what's happening here is that you're merging the streams using a binary tree, such that the final stream is ordered on the row key, then using groupBy to get all the rows which contain the key as a stream or list of vector, then re-turning it into a binary tree using the function mergeValues.

So I think this is a bit round about. It should be possible to merge the streams themselves in a binary fashion (and it would make doing it in parallel easier I believe).

@HuwCampbell
Copy link
Owner

It's cool. I've left a few comments.

unionInputGroupBy schema msize inputs0 = do
let
compKey :: Row -> Row -> Bool
compKey a b = (rowKey a) == (rowKey b)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is

(==) `on` rowKey

by the way. With on from Data.Function.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants