# Single pendulum


This notebook is an implementation of a controller for a single pendulum with REactiveMP.



In [1]:
using RxInfer, PyPlot, Compose
import ReactiveMP: getrecent, messageout

In [2]:
# Dynamical parameters
m = 0.65 # grams
l = 0.8 # cm
b = 0.6 # friction
G = 9.81 # gravity
O = [1.0; 0.0] # observation matrix

# Time step size
Δt = 0.1

# Transition matrix
A = [1 Δt; -G/l*Δt -b/(m*l^2)*Δt+1];

# Control matrix
B = reshape([0; 1/(m*l^2)*Δt], 2, 1);


In [3]:
# Internal dynamic model
function dzdt(s_t_min::AbstractVector, u::AbstractVector) # Transition function modeling transition due to gravity, friction and engine control
    s_t = A*s_t_min + B* u

    return s_t
end

dzdt (generic function with 1 method)

In [4]:
function initializeWorld()
    z_0 = [3.0, 0.0] # Initial position
    z_t_min = z_0
    function execute(a_t::Float64)
        z_t = dzdt(z_t, [a_t])
    
        z_t_min = z_t # Reset state
                
        return z_t
    end

    z_t = [1/3 *π, 0.0] # Predefine outcome variable
    observe() = z_t  # Report current position

    return (execute, observe)
end

initializeWorld (generic function with 1 method)

In [5]:
@model function pendulum(T)
    # Internal model perameters
    Gamma = 1e4*diageye(2) # Transition precision
    Theta = 1e-4*diageye(2) # Observation variance
    cA = constvar(A) # Transition matrix
    cB = constvar(B) # Control matrix
    cO = constvar(O) # Observation matrix
    
    m_s_t_min = datavar(Vector{Float64})
    V_s_t_min = datavar(Matrix{Float64})

    s_t_min ~ MvNormal(mean = m_s_t_min, cov = V_s_t_min)
    s_k_min = s_t_min
    
    m_u = datavar(Vector{Float64}, T)
    V_u = datavar(Matrix{Float64}, T)
    
    m_x = datavar(Vector{Float64}, T)
    V_x = datavar(Matrix{Float64}, T)
    
    u = randomvar(T)
    s = randomvar(T)
    x = randomvar(T)
    
    u_s = randomvar(T)
    
    for k in 1:T
        u[k] ~ MvNormal(mean = m_u[k], cov = V_u[k])
        u_s[k] ~ dzdt(s_k_min, u[k])
        s[k] ~ MvNormal(mean = u_s[k], precision = Gamma)
        x[k] ~ MvNormal(mean = dot(cO, s[k]), cov = Theta)
        x[k] ~ MvNormal(mean = m_x[k], cov = V_x[k]) # goal
        s_k_min = s[k]
    end
    
    return (s, )
end

@meta function pendulum_meta()
    dzdt() -> DeltaMeta(method = Linearization())
end


pendulum_meta (generic function with 1 method)

In [6]:
# Because states of the agent are unknown to the world, we wrap them in a comprehension.
# The comprehension only returns functions for interacting with the agent.
# Internal beliefs cannot be directly observed, and interaction is only allowed through the Markov blanket
function initializeAgent(; T = 5)
    Epsilon = fill(huge, 1, 1) # Control prior variance
    m_u = Vector{Float64}[ [ 0.0] for k=1:T ] # Set control priors
    V_u = Matrix{Float64}[ Epsilon for k=1:T ]

    x_target = [π, 0.0] # Goal state
    Sigma = 1e-4*diageye(2) # Goal prior variance
    m_x = [zeros(2) for k=1:T]
    m_x[end] = x_target
    V_x = [huge*diageye(2) for k=1:T]
    V_x[end] = Sigma # Set prior to reach goal at t=T

    m_s_t_min = [1/3 *π, 0.0] # Set initial brain state prior
    V_s_t_min = tiny*diageye(2)
    
    result = nothing

    # Initialize messages and marginals dictionary

    function infer(upsilon_t::Float64, y_hat_t::Vector{Float64})
        m_u[1] = [ upsilon_t ] # Register action with the generative model
        V_u[1] = fill(tiny, 1, 1) # Clamp control prior to performed action

        m_x[1] = y_hat_t # Register observation with the generative model
        V_x[1] = tiny*diageye(2) # Clamp goal prior to observation

        data = Dict(:m_u       => m_u, 
                    :V_u       => V_u, 
                    :m_x       => m_x, 
                    :V_x       => V_x,
                    :m_s_t_min => m_s_t_min,
                    :V_s_t_min => V_s_t_min)

        result = inference(
            model = pendulum(T),
            meta = pendulum_meta(),
            data = data,
        )
    end
    
    function act() 
        if result !== nothing
            return mode(result.posteriors[:u][2])[1]
        else
            return 0.0
        end
    end

    function slide(slide_msg_idx = 3)
        (s, ) = result.returnval
        (m_s_t_min, V_s_t_min) = mean_cov(getrecent(messageout(s[2], slide_msg_idx))) # Reset prior state statistics;

        m_u = circshift(m_u, -1)
        m_u[end] = [0.0]
        V_u = circshift(V_u, -1)
        V_u[end] = Epsilon

        m_x = circshift(m_x, -1)
        m_x[end] = x_target
        V_x = circshift(V_x, -1)
        V_x[end] = Sigma
    end

    return (infer, act, slide)    
end

initializeAgent (generic function with 1 method)

In [7]:
N = 80 # Total simulation time

(execute, observe)  = initializeWorld() # Let there be a world
(infer, act, slide) = initializeAgent(T = 3) # Let there be an agent

# Step through experimental protocol
a = Vector{Float64}(undef, N) # Actions
y = Vector{Vector{Float64}}(undef, N) # Observations
for t=1:N
    a[t] = act() # Evoke an action from the agent
           execute(a[t]) # The action influences hidden external states
    y[t] = observe() # Observe the current environmental outcome (update p)
           infer(a[t], y[t]) # Infer beliefs from current model state (update q)
           slide() # Prepare for next iteration
end
;

LoadError: RuleMethodError: no method matching rule for the given arguments

Possible fix, define:

@rule typeof(dot)(:in2, Marginalisation) (m_out::MvNormalMeanCovariance, m_in1::PointMass, meta::TinyCorrection) = begin 
    return ...
end



In [8]:
# Ploting the motion of the pendulum
function draw_pendulum(t)
    IJulia.clear_output(true)
    sleep(0.05)
    x = 0.5 +l*sin(t)
    y = l*cos(t)
    display(compose(context(units=UnitBox(-1,-1, 3,3)),
            (context(), line([(0,0), (1,0)]), stroke("black"),linewidth(1mm)),
            (context(),line([(0.5,0.0),(x,y)]), stroke("black"), linewidth(0.5mm)),
            (context(),circle(x, y, m* 0.08),fill("blue")))
        )
        
end
precompile(draw_pendulum, (Float64, ))
positions = mapreduce(permutedims, vcat, y)
source = from(positions[:,1])
subscription = subscribe!(source, draw_pendulum)

LoadError: UndefRefError: access to undefined reference