Skip to content

Conversation

@krynju
Copy link
Member

@krynju krynju commented Aug 14, 2021

  1. Made the map and reduce function act more like TableOperations and Base equivalents instead of trying to mimick DataFrames.
  2. Map now returns a DTable
  3. Reduce returns a NamedTuple with results of per-column reduction. Made it work nicely with init from Base and you can also select the columns for reduction in there
  4. A page about DTable for the documentation - need to still work on it for better examples of usage.

@krynju krynju mentioned this pull request Aug 14, 2021
@krynju
Copy link
Member Author

krynju commented Aug 14, 2021

note to self: adjust indexing for 1.3 appveyor CI as v[begin] is not supported at 1.3

@codecov-commenter
Copy link

codecov-commenter commented Aug 14, 2021

Codecov Report

Merging #265 (8cf06fb) into master (0b72b77) will not change coverage.
The diff coverage is 0.00%.

Impacted file tree graph

@@          Coverage Diff           @@
##           master    #265   +/-   ##
======================================
  Coverage    0.00%   0.00%           
======================================
  Files          35      34    -1     
  Lines        2724    2748   +24     
======================================
- Misses       2724    2748   +24     
Impacted Files Coverage Δ
src/Dagger.jl 0.00% <ø> (ø)
src/table/dtable.jl 0.00% <0.00%> (ø)
src/table/operations.jl 0.00% <0.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0b72b77...8cf06fb. Read the comment docs.

@krynju krynju marked this pull request as ready for review August 15, 2021 10:26
mapped = chunk |> TableOperations.map(x -> (result = f_row(x)))
reduce(f_reduce, mapped; init=init)
chunk_reduce = (_f, _chunk, _cols, _init) -> begin
values = [reduce(_f, Tables.getcolumn(_chunk, c); init=deepcopy(_init)) for c in _cols]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpsamaroo Do you think the per-column-within-chunk reduction should be done within separate tasks as well?
I think this could be a potential performance improvement with bigger chunks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should definitely parallelize more rather than less, and parallelizing per-column within a chunk might give the scheduler better information about compute costs for reducing that column, since columns can have different types, and thus take more or less time to compute.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the spawns per column inside and:

  1. Performance wise probably only an upgrade for many columns (need to look for a threshold sometime). For 2-4 cols it was usually a downgrade.
  2. Stability wise it causes this Eager scheduler error when @spawn inside a thunk #267 , so for now I'll keep this commented out
    col_in_chunk_reduce = (_f, _c, _init, _chunk) -> reduce(_f, Tables.getcolumn(_chunk, _c); init=deepcopy(_init))

    chunk_reduce = (_f, _chunk, _cols, _init) -> begin
        if length(_cols) <= 1 
            v = [col_in_chunk_reduce(_f, c, _init, _chunk) for c in _cols]
        else
            values = [Dagger.spawn(col_in_chunk_reduce, _f, c, _init, _chunk) for c in _cols]
            v = fetch.(values)
        end
        (; zip(_cols, v)...)
    end

Comment on lines +84 to +88
construct_single_column = (_col, _chunk_results...) -> getindex.(_chunk_results, _col)
result_columns = [Dagger.@spawn construct_single_column(c, chunk_reduce_results...) for c in columns]

reduce_result_column = (_f, _c, _init) -> reduce(_f, _c; init=_init)
reduce_chunks = [Dagger.@spawn reduce_result_column(f, c, deepcopy(init)) for c in result_columns]
Copy link
Member Author

@krynju krynju Aug 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this part first takes the per chunk results and makes columns out of them and then reduces them.
I tried treereduce instead of this and it was noticeably slower.
I think for the DTable where we know there's not going to be more than a reasonable number of chunks this could potentially be always faster than treereduce
Is there any case to use treereduce instead? Actually multimachine distributed maybe?

@jpsamaroo

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generating too many thunks is definitely detrimental to the scheduler right now, which I assume is what treereduce is doing. In the future I'll add support for lazy representations of operations directly in the scheduler, which will let us tell the scheduler, "Here's all the possible ways you can parallelize this operation, do what you think is most efficient".

Copy link
Member

@jpsamaroo jpsamaroo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work!

Comment on lines +84 to +88
construct_single_column = (_col, _chunk_results...) -> getindex.(_chunk_results, _col)
result_columns = [Dagger.@spawn construct_single_column(c, chunk_reduce_results...) for c in columns]

reduce_result_column = (_f, _c, _init) -> reduce(_f, _c; init=_init)
reduce_chunks = [Dagger.@spawn reduce_result_column(f, c, deepcopy(init)) for c in result_columns]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generating too many thunks is definitely detrimental to the scheduler right now, which I assume is what treereduce is doing. In the future I'll add support for lazy representations of operations directly in the scheduler, which will let us tell the scheduler, "Here's all the possible ways you can parallelize this operation, do what you think is most efficient".

krynju and others added 2 commits August 23, 2021 17:08
Co-authored-by: Julian Samaroo <jpsamaroo@jpsamaroo.me>
@jpsamaroo jpsamaroo merged commit 389ab9a into JuliaParallel:master Aug 23, 2021
@jpsamaroo
Copy link
Member

Thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants