In [11]:
import numpy as np
from typing import Tuple, Optional

def sample_poisson_mixture(
    n: int,
    lam: Tuple[float, float] = (2.0, 7.0),
    pi: float = 0.4,
    return_labels: bool = True,
    rng: Optional[np.random.Generator] = None
):
    """
    Draw n i.i.d. observations from a 2-component Poisson mixture:
        Z_i ~ Bernoulli(pi)           (component 1 w.p. pi, component 2 w.p. 1-pi)
        X_i | Z_i=1 ~ Poisson(lam[0])
        X_i | Z_i=0 ~ Poisson(lam[1])

    Parameters
    ----------
    n : int
        Number of observations to generate.
    lam : (lambda1, lambda2)
        Poisson rates (>0) for the two components.
    pi : float in [0,1]
        Mixing weight for component 1.
    return_labels : bool
        If True, also return the latent component indicators (0/1).
    rng : np.random.Generator
        Optional NumPy Generator for reproducibility.

    Returns
    -------
    x : np.ndarray of shape (n,)
        Generated observations.
    z : np.ndarray of shape (n,) with values in {0,1}  [only if return_labels=True]
    """
    if rng is None:
        rng = np.random.default_rng()

    lam1, lam2 = lam
    if lam1 <= 0 or lam2 <= 0:
        raise ValueError("Poisson rates must be > 0.")
    if not (0.0 <= pi <= 1.0):
        raise ValueError("pi must be in [0, 1].")

    # Sample component labels: 1 -> uses lam1, 0 -> uses lam2
    z = rng.binomial(1, pi, size = n)

    # Vectorized Poisson sampling by component
    x = np.empty(n, dtype=int)
    idx1 = (z == 1)
    idx2 = ~idx1
    x[idx1] = rng.poisson(lam1, size=idx1.sum())
    x[idx2] = rng.poisson(lam2, size=idx2.sum())

    return (x, z) if return_labels else x

if __name__ == "__main__":
    rng = np.random.default_rng(2124)
    x, z = sample_poisson_mixture(n=5, lam=(3, 8), pi=0.35, rng=rng)
    print("observations:", x)
    print("labels      :", z)


observations: [ 6 11  1 11  5]
labels      : [0 0 1 0 1]


In [6]:
import numpy as np
from typing import Optional, Tuple

def sample_poisson_hmm(
    n: int,
    lambdas: Tuple[float, ...] = (3.0, 8.0),             # emission rates for each state
    A: Optional[np.ndarray] = None,                      # KxK transition matrix
    pi0: Optional[np.ndarray] = None,                    # length-K initial distribution
    m: int = 1,                                          # number of independent sequences
    return_states: bool = True,
    rng: Optional[np.random.Generator] = None,
):
    """
    Sample observations from a K-state HMM with Poisson emissions.

    States: Z_t in {0,...,K-1}
      Z_0 ~ Categorical(pi0)
      Z_t | Z_{t-1}=i ~ Categorical(A[i])
      X_t | Z_t=k ~ Poisson(lambdas[k])

    Parameters
    ----------
    n : int
        Length of each sequence.
    lambdas : tuple of float
        Poisson rates (>0) for each state (defines K=len(lambdas)).
    A : (K,K) ndarray
        Row-stochastic transition matrix. If None, uses a sticky default.
    pi0 : (K,) ndarray
        Initial distribution. If None, uses the stationary dist of A (or uniform if A=None).
    m : int
        Number of independent sequences to generate (returned stacked on axis 0).
    return_states : bool
        If True, also return the latent states.
    rng : np.random.Generator
        Optional NumPy Generator for reproducibility.

    Returns
    -------
    X : ndarray, shape (m, n)
        Observations (integers).
    Z : ndarray, shape (m, n)
        Hidden states (only if return_states=True).
    """
    if rng is None:
        rng = np.random.default_rng()

    lambdas = np.asarray(lambdas, dtype=float)
    if np.any(lambdas <= 0):
        raise ValueError("All Poisson rates must be > 0.")
    K = lambdas.size

    if A is None:
        # Slightly sticky chain by default
        A = np.full((K, K), 1.0 / (K + 9))
        np.fill_diagonal(A, 10.0 / (K + 9))
    A = np.asarray(A, dtype=float)
    if A.shape != (K, K):
        raise ValueError("A must be KxK where K=len(lambdas).")
    if not np.allclose(A.sum(axis=1), 1.0):
        raise ValueError("Each row of A must sum to 1.")

    if pi0 is None:
        # Try stationary distribution; fall back to uniform if singular
        try:
            w, v = np.linalg.eig(A.T)
            i = np.argmin(np.abs(w - 1.0))
            pi0 = np.real(v[:, i])
            pi0 = pi0 / pi0.sum()
            pi0 = np.clip(pi0, 0, None)
            pi0 = pi0 / pi0.sum()
            if not np.isfinite(pi0).all() or np.any(pi0 <= 0):
                raise RuntimeError
        except Exception:
            pi0 = np.full(K, 1.0 / K)
    else:
        pi0 = np.asarray(pi0, dtype=float)
        if pi0.shape != (K,) or not np.isclose(pi0.sum(), 1.0):
            raise ValueError("pi0 must be length-K and sum to 1.")

    X = np.empty((m, n), dtype=int)
    Z = np.empty((m, n), dtype=int)

    for s in range(m):
        # initial state
        Z[s, 0] = rng.choice(K, p=pi0)
        X[s, 0] = rng.poisson(lambdas[Z[s, 0]])
        # forward simulate
        for t in range(1, n):
            Z[s, t] = rng.choice(K, p=A[Z[s, t-1]])
            X[s, t] = rng.poisson(lambdas[Z[s, t]])

    return (X, Z) if return_states else X

if __name__ == "__main__":
    rng = np.random.default_rng(123)
    # 2-state example
    A = np.array([[0.92, 0.08],
                  [0.06, 0.94]])
    X, Z = sample_poisson_hmm(
        n=5, lambdas=(2.5, 9.0), A=A, pi0=np.array([0.5, 0.5]), rng=rng
    )
    print("obs:", X[0])
    print("st :", Z[0])


obs: [ 6 12  9 10  3]
st : [1 1 1 1 1]


In [12]:
import numpy as np
from scipy.stats import poisson

# ---- Mixture model (reusing previous function) ----
def sample_poisson_mixture(n, lam=(2, 7), pi=0.4, rng=None):
    if rng is None:
        rng = np.random.default_rng()
    lam1, lam2 = lam
    z = rng.binomial(1, pi, size=n)
    x = np.where(z == 1, rng.poisson(lam1, size=n), rng.poisson(lam2, size=n))
    return x, z

# ---- Likelihood function ----
def mixture_likelihood(x, lam=(2, 7), pi=0.4):
    """
    Computes the likelihood of observed data under a 2-component Poisson mixture.
    L = ∏ [ π·Pois(x_i | λ1) + (1−π)·Pois(x_i | λ2) ]
    """
    lam1, lam2 = lam
    pois1 = poisson.pmf(x, lam1)
    pois2 = poisson.pmf(x, lam2)
    mixture_pdf = pi * pois1 + (1 - pi) * pois2
    L = np.prod(mixture_pdf)
    logL = np.sum(np.log(mixture_pdf))
    return L, logL

# ---- Step 3 ----
rng = np.random.default_rng(1025)
x, z = sample_poisson_mixture(n=3, lam=(2, 7), pi=0.4, rng=rng)

print("Observed data (x):", x)
print("Hidden labels (z):", z)

L, logL = mixture_likelihood(x, lam=(2, 7), pi=0.4)
print(f"Likelihood: {L:.6e}")
print(f"Log-likelihood: {logL:.3f}")


Observed data (x): [1 2 7]
Hidden labels (z): [1 1 0]
Likelihood: 1.238128e-03
Log-likelihood: -6.694


In [14]:
import numpy as np

# ---------- data generator (same as before) ----------
def sample_poisson_mixture(n, lam=(3.0, 8.0), pi=0.35, rng=None):
    if rng is None: rng = np.random.default_rng()
    z = rng.binomial(1, pi, size=n)            # 1 -> uses lam[0], 0 -> lam[1]
    x = np.where(z==1, rng.poisson(lam[0], n), rng.poisson(lam[1], n))
    return x, z

# ---------- EM for 2-Poisson mixture ----------
def _log_pois(x, lam):
    # log Poisson pmf up to constant: x*log(lam) - lam - log(x!)
    # we need the full log pmf for responsibilities, so use gammaln.
    from math import log
    from scipy.special import gammaln
    x = np.asarray(x, dtype=float)
    lam = float(lam)
    return x*np.log(lam) - lam - gammaln(x+1)

def fit_poisson_mixture_EM(
    x, n_init=10, tol=1e-6, max_iter=500, rng=None, min_rate=1e-6
):
    """
    MLE for a 2-component Poisson mixture using EM.
    Returns a dict with params, loglik, responsibilities, etc.
    """
    if rng is None: rng = np.random.default_rng()
    x = np.asarray(x, dtype=int)
    n = x.size

    def em_once(pi, lam1, lam2):
        eps = 1e-12
        lam1 = max(lam1, min_rate)
        lam2 = max(lam2, min_rate)
        pi = np.clip(pi, eps, 1-eps)

        loglik_prev = -np.inf
        for it in range(1, max_iter+1):
            # E-step: responsibilities gamma_i = P(Z=1 | x_i)
            log_num1 = np.log(pi)   + _log_pois(x, lam1)
            log_num2 = np.log(1-pi) + _log_pois(x, lam2)
            # log-sum-exp
            m = np.maximum(log_num1, log_num2)
            log_den = m + np.log(np.exp(log_num1-m) + np.exp(log_num2-m))
            gamma = np.exp(log_num1 - log_den)

            # M-step
            sum_g  = gamma.sum()
            sum_gx = (gamma * x).sum()
            sum_ng  = n - sum_g
            sum_ng_x = x.sum() - sum_gx

            pi  = np.clip(sum_g / n, eps, 1-eps)
            lam1 = max(sum_gx / max(sum_g, eps), min_rate)
            lam2 = max(sum_ng_x / max(sum_ng, eps), min_rate)

            loglik = log_den.sum()
            if abs(loglik - loglik_prev) < tol * (1 + abs(loglik_prev)):
                break
            loglik_prev = loglik

        # sort components by increasing lambda for identifiability
        if lam1 > lam2:
            lam1, lam2 = lam2, lam1
            gamma = 1 - gamma
            pi = 1 - pi
        return {
            "pi": float(pi),
            "lam": (float(lam1), float(lam2)),
            "loglik": float(loglik),
            "responsibilities": gamma,
            "iterations": it
        }

    # Multiple random initializations to avoid local optima
    best = None
    xm = x.mean()
    for _ in range(n_init):
        # random but sensible starts
        lam1_0 = max(min_rate, rng.uniform(0.3, 0.9) * max(0.5, xm))
        lam2_0 = max(min_rate, rng.uniform(1.1, 2.0) * max(1.0, xm))
        if lam1_0 > lam2_0: lam1_0, lam2_0 = lam2_0, lam1_0
        pi_0   = rng.uniform(0.2, 0.8)
        cand = em_once(pi_0, lam1_0, lam2_0)
        if (best is None) or (cand["loglik"] > best["loglik"]):
            best = cand

    # add hard labels for convenience
    best["z_hat"] = (best["responsibilities"] >= 0.5).astype(int)
    return best

# ---------- example: generate n=1000 then fit ----------
if __name__ == "__main__":
    rng = np.random.default_rng(1025)
    # true params (choose your own)
    lam_true = (3.0, 8.0)
    pi_true  = 0.35
    x, _ = sample_poisson_mixture(n=1000, lam=lam_true, pi=pi_true, rng=rng)

    fit = fit_poisson_mixture_EM(x, n_init=15, rng=rng)
    print("Estimated pi      :", round(fit["pi"], 4))
    print("Estimated lambdas :", tuple(round(v, 4) for v in fit["lam"]))
    print("Log-likelihood    :", round(fit["loglik"], 2))
    print("EM iterations     :", fit["iterations"])


Estimated pi      : 0.3829
Estimated lambdas : (3.0644, 8.0286)
Log-likelihood    : -2602.45
EM iterations     : 28
