In [1]:
from itertools import product

from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
import numpy as np

%matplotlib inline

# sample from model

In [61]:
n_units = 3
data = np.random.binomial(n=1, p=[1, 0, 1], size=(100, n_units))
alpha = .25

biases = np.random.randn(n_units)
weights = np.random.randn(n_units, n_units)

In [62]:
def inv_logit(z):
    return 1 / (1 + np.exp(-z))


def gibbs_sampler(weights, biases, init_sample=None, n_samples=100, burn_in=25, every_n=10) -> np.array:
    
    if burn_in > n_samples:
        raise("Can't burn in for more samples than there are in the chain")
        
    init_sample = init_sample or [0 for _ in biases]
    samples = [init_sample]
    
    def _gibbs_step(sample, i):
        z = sum([weights[i, j] * sample[j] for j in range(len(sample)) if j != i]) + biases[i]
        p = inv_logit(z)
        return np.random.binomial(n=1, p=p)
    
    for _ in range(n_samples):
        sample = list(samples[-1])  # make copy
        for i, _ in enumerate(sample):
            sample[i] = _gibbs_step(sample=sample, i=i)
        samples.append( sample )
        
    return np.array([sample for i, sample in enumerate(samples[burn_in:]) if i % every_n == 0])

In [63]:
def plot_n_samples(n, weights=weights, biases=biases):
    fig = plt.figure(figsize=(12, 9))
    ax = fig.add_subplot(111, projection='3d')
    
    samples = gibbs_sampler(n_samples=n, weights=weights, biases=biases)
    x, y, z = zip(*np.array(samples))
    
    x += np.random.randn(len(x)) * .05
    y += np.random.randn(len(y)) * .05
    z += np.random.randn(len(z)) * .05
    
    ax.scatter(x, y, z)

# print(f'sum(x): {sum(x)} | sum(y): {sum(y)} | sum(z): {sum(z)}')

In [64]:
from itertools import combinations


weight_combinations = list(combinations(range(n_units), 2))


def update_parameters(weights=weights, biases=biases):
    model_samples = gibbs_sampler(weights=weights, biases=biases, n_samples=1000)

    for i, j in weight_combinations:
        # positive phase
        positive_phase = (data[:, i] * data[:, j]).mean()

        # negative phase
        negative_phase = (model_samples[:, i] * model_samples[:, j]).mean()

        # update weights
        weights[i, j] += alpha * (positive_phase - negative_phase)
        
    for i, _ in enumerate(biases):
        # positive phase
        positive_phase = data[:, i].mean()
        
        # negative phase
        negative_phase = model_samples[:, i].mean()
        
        # update biases
        biases[i] += alpha * (positive_phase - negative_phase)
        
    return np.array(weights), np.array(biases)

# train model

In [74]:
model_samples = gibbs_sampler(weights=weights, biases=biases, n_samples=1000)

In [75]:
model_samples.mean(0)

array([1.        , 0.01020408, 0.98979592])

In [73]:
for _ in range(100):
    weights, biases = update_parameters(weights=weights, biases=biases)
# plot_n_samples(1000)

# ask questions

In [10]:
from itertools import product

In [30]:
likelihood(data.ravel())

3.6890563397915714e-06

In [28]:
def H(x):
    return x.T @ weights @ x - biases.T @ x


# def H(x):
#     vals = {
#         (0, 0, 0): -1,
#         (0, 0, 1): -3,
#         (0, 1, 0): -3,
#         (0, 1, 1): 5,
#         (1, 0, 0): 3,
#         (1, 0, 1): -3,
#         (1, 1, 0): -1,
#         (1, 1, 1): 1,
#     }
#     return vals[tuple(x)]


def unnormalized_likelihood(x):
    is_marginal_lik = any([el == ... for el in x])
    if is_marginal_lik:
        unnormalized_lik = 0
        for config in product(*[[0, 1] if el == ... else [el] for el in x]):
            config = np.array(config)
            unnormalized_lik += np.exp(H(config))
    else:
        unnormalized_lik = np.exp(H(x))
    return unnormalized_lik


def likelihood(x):
    """
    Must have the dimensionality of the data observations. To marginalize, put ellipses (...)
    in the elements over which you wish to marginalize.
    """
    numerator = unnormalized_likelihood(x)
    
    denominator = 0
    for config in product([0, 1], repeat=len(x)):
        config = np.array(config)
        denominator += np.exp(H(config))
    return numerator / denominator


def conditional_likelihood(x, cond: dict):
    joint = np.array(x)
    for index, val in cond.items():
        if isinstance(joint[index], int):
            raise
        joint[index] = val
        
    evidence = [cond.get(i, ...) for i in range(len(x))]
    
    return unnormalized_likelihood(joint) / unnormalized_likelihood(evidence)

In [250]:
# P(x_1 = 1 | x_2 = 2) = P(x_1 = 1, x_2 = 2) / P(x_2 = 2)

In [251]:
x = np.array([1, ..., ...])

likelihood(x)

0.13492854264366042

In [252]:
conditional_likelihood(x, cond={2: 0})

0.9799882683585315

In [None]:
P(x_1 = 1) = P(x_1)

In [46]:
# lets generate data

n_outputs = 10
n_points = 100
data = np.random.binomial(n=1, p=.6, size=(n_points, n_outputs))
weights = np.random.randn(n_points, n_points)
np.fill_diagonal(weights, 0)
bias = np.random.randn(n_outputs)

In [None]:
# now, derivation of gradient updates