## Hierarchical Clustering
Consider an unlabeled version of the problem where you had data from 3 types of mugs with their height, diameter, weight, and hue (color). (To obtain the unlabeled data, simply assume that the labels are not part of the dataset).

a) Implement hierarchical agglomerative clustering for this dataset. Your implementation should provide the option to use single (minimum) linkage, and complete (maximum) linkage.

b) Apply your hierarchical clustering implementation with single (minimum) linkage to the data. Choosing the case of 2, 4, 6, and 8 clusters, evaluate to what degree the clusters reflect the type of mug (i.e. the material of the mug) by evaluating the purity of the clusters using the original data labels. For this, compute the accuracy for each cluster assuming that the predicted label for each cluster is the most frequently occurring label in this cluster. Once you computed this for each cluster, compute the overall accuracy using the corresponding number of clusters as the weighted sum of the accuracies where the weight is the fraction of data points in the corresponding cluster.

c) Repeat the clustering and evaluation from part 1b) using complete (maximum) linkage. Show the results and discuss any differences you see between single linkage and maximum linkage results (in particular, which one seems to better reflect the material types.

In [4]:
import csv
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter

In [5]:
# Helper function to calculate Euclidean distance
def euclidean_distance(point1, point2):
    return np.sqrt(np.sum((np.array(point1) - np.array(point2)) ** 2))

In [6]:
# Helper function to calculate the distance matrix
def calculate_distance_matrix(data):
    distance_matrix = np.zeros((len(data), len(data)))
    for i in range(len(data)):
        for j in range(i+1, len(data)):
            distance = euclidean_distance(data[i], data[j])
            distance_matrix[i][j] = distance_matrix[j][i] = distance
    return distance_matrix


In [7]:
# Helper function to find closest clusters
def find_closest_clusters(distance_matrix):
    min_val = np.inf
    x = y = 0
    for i in range(len(distance_matrix)):
        for j in range(i+1, len(distance_matrix)):
            if distance_matrix[i][j] < min_val:
                min_val = distance_matrix[i][j]
                x, y = i, j
    return x, y


In [8]:
# Helper function to update distance matrix after merging clusters
def update_distance_matrix(distance_matrix, x, y, linkage):
    row = []
    for i in range(len(distance_matrix)):
        if i != x and i != y:
            if linkage == 'single':
                distance = min(distance_matrix[x][i], distance_matrix[y][i])
            elif linkage == 'complete':
                distance = max(distance_matrix[x][i], distance_matrix[y][i])
            row.append(distance)
    distance_matrix = np.delete(distance_matrix, [x, y], axis=0)
    distance_matrix = np.delete(distance_matrix, [x, y], axis=1)
    distance_matrix = np.vstack([distance_matrix, row])
    row.append(0)
    distance_matrix = np.column_stack([distance_matrix, row])
    return distance_matrix

In [9]:
# Helper function to merge clusters
def merge_clusters(clusters, x, y):
    clusters[x] += clusters[y]
    clusters.pop(y)
    return clusters

In [10]:
# Hierarchical Agglomerative Clustering
def agglomerative_clustering(data, linkage, no_of_clusters):
    distance_matrix = calculate_distance_matrix(data)
    clusters = [[i] for i in range(len(data))]
    while len(clusters) > no_of_clusters:
        x, y = find_closest_clusters(distance_matrix)
        distance_matrix = update_distance_matrix(distance_matrix, x, y, linkage)
        clusters = merge_clusters(clusters, x, y)
    return clusters

In [11]:
# Helper function to calculate cluster purity
def calculate_purity(clusters, labels):
    total_correct_predictions = 0
    for cluster in clusters:
        cluster_labels = [labels[i] for i in cluster]
        most_common = Counter(cluster_labels).most_common(1)[0][0]
        correct_predictions = cluster_labels.count(most_common)
        total_correct_predictions += correct_predictions
    return total_correct_predictions / len(labels)


In [12]:
# Reading the data
with open('dataset.csv', 'r') as f:
    reader = csv.reader(f)
    headers = next(reader)
    data = []
    labels = []
    for row in reader:
        data.append(list(map(float, row[:-1])))
        labels.append(row[-1])

In [13]:
# Normalizing the data
data = (data - np.min(data, axis=0)) / (np.max(data, axis=0) - np.min(data, axis=0))

# Applying the algorithm
for no_of_clusters in [2, 4, 6, 8]:
    for linkage in ['single', 'complete']:
        print(f"Linkage: {linkage}, Number of clusters: {no_of_clusters}")
        clusters = agglomerative_clustering(data, linkage, no_of_clusters)
        purity = calculate_purity(clusters, labels)
        print(f"Purity: {purity}")

Linkage: single, Number of clusters: 2
Purity: 0.43333333333333335
Linkage: complete, Number of clusters: 2
Purity: 0.43333333333333335
Linkage: single, Number of clusters: 4
Purity: 0.43333333333333335
Linkage: complete, Number of clusters: 4
Purity: 0.44166666666666665
Linkage: single, Number of clusters: 6
Purity: 0.4583333333333333
Linkage: complete, Number of clusters: 6
Purity: 0.45
Linkage: single, Number of clusters: 8
Purity: 0.475
Linkage: complete, Number of clusters: 8
Purity: 0.45


It's difficult to definitively say which method is better at reflecting the material types of the mugs. The results depend on the chosen number of clusters. At a lower number of clusters, complete linkage seems to perform slightly better, while at a higher number of clusters, single linkage seems to perform better.

The differences in purity scores are relatively small, and the overall purity scores are not particularly high, which suggests that neither method is able to perfectly capture the underlying classes in the data. This might be due to the complexity of the data, noise, or other factors.

### Self-Training

Take the first 10 data items from each class as the supervised data, Ds, (keeping its labels) and the remaining data items as unsupervised data, Du, (by ignoring its labels).

a) Implement a self-training system using a K-nearest neighbor classifier for this problem (to evaluate the level of certainty for each prediction on unlabeled data you can use the purity of the prediction - i.e. how many more ”votes” the predicted class received than the other class - or you can use distance weighted voting where the number of votes each of the K data points receives is inversely proportional to the distance from the data point). Your algorithm should allow you to change the number of data items that are being added to the labeled data set in each iteration.

b) Learn a classifier using the semi-supervised learning algorithm and compare it agains the K-nearest neighbor classifier learned only from the labeled data Ds by evaluating the fraction of initially unlabeled points it predicts correctly (for the self-learning you can simply use the fraction of the initially unlabeled points that were assigned the correct label during the learning process). Repea the comparison for four different numbers of data points added in each iteration, including adding 1 data point in each iteration, adding all data points in the first iteration, adding 5 points in each iteration, and one setting of your choice. Discuss if you see any performance difference and if so, what it is.

In [18]:
import pandas as pd
import numpy as np
import operator
from collections import Counter

In [19]:
def euclidean_distance(a, b):
    return np.sqrt(np.sum((a-b)**2))

In [20]:
def get_neighbors(X_train, test_row, k):
    distances = []
    for train_row in X_train:
        dist = euclidean_distance(test_row, train_row[:-1])
        distances.append((train_row, dist))
    distances.sort(key=lambda tup: tup[1])
    neighbors = [tup[0] for tup in distances[:k]]
    return neighbors

In [21]:
def predict_classification(X_train, test_row, k, threshold):
    neighbors = get_neighbors(X_train, test_row, k)
    output_values = [row[-1] for row in neighbors]
    prediction, count = Counter(output_values).most_common(1)[0]
    if len(Counter(output_values)) > 1:
        _, second_most_common_count = Counter(output_values).most_common(2)[1]
        if count / second_most_common_count < threshold:
            return None
    return prediction

In [22]:
def self_training(X_train, X_unlabel, k, n, threshold):
    while len(X_unlabel) > 0 and n > 0:
        X_unlabel_new = np.empty((0, 4))
        for i in range(len(X_unlabel)):
            if i == n:
                break
            test_row = X_unlabel[i]
            label = predict_classification(X_train, test_row, k, threshold)
            if label is not None:
                test_row = np.append(test_row, label)
                X_train = np.vstack([X_train, test_row])
            else:
                X_unlabel_new = np.vstack([X_unlabel_new, test_row])
        X_unlabel = np.vstack([X_unlabel_new, X_unlabel[n:]])
        n -= 1
    return X_train

In [23]:
def accuracy(y_true, y_pred):
    correct = 0
    for i in range(len(y_true)):
        if y_true[i] == y_pred[i]:
            correct += 1
    return correct / float(len(y_true))

In [26]:
df = pd.read_csv('dataset.csv')
df['Type'] = pd.Categorical(df['Type'])
df['Type'] = df['Type'].cat.codes

# Normalize the features
for column in df.columns[:-1]:
    df[column] = (df[column] - df[column].mean()) / df[column].std()

data = df.values
np.random.shuffle(data)

# Splitting data into labelled and unlabelled
X_train = data[:20]
X_unlabel = data[20:, :-1]

k = 3
n = [1, len(X_unlabel), 5, 10]
threshold = 2

for i in n:
    X_train_new = self_training(X_train.copy(), X_unlabel.copy(), k, i, threshold)
    y_true = data[20:, -1]
    y_pred = []
    for row in data[20:]:
        prediction = predict_classification(X_train_new, row[:-1], k, threshold)
        if prediction is not None:
            y_pred.append(prediction)
        else:
            y_pred.append(-1)  # mark the data points that couldn't be classified
    print("Number of points added in each iteration:", i)
    print("Accuracy:", accuracy(y_true, y_pred))


Number of points added in each iteration: 1
Accuracy: 0.71
Number of points added in each iteration: 100
Accuracy: 0.75
Number of points added in each iteration: 5
Accuracy: 0.76
Number of points added in each iteration: 10
Accuracy: 0.74


These results show that the accuracy of the classifier improves when more data points are added in each iteration. This is to be expected, as adding more data points in each iteration allows the classifier to learn from a larger portion of the data set at each step, which can help it make better predictions.