Skip to content

Commit

Permalink
Merge pull request #553 from JuliaFolds/fixed-size-collect
Browse files Browse the repository at this point in the history
Add fast pathway for `copy`, `collect`, `tcollect`, and `tcopy` for size-stable operations
  • Loading branch information
MasonProtter authored May 8, 2023
2 parents f8d0dfe + c616391 commit 2a51f8d
Show file tree
Hide file tree
Showing 13 changed files with 944 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- julia-version: '1.8'
envname: "main_v2"
- julia-version: 'nightly'
envname: "main_v2"
envname: "main_v3"
fail-fast: false
name: Test Julia ${{ matrix.julia-version }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Transducers"
uuid = "28d57a85-8fef-5791-bfe6-a80928e7c999"
authors = ["Takafumi Arakaki <aka.tkf@gmail.com>"]
version = "0.4.75"
version = "0.4.76"

[deps]
Adapt = "79e6a3ab-5dfb-504d-930d-738a2a938a0e"
Expand Down
2 changes: 1 addition & 1 deletion src/Transducers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ using InitialValues:
asmonoid,
hasinitialvalue
using Logging: @logmsg, LogLevel
using MicroCollections: UndefVector
using MicroCollections: UndefVector, UndefArray
using Requires
using Setfield: @lens, @set, set, setproperties
using SplittablesBase: SplittablesBase, amount, halve
Expand Down
2 changes: 2 additions & 0 deletions src/core.jl
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,8 @@ identityof(::typeof(right), ::Any) = nothing
abstract type Reducible end
abstract type Foldable <: Reducible end

Base.IteratorSize(::Type{<:Reducible}) = Base.SizeUnknown()

"""
Transducers.asfoldable(x) -> foldable
Expand Down
5 changes: 3 additions & 2 deletions src/dreduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,14 @@ function dtransduce(
end
@argcheck basesize > 0
@argcheck threads_basesize > 0
futures = map(firstindex(coll):basesize:lastindex(coll)) do start
chunks = split_into_chunks(coll, basesize)
futures = map(chunks) do chunk
Distributed.remotecall(
_remote_reduce,
pool,
rf,
init,
coll[start:min(end, start - 1 + basesize)],
chunk,
threads_basesize,
)
end
Expand Down
55 changes: 51 additions & 4 deletions src/processes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,10 @@ julia> collect(Interpose(missing), 1:3)
3
```
"""
function Base.collect(xf::Transducer, coll)
Base.collect(xf::XF, coll) where {XF <: Transducer} = _collect(xf, coll, OutputSize(XF), Base.IteratorSize(coll))
Base.collect(foldable::Foldable) = collect(extract_transducer(foldable)...)

function _collect(xf, coll, ::Any, ::Any)
result = finish!(unreduced(transduce(
Map(SingletonVector) xf,
wheninit(collector, append!!),
Expand All @@ -816,9 +819,17 @@ function Base.collect(xf::Transducer, coll)
end
return result
end
# Base.collect(xf, coll) = append!([], xf, coll)

Base.collect(foldable::Foldable) = collect(extract_transducer(foldable)...)
function _collect(xf, arr::Array, ::SizeStable, ::Union{Base.HasLength, Base.HasShape})
rf(coll, (i, val)) = @inbounds setindex!!(coll, val, i)
dest = UndefArray(size(arr)...)
unreduced(transduce(
Enumerate() xf,
wheninit(() -> dest, rf),
dest,
arr
))
end

"""
copy(xf::Transducer, T, foldable) :: Union{T, Empty{T}}
Expand Down Expand Up @@ -869,7 +880,9 @@ julia> @assert copy(
) == DataFrame(A = [2], B = [3])
```
"""
Base.copy(xf::Transducer, ::Type{T}, foldable) where {T} = append!!(xf, Empty(T), foldable)
function Base.copy(xf::XF, T, foldable::F) where {XF <: Transducer, F}
_copy(xf, T, foldable, OutputSize(XF), Base.IteratorSize(F))
end
Base.copy(xf::Transducer, foldable) = copy(xf, _materializer(foldable), foldable)

function Base.copy(::Type{T}, ed::Foldable) where {T}
Expand All @@ -882,6 +895,40 @@ function Base.copy(ed::Foldable)
return copy(xf, foldable)
end

_copy(xf, ::Type{T}, foldable, ::Any, ::Any) where {T} = append!!(xf, Empty(T), foldable)

function _copy(xf, ::Type{Vector{<:Any}}, arr, ::SizeStable, ::Base.HasLength)
rf(coll, (i, val)) = @inbounds setindex!!(coll, val, i)
dest = UndefVector(length(arr))
unreduced(transduce(
Enumerate() xf,
wheninit(() -> dest, rf),
dest,
arr
))
end

function _copy(xf, ::Type{Array{<:Any, N}}, arr, ::SizeStable, ::Base.HasShape) where {N}
sz_arr = size(arr)
M = length(sz_arr)
if N > M
sz = (sz_arr..., ntuple(_ -> 1, N - ndims(arr))...)
elseif N < M
l, r = sz_arr[1:(N-1)], sz_arr[N:end]
sz = (l..., prod(r))
else
sz = sz_arr
end
rf(coll, (i, val)) = @inbounds setindex!!(coll, val, i)
dest = UndefArray(sz)
unreduced(transduce(
Enumerate() xf,
wheninit(() -> dest, rf),
dest,
arr
))
end

Base.Set(foldable::Foldable) = copy(Set, foldable)
Base.Dict(foldable::Foldable) = copy(Dict, foldable)

Expand Down
15 changes: 13 additions & 2 deletions src/reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,19 @@ julia> @assert foldxt(
) == Table(a = [1, 2])
```
"""
tcopy(xf, T, reducible; kwargs...) =
foldxt(append!!, Map(SingletonVector) xf, reducible; init = Empty(T), kwargs...)
tcopy(xf::XF, T, reducible::R; kwargs...) where {XF, R} = _tcopy(xf, T, reducible, OutputSize(XF), Base.IteratorSize(R); kwargs...)
_tcopy(xf, T, reducible, ::Any, ::Any; kwargs...) = foldxt(append!!, Map(SingletonVector) xf, reducible; init = Empty(T), kwargs...)
function _tcopy(xf, ::Type{T}, reducible, ::SizeStable, ::Union{Base.HasLength, Base.HasShape};
basesize=max(amount(reducible) ÷ Threads.nthreads(), 1), kwargs...) where {T <: Array}
chunks = split_into_chunks(reducible, basesize)
foldxt(append!!, Map(x -> copy(xf, T, x)), chunks; init = Empty(T), kwargs...)
end

# This can't be collect(Partition(sz), col) because of https://github.com/JuliaFolds/Transducers.jl/issues/554
function split_into_chunks(coll, sz)
collect(Iterators.partition(coll, sz))
end

tcopy(xf, reducible; kwargs...) = tcopy(xf, _materializer(reducible), reducible; kwargs...)

function tcopy(::Type{T}, itr; kwargs...) where {T}
Expand Down
Loading

2 comments on commit 2a51f8d

@MasonProtter
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/83126

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.4.76 -m "<description of version>" 2a51f8dbd6b4408063e52108c3e36006f63b6b0c
git push origin v0.4.76

Please sign in to comment.