From 35f49ca9ac648fc315fb8b1864c44d7a90bdcf2b Mon Sep 17 00:00:00 2001 From: Adriano Meligrana <68152031+Tortar@users.noreply.github.com> Date: Fri, 15 Aug 2025 01:49:21 +0200 Subject: [PATCH 1/2] Add arrow version to on-disk sampling --- docs/src/example.md | 131 +++++++++++++++++++++++++++++--------------- 1 file changed, 88 insertions(+), 43 deletions(-) diff --git a/docs/src/example.md b/docs/src/example.md index 81ffeed..a2c6c05 100644 --- a/docs/src/example.md +++ b/docs/src/example.md @@ -1,94 +1,139 @@ -# Illustrative Examples +# Some Applications ## Sampling from Data on Disk Suppose we want to sample from large datasets stored on disk. `StreamSampling.jl` is very suited for this task. Let's simulate this task by generating some data in -HDF5 format and batch sampling them. You will need 10GB of space on disk for running -this example. If not available you can set a smaller size for `totaltuples`. +HDF5 and arrow formats and batch sampling them. You will need 20GB of space on disk +for running this example. If not available you can set a smaller size for `totaltpl`. + +We first generate the datasets and store them with -We first generate the dataset and store it with ```julia -using StreamSampling, Random, ChunkSplitters, HDF5 +using StreamSampling, Random, ChunkSplitters +using HDF5, Arrow const dtype = @NamedTuple{a::Float64, b::Float64, c::Float64, d::Float64} -const totaltuples = 10^10÷32 -const chunktuples = 5*10^5 -const numchunks = ceil(Int, totaltuples / chunktuples) - -function generate_large_hdf5_file(filename) - h5open(filename, "w") do file - dset = create_dataset(file, "data", dtype, (totaltuples,), chunk=(chunktuples,)) - Threads.@threads for i in 1:numchunks - startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples) - dset[startrow:endrow] = map(i -> (a=rand(), b=rand(), c=rand(), d=rand()), - 1:endrow-startrow+1) +const totaltpl = 10^10÷32 +const chunktpl = 5*10^5 +const numchunks = ceil(Int, totaltpl / chunktpl) + +function generate_file(filename, format) + if format == :hdf5 + h5open(filename, "w") do file + dset = create_dataset(file, "data", dtype, (totaltpl,), chunk=(chunktpl,)) + Threads.@threads for i in 1:numchunks + starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl) + dset[starttpl:endtpl] = map(i -> (a=rand(), b=rand(), c=rand(), d=rand()), + 1:endtpl-starttpl+1) + end + end + elseif format == :arrow + open(Arrow.Writer, filename) do writer + for i in 1:numchunks + starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl) + Arrow.write(writer, (data=map(i -> (a=rand(), b=rand(), c=rand(), d=rand()), + 1:endtpl-starttpl+1),)) + end end end end -!isfile("large_random_data.h5") && generate_large_hdf5_file("large_random_data.h5") +!isfile("random_data.h5") && generate_file("random_data.h5", :hdf5) +!isfile("random_data.arrow") && generate_file("random_data.arrow", :arrow) ``` -Then we can sample it using 1 thread with +Then we can sample them using 1 thread with ```julia -function sample_large_hdf5_file(filename, rng, n, alg) +function sample_file(filename, rng, n, alg, format) rs = ReservoirSampler{dtype}(rng, n, alg) - h5open(filename, "r") do file - dset = file["data"] - for i in 1:numchunks - startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples) - data_chunk = dset[startrow:endrow] - for d in data_chunk - fit!(rs, d) + if format == :hdf5 + h5open(filename, "r") do file + dset = file["data"] + for i in 1:numchunks + starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl) + data_chunk = dset[starttpl:endtpl] + for d in data_chunk + fit!(rs, d) + end end end + elseif format == :arrow + rs = ReservoirSampler{dtype}(rng, n, alg) + data = Arrow.Table(filename).data + @inbounds for i in 1:length(data) + fit!(rs, data[i]) + end end return rs end rng = Xoshiro(42) -@time rs = sample_large_hdf5_file("large_random_data.h5", rng, 10^7, AlgRSWRSKIP()) +@time rs = sample_file("random_data.h5", rng, 10^7, AlgRSWRSKIP(), :hdf5) ``` ```julia 43.514238 seconds (937.21 M allocations: 42.502 GiB, 2.57% gc time) ``` +```julia +@time rs = sample_file("random_data.arrow", rng, 10^7, AlgRSWRSKIP(), :arrow) +``` +```julia + 38.635389 seconds (1.25 G allocations: 33.500 GiB, 3.52% gc time, 75763 lock conflicts) +``` We can try to improve the performance by using multiple threads. Here, I started Julia -with `julia -t6 --gcthreads=6,1` on my machine +with `julia -t4 --gcthreads=4,1` on my machine ```julia -function psample_large_hdf5_file(filename, rngs, n, alg) +function psample_file(filename, rngs, n, alg, format) rsv = [ReservoirSampler{dtype}(rngs[i], n, alg) for i in 1:Threads.nthreads()] - h5open(filename, "r") do file - dset = file["data"] - for c in chunks(1:numchunks; n=ceil(Int, numchunks/Threads.nthreads())) - Threads.@threads for (j, i) in collect(enumerate(c)) - startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples) - data_chunk, rs = dset[startrow:endrow], rsv[j] - for d in data_chunk - fit!(rs, d) + if format == :hdf5 + h5open(filename, "r") do file + dset = file["data"] + for c in chunks(1:numchunks; n=ceil(Int, numchunks/Threads.nthreads())) + Threads.@threads for (j, i) in collect(enumerate(c)) + starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl) + data_chunk, rs = dset[starttpl:endtpl], rsv[j] + for d in data_chunk + fit!(rs, d) + end end end end + elseif format == :arrow + data = Arrow.Table(filename).data + Threads.@threads for (i,c) in enumerate(chunks(1:length(data), n=Threads.nthreads())) + @inbounds for j in c + fit!(rsv[i], data[j]) + end + end end return merge(rsv...) end rngs = [Xoshiro(i) for i in 1:Threads.nthreads()] -@time rs = psample_large_hdf5_file("large_random_data.h5", rngs, 10^7, AlgRSWRSKIP()) +@time rs = psample_file("random_data.h5", rngs, 10^7, AlgRSWRSKIP(), :hdf5) ``` ```julia - 21.545665 seconds (937.21 M allocations: 46.525 GiB, 9.50% gc time, 14913 lock conflicts) + 23.240628 seconds (937.23 M allocations: 45.185 GiB, 9.52% gc time, 9375 lock conflicts) ``` +```julia +@time rs = psample_file("random_data.arrow", rngs, 10^7, AlgRSWRSKIP(), :arrow) +``` +```julia + 5.868995 seconds (175.91 k allocations: 3.288 GiB, 6.44% gc time, 64714 lock conflicts) +``` + +As you can see, the speed-up is not linear in the number of threads for an hdf5 file. This is +mainly due to the fact that accessing the chunks is single-threaded, so one would need to use +`MPI.jl` as explained at https://juliaio.github.io/HDF5.jl/stable/mpi/ to improve the multi-threading +performance. Though, we are already sampling at 500MB/s, which is not bad! -As you can see, the speed-up is not linear in the number of threads. This is mainly due to -the fact that accessing the chunks is single-threaded, so one would need to use `MPI.jl` as -explained at [https://juliaio.github.io/HDF5.jl/stable/mpi/](https://juliaio.github.io/HDF5.jl/stable/mpi/) -to improve the multi-threading performance. Though, we are already sampling at 500MB/S, which is not bad! +Using `Arrow.jl` gives an even better performance, and a scalability which is better than +linear somehow, reaching a 2GB/s sampling speed! ## Monitoring From 6602ed9379560a4651abf2ad726cc77acb717fc3 Mon Sep 17 00:00:00 2001 From: Adriano Meligrana <68152031+Tortar@users.noreply.github.com> Date: Fri, 15 Aug 2025 01:50:10 +0200 Subject: [PATCH 2/2] Update WeightedSamplingMulti.jl --- src/WeightedSamplingMulti.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/WeightedSamplingMulti.jl b/src/WeightedSamplingMulti.jl index a136030..c1bbf31 100644 --- a/src/WeightedSamplingMulti.jl +++ b/src/WeightedSamplingMulti.jl @@ -211,7 +211,7 @@ end function Base.merge!(s1::MultiAlgAResSampler, ss::MultiAlgAResSampler...) length(typeof(s1.value.valtree).parameters) == 3 && error("Merging ordered reservoirs is not possible") s1.n > minimum(s.n for s in ss) && error("The size of the mutated reservoir should be the minimum size between all merged reservoir") - empty!(s1.value) + empty!(s1.value.valtree) newvalue = reduce_samples(TypeS(), s1, ss...) for e in newvalue push!(s1.value, e[1] => e[2]) @@ -222,7 +222,7 @@ end function Base.merge!(s1::MultiAlgAExpJSampler, ss::MultiAlgAExpJSampler...) length(typeof(s1.value.valtree).parameters) == 3 && error("Merging ordered reservoirs is not possible") s1.n > minimum(s.n for s in ss) && error("The size of the mutated reservoir should be the minimum size between all merged reservoir") - empty!(s1.value) + empty!(s1.value.valtree) newvalue = reduce_samples(TypeS(), s1, ss...) for e in newvalue push!(s1.value, e[1] => e[2])