In [9]:
using Rocket
using ReactiveMP
using GraphPPL
using BenchmarkTools
using Distributions
using MacroTools

In [10]:
@model function smoothing(n, k, x0, P)
    
    x_prior ~ NormalMeanVariance(mean(x0), cov(x0)) 

    x = randomvar(n)
    y = datavar(Float64, n)

    x_prev = x_prior
    
    sync = PostponeScheduler()

    for i in 1:n
        x[i] ~ (x_prev + 1.0) where { portal = rem(i, k) === 0 ? ScheduleOnPortal(sync) : EmptyPortal() }
        y[i] ~ NormalMeanVariance(x[i], P)
        
        x_prev = x[i]
    end

    return sync, x, y
end

smoothing (generic function with 1 method)

In [14]:
P = 1.0

n = 10_000
k = 700
data = collect(1:n) + rand(Normal(0.0, sqrt(P)), n);

In [15]:
function inference(; data, k, x0, P)
    n = length(data)
    
    _, (sync, x, y) = smoothing(n, k, x0, P);

    buffer    = Vector{Marginal}(undef, n)
    marginals = collectLatest(getmarginals(x))
    
    subscription = subscribe!(marginals, (ms) -> copyto!(buffer, ms))
    
    wait(sync)
    
    update!(y, data)
    
    wait(sync)
    
    unsubscribe!(subscription)
    
    return buffer
end

inference (generic function with 1 method)

In [17]:
@benchmark res = inference(
    data = $data,
    k = k,
    x0 = NormalMeanVariance(0.0, 10000.0),
    P = $P
)

BenchmarkTools.Trial: 
  memory estimate:  423.34 MiB
  allocs estimate:  6850545
  --------------
  minimum time:     802.275 ms (37.12% GC)
  median time:      867.184 ms (43.89% GC)
  mean time:        960.906 ms (48.33% GC)
  maximum time:     1.201 s (58.15% GC)
  --------------
  samples:          6
  evals/sample:     1