In [27]:
using Rocket
using ReactiveMP
using GraphPPL
using BenchmarkTools
using Distributions
using MacroTools

In [28]:
@model [ outbound_message_portal = EmptyPortal() ] function smoothing(n, k, x0, P)
    
    x_prior ~ NormalMeanVariance(mean(x0), cov(x0)) 

    x = randomvar(n)
    y = datavar(Float64, n)

    x_prev = x_prior

    for i in 1:n
        x[i] ~ (x_prev + 1.0) where { portal = rem(i, k) === 0 ? AsyncPortal() : EmptyPortal() }
        y[i] ~ NormalMeanVariance(x[i], P)
        
        x_prev = x[i]
    end

    return x, y
end

smoothing (generic function with 1 method)

In [29]:
P = 1.0

n = 100_000
k = 715
data = collect(1:n) + rand(Normal(0.0, sqrt(P)), n);

In [35]:
function inference(; data, k, x0, P)
    n = length(data)
    
    _, (x, y) = smoothing(n, k, x0, P);

    buffer    = Vector{Marginal}(undef, n)
    marginals = collectLatest(Marginal, getmarginals(x))
    
    subscription = subscribe!(marginals, (ms) -> copyto!(buffer, ms))
    
    yield()
    
    update!(y, data)
    
    for i in 1:(div(n, k) + 1)
        yield()
    end
    
    unsubscribe!(subscription)
    
    return buffer
end

inference (generic function with 1 method)

In [37]:
@time res = inference(
    data = data,
    k = k,
    x0 = NormalMeanVariance(0.0, 10000.0),
    P = P
)

 12.011898 seconds (68.41 M allocations: 4.111 GiB, 56.25% gc time)


100000-element Array{Marginal,1}:
 Marginal{NormalMeanVariance{Float64}}(NormalMeanVariance{Float64}(μ=0.9984581685522635, v=9.999999990000086e-6))
 Marginal{NormalMeanVariance{Float64}}(NormalMeanVariance{Float64}(μ=1.9984581685522635, v=9.999999990000086e-6))
 Marginal{NormalMeanVariance{Float64}}(NormalMeanVariance{Float64}(μ=2.998458168552263, v=9.999999990000084e-6))
 Marginal{NormalMeanVariance{Float64}}(NormalMeanVariance{Float64}(μ=3.9984581685522635, v=9.999999990000084e-6))
 Marginal{NormalMeanVariance{Float64}}(NormalMeanVariance{Float64}(μ=4.998458168552264, v=9.999999990000084e-6))
 Marginal{NormalMeanVariance{Float64}}(NormalMeanVariance{Float64}(μ=5.998458168552264, v=9.999999990000086e-6))
 Marginal{NormalMeanVariance{Float64}}(NormalMeanVariance{Float64}(μ=6.998458168552264, v=9.999999990000084e-6))
 Marginal{NormalMeanVariance{Float64}}(NormalMeanVariance{Float64}(μ=7.998458168552263, v=9.999999990000084e-6))
 Marginal{NormalMeanVariance{Float64}}(NormalMeanVariance{F