Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 6 additions & 21 deletions benchmark/benchmark_comparison_non_stream_WWR.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ using CairoMakie
################

function weighted_reservoir_sample(rng, a, ws, n)
return shuffle!(rng, weighted_reservoir_sample_seq(rng, a, ws, n)[1])
return StreamSampling.fshuffle!(rng, weighted_reservoir_sample_seq(rng, a, ws, n)[1])
end

function weighted_reservoir_sample_seq(rng, a, ws, n)
Expand All @@ -25,12 +25,9 @@ function weighted_reservoir_sample_seq(rng, a, ws, n)
w_sum += w_el
if w_sum > w_skip
p = w_el/w_sum
q = 1-p
z = exp((n-4)*log1p(-p))
t = rand(rng, Uniform(z*q*q*q*q,1.0))
k = @inline choose(n, p, q, t, z)
k = StreamSampling.choose(rng, n, p)
@inbounds for j in 1:k
r = rand(rng, j:n)
r = rand(s.rng, Random.Sampler(s.rng, j:n, Val(1)))
reservoir[r], reservoir[j] = reservoir[j], a[i]
end
w_skip = @inline skip(rng, w_sum, n)
Expand All @@ -44,18 +41,6 @@ function skip(rng, w_sum::AbstractFloat, n)
return w_sum/k
end

function choose(n, p, q, t, z)
x = z*q*q*q*(q + n*p)
x > t && return 1
x += n*p*(n-1)*p*z*q*q/2
x > t && return 2
x += n*p*(n-1)*p*(n-2)*p*z*q/6
x > t && return 3
x += n*p*(n-1)*p*(n-2)*p*(n-3)*p*z/24
x > t && return 4
return quantile(Binomial(n, p), t)
end

#####################
## parallel 1 pass ##
#####################
Expand All @@ -75,7 +60,7 @@ function weighted_reservoir_sample_parallel_1_pass(rngs, a, ws, n)
Threads.@threads for i in 1:nt
ss[i] = sample(rngs[i], ss[i], ns[i]; replace = false)
end
return shuffle!(rngs[1], reduce(vcat, ss))
return fshuffle!(rngs[1], reduce(vcat, ss))
end

#####################
Expand All @@ -97,7 +82,7 @@ function weighted_reservoir_sample_parallel_2_pass(rngs, a, ws, n)
s = weighted_reservoir_sample_seq(rngs[i], @view(a[inds]), @view(ws[inds]), ns[i])
ss[i] = s[1]
end
return shuffle!(rngs[1], reduce(vcat, ss))
return fshuffle!(rngs[1], reduce(vcat, ss))
end

function sample_parallel_2_pass(rngs, a, ws, n)
Expand All @@ -115,7 +100,7 @@ function sample_parallel_2_pass(rngs, a, ws, n)
s = sample(rngs[i], @view(a[inds]), Weights(@view(ws[inds])), ns[i]; replace = true)
ss[i] = s
end
return shuffle!(rngs[1], reduce(vcat, ss))
return fshuffle!(rngs[1], reduce(vcat, ss))
end

################
Expand Down
14 changes: 7 additions & 7 deletions benchmark/benchmark_comparison_stream_WWR.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ using Random

struct AlgAExpJWR end

struct SampleMultiAlgAExpJWR{B, R, T} <: AbstractReservoirSampler
struct MultiAlgAExpJSamplerWR{B, R, T} <: AbstractReservoirSampler
n::Int
seen_k::Int
w_sum::Float64
Expand All @@ -19,15 +19,15 @@ struct SampleMultiAlgAExpJWR{B, R, T} <: AbstractReservoirSampler
end

function StreamSampling.ReservoirSampler{T}(rng::AbstractRNG, n::Integer, ::AlgAExpJWR,
::StreamSampling.ImmutSample, ::StreamSampling.Unord) where T
::StreamSampling.ImmutSampler, ::StreamSampling.Unord) where T
value = BinaryHeap(Base.By(first, DataStructures.FasterForward()), Tuple{Float64,T}[])
sizehint!(value, n)
v = Vector{T}(undef, n)
w = Vector{Float64}(undef, n)
return SampleMultiAlgAExpJWR(n, 0, 0.0, rng, value, v, w)
return MultiAlgAExpJSamplerWR(n, 0, 0.0, rng, value, v, w)
end

@inline function OnlineStatsBase._fit!(s::SampleMultiAlgAExpJWR, el, w)
@inline function OnlineStatsBase._fit!(s::MultiAlgAExpJSamplerWR, el, w)
n = s.n
s = @inline update_state!(s, w)
if s.seen_k <= n
Expand All @@ -51,14 +51,14 @@ end

skip_single(rng, n) = n/rand(rng)

function update_state!(s::SampleMultiAlgAExpJWR, w)
function update_state!(s::MultiAlgAExpJSamplerWR, w)
@update s.seen_k += 1
@update s.w_sum += w
return s
end

function OnlineStatsBase.value(s::SampleMultiAlgAExpJWR)
return shuffle!(s.rng, last.(s.value.valtree))
function OnlineStatsBase.value(s::MultiAlgAExpJSamplerWR)
return StreamSampling.fshuffle!(s.rng, last.(s.value.valtree))
end

a = Iterators.filter(x -> x != 1, 1:10^8)
Expand Down
2 changes: 2 additions & 0 deletions docs/Project.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
[deps]
BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf"
Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4"
Literate = "98b081ad-f1c9-55d3-8b20-4c87d4299306"
StreamSampling = "ff63dad9-3335-55d8-95ec-f8139d39e468"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"

Expand Down
19 changes: 13 additions & 6 deletions docs/make.jl
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
using Documenter

@info "Loading packages..."
using StreamSampling

println("Documentation Build")
using BenchmarkTools
using Documenter
using Literate

@info "Building Documentation"
makedocs(
modules = [StreamSampling],
sitename = "StreamSampling.jl",
pages = [
format = Documenter.HTML(prettyurls = false, size_threshold = 409600),
pages = [
"StreamSampling.jl" => "index.md",
"Basics" => "basics.md",
"An Illustrative Example" => "example.md",
"API" => "api.md",
"Benchmark Comparison" => "benchmark.md"
"Performance Tips" => "perf_tips.md",
"Benchmarks" => "benchmark.md"
],
warnonly = [:doctest, :missing_docs, :cross_references],
)
Expand All @@ -24,4 +31,4 @@ if CI
devbranch = "main",
)
end
println("Finished boulding and deploying docs.")
println("Finished building and deploying docs.")
55 changes: 55 additions & 0 deletions docs/src/basics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

# Overview of the functionalities

The `itsample` function allows to consume all the stream at once and return the sample collected:

```julia
using StreamSampling

st = 1:100;

itsample(st, 5)
```

In some cases, one needs to control the updates the `ReservoirSampler` will be subject to. In this case
you can simply use the `fit!` function to update the reservoir:

```julia
using StreamSampling

st = 1:100;

rs = ReservoirSampler{Int}(5);

for x in st
fit!(rs, x)
end

value(rs)
```

If the total number of elements in the stream is known beforehand and the sampling is unweighted, it is
also possible to iterate over a `StreamSampler` like so

```julia
using StreamSampling

st = 1:100;

ss = StreamSampler{Int}(st, 5, 100);

r = Int[];

for x in ss
push!(r, x)
end

r
```

The advantage of `StreamSampler` iterators in respect to `ReservoirSampler` is that they require `O(1)`
memory if not collected, while reservoir techniques require `O(k)` memory where `k` is the number
of elements in the sample.

Consult the [API page](https://juliadynamics.github.io/StreamSampling.jl/stable/api) for more information
about the package interface.
41 changes: 20 additions & 21 deletions docs/src/example.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# An Illustrative Example

Suppose to receive data about some process in the form of a stream and you want
Expand All @@ -9,43 +10,41 @@ you want that to be lower than a certain threshold otherwise some malfunctioning
is expected.

```julia
julia> using StreamSampling, Statistics, Random

julia> function monitor(stream, thr)
rng = Xoshiro(42)
# we use a reservoir sample of 10^4 elements
rs = ReservoirSampler{Int}(rng, 10^4)
# we loop over the stream and fit the data in the reservoir
for (i, e) in enumerate(stream)
fit!(rs, e)
# we check the mean value every 1000 iterations
if iszero(mod(i, 1000)) && mean(value(rs)) >= thr
return rs
end
end
end
using StreamSampling, Statistics, Random

function monitor(stream, thr)
rng = Xoshiro(42)
# we use a reservoir sample of 10^4 elements
rs = ReservoirSampler{Int}(rng, 10^4)
# we loop over the stream and fit the data in the reservoir
for (i, e) in enumerate(stream)
fit!(rs, e)
# we check the mean value every 1000 iterations
if iszero(mod(i, 1000)) && mean(value(rs)) >= thr
return rs
end
end
end
```

We use some toy data for illustration

```julia
julia> stream = 1:10^8; # the data stream

julia> thr = 2*10^7; # the threshold for the mean monitoring
stream = 1:10^8; # the data stream
thr = 2*10^7; # the threshold for the mean monitoring
```

Then, we run the monitoring

```julia
julia> rs = monitor(stream, thr);
rs = monitor(stream, thr);
```

The number of observations until the detection is triggered is
given by

```julia
julia> nobs(rs)
40009000
nobs(rs)
```

which is very close to the true value of `4*10^7 - 1` observations.
Expand Down
73 changes: 0 additions & 73 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,79 +5,6 @@
StreamSampling
```

# Overview of the functionalities

The `itsample` function allows to consume all the stream at once and return the sample collected:

```julia
julia> using StreamSampling

julia> st = 1:100;

julia> itsample(st, 5)
5-element Vector{Int64}:
9
15
52
96
91
```

In some cases, one needs to control the updates the `ReservoirSampler` will be subject to. In this case
you can simply use the `fit!` function to update the reservoir:

```julia
julia> using StreamSampling

julia> st = 1:100;

julia> rs = ReservoirSampler{Int}(5);

julia> for x in st
fit!(rs, x)
end

julia> value(rs)
5-element Vector{Int64}:
7
9
20
49
74
```

If the total number of elements in the stream is known beforehand and the sampling is unweighted, it is
also possible to iterate over a `StreamSampler` like so

```julia
julia> using StreamSampling

julia> st = 1:100;

julia> ss = StreamSampler{Int}(st, 5, 100);

julia> r = Int[];

julia> for x in ss
push!(r, x)
end

julia> r
5-element Vector{Int64}:
10
22
26
35
75
```

The advantage of `StreamSampler` iterators in respect to `ReservoirSampler` is that they require `O(1)`
memory if not collected, while reservoir techniques require `O(k)` memory where `k` is the number
of elements in the sample.

Consult the [API page](https://juliadynamics.github.io/StreamSampling.jl/stable/api) for more information
about the package interface.

## Reproducibility

```@raw html
Expand Down
Loading
Loading