In [44]:
import numpy as np
import torch as th
import pyro
from pyro import poutine
import pyro.distributions as dist
from pyro.infer import SVI, Trace_ELBO
from pyro.optim import Adam
import pandas as pd

In [43]:
def model(sequences, lengths, args, batch_size=None, include_prior=True):
    assert not th._C._get_tracing_state()
    num_sequences, max_length, data_dim = sequences.shape
    with poutine.mask(mask=include_prior):
        # Our prior on transition probabilities will be:
        # stay in the same state with 90% probability; uniformly jump to another
        # state with 10% probability.
        probs_z = pyro.sample(
            "probs_z",
            dist.Dirichlet(0.9 * th.eye(args.hidden_dim) + 0.1).to_event(1),
        )
        # # We put a weak prior on the conditional probability of a tone sounding.
        # # We know that on average about 4 of 88 tones are active, so we'll set a
        # # rough weak prior of 10% of the notes being active at any one time.
        # probs_y = pyro.sample(
        #     "probs_y",
        #     dist.Beta(0.1, 0.9).expand([args.hidden_dim, data_dim]).to_event(2),
        # )
        probs_alpha = pyro.sample(
            "probs_alpha",
            dist.Gamma(1.0, 1.0).expand([args.hidden_dim, data_dim]).to_event(2)
        )

        probs_beta = pyro.sample(
            "probs_beta",
            dist.Gamma(1.0, 1.0).expand([args.hidden_dim, data_dim]).to_event(2)
        )
    # In this first model we'll sequentially iterate over sequences in a
    # minibatch; this will make it easy to reason about tensor shapes.
    tones_plate = pyro.plate("tones", data_dim, dim=-1)
    for i in pyro.plate("sequences", len(sequences), batch_size):
        length = lengths[i]
        sequence = sequences[i, :length]
        z = 0
        for t in pyro.markov(range(length)):
            # On the next line, we'll overwrite the value of x with an updated
            # value. If we wanted to record all x values, we could instead
            # write x[t] = pyro.sample(...x[t-1]...).
            z = pyro.sample(
                "z_{}_{}".format(i, t),
                dist.Categorical(probs_z[z]),
                infer={"enumerate": "parallel"},
            )
            with tones_plate:
                pyro.sample(
                    "y_{}_{}".format(i, t),
                    dist.Gamma(probs_alpha[z], probs_beta[z]),
                    obs=sequence[t]
                )

In [42]:
import torch
import pyro
import pyro.distributions as dist
import pyro.poutine as poutine
from pyro.infer import TraceEnum_ELBO, SVI
from pyro.optim import Adam

# Define the args structure with hidden_dim
class Args:
    def __init__(self, hidden_dim):
        self.hidden_dim = hidden_dim

# Example single sequence setup
max_length = 10  # Length of the sequence
hidden_dim = 3  # Example: 3 hidden states

# Randomly generate a single synthetic sequence of areas
sequence = (torch.rand(max_length) > 0.6).float()
length = torch.tensor(max_length)

args = Args(hidden_dim=hidden_dim)

def model_0(sequence, hidden_dim, args, include_prior=True):
    length = len(sequence)
    with poutine.mask(mask=include_prior):
        # Transition probabilities
        probs_x = pyro.sample(
            "probs_x",
            dist.Dirichlet(0.9 * torch.eye(hidden_dim) + 0.1).to_event(),
        )
        # Emission probabilities (1-dimensional for the area)
        probs_y = pyro.sample(
            "probs_y",
            dist.Beta(0.1, 0.9).expand([hidden_dim]).to_event(1),
        )
    
    x = 0  # Initial hidden state
    for t in pyro.markov(range(length)):
        x = pyro.sample(
            f"x_{t}",
            dist.Categorical(probs_x[x]),
            infer={"enumerate": "parallel"},
        )
        pyro.sample(
            f"y_{t}",
            dist.Bernoulli(probs_y[x]),
            obs=sequence[t],
        )

# Define the guide (variational distribution)
def guide_0(sequence, hidden_dim, args, include_prior=True):
    probs_x = pyro.param(
        "probs_x_q", dist.Dirichlet(0.9 * torch.eye(hidden_dim) + 0.1).sample(), constraint=dist.constraints.simplex
    )
    probs_y = pyro.param(
        "probs_y_q", dist.Beta(0.1, 0.9).expand([hidden_dim]).sample(), constraint = dist.constraints.unit_interval
    )
    with poutine.mask(mask=include_prior):
        pyro.sample("probs_x", dist.Delta(probs_x).to_event(2))
        pyro.sample("probs_y", dist.Delta(probs_y).to_event(1))

# Optimizer
optimizer = Adam({"lr": 0.01})

# Inference algorithm
elbo = TraceEnum_ELBO(max_plate_nesting=1)
svi = SVI(model_0, guide_0, optimizer, loss=elbo)

# Training
num_steps = 1000
for step in range(num_steps):
    loss = svi.step(sequence, hidden_dim, args)
    if step % 100 == 0:
        print(f"Step {step} : Loss = {loss}")

# Extract learned parameters
probs_x_posterior = pyro.param("probs_x_q").detach().cpu().numpy()
probs_y_posterior = pyro.param("probs_y_q").detach().cpu().numpy()

print("Learned transition probabilities (probs_x):")
print(probs_x_posterior)

print("Learned emission probabilities (probs_y):")
print(probs_y_posterior)


ValueError: Expected parameter probs (Tensor of shape (3, 1)) of distribution Bernoulli(probs: torch.Size([3, 1])) to satisfy the constraint Interval(lower_bound=0.0, upper_bound=1.0), but found invalid values:
tensor([[-0.0100],
        [ 0.2034],
        [-0.0044]], grad_fn=<IndexBackward0>)
Trace Shapes:          
 Param Sites:          
Sample Sites:          
 probs_x dist     | 3 3
        value     | 3 3
 probs_y dist     | 3  
        value     | 3  
     x_0 dist     |    
        value 3 1 |    

In [37]:
sequence, length

(tensor([1., 1., 0., 1., 0., 1., 0., 0., 0., 0.]), tensor(10))

In [47]:
data = pd.read_csv("data/hulls_df.csv")
data = data.dropna()
data.head()

Unnamed: 0,Period,Frame,Time [s],HomeHull,AwayHull
0,1.0,49.0,2.0,545.066253,782.19482
1,1.0,50.0,2.04,580.876874,782.477901
2,1.0,51.0,2.08,581.11677,782.804969
3,1.0,52.0,2.12,581.348315,783.177048
4,1.0,53.0,2.16,581.702889,783.361684


In [56]:
sequence = th.tensor(data["HomeHull"].values)
hidden_dim = 2
sequence


tensor([545.0663, 580.8769, 581.1168,  ..., 693.3607, 693.8751, 694.2381],
       dtype=torch.float64)

In [57]:
def model(sequence, hidden_dim, include_prior=True):
    length = len(sequence)
    with poutine.mask(mask=include_prior):
        # Transition probabilities
        probs_x = pyro.sample(
            "probs_x",
            dist.Dirichlet(0.9 * torch.eye(hidden_dim) + 0.1).to_event(),
        )
        # Emission probabilities (1-dimensional for the area)
        probs_alpha = pyro.sample(
            "probs_alpha",
            dist.Gamma(1.0, 1.0).expand([hidden_dim]).to_event(1)
        )

        probs_beta = pyro.sample(
            "probs_beta",
            dist.Gamma(1.0, 1.0).expand([hidden_dim]).to_event(1)
        )
    
    x = 0  # Initial hidden state
    for t in pyro.markov(range(length)):
        x = pyro.sample(
            f"x_{t}",
            dist.Categorical(probs_x[x]),
            infer={"enumerate": "parallel"},
        )
        pyro.sample(
            f"y_{t}",
            dist.Gamma(probs_alpha[x], probs_beta[x]),
            obs=sequence[t],
        )

from pyro.infer.autoguide import AutoDelta

# Define the guide (variational distribution)
guide = AutoDelta(poutine.block(model, expose=["probs_x", "probs_alpha", "probs_beta"]))

In [58]:
pyro.clear_param_store()

# Optimizer
optimizer = Adam({"lr": 0.01})

# Inference algorithm
elbo = TraceEnum_ELBO(max_plate_nesting=1)
svi = SVI(model, guide, optimizer, loss=elbo)

# Training
num_steps = 1000
for step in range(num_steps):
    loss = svi.step(sequence, hidden_dim)
    if step % 100 == 0:
        print(f"Step {step} : Loss = {loss}")

KeyboardInterrupt: 