In [3]:
import numpy as np
import random
from sklearn.mixture import BayesianGaussianMixture as DPGMM
from sklearn.mixture import GaussianMixture as GMM
from scipy.stats import multivariate_normal
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib import colors, animation
from IPython.display import HTML

In [4]:
def initialize_figure():
    fig, axes = plt.subplots(2,2)
    axes = axes.flatten()
    fig.set_size_inches(15,15)
    for ax in axes:
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
    #plt.axis('equal')
    axes[0].set_title("Samples")
    return fig, axes

def get_data(num_dim_pts=20, neg_start=0, neg_end=6, xoffset=6, yoffset=6):
    pos = None; neg = None
    for x in range(num_dim_pts):
        for y in range(num_dim_pts):
            inst = np.array([x,y]).reshape(1,-1)
            if (x >= xoffset+neg_start and x < xoffset+neg_end and
                y >= yoffset+neg_start and y < yoffset+neg_end):
                neg = inst if neg is None else np.vstack((neg, inst))
            else:
                pos = inst if pos is None else np.vstack((pos, inst))
    # Prepend labels
    pos = np.insert(pos, 0, 1, axis=1)
    neg = np.insert(neg, 0, 0, axis=1)
    return np.vstack((pos, neg))

def get_grid_data(data, num_dim_pts=20):
    grid = np.zeros((num_dim_pts, num_dim_pts))
    for point, value in data:
        point = point.flatten()
        grid[point[1], point[2]] = value
    return grid.T

def plot_scatter_data(data, ax, cname, large=False):
    converter = colors.ColorConverter()
    color = converter.to_rgba(colors.cnames[cname])
    if large:
        scatter = ax.scatter(data[1], data[2], color=color, s=300.0, edgecolor='black', linewidth='3')
    else:
        scatter = ax.scatter(data[:,0], data[:,1], color=color, s=100.0)
    return scatter
    
def plot_gmm(samples, gmm, ax, cname):
    preds = gmm.predict(samples)
    means = gmm.means_
    covs  = gmm.covariances_
    color = colors.cnames[cname]
    for i, (mean, cov) in enumerate(zip(means, covs)):
        v, w = np.linalg.eigh(cov)
        v = 2. * np.sqrt(2.) * np.sqrt(v)
        u = w[0] / np.linalg.norm(w[0])
        # Not plotting redundant components
        if not np.any(preds == i):
            continue
        # Plot an ellipse to show the Gaussian component
        angle = np.arctan(u[1] / u[0])
        angle = 180. * angle / np.pi  # convert to degrees
        ell = mpl.patches.Ellipse(mean, v[0], v[1], 180. + angle, color=color)
        ell.set_clip_box(ax.bbox)
        ell.set_alpha(0.5)
        ax.add_artist(ell)
        
def plot_grid_img(data, ax, title=""):
    ax.set_title(title)
    num_dim_pts = 20
    grid = np.zeros((num_dim_pts, num_dim_pts))
    for point, value in data:
        point = point.flatten()
        grid[point[1], point[2]] = value
    img = ax.imshow(grid.T, origin="lower", vmin=0.0, vmax=1.0, cmap="hot")
    return img

def plot_max_measures(max_measures, ax, title="Max Measure"):
    ax.plot(max_measures)
    ax.set_ylim(0,0.7)
    ax.set_xlim(0,100)
    ax.set_title(title)
        
def pdf(x, mu, sigma):
    try:
        mvn = multivariate_normal(mean=mu, cov=sigma)
        return mvn.pdf(x)
    except np.linalg.LinAlgError as e:
        print "SINGULAR IN TRAJ PROB"
        return 0.0
    
def gmm_pdf(x, gmm, zero_lim=1e-3):
    p = 0.0
    for i in range(gmm.n_components):
        p += gmm.weights_[i] * pdf(x, gmm.means_[i], gmm.covariances_[i])
    return max(p, zero_lim)

def get_probs(data, pos_gmm, neg_gmm):
    pos_probs = []
    neg_probs = []
    for i in range(data.shape[0]):
        x = data[i,1:] # Leave out pre-pended label
        pos_pdf  = gmm_pdf(x, pos_gmm)
        neg_pdf  = gmm_pdf(x, neg_gmm)
        pos_prob = pos_pdf / (pos_pdf + neg_pdf)
        neg_prob = neg_pdf / (pos_pdf + neg_pdf)
        pos_probs.append((data[i,:], pos_prob))
        neg_probs.append((data[i,:], neg_prob))
    return pos_probs, neg_probs

def entropy(pos_prob, neg_prob):
    pos_entropy = -(pos_prob * np.log(pos_prob))
    neg_entropy = -(neg_prob * np.log(neg_prob))
    return pos_entropy + neg_entropy

def get_entropies(pos_probs, neg_probs):
    measures = []
    for i in range(len(pos_probs)):
        m = entropy(pos_probs[i][1], neg_probs[i][1])
        measures.append((pos_probs[i][0], m))
    return measures

SyntaxError: Missing parentheses in call to 'print'. Did you mean print("SINGULAR IN TRAJ PROB")? (2982520046.py, line 84)

In [None]:
dp_prior   = True
num_frames = 20
us_method  = "Mutual Information"  # ["Entropy", "Mutual Information", Random"]

# ===================================================================================

if dp_prior:
    pos_gmm = DPGMM()
    neg_gmm = DPGMM()
else:
    pos_gmm = GMM()
    neg_gmm = GMM()
    
data = get_data()
full_data = np.copy(data)
pos_data = data[np.where(data[:,0] == 1)][:,1:]
neg_data = data[np.where(data[:,0] == 0)][:,1:]
fig, axes = initialize_figure()

# Get two samples of each type to initiate learning
pos_samples = pos_data[np.random.randint(pos_data.shape[0], size=3),:]
neg_samples = neg_data[np.random.randint(neg_data.shape[0], size=3),:]

# Fit the GMMs
pos_gmm.fit(pos_samples)
neg_gmm.fit(neg_samples)

# Get initial probabilities and measures
pos_probs, neg_probs = get_probs(data, pos_gmm, neg_gmm)
entropies = get_entropies(pos_probs, neg_probs)

# Setup the plots
scatter = None
max_entropies = []
plot_grid_img(entropies, axes[1], "Entropy")
plot_max_measures(max_entropies, axes[2])


def iterate(itr):
    print "ITERATION %d" % itr
    global pos_samples, neg_samples, data, max_entropies, scatter
    
    # Clear drawn GMMs
    axes[0].artists = []
    axes[0].set_ylim(-1,20)
    axes[0].set_xlim(-1,20)
    
    if scatter is not None:
        scatter.remove()
    
    # Visualize GMMs before updating since probs in this iteration are computed based on these ones
    plot_gmm(pos_samples, pos_gmm, axes[0], "cornflowerblue")
    plot_gmm(neg_samples, neg_gmm, axes[0], "firebrick")
    
    # Compute probs/measures over all data for visualization
    full_pos_probs, full_neg_probs = get_probs(full_data, pos_gmm, neg_gmm)
    full_entropies = get_entropies(full_pos_probs, full_neg_probs)
    
    pos_probs, neg_probs = get_probs(data, pos_gmm, neg_gmm)
    
    if us_method == "Random":
        entropies = get_entropies(pos_probs, neg_probs) # For tracking values
        idx = np.random.randint(0, len(entropies))
        _, entropy = max(entropies, key=lambda p: p[1]) # Still want to max measure over all points
        next_sample = entropies[idx][0]
    elif us_method == "Entropy":
        entropies = get_entropies(pos_probs, neg_probs)
        # Find idx of max value, collect set of all max vals (ties), pick a random one of those
        entropy = 0.0  
        for j in range(len(entropies)):  
            if entropies[j][1] > entropy:
                entropy = entropies[j][1]
        max_idxs = [k for k, value in enumerate(entropies) if value[1] == entropy]
        idx = np.random.choice(max_idxs)
        next_sample, entropy = entropies[idx]
    elif us_method == "Mutual Information":
        # Find the sample that, if added, results in the greatest reduction uncertainty over full space
        max_diff = 0.0
        idx = None
        for i in range(data.shape[0]):
            proxy_sample = data[i,:]
            if proxy_sample[0] == 1:
                pos_gmm.fit(np.vstack((pos_samples, proxy_sample[1:])))
            else:
                neg_gmm.fit(np.vstack((neg_samples, proxy_sample[1:])))
            proxy_pos_probs, proxy_neg_probs = get_probs(full_data, pos_gmm, neg_gmm)
            proxy_entropies = get_entropies(proxy_pos_probs, proxy_neg_probs)
            diffs = 0.0
            for j in range(len(proxy_entropies)):
                diffs += full_entropies[j][1] - proxy_entropies[j][1]
            if diffs > max_diff:
                idx = i
                max_diff = diffs
        print "    MAX DIFF: %f" % max_diff
        next_sample = data[idx,:]
        measure = 0.0 # TODO fix it
    else:
        print "Unknown uncertainty sampling method: %s" % us_method
        return
            
    max_entropies.append(measure)
    data = np.delete(data, idx, axis=0)
    if next_sample[0] == 1:
        pos_samples = np.vstack((pos_samples, next_sample[1:]))
    else:
        neg_samples = np.vstack((neg_samples, next_sample[1:]))
    pos_gmm.n_components = min(10, pos_samples.shape[0])
    neg_gmm.n_components = min(10, neg_samples.shape[0])
    pos_gmm.fit(pos_samples)
    neg_gmm.fit(neg_samples)

    # Visualize scatter plot
    plot_scatter_data(pos_samples, axes[0], "cornflowerblue")
    plot_scatter_data(neg_samples, axes[0], "firebrick")
    scatter = plot_scatter_data(next_sample, axes[0], "gold", large=True)
    
    # Visualize uncertainty sampling measure
    plot_grid_img(full_entropies, axes[1], us_method)
    
#     # Visualize max measure over learning iterations
#     axes[2].plot(max_measures, lw=5.0, color='g')
#     axes[2].set_ylim(0,0.7)
#     axes[2].set_xlim(0,num_frames)
    
    # Visualize positive probability
    plot_grid_img(full_pos_probs, axes[3], "Prob(Positive)")
    
ani = animation.FuncAnimation(fig, iterate, frames=num_frames)
Writer = animation.writers['ffmpeg']
writer = Writer(fps=5, bitrate=1800)
ani.save('video.mp4', writer=writer)

#HTML(ani.to_jshtml())

: 