### Importing relevent libraries and reading data files

In [313]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [314]:
import numpy as np
import scipy as sp

training_data_and_class = np.loadtxt("/content/drive/MyDrive/2학년 2학기/COSE362-MachineLearning/Hw1/train.txt") #(60290, 14(13 feature + 1 class))
#training_data_and_class = np.loadtxt("/content/drive/MyDrive/2학년 2학기/COSE362-MachineLearning/Hw1/personal_test.txt")

### Self-defined functions

In [315]:
def compute_responsibilities_vectorized(theta, data, K):
  """Vectorized computation of responsibilities."""
  component_dist, means, covariances = theta
  data_size, feat_dim = data.shape

  # Precompute the multivariate normal densities for all components and all data points
  prob_matrix = np.zeros((data_size, K))
  for k in range(K):
      prob_matrix[:, k] = sp.stats.multivariate_normal.pdf(data, means[k], covariances[k], allow_singular=True) * component_dist[k]

  # Calculate responsibilities by normalizing over all components
  responsibility_matrix = prob_matrix / prob_matrix.sum(axis=1, keepdims=True)
  return responsibility_matrix

In [316]:
def GMM_likelihood(theta_list, xt): #calculates likelihood of xt given GMM parameters
  responsibility, mean_list, cov_list = theta_list
  likelihood = 0
  for c in range(len(responsibility)):
    likelihood += sp.stats.multivariate_normal.pdf(xt, mean_list[c], cov_list[c], allow_singular=True) * responsibility[c]

  return likelihood

In [317]:
def MSE_of_model(model_theta, test_data):
  N = np.shape(test_data)[0]
  feature_data = test_data[:, :-1]
  true_class = test_data[:,-1]
  label = true_class[0]
  if label == 0:
    model_class = np.ones(np.shape(true_class))
  else:
    model_class = np.zeros(np.shape(true_class))

  for i, xt in enumerate(feature_data):
    model_class[i] = data_binary_classifier()


  print("model_class example:", model_class[:10])
  print("true_class example:", true_class[:10])
  return np.mean((true_class - model_class) ** 2)

In [318]:
def data_binary_classifier(theta_0, theta_1, data):
  model_class = []
  for xt in data:
    if (GMM_likelihood(theta_0, xt) > GMM_likelihood(theta_1, xt)):
      model_class.append(0)
    else:
      model_class.append(1)
  return np.array(model_class)

### GMM model

In [319]:
def GMM_training(data, K):
  data_size, feat_dim = np.shape(data)

  #initialization

  component_dist = np.zeros((K,))
  mean_k = np.zeros((K, feat_dim))
  cov_k = np.zeros((K, feat_dim, feat_dim))
  component_dist[:] = 1/K #(K,)
  mean_k = data[np.random.choice(data.shape[0], size=K, replace=False)].reshape(K, feat_dim)
  cov_init = np.cov(data, rowvar=False)
  cov_k[:] = cov_init #(K, feat_dim, feat_dim)
  theta = [component_dist, mean_k, cov_k]
  convergence = 0

  #iteration until theta convergence
  while True:
    #Expectation
    responsibility = compute_responsibilities_vectorized(theta, data, K)
    #Mazimization
    new_component_dist = np.zeros(np.shape(component_dist))
    new_mean_k = np.zeros(np.shape(mean_k))
    new_cov_k = np.zeros(np.shape(cov_k))

    for k in range(K):
      new_mean_k_num = np.zeros((feat_dim,))
      new_cov_k_num = np.zeros((feat_dim, feat_dim))
      new_mean_k_denum = 0
      for t in range(data_size):
        new_component_dist[k] += responsibility[t][k]
        new_mean_k_num += responsibility[t][k] * data[t]
        new_cov_k_num += responsibility[t][k] * np.outer(data[t], data[t])
        new_mean_k_denum += responsibility[t][k]

      new_component_dist[k] /= data_size
      new_mean_k[k] = new_mean_k_num / new_mean_k_denum
      new_cov_k[k] = (new_cov_k_num / new_mean_k_denum) - np.outer(new_mean_k[k], new_mean_k[k])

    new_theta = [new_component_dist, new_mean_k, new_cov_k]

    old_component_dist, old_means, old_covariances = theta
    new_component_dist, new_means, new_covariances = new_theta

    # Check for changes in means, covariances, and component distributions
    mean_ratio = np.mean(abs(new_means / old_means))
    cov_ratio = np.mean([np.mean(abs(new_covariances[i] / old_covariances[i])) for i in range(len(old_covariances))])
    comp_dist_ratio = np.mean(abs(new_component_dist / old_component_dist))
    total_diff = abs((sum([mean_ratio, cov_ratio, comp_dist_ratio]) / 3) - 1)
    if (convergence > 3):
      break
    if (total_diff < 10 ** -2):
      convergence += 1
    else:
      convergence = 0
    theta = new_theta

  return new_theta

### Training

In [None]:
true_class = training_data_and_class[:, -1] #(60290,)
data_0 = training_data_and_class[:, :][true_class == 0] #(60290, 13)
data_1 = training_data_and_class[:, :][true_class == 1]

#Preparing for K-fold cross validation(K=5)
use_size_0 = (np.shape(data_0)[0] // 5) * 5
use_size_1 = (np.shape(data_1)[0] // 5) * 5

data_0_split = [row for row in np.reshape(data_0[:use_size_0], (5, -1, 14))]
data_1_split = [row for row in np.reshape(data_1[:use_size_1], (5, -1, 14))]
total_error_2t6 = np.zeros((5,))
for c in range(2,7): #number of mixture components
  print()
  print("mixture component:", c)
  MSE_per_component = 0
  for K in range(5):
    print("data section:", K + 1)
    test_data = np.concatenate((data_0_split[K], data_1_split[K]))

    train_data_0 = np.concatenate(data_0_split[:K] + data_0_split[K+1:])
    train_data_1 = np.concatenate(data_1_split[:K] + data_1_split[K+1:])
    train_data_0_nl = train_data_0[:,:-1]
    train_data_1_nl = train_data_1[:,:-1]

    GMM_0_theta = GMM_training(train_data_0_nl, c)
    print("done training GMM_0")
    GMM_1_theta = GMM_training(train_data_1_nl, c)
    print("done training GMM_1")

    #test model, create a estimate class
    test_data_nl = test_data[:, :-1]
    test_true_class = test_data[:, -1]
    test_model_class = data_binary_classifier(GMM_0_theta, GMM_1_theta, test_data_nl)

    #calculate MSE
    MSE_per_component_part = np.mean((test_true_class - test_model_class) ** 2)
    MSE_per_component += MSE_per_component_part #this is the MSE for one data section of K division
    print("MSE value for one division:", MSE_per_component_part)
  print("Done calculating MSE for c =", c)
  print("MSE value for", c, "is:", MSE_per_component/5)
  total_error_2t6[c - 2] = MSE_per_component / 5

print("total error across 2 ~ 6 mixture components:", total_error_2t6)


mixture component: 2
data section: 1
done training GMM_0
done training GMM_1
MSE value for one division: 0.15493074562494816
data section: 2
done training GMM_0
done training GMM_1
MSE value for one division: 0.1297171767438003
data section: 3
done training GMM_0
done training GMM_1
MSE value for one division: 0.1795637389068591
data section: 4
done training GMM_0
done training GMM_1
MSE value for one division: 0.1598241685328025
data section: 5
