# 2: Small Worlds and Large Worlds

In [None]:
#black blackcellmagic proplot

In [None]:
import numpy as np
import scipy.stats as st
import matplotlib.pyplot as plt

import torch
from torch import tensor
import pyro
from pyro.distributions import (
    Bernoulli,
    Beta,
    Binomial,
    Categorical,
    Normal,
    Uniform,
)

### Code 2.1

Normalize a list of counts to produce a probability distribution

In [None]:
ways = np.array([0, 3, 8, 9, 0])
print(ways/sum(ways))

### Code 2.2

Compute the likelihood of getting six successes in nine trials with a success probability of 50% using a binomial distribution.

In [None]:
# Using scipy's stats module
st.binom(n=9, p=0.5).pmf(6)

In [None]:
# Using pyro's distributions
#tt(6.)
#torch.tensor(3.14159)
Binomial(total_count=9, probs=0.5).log_prob(tensor(6.)).exp().item()

### Code 2.3

Using the "grid approximation" to condition models

In [None]:
def calc_posterior(n_points=20):
    # Define the grid
    p_grid = np.linspace(0, 1, n_points)
    # Define uniform prior (un-normalized)
    prior = np.ones(n_points)
    # Compute likelihood at each point
    likelihood = st.binom(n=9, p=p_grid).pmf(6)
    # Compute product of likelihood and prior
    posterior = likelihood * prior
    # Normalize the posterior
    posterior /= sum(posterior)
    return p_grid, posterior

### Code 2.4
Display the results of the above computation

In [None]:
fig, axes = plt.subplots(ncols=2, figsize=(10, 4))
for n, ax in zip((5, 20), axes):
    p_grid, posterior = calc_posterior(n)
    ax.plot(p_grid, posterior, marker="o")
    ax.set_xlabel("probability of water")
    ax.set_ylabel("posterior probability")
    ax.set_title(f"{n} points")
plt.subplots_adjust(wspace=0.3)
plt.show()

### Code 2.5
Do the same thing as above, but for some different priors

In [None]:
def calc_posterior(prior):
    # Define the grid
    n_points = len(prior)
    p_grid = np.linspace(0, 1, n_points)
    # Compute likelihood at each point
    likelihood = st.binom(n=9, p=p_grid).pmf(6)
    # Compute product of likelihood and prior
    posterior = likelihood * prior
    # Normalize the posterior
    posterior /= sum(posterior)
    return p_grid, posterior

In [None]:
fig, axes = plt.subplots(ncols=2, figsize=(10, 4))
n_points = 100
p_grid = np.linspace(0, 1, n_points)
priors = np.array([
    [0 if p < 0.5 else 1 for p in p_grid], # step function
    np.exp(-5*np.abs(p_grid - 0.5)), # Laplace prior
])
for prior, ax, label in zip(priors, axes, ("step", "Laplace")):
    p_grid, posterior = calc_posterior(prior)
    ax.plot(p_grid, posterior)
    ax.set_xlabel("probability of water")
    ax.set_ylabel("posterior probability")
    ax.set_title(f"{label} prior")
plt.subplots_adjust(wspace=0.3)
plt.show()

### Code 2.6

Compute the quadratic approximation (the posterior is approximated as a Gaussian centered on the MAP estimate) for the globe model:
$$
p \sim \text{Uniform}(0, 1) \\
W \sim \text{Binomial}(W+L, p) \\
\text{data: } W=6, L=3
$$
McElreath uses some black box function `quap` to perform the calculation. Since no equivalent function to `quap` exists in `pyro` (that I know of), we're just gonna have to roll our own. First we compute the MAP estimate for $p$ using vanilla gradient descent.

In [None]:
# First, we compute the MAP estimate
pyro.clear_param_store()
# Specify the model
def model(W, L):
    N = tensor(W+L)
    # p is an optimizable parameter instead of a sampled value
    p = pyro.param("p", Uniform(0., 1.).sample())
    # Condition on the observed value of W
    pyro.sample("W", Binomial(N, p), obs=tensor(W))

# Trace the model so that we can inspect it when it runs
traced_model = pyro.poutine.trace(model)
vals = []
# Use vanilla gradient descent to maximize the likelihood
for _ in range(1000):
    tr = traced_model.get_trace(6., 3.)
    p = tr.nodes["p"]["value"]
    log_L = tr.log_prob_sum() # log likelihood
    log_L.backward() # Differentiate likelihood wrt parameters (p)
    p.data += 1e-3*p.grad # gradient update
    p.grad.zero_()
    vals.append(p.item())
print(f"MAP estimate: p={vals[-1]:.4f}")

In [None]:
plt.plot(vals)
plt.xscale('log')
plt.title("MAP estimate for $p$")
plt.xlabel("Steps")
plt.ylabel("$p$")
plt.show()

Then we compute the "curvature" at the peak in order to estimate the standard deviation. The reason why becomes apparent if you look at the log of the normal distribution:
$$
N(x;\mu,\sigma) \propto \exp\left(-(x-\mu)^2/2\sigma^2\right) \\
\log N(x; \mu,\sigma) = f(x) = -(x-\mu)^2/2\sigma^2 + C
$$
we can see that if we expand $f(x)$ around $x=\mu$ to second order, we recover the equation itself, because $f(x)$ is a parabola with
$$
f(\mu) = C,\quad f^\prime(\mu) = 0,\quad f^{\prime\prime}(\mu) = -1/2\sigma^2
$$
so that the relationship between the second derivative and $\sigma$ is
$$
\sigma = 1/\sqrt{-f^{\prime\prime}(\mu)}
$$

In [None]:
# Compute the "curvature" (second derivative) of the log
# likelihood wrt p
tr = traced_model.get_trace(6., 3.)
p = tr.nodes["p"]["value"]
log_L = tr.log_prob_sum()
# Compute the second derivative of the log likelihood wrt p
g = torch.autograd.grad(log_L, p, create_graph=True)[0]
g2 = torch.autograd.grad(g, p)[0]
print(f"2nd derivative is {g2.item():.3f}")
stddev = 1/torch.sqrt(-g2)
print(f"Which means that the stddev is {stddev.item():.3f}")

### Code 2.7
Do the same thing, but analytically, using a beta distribution for $p$ (beta is the conjugate prior to the binomial distribution, and generalizes the uniform distribution)

In [None]:
x = torch.linspace(0.001, 0.999, 100)
y1 = Beta(6.+1, 3.+1).log_prob(x).exp()
y2 = Normal(p, stddev).log_prob(x).exp().detach()
plt.plot(x, y1, label="exact")
plt.plot(x, y2, label="Gaussian approx.")
plt.legend()
plt.show()

### Code 2.8
Use vanilla MCMC to infer the posterior distribution of $p$ for the globe model

In [None]:
n_samples = 10000
p = list()
p.append(tensor(0.5))
W = tensor(6.)
L = tensor(3.)
for i in range(1, n_samples):
    # Take a small step away from the previous sample
    p_new = Normal(p[i-1], 0.1).sample()
    # If the step is outside [0, 1], squish it back in there
    if p_new < 0:
        p_new = abs(p_new)
    if p_new > 1:
        p_new = 2 - p_new
    # Get the likelihood of the previous/current step
    q0 = Binomial(W+L, p[i-1]).log_prob(W).exp()
    q1 = Binomial(W+L, p_new).log_prob(W).exp()
    # Reject the new sample if the likelihood ratio isn't high enough
    if Uniform(0, 1).sample() < q1/q0:
        p.append(p_new)
    else:
        p.append(p[i-1])

### Code 2.9
Plot the results of the MCMC simulation and compare with the analytical posterior

In [None]:
plt.hist(p, bins=100, label="MCMC", density=True)
x = torch.linspace(0, 1, 100)
y = Beta(W+1, L+1).log_prob(x).exp()
plt.plot(x, y, label="analytic")
plt.legend()
plt.show()

## Practice problems

### 2E1
Which of the expressions below correspond to the statement "the probability of rain on Monday"?
1. Pr(rain)
2. Pr(rain|Monday)
3. Pr(Monday|rain)
4. Pr(rain,Monday)/Pr(Monday)

Answer: the probability of rain on Monday can be rephrased as the probability of rain, given that it is Monday, so the answer is (2). However, because of Bayes rule, (4) is also mathematically equivalent to (2), so they are both right.

### 2E2
Which of the following statements corresponds to the expression: Pr(Monday|rain)?
1. The probability of rain on Monday
2. The probability of rain, given that it is Monday
3. The probability that it is Monday, given that it is raining
4. The probability that it is Monday and that it is raining

Answer: That expression literally translates into (3). It is not (4) because that's the _joint_ probability Pr(Monday, rain)

### 2E3
Which of the expressions below correspond to the statement "the probability that it is Monday, given that it is raining"?

1. Pr(Monday|rain)
2. Pr(rain|Monday)
3. Pr(rain|Monday) Pr(Monday)
4. Pr(rain|Monday) Pr(Monday) / Pr(rain)
5. Pr(Monday|rain) P(rain) / Pr(Monday)

Answer: So first off, (3) is equivalent to the joint distribution Pr(rain, Monday), and (4)\~(1) and (5)\~(2) by Bayes' rule. The statement of interest is equivalent to Pr(Monday|rain), which is then equal to choices (1) and (4)

### 2E4
The "probability of water is 0.7" in the context of the globe tossing model means that the probability of your thumb landing on water after a toss is 0.7. Assuming that your thumb is equally likely to land anywhere on the surface, this means that the fraction of the surface of the globe that is covered in water is then 0.7.

### 2M1
Compute and plot the grid approximate posterior distribution for each of the following sets of observations. In each case, assume a uniform prior for $p$:
1. W, W, W (W=3, L=0)
2. W, W, W, L (W=3, L=1)
3. L, W, W, L, W, W, W (W=5, L=2)
4. L, W, L, L, W, W, W, L, W, W (W=6, L=4) 

In [None]:
def calc_posterior(W, L):
    p_grid = np.linspace(0, 1, 100)
    prior = np.ones(p_grid.shape)
    L = st.binom(n=W+L, p=p_grid).pmf(W)
    posterior = L * prior
    posterior /= sum(posterior)
    return p_grid, posterior

In [None]:
for W, L in ((3, 0), (3, 1), (5, 2)):
    x, y = calc_posterior(W, L)
    plt.plot(x, y, label=f"W={W}, L={L}")
plt.xlabel("$p$")
plt.ylabel("pdf")
plt.title("Posterior probability")
plt.legend()
plt.show()

In [1]:
import math
import scipy.stats as st
import numpy as np
import matplotlib
matplotlib.use('ps')
from matplotlib import rc

rc('text',usetex=True)
rc('text.latex', preamble=r'\usepackage{color}')

import matplotlib.pyplot as plt

from matplotlib.animation import FuncAnimation

def calc_posterior(W, L):
    p_grid = np.linspace(0, 1, 100)
    #prior = np.ones(p_grid.shape)
    L = st.binom(n=W+L, p=p_grid).pmf(W)
    posterior = L #* prior
    posterior /= sum(posterior)
    return p_grid, posterior

obs = ['L', 'W', 'L', 'L', 'W', 'W', 'W', 'L', 'W', 'W']
data = []
for i in range(len(obs)):
    curr_obs = obs[0:i+1]
    W = curr_obs.count('W')
    L = curr_obs.count('L')
    data.append((W,L))
    #print(data)

class UpdateDist:
    def __init__(self, ax, prob=0.5):
        self.idx = 0
        self.prob = prob
        self.ax = ax
        #self.line, = ax.plot([], [], 'k-')
        self.lines = [self.ax.plot([], [], 'k--')[0], self.ax.plot([], [], 'r-')[0] ]    
        self.text = self.ax.text(0.9, 0.05, '', ha='right', va='top', fontsize=18)
        self.x = np.linspace(0, 1, 100)
        self.num_frames = 50
        # Set up plot parameters
        self.ax.set_xlim(0, 1)
        self.ax.set_ylim(0, 0.06)
        self.ax.grid(True)

        # This vertical line represents the theoretical value, to
        # which the plotted distribution should converge.
        self.ax.axvline(prob, linestyle='--', color='black')

    def __call__(self, i):
        global data, obs
        # This way the plot can continuously run and we just keep
        # watching new realizations of the process
        print(self.idx, i/(i%self.num_frames))
        if i == 0:
            for line in self.lines:
                line.set_data([], [])
            self.x_prev = np.linspace(0, 1, 100)
            self.y_prev = np.ones(self.x_prev.shape)
            self.y_prev = self.y_prev / np.linalg.norm(self.y_prev, 1)
        
        
        W, L = data[self.idx]
        x, y = calc_posterior(W, L)
        
        #self.lines[0].set_data(self.x_prev, self.y_prev)
        alpha = (i%self.num_frames) / self.num_frames
        y_alpha = (alpha*y) + (1-alpha)*self.y_prev
        self.lines[1].set_data(x, y_alpha)
        self.text.set_text(f"{obs[self.idx]} W={W} L={L} N={self.idx +1}")
        if (i%self.num_frames) == 0 and i!=0:
            self.x_prev, self.y_prev = x, y_alpha
            self.idx = self.idx  + 1
        if (i+1)%(self.num_frames) == 0:
            self.x_prev, self.y_prev = x, y_alpha
        return self.lines

# Fixing random state for reproducibility
np.random.seed(19680801)


fig, ax = plt.subplots()
ud = UpdateDist(ax, prob=0.7)
anim = FuncAnimation(fig, ud, frames=len(data)*ud.num_frames, interval=100, blit=True)
anim.save('test.gif', fps=10)
#plt.show()

MovieWriter ffmpeg unavailable; using Pillow instead.


IndexError: list index out of range

In [45]:
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

# Function 1
def function1(x):
    return np.sin(x)

# Function 2
def function2(x):
    return np.cos(x)

# Intermediary functions for visualization
def intermediate_function(alpha, x):
    return (1 - alpha) * function1(x) + alpha * function2(x)

def plot_transition(alpha, x, y1, y2, i):
    y_intermediate = intermediate_function(alpha, x)
    plt.figure(figsize=(8, 5))
    plt.plot(x, y1, label="Function 1 (sin(x))", color='blue')
    plt.plot(x, y2, label="Function 2 (cos(x))", color='red')
    plt.plot(x, y_intermediate, label=f"Intermediate (alpha = {alpha:.2f})", color='green', linestyle='dashed')
    plt.legend()
    plt.xlabel('x')
    plt.ylabel('y')
    plt.title(f"Transition between Function 1 and Function 2 (alpha = {alpha:.2f})")
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(f'./frame/frame_{int(i)}.png')
    plt.close()

# Generate GIF
def generate_gif():
    x = np.linspace(0, 2 * np.pi, 100)
    y1 = function1(x)
    y2 = function2(x)

    num_frames = 50
    for i in range(num_frames):
        alpha = i / (num_frames)
        plot_transition(alpha, x, y1, y2, i)

    images = [Image.open(f'./frame/frame_{i}.png') for i in range(num_frames)]
    images[0].save('function_transition.gif', save_all=True, append_images=images[1:], duration=100, loop=0)

if __name__ == "__main__":
    generate_gif()


### 2M2
Same as 2M1, but assume a step prior that is zero for $p < 0.5$ and constant otherwise.

In [None]:
def calc_posterior(W, L):
    p_grid = np.linspace(0, 1, 100)
    prior = np.ones(p_grid.shape)
    prior[p_grid<0.5] = 0
    L = st.binom(n=W+L, p=p_grid).pmf(W)
    posterior = L * prior
    posterior /= sum(posterior)
    return p_grid, posterior

In [None]:
for W, L in ((3, 0), (3, 1), (5, 2)):
    x, y = calc_posterior(W, L)
    plt.plot(x, y, label=f"W={W}, L={L}")
plt.xlabel("$p$")
plt.ylabel("pdf")
plt.title("Posterior probability")
plt.legend()
plt.show()

### 2M3
Suppose you have two globes, one for Earth and one for Mars. Earth has fraction 0.7 water, but Mars is 100% land. Show that the posterior probability that the globe is Earth, conditional on seeing land, is 0.23.

Ok, I'm going to do this a couple ways. First, the easy numerical simulation method. We compute Pr(Earth|land) by simulating a bunch tosses, tossing out any that don't result in land, and see what fraction of them are Earth

In [None]:
def simulation(n_samples):
    # sample a planet with equal probability
    planets = np.random.choice(["earth", "mars"], n_samples)
    # toss it and see where it lands
    p = np.random.rand(n_samples)
    terrain = np.empty(n_samples, dtype=str)
    for planet, w_prob in zip(("earth", "mars"), (0.7, 0)):
        mask = (planets == planet)
        terrain[mask & (p < w_prob)] = "W"
        terrain[mask & (p >= w_prob)] = "L"
    return planets, terrain

def calc_conditional_prob():
    planets, terrain = simulation(int(1e6))
    mask = (terrain == "L")
    planets = planets[mask]
    return sum(planets == "earth")/len(planets)

In [None]:
calc_conditional_prob()

Then, we'll do it analytically. We assume that the Earth/Mars choice is a Bernoulli RV with $p=0.5$, so that Pr(Earth)=Pr(Mars)=1/2. The conditional probabilities Pr(land|Earth) = 0.3 and Pr(land|Mars) = 1, so from Bayes' rule we have:
$$
\begin{align}
Pr(\text{Earth}|\text{land}) & = Pr(\text{Earth}, \text{land})/Pr(\text{land}) \\
& = Pr(\text{land} | \text{Earth}) Pr(\text{Earth})/ Pr(\text{land}) \\
& = 0.3 * 0.5/ Pr(\text{land}) \\
& = 0.15/ \left[Pr(\text{land}|\text{Earth}) Pr(\text{Earth}) + Pr(\text{land}|\text{Mars}) Pr(\text{Mars})\right] \\
&= 0.15/\left[ 0.3*0.5 + 1*0.5 \right] \\
&\approx 0.2307
\end{align}
$$

In [None]:
0.15/(0.3*0.5 + 1*0.5)

Next we'll use `pyro` to figure it out by sampling the model, then analyzing the samples

In [None]:
# Specify model
def model():
    planet = pyro.sample("planet", Bernoulli(tensor(0.5))) # 0=earth, 1=mars
    if planet == 0:
        terrain = pyro.sample("terrain", Bernoulli(tensor(0.3))) # 0=water, 1=land
    else:
        terrain = pyro.sample("terrain", Bernoulli(tensor(1.))) # 0=water, 1=land

In [None]:
# Get model traces
planets = list()
terrains = list()
traced_model = pyro.poutine.trace(model)
for _ in range(10000):
    tr = traced_model.get_trace()
    planets.append(tr.nodes["planet"]["value"])
    terrains.append(tr.nodes["terrain"]["value"])
planets = np.array(planets)
terrains = np.array(terrains)

In [None]:
# Calculate probability
mask = (terrains == 1)
planets = planets[mask]
sum(planets == 0)/len(planets)