Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 88 additions & 43 deletions docs/src/example.md
Original file line number Diff line number Diff line change
@@ -1,94 +1,139 @@

# Illustrative Examples
# Some Applications

## Sampling from Data on Disk

Suppose we want to sample from large datasets stored on disk. `StreamSampling.jl`
is very suited for this task. Let's simulate this task by generating some data in
HDF5 format and batch sampling them. You will need 10GB of space on disk for running
this example. If not available you can set a smaller size for `totaltuples`.
HDF5 and arrow formats and batch sampling them. You will need 20GB of space on disk
for running this example. If not available you can set a smaller size for `totaltpl`.

We first generate the datasets and store them with

We first generate the dataset and store it with

```julia
using StreamSampling, Random, ChunkSplitters, HDF5
using StreamSampling, Random, ChunkSplitters
using HDF5, Arrow

const dtype = @NamedTuple{a::Float64, b::Float64, c::Float64, d::Float64}
const totaltuples = 10^10÷32
const chunktuples = 5*10^5
const numchunks = ceil(Int, totaltuples / chunktuples)

function generate_large_hdf5_file(filename)
h5open(filename, "w") do file
dset = create_dataset(file, "data", dtype, (totaltuples,), chunk=(chunktuples,))
Threads.@threads for i in 1:numchunks
startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples)
dset[startrow:endrow] = map(i -> (a=rand(), b=rand(), c=rand(), d=rand()),
1:endrow-startrow+1)
const totaltpl = 10^10÷32
const chunktpl = 5*10^5
const numchunks = ceil(Int, totaltpl / chunktpl)

function generate_file(filename, format)
if format == :hdf5
h5open(filename, "w") do file
dset = create_dataset(file, "data", dtype, (totaltpl,), chunk=(chunktpl,))
Threads.@threads for i in 1:numchunks
starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl)
dset[starttpl:endtpl] = map(i -> (a=rand(), b=rand(), c=rand(), d=rand()),
1:endtpl-starttpl+1)
end
end
elseif format == :arrow
open(Arrow.Writer, filename) do writer
for i in 1:numchunks
starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl)
Arrow.write(writer, (data=map(i -> (a=rand(), b=rand(), c=rand(), d=rand()),
1:endtpl-starttpl+1),))
end
end
end
end

!isfile("large_random_data.h5") && generate_large_hdf5_file("large_random_data.h5")
!isfile("random_data.h5") && generate_file("random_data.h5", :hdf5)
!isfile("random_data.arrow") && generate_file("random_data.arrow", :arrow)
```

Then we can sample it using 1 thread with
Then we can sample them using 1 thread with

```julia
function sample_large_hdf5_file(filename, rng, n, alg)
function sample_file(filename, rng, n, alg, format)
rs = ReservoirSampler{dtype}(rng, n, alg)
h5open(filename, "r") do file
dset = file["data"]
for i in 1:numchunks
startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples)
data_chunk = dset[startrow:endrow]
for d in data_chunk
fit!(rs, d)
if format == :hdf5
h5open(filename, "r") do file
dset = file["data"]
for i in 1:numchunks
starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl)
data_chunk = dset[starttpl:endtpl]
for d in data_chunk
fit!(rs, d)
end
end
end
elseif format == :arrow
rs = ReservoirSampler{dtype}(rng, n, alg)
data = Arrow.Table(filename).data
@inbounds for i in 1:length(data)
fit!(rs, data[i])
end
end
return rs
end

rng = Xoshiro(42)
@time rs = sample_large_hdf5_file("large_random_data.h5", rng, 10^7, AlgRSWRSKIP())
@time rs = sample_file("random_data.h5", rng, 10^7, AlgRSWRSKIP(), :hdf5)
```
```julia
43.514238 seconds (937.21 M allocations: 42.502 GiB, 2.57% gc time)
```
```julia
@time rs = sample_file("random_data.arrow", rng, 10^7, AlgRSWRSKIP(), :arrow)
```
```julia
38.635389 seconds (1.25 G allocations: 33.500 GiB, 3.52% gc time, 75763 lock conflicts)
```

We can try to improve the performance by using multiple threads. Here, I started Julia
with `julia -t6 --gcthreads=6,1` on my machine
with `julia -t4 --gcthreads=4,1` on my machine

```julia
function psample_large_hdf5_file(filename, rngs, n, alg)
function psample_file(filename, rngs, n, alg, format)
rsv = [ReservoirSampler{dtype}(rngs[i], n, alg) for i in 1:Threads.nthreads()]
h5open(filename, "r") do file
dset = file["data"]
for c in chunks(1:numchunks; n=ceil(Int, numchunks/Threads.nthreads()))
Threads.@threads for (j, i) in collect(enumerate(c))
startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples)
data_chunk, rs = dset[startrow:endrow], rsv[j]
for d in data_chunk
fit!(rs, d)
if format == :hdf5
h5open(filename, "r") do file
dset = file["data"]
for c in chunks(1:numchunks; n=ceil(Int, numchunks/Threads.nthreads()))
Threads.@threads for (j, i) in collect(enumerate(c))
starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl)
data_chunk, rs = dset[starttpl:endtpl], rsv[j]
for d in data_chunk
fit!(rs, d)
end
end
end
end
elseif format == :arrow
data = Arrow.Table(filename).data
Threads.@threads for (i,c) in enumerate(chunks(1:length(data), n=Threads.nthreads()))
@inbounds for j in c
fit!(rsv[i], data[j])
end
end
end
return merge(rsv...)
end

rngs = [Xoshiro(i) for i in 1:Threads.nthreads()]
@time rs = psample_large_hdf5_file("large_random_data.h5", rngs, 10^7, AlgRSWRSKIP())
@time rs = psample_file("random_data.h5", rngs, 10^7, AlgRSWRSKIP(), :hdf5)
```
```julia
21.545665 seconds (937.21 M allocations: 46.525 GiB, 9.50% gc time, 14913 lock conflicts)
23.240628 seconds (937.23 M allocations: 45.185 GiB, 9.52% gc time, 9375 lock conflicts)
```
```julia
@time rs = psample_file("random_data.arrow", rngs, 10^7, AlgRSWRSKIP(), :arrow)
```
```julia
5.868995 seconds (175.91 k allocations: 3.288 GiB, 6.44% gc time, 64714 lock conflicts)
```

As you can see, the speed-up is not linear in the number of threads for an hdf5 file. This is
mainly due to the fact that accessing the chunks is single-threaded, so one would need to use
`MPI.jl` as explained at https://juliaio.github.io/HDF5.jl/stable/mpi/ to improve the multi-threading
performance. Though, we are already sampling at 500MB/s, which is not bad!

As you can see, the speed-up is not linear in the number of threads. This is mainly due to
the fact that accessing the chunks is single-threaded, so one would need to use `MPI.jl` as
explained at [https://juliaio.github.io/HDF5.jl/stable/mpi/](https://juliaio.github.io/HDF5.jl/stable/mpi/)
to improve the multi-threading performance. Though, we are already sampling at 500MB/S, which is not bad!
Using `Arrow.jl` gives an even better performance, and a scalability which is better than
linear somehow, reaching a 2GB/s sampling speed!

## Monitoring

Expand Down
4 changes: 2 additions & 2 deletions src/WeightedSamplingMulti.jl
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ end
function Base.merge!(s1::MultiAlgAResSampler, ss::MultiAlgAResSampler...)
length(typeof(s1.value.valtree).parameters) == 3 && error("Merging ordered reservoirs is not possible")
s1.n > minimum(s.n for s in ss) && error("The size of the mutated reservoir should be the minimum size between all merged reservoir")
empty!(s1.value)
empty!(s1.value.valtree)
newvalue = reduce_samples(TypeS(), s1, ss...)
for e in newvalue
push!(s1.value, e[1] => e[2])
Expand All @@ -222,7 +222,7 @@ end
function Base.merge!(s1::MultiAlgAExpJSampler, ss::MultiAlgAExpJSampler...)
length(typeof(s1.value.valtree).parameters) == 3 && error("Merging ordered reservoirs is not possible")
s1.n > minimum(s.n for s in ss) && error("The size of the mutated reservoir should be the minimum size between all merged reservoir")
empty!(s1.value)
empty!(s1.value.valtree)
newvalue = reduce_samples(TypeS(), s1, ss...)
for e in newvalue
push!(s1.value, e[1] => e[2])
Expand Down
Loading