In [1]:
import numpy as np

In [3]:
z_dim = 5
x_dim = 4

In [8]:
# init to uniform dist of shape (z_dim,)
p_z_init = np.ones(z_dim) / z_dim

In [4]:
# init to uniform dist of shape (z_dim, z_dim)
p_latent_transition = np.ones((z_dim, z_dim)) / z_dim

In [5]:
# init to uniform dist of shape (z_dim, x_dim)
p_emit = np.ones((z_dim, x_dim)) / x_dim


In [9]:
print(p_z_init, p_latent_transition, p_emit)

[0.2 0.2 0.2 0.2 0.2] [[0.2 0.2 0.2 0.2 0.2]
 [0.2 0.2 0.2 0.2 0.2]
 [0.2 0.2 0.2 0.2 0.2]
 [0.2 0.2 0.2 0.2 0.2]
 [0.2 0.2 0.2 0.2 0.2]] [[0.25 0.25 0.25 0.25]
 [0.25 0.25 0.25 0.25]
 [0.25 0.25 0.25 0.25]
 [0.25 0.25 0.25 0.25]
 [0.25 0.25 0.25 0.25]]


In [11]:
sum(p_emit[z, 0] * p_z_init[z] for z in range(z_dim))

0.25

In [10]:
np.array([sum(p_emit[z, x] * p_z_init[z] for z in range(z_dim)) for x in range(x_dim)])

array([0.25, 0.25, 0.25, 0.25])

In [12]:
# faster way with matrix multiply
# p_z_init is shape (z_dim,)
# p_emit is shape (z_dim, x_dim)
# matrix multiply is the same as dot product. it takes (x, y) x (y, z) and returns (x, z)
# dot product takes (y, ) and (y, z) and returns (z, )
# in this case, it is (z_dim,) x (z_dim, x_dim) = (x_dim,)
np.dot(p_z_init, p_emit) # p_z_init is (z_dim,) and p_emit is (z_dim, x_dim)

array([0.25, 0.25, 0.25, 0.25])

In [13]:
np.array([0, 0]).shape

(2,)

In [14]:
np.array([[0,0]]).shape

(1, 2)

In [15]:
np.dot(p_z_init, p_latent_transition)

array([0.2, 0.2, 0.2, 0.2, 0.2])

In [16]:
x = np.array([1,2,3,1,2,3,1,2,3,1,2,3])

In [17]:
x.shape

(12,)

In [29]:
p_z = np.zeros((x.shape[0], z_dim))
p_z[0] = p_z_init
print(p_z)

[[0.2 0.2 0.2 0.2 0.2]
 [0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0. ]]


In [38]:
p_x = np.zeros((x.shape[0], x_dim))
print(p_x)

[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]


In [41]:
# THE FORWARD ALGORITHM
for i in range(x.shape[0]):
    # emission:
    # calculate P(X_i = x) = sum_z P(X_i = x | Z_i = z) P(Z_i = z)
    p_x[i] = np.dot(p_z[i], p_emit) # (x_dim, )

    if i < x.shape[0] - 1:
        # latent transition:
        # calculate P(Z_{i+1} = z) = sum_z P(Z_{i+1} = z | Z_i = z) P(Z_i = z)
        p_z[i+1] = np.dot(p_z[i], p_latent_transition) # (z_dim, )

In [44]:
# first calculate P(Z_i = z) for all i
for i in range(x.shape[0] - 1):
    p_z[i+1] = np.dot(p_z[i], p_latent_transition) # (z_dim, )

In [45]:
# then P(X_i = x) is just one big matrix multiply ish thing:
# p_z is (seq_length, z_dim)
# p_emit is (z_dim, x_dim)
# output is (seq_length, x_dim)
p_x = np.dot(p_z, p_emit)

In [42]:
p_x

array([[0.25, 0.25, 0.25, 0.25],
       [0.25, 0.25, 0.25, 0.25],
       [0.25, 0.25, 0.25, 0.25],
       [0.25, 0.25, 0.25, 0.25],
       [0.25, 0.25, 0.25, 0.25],
       [0.25, 0.25, 0.25, 0.25],
       [0.25, 0.25, 0.25, 0.25],
       [0.25, 0.25, 0.25, 0.25],
       [0.25, 0.25, 0.25, 0.25],
       [0.25, 0.25, 0.25, 0.25],
       [0.25, 0.25, 0.25, 0.25],
       [0.25, 0.25, 0.25, 0.25]])

In [43]:
p_z

array([[0.2, 0.2, 0.2, 0.2, 0.2],
       [0.2, 0.2, 0.2, 0.2, 0.2],
       [0.2, 0.2, 0.2, 0.2, 0.2],
       [0.2, 0.2, 0.2, 0.2, 0.2],
       [0.2, 0.2, 0.2, 0.2, 0.2],
       [0.2, 0.2, 0.2, 0.2, 0.2],
       [0.2, 0.2, 0.2, 0.2, 0.2],
       [0.2, 0.2, 0.2, 0.2, 0.2],
       [0.2, 0.2, 0.2, 0.2, 0.2],
       [0.2, 0.2, 0.2, 0.2, 0.2],
       [0.2, 0.2, 0.2, 0.2, 0.2],
       [0.2, 0.2, 0.2, 0.2, 0.2]])

In [49]:
# x is (seq_len, )
# p_x is (seq_len, x_dim)
[p_x[i, x[i]] for i in range(len(x))]


[0.25,
 0.25000000000000006,
 0.25000000000000006,
 0.25000000000000006,
 0.25000000000000006,
 0.25000000000000006,
 0.25000000000000006,
 0.25000000000000006,
 0.25000000000000006,
 0.25000000000000006,
 0.25000000000000006,
 0.25000000000000006]

In [51]:
p_x[range(len(x)), x]


array([0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25,
       0.25])

In [50]:
x

array([1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3])

In [52]:
np.prod(p_x[range(len(x)), x])

5.960464477539077e-08

In [1]:
import torch
import torch.nn as nn

# Set random seed for reproducibility
torch.manual_seed(42)

# Define dimensions for our toy example
x_dim = 3  # number of possible observations
z_dim = 2  # number of possible latent states
seq_length = 4  # length of sequence

# Create random parameters with requires_grad=True
p_emit = torch.rand((z_dim, x_dim), requires_grad=True)  # P(X|Z)
p_latent_transition = torch.rand((z_dim, z_dim), requires_grad=True)  # P(Z_{t+1}|Z_t)
initial_z = torch.rand(z_dim, requires_grad=True)  # P(Z_0)

# Normalize probabilities using softmax
p_emit = nn.functional.softmax(p_emit, dim=1)
p_latent_transition = nn.functional.softmax(p_latent_transition, dim=1)
initial_z = nn.functional.softmax(initial_z, dim=0)

# Create a random sequence of observations (one-hot encoded)
x = torch.randint(0, x_dim, (seq_length,))
x_one_hot = torch.nn.functional.one_hot(x, x_dim).float()

# Initialize arrays to store probabilities
p_z = torch.zeros((seq_length, z_dim))
p_x = torch.zeros(seq_length)

# Initial state distribution
p_z[0] = initial_z

# Forward algorithm
for i in range(seq_length):
    # Emission probability: P(X_i = x | Z_i)
    # Calculate P(X_i = x) = sum_z P(X_i = x | Z_i = z) P(Z_i = z)
    emission_probs = torch.matmul(p_z[i], p_emit)  # (x_dim,)
    p_x[i] = torch.sum(x_one_hot[i] * emission_probs)  # Probability of current observation

    if i < seq_length - 1:
        # Latent transition: P(Z_{i+1} | Z_i)
        # Calculate P(Z_{i+1} = z) = sum_z P(Z_{i+1} = z | Z_i = z) P(Z_i = z)
        p_z[i + 1] = torch.matmul(p_z[i], p_latent_transition)  # (z_dim,)

# Calculate negative log-likelihood as loss
loss = -torch.sum(torch.log(p_x))

# Backpropagate
loss.backward()

# Print results
print(f"Observation sequence: {x}")
print("\nEmission matrix P(X|Z):")
print(p_emit.detach())
print("\nTransition matrix P(Z_{t+1}|Z_t):")
print(p_latent_transition.detach())
print("\nInitial state distribution P(Z_0):")
print(initial_z.detach())
print("\nForward probabilities P(Z_t):")
print(p_z.detach())
print("\nObservation probabilities P(X_t):")
print(p_x.detach())
print("\nNegative log-likelihood:", loss.item())

# Print gradients
print("\nGradients:")
print("dL/dp_emit:", p_emit.grad)
print("dL/dp_latent_transition:", p_latent_transition.grad)
print("dL/dinitial_z:", initial_z.grad)

RuntimeError: one of the variables needed for gradient computation has been modified by an inplace operation: [torch.FloatTensor [1, 2]], which is output 0 of AsStridedBackward0, is at version 4; expected version 3 instead. Hint: enable anomaly detection to find the operation that failed to compute its gradient, with torch.autograd.set_detect_anomaly(True).

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim

class ToyHMM(nn.Module):
    def __init__(self, hidden_dim, obs_dim):
        super().__init__()
        self.log_init = nn.Parameter(torch.randn(hidden_dim))
        self.log_trans = nn.Parameter(torch.randn(hidden_dim, hidden_dim))
        self.log_emit = nn.Parameter(torch.randn(hidden_dim, obs_dim))

    def forward(self, obs_seq):
        # obs_seq is shape (seq_len,)
        seq_len = obs_seq.size(0)
        # Convert parameters to probability distributions
        init_probs = self.log_init.log_softmax(dim=0)          # (hidden_dim,)
        trans_probs = self.log_trans.log_softmax(dim=1)        # (hidden_dim, hidden_dim)
        emit_probs = self.log_emit.log_softmax(dim=1)          # (hidden_dim, obs_dim)

        alpha = init_probs + emit_probs[:, obs_seq[0]]         # shape (hidden_dim,)
        for t in range(1, seq_len):
            # broadcast alpha to (hidden_dim, hidden_dim) by unsqueezing
            alpha_next = torch.logsumexp(alpha.unsqueeze(1) + trans_probs, dim=0)
            alpha = alpha_next + emit_probs[:, obs_seq[t]]

        return torch.logsumexp(alpha, dim=0)  # log P(X)

# Create a toy dataset (batch of sequences)
# For simplicity, generate random observations in [0, obs_dim-1].
hidden_dim, obs_dim = 2, 3
seq_len, num_seqs = 5, 10
data = torch.randint(low=0, high=obs_dim, size=(num_seqs, seq_len))

model = ToyHMM(hidden_dim, obs_dim)
optimizer = optim.Adam(model.parameters(), lr=0.1)

for epoch in range(50):
    total_loss = 0
    for seq in data:
        optimizer.zero_grad()
        log_prob = model(seq)  # forward algorithm
        loss = -log_prob       # negative log-likelihood
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    if (epoch+1) % 10 == 0:
        print(f"Epoch {epoch+1}, Avg Loss: {total_loss/num_seqs:.4f}")

Epoch 10, Avg Loss: 5.1488
Epoch 20, Avg Loss: 5.1263
Epoch 30, Avg Loss: 5.1225
Epoch 40, Avg Loss: 5.1208
Epoch 50, Avg Loss: 5.1197
