In [30]:
using Distributions
using ReactiveMP
using Rx
using BenchmarkTools

import Base: show

In [31]:
mutable struct InferenceActor <: Actor{AbstractMessage}
    index       :: Int
    size        :: Int
    data        :: Vector{Float64}
    means       :: Vector{Float64}
    complete    :: Channel{Bool}
    communicate :: Channel{Tuple{Float64, Float64}}
    
    y          :: ObservedVariable
    e_mean     :: EstimatedVariable
    e_variance :: EstimatedVariable
    
    InferenceActor(data::Vector{Float64}, y::ObservedVariable, e_mean::EstimatedVariable, e_variance::EstimatedVariable) = begin
        size  = length(data)
        means = Vector{Float64}(undef, size)
        
        actor = new(1, size, data, means, Channel{Bool}(Inf), Channel{Tuple{Float64, Float64}}(Inf), y, e_mean, e_variance)
        
        task = @async begin
            while true
                u = take!(actor.communicate)
                update!(actor, u[1], u[2])
            end
        end
        
        bind(actor.communicate, task)
        
        return actor
    end
end

function update!(actor::InferenceActor, mean::Float64, variance::Float64)
    next!(actor.y.values, actor.data[actor.index])
    next!(actor.e_mean.values, mean)
    next!(actor.e_variance.values, variance)
end

function stop!(actor::InferenceActor)
    complete!(actor.y.values)
    complete!(actor.e_mean.values)
    complete!(actor.e_variance.values)
end

function Rx.on_next!(actor::InferenceActor, data::AbstractMessage)
    m = mean(data.distribution)
    v = var(data.distribution)
    
    actor.means[actor.index] = m
    
    actor.index += 1
    
    if actor.index < actor.size + 1
        put!(actor.communicate, (m, v))
    else
        stop!(actor)
    end
end

Rx.on_error!(actor::InferenceActor, err) = error(err)

Rx.on_complete!(actor::InferenceActor)   = begin 
    put!(actor.complete, true)
    close(actor.complete)
    close(actor.communicate)
end

Base.show(io::IO, actor::InferenceActor) = print(io, "InferenceActor")

In [39]:
@btime begin
    N = 10000
    data = collect(1:N) + rand(Normal(0.0, 1.0), N);
    
    x_prev_add   = addition_node("x_prev_add", StochasticMessage{Normal{Float64}}, DeterministicMessage, StochasticMessage{Normal{Float64}});
    add_1        = constant_variable("add_1", 1.0, x_prev_add.in2);

    x_prev_prior = gaussian_mean_variance("x_prev_prior");
    x_prev_m     = estimated_variable("x_prev_m", x_prev_prior.mean);
    x_prev_v     = estimated_variable("x_prev_v", x_prev_prior.variance);

    x_prev = random_variable("x_prev", x_prev_prior.value, x_prev_add.in1);

    noise_node     = gaussian_mean_variance("noise_node");
    noise_mean     = constant_variable("noise_mean", 0.0, noise_node.mean);
    noise_variance = constant_variable("noise_variance", 1.0, noise_node.variance);

    add_x_and_noise = addition_node("add_x_and_noise", StochasticMessage{Normal{Float64}}, StochasticMessage{Normal{Float64}}, DeterministicMessage);

    x = random_variable("x", x_prev_add.out, add_x_and_noise.in1);
    n = random_variable("n", noise_node.value, add_x_and_noise.in2);
    y = observed_variable("y", add_x_and_noise.out);
    
    actor = InferenceActor(data, y, x_prev_m, x_prev_v);
    
    @async begin
        subscribe!(inference(x), actor)   
        update!(actor, 0.0, 1000.0) 
    end
    
    yield()
    
    take!(actor.complete)
    
    return actor.means
end

10000-element Array{Float64,1}:
    1.385932431197308 
    2.064519588051916 
    2.872479762147068 
    3.9126619784963816
    4.911628850423554 
    5.911624122610374 
    6.91162412260434  
    7.91162412260434  
    8.91162412260434  
    9.91162412260434  
   10.91162412260434  
   11.91162412260434  
   12.91162412260434  
    ⋮                 
 9988.911624122604    
 9989.911624122604    
 9990.911624122604    
 9991.911624122604    
 9992.911624122604    
 9993.911624122604    
 9994.911624122604    
 9995.911624122604    
 9996.911624122604    
 9997.911624122604    
 9998.911624122604    
 9999.911624122604    

In [24]:
@CreateMapOperator(Identity, Int, Int, (d) -> d ^ 2)

In [25]:
of(2) |> IdentityMapOperator()

SingleObservable{Int64}(4)

In [19]:
l1 = LazyObservable{Int}("l1")
l2 = LazyObservable{Int}("l2")

LazyObservable{Int64}("l2", #undef)

In [20]:
lc = combineLatest(l1, l2)

ReactiveMP.LazyCombined{Int64,Int64}(LazyObservable{Int64}("l1", #undef), LazyObservable{Int64}("l2", #undef))

In [28]:
l1.observable = of(2) |> IdentityMapOperator()
l2.observable = of(2)

SingleObservable{Int64}(2)

In [29]:
subscribe!(lc, LoggerActor{Tuple{Int, Int}}())

[LogActor] Data: (4, 2)
[LogActor] Completed


VoidTeardown()