Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ the number of items contained in the stream is unknown.
This has some advantages over other sampling procedures:

- If the iterable is lazy, the memory required is a small constant or grows in relation to the size of the sample,
instead of the all population.
instead of all the population.
- With reservoir methods, the sample collected is a random sample of the portion of the stream seen thus far at any
point of the sampling process.
- In some cases, sampling with the techniques implemented in this library can bring considerable performance gains, since
Expand Down
103 changes: 97 additions & 6 deletions docs/src/example.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,106 @@

# An Illustrative Example
# Illustrative Examples

## Sampling from Data on Disk

Suppose we want to sample from large datasets stored on disk. `StreamSampling.jl`
is very suited for this task. Let's simulate this task by generating some data in
HDF5 format and batch sampling them. You will need 10GB of space on disk for running
this example. If not available you can set a smaller size for `totaltuples`.

We first generate the dataset and store it with

```julia
using StreamSampling, Random, ChunkSplitters, HDF5

const dtype = @NamedTuple{a::Float64, b::Float64, c::Float64, d::Float64}
const totaltuples = 10^10÷32
const chunktuples = 5*10^5
const numchunks = ceil(Int, totaltuples / chunktuples)

function generate_large_hdf5_file(filename)
h5open(filename, "w") do file
dset = create_dataset(file, "data", dtype, (totaltuples,), chunk=(chunktuples,))
Threads.@threads for i in 1:numchunks
startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples)
dset[startrow:endrow] = map(i -> (a=rand(), b=rand(), c=rand(), d=rand()),
1:endrow-startrow+1)
end
end
end

!isfile("large_random_data.h5") && generate_large_hdf5_file("large_random_data.h5")
```

Then we can sample it using 1 thread with

```julia
function sample_large_hdf5_file(filename, rng, n, alg)
rs = ReservoirSampler{dtype}(rng, n, alg)
h5open(filename, "r") do file
dset = file["data"]
for i in 1:numchunks
startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples)
data_chunk = dset[startrow:endrow]
for d in data_chunk
fit!(rs, d)
end
end
end
return rs
end

rng = Xoshiro(42)
@time rs = sample_large_hdf5_file("large_random_data.h5", rng, 10^7, AlgRSWRSKIP())
```
```julia
43.514238 seconds (937.21 M allocations: 42.502 GiB, 2.57% gc time)
```

We can try to improve the performance by using multiple threads. Here, I started Julia
with `julia -t6 --gcthreads=6,1` on my machine

```julia
function psample_large_hdf5_file(filename, rngs, n, alg)
rsv = [ReservoirSampler{DATATYPE}(rngs[i], n, alg) for i in 1:Threads.nthreads()]
h5open(filename, "r") do file
dset = file["data"]
for c in chunks(1:numchunks; n=ceil(Int, numchunks/Threads.nthreads()))
Threads.@threads for (j, i) in collect(enumerate(c))
startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples)
data_chunk, rs = dset[startrow:endrow], rsv[j]
for d in data_chunk
fit!(rs, d)
end
end
end
end
return merge(rsv...)
end

rngs = [Xoshiro(i) for i in 1:Threads.nthreads()]
@time rs = psample_large_hdf5_file("large_random_data.h5", rngs, 10^7, AlgRSWRSKIP())
```
```julia
21.545665 seconds (937.21 M allocations: 46.525 GiB, 9.50% gc time, 14913 lock conflicts)
```

As you can see, the speed-up is not linear in the number of threads. This is mainly due to
the fact that accessing the chunks is single-threaded, so one would need to use `MPI.jl` as
explained at https://juliaio.github.io/HDF5.jl/stable/mpi/ to improve the multi-threading
performance. Though, we are already sampling at 500MB/S, which is not bad!

## Monitoring

Suppose to receive data about some process in the form of a stream and you want
to detect if anything is going wrong in the data being received. A reservoir
sampling approach could be useful to evaluate properties on the data stream.
This is a demonstration of such a use case using `StreamSampling.jl`. We will
assume that the monitored statistic in this case is the mean of the data, and
you want that to be lower than a certain threshold otherwise some malfunctioning
is expected.
is expected

```@example 1
```julia
using StreamSampling, Statistics, Random

function monitor(stream, thr)
Expand All @@ -29,21 +120,21 @@ end

We use some toy data for illustration

```@example 1
```julia
stream = 1:10^8; # the data stream
thr = 2*10^7; # the threshold for the mean monitoring
```

Then, we run the monitoring

```@example 1
```julia
rs = monitor(stream, thr);
```

The number of observations until the detection is triggered is
given by

```@example 1
```julia
nobs(rs)
```

Expand Down
6 changes: 3 additions & 3 deletions docs/src/perf_tips.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ Running with both version we get
```

As you can see, the immutable version is 50% faster than
the mutable one. In general, more the ratio between reservoir
size and stream size is smaller, more the immutable version
will be faster than the mutable one. Be careful though, because
the mutable one. In general, the smaller the ratio between reservoir
size and stream size, the faster the immutable version
will be than the mutable one. Be careful though, because
calling `fit!` on an immutable sampler won't modify it in-place,
but only create a new updated instance.

Expand Down
Loading