## Exam

In [3]:
import torch 
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt


In [None]:
def kmeans_1d(x, c1, c2):
    """ do k means algoritm until convergence"""
    loss = 1000
    current_loss = np.inf

    epoch = 1
    while loss < current_loss:
        # k means clustering
        current_loss = loss
        # calculate distance from each point to each centroid
        dist_c1 = np.abs(x - c1)
        dist_c2 = np.abs(x - c2)

        # assign each point to the closest centroid
        c1_idx = dist_c1 < dist_c2

        # calculate new centroids
        c1 = np.mean(x[c1_idx])
        c2 = np.mean(x[~c1_idx])

        # calculate loss
        loss = np.sum(dist_c1[c1_idx]) + np.sum(dist_c2[~c1_idx])
        print(f'Epoch: {epoch}, loss: {loss}')

        epoch += 1
    return c1, c2, c1_idx

In [11]:
def k_means(x, c1, c2):
    loss = 1000
    current_loss = np.inf

    epoch = 1
    while loss < current_loss:
        # k means clustering
        current_loss = loss
        # calculate distance from each point to each centroid
        dist_c1 = np.linalg.norm(x - c1, axis=1)
        dist_c2 = np.linalg.norm(x - c2, axis=1)

        # assign each point to the closest centroid
        c1_idx = dist_c1 < dist_c2

        # calculate new centroids
        c1 = np.mean(x[c1_idx], axis=0)
        c2 = np.mean(x[~c1_idx], axis=0)

        # calculate loss
        loss = np.sum(dist_c1[c1_idx]) + np.sum(dist_c2[~c1_idx])
        print(f'Epoch: {epoch}, loss: {loss}')

        epoch += 1
    return c1, c2, c1_idx

In [9]:
def plot_k_means(x, c1, c2, c1_idx):
    fig, ax = plt.subplots()

    # plot the data and circle around centroid with max distance in centroid
    # plt.scatter(x[:,0], x[:,1], 'o', fillstyle='none', label='data')
    ax.scatter(c1[0], c1[1], color='r', label='centroid 1')
    ax.scatter(c2[0], c2[1], color='b', label='centroid 2')

    # calculate distance from each point to each centroid
    max_dist_c1 = max(np.linalg.norm(x[c1_idx]- c1, axis=1))
    max_dist_c2 = max(np.linalg.norm(x[~c1_idx]- c2, axis=1))

    # plot circle around centroid with max distance in centroid
    ax.add_patch(plt.Circle(c1, max_dist_c1, fill=False, color='r'))
    ax.add_patch(plt.Circle(c2, max_dist_c2, fill=False, color='b'))

    x1= x[c1_idx]
    x2 = x[~c1_idx]

    ax.scatter(x1[:,0], x1[:,1], color='r', label='cluster 1', marker='x')
    ax.scatter(x2[:,0], x2[:,1], color='b', label='cluster 2', marker='x')

    ax.legend(loc='best')

In [8]:
class GaussianMixtureModel1D(nn.Module):
    def __init__(self, mu1, mu2, sig1, sig2):
        super().__init__()
        self.mu1 = mu1
        self.mu2 = mu2
        self.sig1 = sig1
        self.sig2 = sig2
        self.alpha = torch.tensor([0.5])

    def gmm_gamma(self, x):
        """ 
        Calculate the probability (responsiblity) of each point in each cluster
        """
        # calculate probability of each point in each cluster
        log_p1 = torch.distributions.Normal(loc=self.mu1, scale=self.sig1).log_prob(x)
        log_p2 = torch.distributions.Normal(loc=self.mu2, scale=self.sig2).log_prob(x)

        # calculate probability of each point in each cluster
        p1 = torch.exp(log_p1) * self.alpha
        p2 = torch.exp(log_p2) * (1 - self.alpha)
        
        gamma1 = p1 / ( p1 + p2)
        gamma2 = p2 / ( p1 + p2)

        self.alpha = torch.sum(gamma1) / x.shape[0]
        
        return gamma1, gamma2

    def train(self, x, epochs):
        """ 
        Calculate the parameters of a Gaussian mixture model using the EM algorithm
        """

        for _ in range(epochs):
            # E step
            gamma1, gamma2 = self.gmm_gamma(x)
            
            # M step
            self.mu1 = torch.sum(gamma1 * x, axis=1) / torch.sum(gamma1)
            self.mu2 = torch.sum(gamma2 * x, axis=1) / torch.sum(gamma2)
            
            self.sig1 = (gamma1 * (x - self.mu1).t() @ (x - self.mu1)) / torch.sum(gamma1) * torch.eye(1)
            self.sig2 = (gamma2 * (x - self.mu2).t() @ (x - self.mu2)) / torch.sum(gamma2) * torch.eye(1)

        return self.mu1, self.mu2, self.sig1, self.sig2

In [7]:
class ConvNetOperations(nn.Module):
    """ 
    Class that performs a series of convolutional operations

    Args:
        conv_ops (list): list of tuples (filters, stride, kernel_size, padding)
        channels (int): number of channels in the input

    """
    def __init__(self, conv_ops, channels=3, bias=True):
        super(ConvNetOperations, self).__init__()
        self.conv_ops = conv_ops
        self.bias = bias
        self.conv_layers = nn.ModuleList()
        self.channels = channels
        self.layer_outputs = []
        self._build_model()
    
    def _build_model(self):
     
        for filters, stride, kernel_size, padding in self.conv_ops:
            self.conv_layers.append(nn.Conv2d(in_channels=self.channels, 
                                              out_channels=filters, 
                                              kernel_size=kernel_size, 
                                              stride=stride, 
                                              padding=padding,
                                              bias=self.bias))
            self.channels = filters
        
    def forward(self, x):
        for conv_layer in self.conv_layers:
            x = conv_layer(x)
            self.layer_outputs.append(x.shape[1:])
        return x
    
    def get_output_size(self, input_size):
        x = torch.randn(1, *input_size)
        return self.forward(x).shape[1:]
    
    def get_num_params(self):
        return sum([np.prod(p.shape) for p in self.parameters()])
    
    def get_num_params_per_layer(self):
        params = [np.prod(p.shape) for p in self.parameters()]
        return [sum(params[i:i+2]) for i in range(0, len(params), 2)]

In [6]:
# Gaussian Mixture Models
class GaussianMixtureModel2D(nn.Module):
    def __init__(self, mu1, mu2, sig1, sig2):
        super().__init__()
        self.mu1 = mu1
        self.mu2 = mu2
        self.sig1 = sig1
        self.sig2 = sig2
        self.alpha = torch.tensor([0.5])

    def gmm_gamma(self, x):
        """ 
        Calculate the probability (responsiblity) of each point in each cluster
        """
        # calculate probability of each point in each cluster
        log_p1 = torch.distributions.multivariate_normal.MultivariateNormal(self.mu1, self.sig1).log_prob(x) 
        log_p2 = torch.distributions.multivariate_normal.MultivariateNormal(self.mu2, self.sig2).log_prob(x) 

        # calculate probability of each point in each cluster
        p1 = torch.exp(log_p1) * self.alpha
        p2 = torch.exp(log_p2) * (1 - self.alpha)
        
        gamma1 = p1 / ( p1 + p2)
        gamma2 = p2 / ( p1 + p2)

        self.alpha = torch.sum(gamma1) / x.shape[0]
        
        return gamma1, gamma2

    def train(self, x, epochs):
        """ 
        Calculate the parameters of a Gaussian mixture model using the EM algorithm
        """

        for _ in range(epochs):
            # E step
            gamma1, gamma2 = self.gmm_gamma(x)
            
            # M step
            self.mu1 = torch.sum(gamma1 * x.T, axis=1) / torch.sum(gamma1)
            self.mu2 = torch.sum(gamma2 * x.T, axis=1) / torch.sum(gamma2)
            
            self.sig1 = (gamma1 * (x - self.mu1).t() @ (x - self.mu1)) / torch.sum(gamma1)
            self.sig2 = (gamma2 * (x - self.mu2).t() @ (x - self.mu2)) / torch.sum(gamma2)

        return self.mu1, self.mu2, self.sig1, self.sig2

In [5]:
# plot distribution contours of the GMM 
def plot_gmm2d(x, gmm, c1_dist, c2_dist, c1_idx, mu1, mu2):
    fig, ax = plt.subplots()

    x1 = np.linspace(0, 1, 100)
    x2 = np.linspace(0, 1, 100)
    xx1, xx2 = np.meshgrid(x1, x2)
    zz = np.c_[xx1.ravel(), xx2.ravel()]

    p1 = torch.exp(c1_dist.log_prob(torch.tensor(zz, dtype=torch.float32))) * gmm.alpha
    p2 = torch.exp(c2_dist.log_prob(torch.tensor(zz, dtype=torch.float32))) * (1 - gmm.alpha)

    # plot contours
    ax.contourf(x1, x2, p1.reshape(100,100), cmap='Reds', alpha=0.3)
    ax.contourf(x1, x2, p2.reshape(100,100), cmap='Blues', alpha=0.3)

    # plot data
    ax.scatter(x[:,0], x[:,1], color='k', marker='x')

    # plot cluster 1
    ax.scatter(x[c1_idx][:,0], x[c1_idx][:,1], color='r', marker='x')

    # plot cluster 2
    ax.scatter(x[~c1_idx][:,0], x[~c1_idx][:,1], color='b', marker='x')

    # plot centroid of each cluster
    ax.scatter(mu1[0], mu1[1], color='r', marker='o')
    ax.scatter(mu2[0], mu2[1], color='b', marker='o')

    ax.set_xlim(0-0.01,1+0.01)
    ax.set_ylim(0-0.01,1+0.01);

In [12]:
def calculate_node_proportions(class_counts):
    """ 
    Calculates the proportion of samples in each node 
    relative to the total number of samples.
    """
    
    node1 = np.sum(class_counts['node1'])
    node2 = np.sum(class_counts['node2'])
    n = node1 + node2
    return node1 / n, node2 / n

def misclassification_error(class_counts, y):
    """ 
    Calculate misclassification error for the split.
    """
    node1 = class_counts['node1']
    node2 = class_counts['node2']
    
    # calculate misclassification error per node
    error1 = 1 - np.max(node1) / np.sum(node1)
    error2 = 1 - np.max(node2) / np.sum(node2)

    error =  (error1 +  error2) / 2

    return error

def gini_impurity_error(class_counts, y):
    """ 
    Calculate gini impurity error for the split.
    """
    node1 = class_counts['node1']
    node2 = class_counts['node2']
    
    # calculate gini impurity per node
    gini1 = 1 - np.sum((node1 / np.sum(node1)) ** 2)
    gini2 = 1 - np.sum((node2 / np.sum(node2)) ** 2)

    prop1, prop2 = calculate_node_proportions(class_counts)
    
    # calculate weighted gini impurity
    gini =  prop1 * gini1 + prop2 * gini2

    return gini

def cross_entropy_error(class_counts, y):
    """ 
    Calculate cross entropy error for the split.
    """
    node1 = class_counts['node1']
    node2 = class_counts['node2']
    
    node1 = node1[node1 != 0]
    node2 = node2[node2 != 0]
    
    # calculate cross entropy for present classes in each node
    cross_entropy1 = - np.sum(node1 / np.sum(node1) * np.log(node1 / np.sum(node1)))
    cross_entropy2 = - np.sum(node2 / np.sum(node2) * np.log(node2 / np.sum(node2)))

    prop1, prop2 = calculate_node_proportions(class_counts)
    
    # calculate weighted cross entropy
    cross_entropy =  prop1 * cross_entropy1 + prop2 * cross_entropy2

    return cross_entropy