# 01 – Model Equations & Core Functions

We implement the mathematical core of the **Mixed Hidden Markov Model (mHMM)**
described in the paper:

> *Handling underlying discrete variables with bivariate mixed hidden Markov models in NONMEM.*

We’ll implement:

1. **Emission models** – the observed data (FEV1 i.e Forced Expiratory Volume and PRO i.e Patient-reported outcomes) conditional on hidden states.  
2. **Transition probabilities** – parameterized state-switching dynamics.  
3. **Forward algorithm** – recursive computation of the likelihood.  
4. **Viterbi algorithm** – decoding most probable state sequences.

Later notebooks (`02_simulator.ipynb` and `03_estimation.ipynb`) will import these functions.


In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from math import log, exp
from scipy.stats import multivariate_normal 

In [2]:
#Logistic and Logit transformations

def logisitic(x):
    """Logisitic transformation"""
    return 1 / (1 + np.exp(-x))

def logit(p):  #logit is the inverse of logistic
    """Logit transformation"""
    return np.log(p / (1 - p))

def logsumexp(log_probs):  
    """Stable computation of log-sum-exp"""
    a_max = np.max(log_probs)
    return a_max + np.log(np.sum(np.exp(log_probs - a_max))) 

We then build a modular bivariate Gaussian emission model for FEV1 and PRO given the hidden state:

In [3]:
EPS = 1e-12  #small constant to avoid log(0)

def var_to_std(var):
    """Convert variance to standard deviation"""
    return np.sqrt(np.maximum(var, 0.0))

class EmissionModel:
    """Implement the emission probability P(Y_t | S_t = s)
    where Y_t = (FEV1_t, PRO_t) are the observed data at time t
    and S_t = s is the hidden state at time t.

    Modes (h*) used to build individual-level FEV1/PRO values.
    IIV random effects g ~ Normal(0, x2_*)
    Residual variances r2FEV1, r2PRO used directly in covariance
    correlation q (state-specific) used to form covariance
    """

    def __init__(self,
                 hFEV1R, hFEV1E,
                 x2_FEV1R=0.03, x2_FEV1E=0.03,
                 hPROR=2.5, hPROE=0.5,
                 x2_PROR=0.09, x2_PROE=0.09,
                 r2_FEV1=0.015, r2_PRO=0.05,
                 qR=-0.33, qE=-0.33,
                 PE=0.2, PHL=10.0):

        # population mode params
        self.hFEV1R = float(hFEV1R)
        self.hFEV1E = float(hFEV1E)
        self.hPROR = float(hPROR)
        self.hPROE = float(hPROE)

        # IIV variances
        self.x2_FEV1R = float(x2_FEV1R)
        self.x2_FEV1E = float(x2_FEV1E)
        self.x2_PROR = float(x2_PROR)
        self.x2_PROE = float(x2_PROE)

        # residual variances
        self.r2_FEV1 = float(r2_FEV1)
        self.r2_PRO = float(r2_PRO)

        # correlations per state
        self.qR = float(qR)
        self.qE = float(qE)

        # placebo effect params for PRO
        self.PE = float(PE)
        self.PHL = float(PHL)  # halflife taken to be 10 weeks

    def sample_individual_effects(self, rng=None):
        """Sample one set of individual random effects g* (mean 0, var = x2_*)"""
        rng = np.random.default_rng(rng)
        g = {
            "gFEV1R": rng.normal(0.0, np.sqrt(self.x2_FEV1R)),
            "gFEV1E": rng.normal(0.0, np.sqrt(self.x2_FEV1E)),
            "gPROR": rng.normal(0.0, np.sqrt(self.x2_PROR)),
            "gPROE": rng.normal(0.0, np.sqrt(self.x2_PROE)),
        }
        return g

    def individual_fev1(self, g, state):
        """
        Compute individuals's latent FEV1 for given state as in Eq. 1 & 2
        """
        FEV1_R = self.hFEV1R * np.exp(g["gFEV1R"])
        if state == 0:
            return FEV1_R
        else:
            #rem for eq 2: FEV1_E = FEV1_R - hFEV1E * exp(g["gFEV1E"])
            return FEV1_R - self.hFEV1E * np.exp(g["gFEV1E"]) 
        
    def individual_pro(self, g, time, state):
        """
        Compute individual's latent PRO for given state and time as in Eq. 3 & 4, including placebo effect half-life(PHL)
        """
       
        tfactor = 1.0 - self.PE * (1.0 - np.exp(-np.log(2.0) * time / self.PHL)) #time dependent placebo effect
        PRO_R = (self.hPROR + g["gPROR"]) * tfactor

        if state == 0:
            return PRO_R 
        else:
            #rem for eq 4: PRO_E = PRO_R + hPROE + g["gPROE"]
            return PRO_R + self.hPROE + g["gPROE"]
        
    
    def emission_cov(self, state):
        """
        Compute the emission covariance matrix (2x2) for given state
        """
        q = self.qR if state ==0 else self.qE
        covxy = q * np.sqrt(self.r2_FEV1 * self.r2_PRO)
        cov = np.array([[self.r2_FEV1, covxy],
                        [covxy, self.r2_PRO]])  #2x2 covariance matrix
        return cov
    

    def logpdf(self, y, g, time, state):
        """
        Compute the log probability density function (pdf) of observing y=(FEV1, PRO)
        given individual effects g, time, and state.
        """
        mu_FEV1 = self.individual_fev1(g, state)
        mu_PRO = self.individual_pro(g, time, state)
        mu = np.array([mu_FEV1, mu_PRO])  #mean vector

        cov = self.emission_cov(state)  #covariance matrix

        logpdf = multivariate_normal.logpdf(y, mean=mu, cov=cov + np.eye(2)*EPS)
        return logpdf
        

In [4]:
# Instantiate emission model (example reference scenario)
em = EmissionModel(hFEV1R=3.0, hFEV1E=0.5)

# Draw one subject's random effects
g = em.sample_individual_effects()

# Simulate example observation
time = 5  # weeks
state = 0  # remission
y_obs = np.array([3.2, 2.1])

log_prob = em.logpdf(y_obs, g, time, state)
print(f"Example log-probability (state={state}): {log_prob:.3f}")


Example log-probability (state=0): -7.247


Transition Probabilities (Eqs 7-10)

In [9]:
class TransitionModel:

    """
    Implement the transition probabilities between Reference and Exarcebation states."""
    
    def __init__(self, hpRE, gpRE, hpER, gpER, trt=0, slp=0):

        """
        Initialize transition model with parameters:
        hpRE: baseline logit prob of R->E
        gpRE: IIV random effect/covariate coefficient for R->E
        hPER: baseline logit prob of E->R
        gpER: IIV random effect variance for E->R
        trt, slp: treatment/slope effect covariates
        """
        self.hpRE = float(hpRE)
        self.gpRE = float(gpRE)
        self.hpER = float(hpER)
        self.gpER = float(gpER)
        self.trt = float(trt)
        self.slp = float(slp)

    def transition_matrix(self):
        """
        Compute the 2x2 transition probability matrix:
        P = [[P(R->R), P(R->E)],
             [P(E->R), P(E->E)]]
        using logistic transformations.
        """
        logit_pRE = self.hpRE + self.gpRE - (self.trt * self.slp) #remission to exarcebation
        logit_pER = self.hpER + self.gpER + (self.slp * self.trt) #exarcebation to remission increases with drug effect

        pRE = logisitic(logit_pRE)
        pER = logisitic(logit_pER)

        P = np.array([[1 - pRE, pRE],
                      [pER, 1 - pER]])
        return P

In [10]:
#test cell
tm = TransitionModel(hpRE=0.1, gpRE=0.2, hpER=0.3, gpER=-0.1, trt=1, slp=0.5)
print("Transition matrix:\n", tm.transition_matrix()) 

Transition matrix:
 [[0.549834   0.450166  ]
 [0.66818777 0.33181223]]


Implement Forward Algorithm to calculate total Likelihood (Lj) by summing all the probabilities of each state at each position

=> α0​(i)=P(S0​=i)×P(O0​∣S0​=i) (initialize)

=> α^0​(i)=∑j​α0​(j)α0​(i)​  (scale)

In [11]:
def forward_algorithm(obs_seq, times, init_probs, trans_mat, emission_model, g):
    """
    Paramters:
    obs_seq: array of observed data (FEV1, PRO) at each time point
    times: array of time points corresponding to obs_seq (for placebo-time effect)
    init_probs: initial state probabilities (array of length 2)
    trans_mat: 2x2 transition probability matrix
    emission_model: instance of EmissionModel to compute emission logpdf
    g: dictionary of individual random effects
    """
    T = len(obs_seq)
    n_states = len(init_probs)
    alpha = np.zeros((T, n_states))
    logL = 0.0

    #Initialization
    for i in range(n_states):  #compute initial forward proba for each possible hidden state i 
        alpha[0, i] = init_probs[i] * np.exp(emission_model.logpdf(obs_seq[0], g, times[0], i)) 
    scale = np.sum(alpha[0, :]) #
    alpha[0, :] /= scale #scaling to prevent underflow
    logL += np.log(scale + EPS)  #add log scale to total log likelihood

    #Recursion
    for t in range(1, T):
        for j in range(n_states):
            emiss = np.exp(emission_model.logpdf(obs_seq[t], g, times[t], j))
            alpha[t, j] = emiss * np.sum(alpha[t-1, :] * trans_mat[:, j])
        scale = np.sum(alpha[t, :])
        alpha[t, :] /= scale
        logL += np.log(scale + EPS)

    return logL #return total log likelihood 

Viterbi Algorithm

In [16]:
def viterbi(obs_seq, init_probs, trans_mat, emission_model, g, times):
    """
    COmpute most probable hidden state sequence. The most probable
    sequence is obtained when the likelihood of a sequence ceases to increase
    """
    T = len(obs_seq)
    n_states = len(init_probs)
    delta = np.zeros((T, n_states))
    psi = np.zeros((T, n_states), dtype=int)

    # Initialization
    for i in range(n_states):
        delta[0, i] = np.log(init_probs[i] + EPS) + emission_model.logpdf(obs_seq[0], g, times[0], i)
        

    # Recursion
    for t in range(1, T):
        for j in range(n_states):
            seq_probs = delta[t-1, :] + np.log(trans_mat[:, j] + EPS)
            psi[t, j] = np.argmax(seq_probs)
            delta[t, j] = np.max(seq_probs) + emission_model.logpdf(obs_seq[t], g, times[t], j)
    
    #Backtracking to find most probable state sequence
    states = np.zeros(T, dtype=int)
    states[-1] = np.argmax(delta[-1, :])
    for t in reversed(range(T-1)):
        states[t] = psi[t+1, states[t+1]]
    return states

In [17]:
#SANITY CHECK

T = 20
obs_seq = np.random.normal(size=(T, 2))  #random observations
init_probs = np.array ([0.7, 0.3]) #initial state probabilities
trans_mat = tm.transition_matrix() 

g = em.sample_individual_effects()
times = np.arange(len(obs_seq)) 
v_states = viterbi(obs_seq, init_probs, trans_mat, em, g, times)
print("Decoded Viterbi states:\n", v_states)

TypeError: EmissionModel.logpdf() missing 2 required positional arguments: 'time' and 'state'

## Summary of our notebook:

We have implemented:
- Logistic / logit helpers  
- Emission model (Eqs. 1–6)  
- Transition probabilities (Eqs. 7–10)  
- Forward algorithm (Eq. 11)  
- Viterbi decoding  

Next:  
we will use **`02_simulator.ipynb`** generate synthetic FEV1–PRO trajectories using these components.
