Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All) to avoid typical problems with Jupyter notebooks. **Unfortunately, this does not work with Chrome right now, you will also need to reload the tab in Chrome afterwards**.

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE". Please put your name here:

In [1]:
NAME = "Aymane Hachcham"

---

# Evaluation

In this (shorter) assignment, we want to compare the quality of different clustering approaches.

In [2]:
import numpy as np, pandas as pd, scipy

In [3]:
# Load the input data
import json, gzip
raw = json.load(gzip.open("/data/simpsonswiki.json.gz", "rt", encoding="utf-8"))
titles, texts, classes = [x["title"] for x in raw], [x["text"] for x in raw], [x["c"] for x in raw]

This is a minimal example implementation of spherical k-means, which we will use in the following.

In [4]:
# Vectorize the text for k-means (minimalistic)
from sklearn.feature_extraction.text import TfidfVectorizer
vect = TfidfVectorizer(stop_words="english", sublinear_tf=True, smooth_idf=False, min_df=5)
vect.fit(texts)
vect.idf_ -= 1
tfidf, idf = vect.transform(texts), vect.idf_
vocabulary = vect.get_feature_names_out()

In [5]:
# Minimalistic implementation for spherical k-means, so we use the same version in this assignment
# This is NOT meant as an example of good code, but to be snort.
def initial_centers(tfidf, k, seed):
    return tfidf[np.random.default_rng(seed=seed).choice(tfidf.shape[0], k, replace=False)]

def sphericalkmeans(tfidf, centers, max_iter=100):
    """REQUIRES the input to be L2 normalized, and does not handle corner cases such as empty clusters!"""
    last_assignment = None
    for iter in range(max_iter):
        assignment = np.asarray((tfidf @ centers.T).argmax(axis=1)).squeeze()
        if last_assignment is not None and all(assignment == last_assignment): break
        last_assignment, centers = assignment, np.zeros(centers.shape)
        for i in range(centers.shape[0]):
            c = tfidf[assignment == i,:].sum(axis=0).A1
            centers[i] = c / np.sqrt((c**2).sum())
    return centers, assignment, iter

## Implement a function to compute a cross-tabulation matrix

Compute the cross-tabulation matrix compares every class to every cluster. Append an additional row and column for the cluster sizes / class totals and the dataset size. Make sure to accept clusters that are, e.g., labeled using text labels and *not* just as integers 0..k.

Write your own code, do not use `pandas.crosstab`.

You do not need to vectorize this, but try to use numpy operations where easily possible - in particular if you end up waiting a lot for results below!

In [8]:
from collections import Counter

In [11]:
def cross_tabulation(clu, cla):
    """Compute the cross-tabulation matrix to compare assignments `clu` and `cla`."""
    # YOUR CODE HERE
    class_freq = Counter(list(zip(cla, clu))).most_common()
    cross_tab_freq = [(x[0][0], x[0][1], x[1]) for x in class_freq]
    sorted_freq = sorted(cross_tab_freq, key=lambda tup: tup[1])

    cross_freq = []
    for name_class in np.unique(cla):
        columns_freq = np.zeros(len(np.unique(clu))).astype(int)
        for x in sorted_freq:
            if x[0] == name_class:
                columns_freq[x[1]] = x[2]

        ## Appending sum for each column
        columns_freq = np.append(columns_freq, columns_freq.sum())
        cross_freq.append(columns_freq)

    cross_freq = np.transpose(np.array(cross_freq))

    ## Appending sum for each row:
    sums_row = [cross_freq[i,:].sum() for i in range(0, cross_freq.shape[0])]
    cross_freq = np.column_stack((cross_freq, sums_row))
    return cross_freq

In [12]:
### Automatic tests
_, _tmp, _ = sphericalkmeans(tfidf, tfidf[:5], 1)
_tmp2 = cross_tabulation(_tmp, np.ones(tfidf.shape[0]))
assert isinstance(_tmp2, np.ndarray), "Must be an array"
assert np.issubdtype(_tmp2.dtype, np.integer), "Must be an integer array"
if _tmp2.shape == (2,6): print("Use first parameter as first index.")
assert _tmp2.shape == (6,2), "Wrong shape"
assert _tmp2[:-1,:-1].sum() == tfidf.shape[0], "Not all elements"
assert (_tmp2[:,:-1].sum(axis=1) == _tmp2[:,-1]).all(), "Sizes are bad"
assert (_tmp2[:-1].sum(axis=0) == _tmp2[-1]).all(), "Sizes are bad"
assert _tmp2.sum() == 4 * tfidf.shape[0], "Not all elements"
from unittest.mock import patch
with patch('pandas.crosstab', side_effect=Exception("Do not use pandas.crosstab")) as mock_p:
    _tmp2 = cross_tabulation(_tmp, _tmp)
assert _tmp2.shape == (6,6), "Wrong shape"
assert _tmp2.sum() == 4 * tfidf.shape[0], "Not all elements"
assert (_tmp2[:,:-1].sum(axis=1) == _tmp2[:,-1]).all(), "Sizes are bad"
assert (_tmp2[:-1].sum(axis=0) == _tmp2[-1]).all(), "Sizes are bad"
del _tmp, _tmp2

In [13]:
### Automatic tests
_, _tmp, _ = sphericalkmeans(tfidf, tfidf[:5], 1)
try:
    _tmp2 = cross_tabulation(_tmp, classes)
except Exception as e:
    raise Exception("Your code probably does not accept textual class labels.")
assert _tmp2.shape == (6,len(np.unique(classes))+1), "Wrong shape"
assert _tmp2.sum() == 4 * tfidf.shape[0], "Not all elements"
assert (_tmp2[:,:-1].sum(axis=1) == _tmp2[:,-1]).all(), "Sizes are bad"
assert (_tmp2[:-1].sum(axis=0) == _tmp2[-1]).all(), "Sizes are bad"
del _tmp, _tmp2

## Implement a function to compute the pair counts from the cross-tabulation matrix

In [None]:
def pair_count(crosstab):
    """Compute the pair count matrix from the cross-tabulation matrix."""
    from scipy.special import comb
    pair_matrix = np.zeros((crosstab.shape[0], crosstab.shape[1])).astype(int)

    # The pair matrix is calculated using the Binomial coefficient on the cross tabulation matrix;
    for i in range(0, pair_matrix.shape[0]):
        for j in range(0, pair_matrix.shape[1]):
            pair_matrix[i][j] = comb(crosstab[i][j], 2)

    return pair_matrix

In [None]:
### Automatic tests
_, _tmp, _ = sphericalkmeans(tfidf, tfidf[:5], 1)
_tmp2 = pair_count(cross_tabulation(_tmp, np.ones(tfidf.shape[0])))
assert isinstance(_tmp2, np.ndarray), "Must be an array"
assert np.issubdtype(_tmp2.dtype, np.integer), "Must be an integer array"
if _tmp2.shape == (2,6): print("Use first parameter as first index.")
assert _tmp2.shape == (6,2), "Wrong shape"
assert (_tmp2[:,:-1].sum(axis=1) == _tmp2[:,-1]).all(), "Wrong result"
assert not (_tmp2[:-1].sum(axis=0) == _tmp2[-1]).all(), "Wrong result"
_tmp2 = cross_tabulation(_tmp, _tmp)
assert _tmp2.shape == (6,6), "Wrong shape"
assert (_tmp2[:,:-1].sum(axis=1) == _tmp2[:,-1]).all(), "Wrong result"
assert (_tmp2[:-1].sum(axis=0) == _tmp2[-1]).all(), "Wrong result"
del _tmp, _tmp2

## Compute the Rand Index

First compute the Rand Index of two assignments. You must use above functions.

In [None]:
def rand_index(clu, cla):
    pair_matrix = pair_count(cross_tabulation(clu, cla))

    # We have to calculate the values:
    # Value a:
    a = np.sum(pair_matrix[:-1,:-1])

    # Value b:
    b = np.sum(pair_matrix[:-1, -1]) - a

    # Value c:
    c = np.sum(pair_matrix[-1, :-1]) - a

    # Value d:
    d = pair_matrix[-1,-1] - a - b - c

    return (a + d) / pair_matrix[-1,-1]

In [None]:
### Automatic tests
_, _tmp, _ = sphericalkmeans(tfidf, tfidf[:5], 1)
from unittest.mock import patch
with patch('__main__.cross_tabulation') as mock_u1, patch('__main__.pair_count') as mock_u2, patch('sklearn.metrics.rand_score') as mock_sk:
    rand_index(_tmp, classes)
assert mock_u1.called, "Use the cross_tabulation function!"
assert mock_u2.called, "Use the pair_count function!"
assert not mock_sk.called, "Use your own code, not sklearn."
ri = rand_index(_tmp, classes)
assert ri <= 1, "RI must be at most 1."
assert ri > 0, "RI must be always larger than 0."

In [None]:
### Automatic tests
import sklearn.metrics
assert abs(ri - sklearn.metrics.rand_score(_tmp, classes)) < 1e-7, "Result should agree with sklearn"
del _tmp

## Compute the Adjusted Rand Index

Write a function to compute the adjusted Rand index of two assignments. You must use above `pair_count` and `cross_tabulation` functions.

Beware of integer overflows when using the equation from the slides. To resolve the integer overflow, transform the equation such that it has the standard form $ARI = \frac{RI-E[RI]}{M-E[RI]}$ where RI is the rand index, $E[RI]$ is the expected value of the rand index (you need to derive this from the ARI equation given on the slides, do *not* attempt to figure out this equation directly; this assignment only needs standad high school math), and \(M\) is the maximum possible value of the Rand index (a constant).

In [None]:
def adjusted_rand_index(clu, cla):
    # YOUR CODE HERE
    pair_matrix = pair_count(cross_tabulation(clu, cla))

    # We have to calculate the values:
    # Value a:
    a = np.sum(pair_matrix[:-1,:-1])

    # Value b:
    b = np.sum(pair_matrix[:-1, -1]) - a

    # Value c:
    c = np.sum(pair_matrix[-1, :-1]) - a

    # Value d:
    d = pair_matrix[-1,-1] - a - b - c


    ri = pair_matrix[-1][-1]*(a + d)
    E_ri = ((a+b)*(a+c) + (c+d)*(b+d))
    m = np.power(pair_matrix[-1][-1], 2)

    return (ri - E_ri)/(m - E_ri)

In [None]:
### Automatic tests
_, _tmp, _ = sphericalkmeans(tfidf, tfidf[:5], 1)
from unittest.mock import patch
with patch('__main__.cross_tabulation') as mock_u1, patch('__main__.pair_count') as mock_u2, patch('sklearn.metrics.adjusted_rand_score') as mock_sk, patch('sklearn.metrics.rand_score') as mock_sk2:
    adjusted_rand_index(_tmp, classes)
assert mock_u1.called, "Use the cross_tabulation function!"
assert mock_u2.called, "Use the pair_count function!"
assert not mock_sk.called, "Use your own code, not sklearn."
assert not mock_sk2.called, "Use your own code, not sklearn."
ari = adjusted_rand_index(_tmp, classes)
assert ari <= 1, "ARI must be at most 1"
assert ari > 0, "This clustering must score better than random."
del _tmp

In [None]:
### Automatic tests
_, _tmp, _ = sphericalkmeans(tfidf, tfidf[:5], 1)
ari = adjusted_rand_index(_tmp, classes)
import sklearn.metrics
assert abs(ari - sklearn.metrics.adjusted_rand_score(_tmp, classes)) < 1e-7, "Result should agree with sklearn"
del _tmp

## Compute the Normalized Mutual Information

Write a function to compute the Normalized Mutual Information (with arithmetic averaging) of two assignments.
You must use above `pair_count` and `cross_tabulation` functions.

In [None]:
def normalized_mutual_information(clu, cla):
    # YOUR CODE HERE
    # We use the pair matrix:
    pair_matrix =  cross_tabulation(clu, cla)

    import math


    nmi_matrix_ck = np.zeros((pair_matrix.shape[0]-1, pair_matrix.shape[1]-1))

    n = pair_matrix[-1, -1]
    ci = pair_matrix[:-1,-1]
    kj = pair_matrix[-1,:-1]

    # i_cc = -sum(list(map(lambda t: t/pair_matrix[-1,-1] * np.log(t/pair_matrix[-1,-1]), pair_matrix[:-1,-1])))
    i_cc = -sum([el/n * math.log(el/n) for el in ci])
    i_kk = -sum([el/n * math.log(el/n) for el in kj])
    # i_kk = -sum(list(map(lambda t: t/pair_matrix[-1,-1] * np.log(t/pair_matrix[-1,-1]), pair_matrix[-1,:-1])))

    for i in range(0, pair_matrix.shape[0]-1):
        for j in range(0, pair_matrix.shape[1]-1):
            nmi_matrix_ck[i][j] = pair_matrix[i][j]/n * math.log2( n*(pair_matrix[i][j]) / ( ci[i] * kj[j] ))

    ck = np.sum(nmi_matrix_ck)

    nmi = (ck * 2) / (i_cc + i_kk)
    return nmi

In [None]:
### Automatic tests
_, _tmp, _ = sphericalkmeans(tfidf, tfidf[:5], 1)
from unittest.mock import patch
with patch('sklearn.metrics.normalized_mutual_info_score', side_effect=Exception("Use your own code, not skelarn")) as mock_sk1,  patch('sklearn.metrics.mutual_info_score', side_effect=Exception("Use your own code, not skelarn")) as mock_sk2:
    normalized_mutual_information(_tmp, classes)
nmi = normalized_mutual_information(_tmp, classes)
assert nmi <= 1, "NMI must be at most 1"
assert nmi > 0, "This clustering must score better than random."
del _tmp

In [None]:
### Automatic tests
_, _tmp, _ = sphericalkmeans(tfidf, tfidf[:5], 1)
nmi = normalized_mutual_information(_tmp, classes)
import sklearn.metrics
assert abs(nmi - sklearn.metrics.normalized_mutual_info_score(_tmp, classes)) < 1e-10, "Result should agree with sklearn"
del _tmp

## Finding the best clustering

for $k=1..15$, and a fixed random seed of 0, find the best spherical k-means clustering by NMI compared to the classes stored in `classes` above (note that this will not generally be possible, as our data usually will not be labeled).

In [None]:
bestk = None # Store best k here
bestnmi = None # Store the best NMI here
bestassignment = None # Store the best assignment here
# YOUR CODE HERE
# raise NotImplementedError()
print("The best k is", bestk, "scoring", bestnmi)
# Hint: it will *not* score very good. The classes are not clusters.

In [None]:
### Automatic tests
assert bestk > 0 and bestk <= 15
assert bestk == len(np.unique(bestassignment)), "Inconsistent result"
assert abs(bestnmi-sklearn.metrics.normalized_mutual_info_score(bestassignment, classes)) < 1e-7, "Inconsistent result"
assert all(bestassignment == sphericalkmeans(tfidf, initial_centers(tfidf, bestk, 0))[1]), "Inconsistent result"
assert bestk > 2, "There should be something better"
assert bestk < 15, "There should be something better"

In [None]:
### Additional Automatic tests

## Explore the result

Explore the clustering result, by comparing it to the original classes.

For each cluster, return the cluster label, the three top classes, and the percentages of the clusters.

In [None]:
def top_classes(clu, cla):
    """For each cluster, give the top three classes and their share of the data each."""
    # For each cluster, call yield label, *top3, *shares to return a 7-tuple.
    # YOUR CODE HERE
#     raise NotImplementedError()

In [None]:
### Automatic tests
_tmp = top_classes(classes, classes)
import types
assert isinstance(_tmp, types.GeneratorType), "You did not use `yield`."
_tmp = list(_tmp)
assert len(_tmp) == len(np.unique(classes)), "Wrong number of results"
for row in _tmp:
    assert len(row) == 7, "Not a 7-tuple"
    assert row[0] in classes, "Not a label"
    assert row[0] == row[1], "Incorrect result"
    assert row[4:] == (1.,0.,0.), "Incorrect result"
del _tmp

In [None]:
### Automatic tests
_tmp = top_classes(bestassignment, classes)
import types
assert isinstance(_tmp, types.GeneratorType), "You did not use `yield`."
_tmp = list(_tmp)
assert len(_tmp) == bestk, "Wrong number of results"
for row in _tmp:
    assert len(row) == 7, "Not a 7-tuple"
    assert row[1] in classes, "Not a label"
# Additional hidden tests
del _tmp

In [None]:
# Explore your best clustering!
# YOUR CODE HERE
raise NotImplementedError()