In [None]:
# Standard imports
import numpy as np
import matplotlib.pyplot as plt
import wandb

In [None]:
# Parameters
prog_set = 'homework_4'

# Sampling mode
sampling_method = 'graph'

# Inference method
inference_method = 'VI'

# Weights & biases
use_wandb = False
%env WANDB_NOTEBOOK_NAME='homework_4.ipynb'

In [None]:
# Calculations
if use_wandb: 
    wandb.init(project='test_homework4', entity='cs532-2022')

In [None]:
# Definitions
def triangle_plots(dicts_of_samples, params, labels,
    truths = None,
    fig_size = 3.,
    hist_bins = 'auto',
    hist_density = True,
    hist_alpha = 0.7,
    scatter_alpha = 0.1,
    scatter_size = 5.,
    use_wandb = False, 
    wandb_name = None,
    ):
    '''
    Makes a triangle plot
    params:
    dicts_of_samples: List of dictionaries of samples (e.g., dict['x'] = [1., 1.1, 1.3, ...])
    params: List of names of parameters to plot (dictionary keys)
    labels: List of axis labels corresponding to parameters
    truths: List of true values of the parameters TODO: Option for None
    '''
    n = len(params)
    fig, _ = plt.subplots(figsize=(n*fig_size, n*fig_size))
    iplot = 0
    samples = len(list(dicts_of_samples.values())[0][params[0]])
    for ir, (param_r, label_r) in enumerate(zip(params, labels)):
        for ic, (param_c, label_c) in enumerate(zip(params, labels)):
            iplot += 1
            if ir == ic:
                plt.subplot(n, n, iplot)
                if truths is not None:
                    plt.axvline(truths[ir], color='black', ls='--', alpha=0.7, label='Truth')
                for name, dict_of_samples in dicts_of_samples.items():
                    label = name if (ir == 0) and (ic == 0) else None
                    plt.hist(dict_of_samples[param_r], 
                        bins=hist_bins, density=hist_density, alpha=hist_alpha, label=label
                    )
                plt.xlabel(label_r) if ic==n-1 else plt.gca().set_xticklabels([])
                plt.yticks([])
                mean = dict_of_samples[param_r].mean(); std = dict_of_samples[param_r].std()
                plt.axvline(mean, color='k', ls='--', label='Mean: %1.2f'%mean)
                plt.axvline(mean-std, color='k', ls=':', label='Std: %1.2f'%std)
                plt.axvline(mean+std, color='k', ls=':')
                #if and iplot == 1: plt.legend(loc='upper left', bbox_to_anchor=(1., 1.))
                plt.legend()
            elif ir > ic:
                plt.subplot(n, n, iplot)
                if truths is not None:
                    plt.plot([truths[ic]], [truths[ir]], color='black', marker='x', alpha=0.7, label='Truth')
                for dict_of_samples in dicts_of_samples.values():
                    plt.scatter(dict_of_samples[param_c], dict_of_samples[param_r], 
                            alpha=scatter_alpha, s=scatter_size,
                    )
                plt.xlabel(label_c) if ir==n-1 else plt.gca().set_xticklabels([])
                plt.ylabel(label_r) if ic==0 else plt.gca().set_yticklabels([])
    plt.suptitle('Samples: {:,}'.format(samples))
    plt.tight_layout()
    plt.show()
    if use_wandb: wandb.log({wandb_name: wandb.Image(fig)})


def plot_traces(data_samples, data_resamples, nr, nc, names=None, panel_size=5., verbose=False, use_wandb=False, wandb_name=None):
    samples = data_samples.shape[0]
    #n = data_samples.shape[1]
    n = nr*nc
    fig, _ = plt.subplots(figsize=(nc*panel_size, nr*panel_size))
    for i in range(n):
        plt.subplot(nr, nc, 1+i)
        mean = data_samples[:, i].mean()
        std = data_samples[:, i].std()
        if verbose:
            print('Mean:', mean)
            print('Std:', std)
            plt.plot(data_resamples[:, i], color='C1', alpha=0.3)
        plt.plot(data_samples[:, i], color='C0', alpha=0.3)
        plt.scatter(list(range(samples)), data_samples[:, i], color='C0', marker='.', alpha=0.1, label='Unweighted')
        plt.scatter(list(range(samples)), data_resamples[:, i], color='C1', marker='.', alpha=0.1, label='Resampled')
        plt.axhline(mean, color='black', ls='--', label='Mean: %1.2f'%mean)
        plt.axhline(mean-std, color='black', ls=':', label='Std: %1.2f'%std)
        plt.axhline(mean+std, color='black', ls=':')
        if names is not None: plt.ylabel(names[i])
        leg = plt.legend()
        for lh in leg.legendHandles: 
            lh.set_alpha(1)
        plt.xlabel('samples')
        plt.xlim(left=0.)
    plt.suptitle('Samples: {:,}'.format(samples))
    plt.tight_layout()
    plt.show()
    if use_wandb: wandb.log({wandb_name: wandb.Image(fig)})


def plot_dist(data, nr, nc, names=None, panel_size=5., verbose=True, use_wandb=False, wandb_name=None):
    samples = data.shape[0]
    n = data.shape[1]
    fig, _ = plt.subplots(figsize=(nc*panel_size, nr*panel_size))
    for i in range(n):
        plt.subplot(nr, nc, 1+i)
        mean = data[:, i].mean()
        std = data[:, i].std()
        if verbose:
            print('Mean:', mean)
            print('Std:', std)
        plt.hist(data[:, i], bins='auto', color='C%d'%i)
        plt.axvline(mean, color='black', ls='--', label='Mean: %1.2f'%mean)
        plt.axvline(mean-std, color='black', ls=':', label='Std: %1.2f'%std)
        plt.axvline(mean+std, color='black', ls=':')
        if names is not None: plt.xlabel(names[i])
        plt.yticks([])
        plt.legend()
    plt.suptitle('Samples: %d'%samples)
    plt.tight_layout()
    plt.show()
    if use_wandb: wandb.log({wandb_name: wandb.Image(fig)})


def plot_loss(data, ylim=None, log=False, **kwargs):
    if log:
        plt.semilogy(-data, **kwargs)
    else:
        plt.plot(data, **kwargs)
    plt.xlabel('Step')
    ylab = r'$-$ELBO' if log else 'ELBO'
    plt.ylabel(ylab)
    plt.ylim(ylim)
    plt.show()

In [None]:
# Program 1
variables = [r'$\mu$']
file_resamples = './../data/homework_4/1_%s_%s.dat'%(sampling_method, inference_method)
file_samples = './../data/homework_4/1_%s_samples.dat'%(sampling_method)
file_params = './../data/homework_4/1_%s_params.dat'%(sampling_method)
file_loss = './../data/homework_4/1_%s_loss.dat'%(sampling_method)
print('Files:', file_resamples, file_samples, file_params, file_loss)
data_resamples, data_samples, data_loss = np.loadtxt(file_resamples), np.loadtxt(file_samples), np.loadtxt(file_loss)
data_resamples, data_samples, data_loss = np.atleast_2d(data_resamples).T, np.atleast_2d(data_samples).T, np.atleast_2d(data_loss).T

# Distribution parameters
data_params = np.loadtxt(file_params)
plt.title('Distribution parameters')
plt.plot(data_params[:, 0], label='loc')
plt.plot(data_params[:, 1], label='scale')
plt.xlabel('Training step')
plt.legend()
plt.show()

# Plots
print('Maximum loss:', max(data_loss))
plot_loss(data_loss, color='k')
samples_dicts = {'Unweighted': {'mu': data_samples[:, 0]}, 'Resampled': {'mu': data_resamples[:, 0]}}
triangle_plots(samples_dicts, params=['mu'], labels=variables, fig_size=4., use_wandb=use_wandb, wandb_name='Program: 1')
plot_traces(data_samples, data_resamples, nr=1, nc=data_resamples.shape[1], names=variables, use_wandb=use_wandb, wandb_name='Samples: 1')

In [None]:
# Program 2
variables = ['slope', 'bias']
file_resamples = './../data/homework_4/2_%s_%s.dat'%(sampling_method, inference_method)
file_samples = './../data/homework_4/2_%s_samples.dat'%(sampling_method)
file_loss = './../data/homework_4/2_%s_loss.dat'%(sampling_method)
file_params = './../data/homework_4/2_%s_params.dat'%(sampling_method)
print('Files:', file_resamples, file_samples, file_loss)
data_resamples, data_samples, data_loss = np.loadtxt(file_resamples), np.loadtxt(file_samples), np.loadtxt(file_loss)

# Distribution parameters
data_params = np.loadtxt(file_params)
plt.title('Distribution parameters')
params = ['slope: loc', 'slope: scale', 'bias: loc', 'bias: scale']
for i, param in enumerate(params):
    plt.plot(data_params[:, i], label=param)
plt.xlabel('Training step')
plt.legend()
plt.show()

# Plots
print('Maximum loss:', max(data_loss))
plot_loss(data_loss, color='k')
samples_dicts = {
    'Unweighted': {'slope': data_samples[:, 0], 'bias': data_samples[:, 1]}, 
    'Resamples': {'slope': data_resamples[:, 0], 'bias': data_resamples[:, 1]},
    }
triangle_plots(samples_dicts, params=variables, labels=variables, fig_size=3., use_wandb=use_wandb, wandb_name='Program: 2')
plot_traces(data_samples, data_resamples, nr=1, nc=2, names=variables, use_wandb=use_wandb, wandb_name='Samples: 2')
print('Covariance matrix:\n', np.cov(data_resamples[:2, :2], bias=False, rowvar=False))

# Posterior predictive
plt.hist(data_resamples[:, 2], bins='auto', alpha=0.7, density=True)
plt.title(r'Posterior predictive at $x = 10$')
plt.ylabel('p(y|x=10)')
plt.xlabel('y')
plt.yticks([])
plt.show()

In [None]:
# Program 3

# File
variables = ['Are the points from the same cluster?']
file = './../data/homework_4/3_%s_%s.dat'%(sampling_method, inference_method)
file_loss = './../data/homework_4/3_%s_loss.dat'%(sampling_method)
print('File:', file)

# Load data
data, data_loss = np.loadtxt(file), np.loadtxt(file_loss)
data = np.atleast_2d(data).T
samples_dict = {'Results': {'x': data[:, 0]}}

# Plots
plot_loss(data_loss, color='black', ylim=(-10000,3000))
plot_loss(data_loss, log=True, color='black')#, ylim=(-300,300))
triangle_plots(samples_dict, params=['x'], labels=variables, fig_size=5., use_wandb=use_wandb, wandb_name='Program: 3')
print('Probability:', variables[0], data.mean())

In [None]:
# Program 4
samples_file = './../data/homework_4/4_%s_samples.dat'%(sampling_method)
resamples_file = './../data/homework_4/4_%s_%s.dat'%(sampling_method, inference_method)
loss_file = './../data/homework_4/4_%s_loss.dat'%(sampling_method)

# Loss
loss_data = np.loadtxt(loss_file)
plot_loss(loss_data, ylim=None, color='k')

# Parameters
num_params = 130
num_bins = 100

# Tensor heat maps
for file in [samples_file, resamples_file]:

    # Load data
    print(file)
    data = np.loadtxt(file)
    print('Data shape:', data.shape)
    print('First entry:', data[0, 0])

    # Setup for heat maps
    x = []; y = []
    for i in range(data.shape[0]):
        for j in range(num_params):
            x.append(j+0.5)
            y.append(data[i, j])

    # Plot heat maps
    plt.figure(figsize=(12,3))
    plt.hist2d(x, y, bins=(num_params, num_bins), density=True, cmin=0., cmap='binary')
    plt.xlabel('Position of tensor')
    plt.xlim((0., num_params))
    for val in [10, 110, 120]:
        plt.axvline(val, color='black', ls='--')
    plt.ylabel('Distribution')
    plt.show()

# Posterior predictive
plt.hist(data[:, num_params], density=True, alpha=0.7)
plt.xlabel(r'Posterior predictive at $x=6$')
plt.yticks([])
plt.show()

In [None]:
# Program 5
variables = [r'$s$']
file_resamples = './../data/homework_4/5_%s_%s.dat'%(sampling_method, inference_method)
file_samples = './../data/homework_4/5_%s_samples.dat'%(sampling_method)
file_loss = './../data/homework_4/5_%s_loss.dat'%(sampling_method)
file_params = './../data/homework_4/5_%s_params.dat'%(sampling_method)

# Loss
data_loss = np.loadtxt(file_loss)
print('Maximum loss:', max(data_loss))
plot_loss(data_loss, color='black')
plot_loss(data_loss, log=True, color='black')

# Distribution parameters
data_params = np.loadtxt(file_params)
plt.title('Distribution parameters')
params = ['loc', 'scale', 'conc', 'rate']
for i, param in enumerate(params):
    plt.plot(data_params[:, i], label=param)
plt.xlabel('Training step')
plt.legend()
plt.show()

# Distribution and traces
print('Files:', file_resamples, file_samples)
data_resamples, data_samples = np.loadtxt(file_resamples), np.loadtxt(file_samples)
data_resamples, data_samples = np.atleast_2d(data_resamples).T, np.atleast_2d(data_samples).T
print('Data shapes:', np.squeeze(data_resamples.shape), np.squeeze(data_samples.shape))
samples_dict = {'Unweighted': {'s': data_samples[:, 0]}, 'Resampled:': {'s': data_resamples[:, 0]}}
triangle_plots(samples_dict, params=['s'], labels=variables, fig_size=5., use_wandb=use_wandb, wandb_name='Program: 5')
plot_traces(data_samples, data_resamples, nr=1, nc=data_resamples.shape[1], names=variables, use_wandb=use_wandb, wandb_name='Samples: 5')

In [None]:
# Finish W&B
if use_wandb:
    wandb.finish()