# Objective

The purpose of this notebook is to test Pyro with the Asia BN, to get a better understanding of how Pyro works and see if we can use it for our experiments.

In [1]:
import time
import torch
import pyro
import pyro.distributions as dist
from pyro.infer import Importance, EmpiricalMarginal
from pyro import poutine

## 1 - Model

In [2]:

# --- CPTs wrapped into a helper ---
def make_cpts(device):
    asia_probs = torch.tensor([0.01, 0.99], device=device)
    smoke_probs = torch.tensor([0.5, 0.5], device=device)

    tub_probs = torch.tensor([
        [0.05, 0.95],
        [0.01, 0.99],
    ], device=device)

    lung_probs = torch.tensor([
        [0.1, 0.9],
        [0.01, 0.99],
    ], device=device)

    bronc_probs = torch.tensor([
        [0.6, 0.4],
        [0.3, 0.7],
    ], device=device)

    either_probs = torch.tensor([
        [[1.0, 0.0], [1.0, 0.0]],  # lung=yes
        [[1.0, 0.0], [0.0, 1.0]],  # lung=no
    ], device=device)

    xray_probs = torch.tensor([
        [0.98, 0.02],
        [0.05, 0.95],
    ], device=device)

    dysp_probs = torch.tensor([
        [[0.9, 0.1], [0.8, 0.2]],
        [[0.7, 0.3], [0.1, 0.9]],
    ], device=device)

    return asia_probs, smoke_probs, tub_probs, lung_probs, bronc_probs, either_probs, xray_probs, dysp_probs

# --- Model factory (closes over CPTs) ---
def make_model(device):
    asia_probs, smoke_probs, tub_probs, lung_probs, bronc_probs, either_probs, xray_probs, dysp_probs = make_cpts(device)

    def model():
        asia = pyro.sample("asia", dist.Categorical(asia_probs))
        smoke = pyro.sample("smoke", dist.Categorical(smoke_probs))
        tub = pyro.sample("tub", dist.Categorical(tub_probs[asia]))
        lung = pyro.sample("lung", dist.Categorical(lung_probs[smoke]))
        bronc = pyro.sample("bronc", dist.Categorical(bronc_probs[smoke]))
        either = pyro.sample("either", dist.Categorical(either_probs[lung, tub]))
        xray = pyro.sample("xray", dist.Categorical(xray_probs[either]))
        dysp = pyro.sample("dysp", dist.Categorical(dysp_probs[bronc, either]))
        return
    return model

## Inference

In order to test the inference, we are going

### Sequentially

In [3]:
def run_sequential_inference(num_samples=5000, device="cpu", query="dysp", evidence=None):
    if evidence is None:
        evidence = {}

    device = torch.device(device)
    pyro.clear_param_store()

    # Build model with CPTs on device
    model = make_model(device)

    # Convert evidence to tensors on the same device
    evidence_tensors = {k: torch.tensor(v, device=device) for k, v in evidence.items()}

    # Conditioned model
    conditioned = poutine.condition(model, data=evidence_tensors)

    # Importance sampling
    start = time.time()
    importance = Importance(conditioned, num_samples=num_samples)
    posterior = importance.run()
    elapsed = time.time() - start

    # Extract samples
    marginal = EmpiricalMarginal(posterior, sites=query)
    samples = [marginal().item() for _ in range(num_samples)]
    samples = torch.tensor(samples)

    counts = torch.bincount(samples, minlength=2).float()
    probs = counts / counts.sum()

    return probs, elapsed

In [4]:
# --- Example usage ---
probs_cpu, t_cpu_seq = run_sequential_inference(num_samples=10000, device="cpu", query="dysp", evidence={"smoke": 1})

print(f"CPU: {probs_cpu.numpy()} (time {t_cpu_seq:.3f}s)")

probs_gpu, t_gpu_seq = run_sequential_inference(num_samples=10000, device="cuda", query="dysp", evidence={"smoke": 1})

print(f"GPU: {probs_gpu.numpy()} (time {t_gpu_seq:.3f}s)")

CPU: [0.3276 0.6724] (time 10.543s)
GPU: [0.3122 0.6878] (time 30.747s)


### Vectorized (Parallel)

In [5]:
def run_vectorized_inference(num_samples=5000, device="cpu", query="dysp", evidence=None):
    """Vectorized version using pyro.plate for true GPU parallelization"""
    if evidence is None:
        evidence = {}
    
    device = torch.device(device)
    pyro.clear_param_store()
    
    # Get CPTs
    asia_probs, smoke_probs, tub_probs, lung_probs, bronc_probs, either_probs, xray_probs, dysp_probs = make_cpts(device)
    
    # Vectorized model using pyro.plate
    def vectorized_model():
        with pyro.plate("particles", num_samples):
            asia = pyro.sample("asia", dist.Categorical(asia_probs))
            smoke = pyro.sample("smoke", dist.Categorical(smoke_probs))
            tub = pyro.sample("tub", dist.Categorical(tub_probs[asia]))
            lung = pyro.sample("lung", dist.Categorical(lung_probs[smoke]))
            bronc = pyro.sample("bronc", dist.Categorical(bronc_probs[smoke]))
            either = pyro.sample("either", dist.Categorical(either_probs[lung, tub]))
            xray = pyro.sample("xray", dist.Categorical(xray_probs[either]))
            dysp = pyro.sample("dysp", dist.Categorical(dysp_probs[bronc, either]))
            return dysp
    
    # Apply evidence conditioning
    evidence_tensors = {k: torch.tensor(v, device=device).expand(num_samples) for k, v in evidence.items()}
    conditioned = poutine.condition(vectorized_model, data=evidence_tensors)
    
    # Run inference (only need 1 sample since each contains num_samples particles)
    start = time.time()
    importance = Importance(conditioned, num_samples=1)
    posterior = importance.run()
    elapsed = time.time() - start
    
    # Extract results from vectorized sampling
    traces = list(posterior.exec_traces)
    samples = traces[0].nodes[query]["value"]  # All samples in one tensor
    
    counts = torch.bincount(samples, minlength=2).float()
    probs = counts / counts.sum()
    
    return probs, elapsed

In [10]:
# Test vectorized inference - CPU vs GPU comparison
print("=== Vectorized Inference Comparison ===")

n_samples = 10000000

# CPU vectorized
probs_cpu_vec, t_cpu_vec = run_vectorized_inference(
    num_samples=n_samples, device="cpu", query="dysp", evidence={"smoke": 1}
)
print(f"CPU (vectorized) time: {t_cpu_vec:.3f}s")
print(f"CPU (vectorized) probs: {probs_cpu_vec.numpy()}")

# GPU vectorized
probs_gpu_vec, t_gpu_vec = run_vectorized_inference(
    num_samples=n_samples, device="cuda", query="dysp", evidence={"smoke": 1}
)
print(f"GPU (vectorized) time: {t_gpu_vec:.3f}s")
print(f"GPU (vectorized) probs: {probs_gpu_vec.cpu().numpy()}")

# GPU vs CPU comparison
gpu_speedup = t_cpu_vec / t_gpu_vec
print(f"GPU is {gpu_speedup:.2f}x faster than CPU with vectorization!")

# Compare sequential GPU vs vectorized GPU (only if we use the same number of samples)
# print(f"Improvement over sequential GPU: {t_gpu_seq/t_gpu_vec:.2f}x faster\n")
# print(f"Improvement over sequential CPU: {t_cpu_seq/t_cpu_vec:.2f}x faster\n")

=== Vectorized Inference Comparison ===
CPU (vectorized) time: 2.728s
CPU (vectorized) probs: [0.3191492 0.6808508]
GPU (vectorized) time: 0.024s
GPU (vectorized) probs: [0.3190166 0.6809834]
GPU is 114.83x faster than CPU with vectorization!
