In [None]:
import numpy as np
import pandas as pd
import sys

RUN_TESTS = True

# Function implementations
## Non-negative Matrix Factorisation implementation
### Provided design specification
Implement the `nmf()` subroutine in the provided code base. This function takes as input a matrix `X`, the number of required components `n` (“number of features” from the lecture), a maximum number of iterations, and an error tolerance threshold. It returns two matrices `W` and `H` (with width/height `n`) such that `WH` approximates `X`.

Use the algorithm from the lecture slides as the algorithm to compute `W` and `H`. For more information about it, you can read about it here.

If at a certain point in the algorithm the reconstruction error of each consecutive iteration is less than `tol`, then you can stop early.

`Hint: if at some place of the algorithm it's possible for a division by 0 to happen, add 1e-9 to the denominator.`


In [None]:
# Non-negative matrix factorisation implementation and tests

RUN_TESTS = True

def nmf(X: pd.DataFrame, n_components: int, max_iter: int=1000, tol: float=1e-3):
  """
  Decomposes the original sparse matrix X into two matrices W and H. 
  """
  # Initialize W and H with random non-negative values
  W = np.random.rand(X.shape[0], n_components)
  H = np.random.rand(n_components, X.shape[1])

  # START ANSWER
  
  # END ANSWER

  return W, H

if RUN_TESTS:
    import unittest
    
    class TestSolution(unittest.TestCase):
        def setUp(self):
            np.random.seed(42)

        def test_2_by_2(self):
            col1 = [1, 1]
            col2 = [0, 0]
            sparse_matrix = pd.DataFrame(list(zip(col1, col2)))
            w, h = nmf(sparse_matrix, 4, 10)
            reconstructed_matrix = pd.DataFrame(data=np.dot(w, h),
                                                index=sparse_matrix.index,
                                                columns=sparse_matrix.columns)
            pd.testing.assert_frame_equal(sparse_matrix, reconstructed_matrix, check_dtype=False)

        def test_3_by_3(self):
            col1 = [1, 1, 0]
            col2 = [0, 0, 0]
            col3 = [0, 1, 0]
            sparse_matrix = pd.DataFrame(list(zip(col1, col2, col3)))
            w, h = nmf(sparse_matrix, 5, 50)
            reconstructed_matrix = pd.DataFrame(data=np.dot(w, h),
                                                index=sparse_matrix.index,
                                                columns=sparse_matrix.columns)
            pd.testing.assert_frame_equal(sparse_matrix, reconstructed_matrix, check_dtype=False, atol=0.05)

        def test_3_by_2(self):
            col1 = [0, 1, 0]
            col2 = [0, 0, 1]
            sparse_matrix = pd.DataFrame(list(zip(col1, col2)))
            w, h = nmf(sparse_matrix, 5, 50)
            reconstructed_matrix = pd.DataFrame(data=np.dot(w, h),
                                                index=sparse_matrix.index,
                                                columns=sparse_matrix.columns)
            pd.testing.assert_frame_equal(sparse_matrix, reconstructed_matrix, check_dtype=False, atol=0.05)

        def test_5_by_5(self):
            col1 = [0, 1, 0, 0, 0]
            col2 = [0, 0, 1, 1, 0]
            col3 = [0, 0, 0, 0, 0]
            col4 = [0, 1, 0, 0, 0]
            col5 = [1, 0, 0, 0, 0]
            sparse_matrix = pd.DataFrame(list(zip(col1, col2, col3, col4, col5)))
            w, h = nmf(sparse_matrix, 5, 50)
            reconstructed_matrix = pd.DataFrame(data=np.dot(w, h),
                                                index=sparse_matrix.index,
                                                columns=sparse_matrix.columns)
            pd.testing.assert_frame_equal(sparse_matrix, reconstructed_matrix, check_dtype=False, atol=0.05)

    unittest.main(argv=[''], verbosity=2, exit=False)

## MinHashing implementation
### Provided design specification
Implement the `compute_signature()` subroutine in the provided code base. This function takes as input a list of `k` `HashFunction` and a list of `n` sets of integers, representing which `ids` each user has liked.

Have a look in the library to see how `HashFunction` is defined.

It should return the minhash signature for the given input, when applying the provided hash functions. The signature should be of size `k x n`, where each column of the signature matrix represents the index of the user’s liked ids, and the rows represent the index of each hash function.

The goal is for similar sets of liked `ids` to have similar columns in the signature matrix. See the tests for an example of what’s expected.

In [80]:
ids = [{1, 2, 3, 4}, {1}, {4, 5}, {1, 2, 3}, {1}]
space = set().union(*ids)
a = sorted(space)

print(np.full((0, 0), sys.maxsize))

[]


In [79]:
# Minhashing implementation and tests
class HashFunction:
    """
    Library class HashFunction. Do not change
    This HashFunction class can be used to create an unique hash given an alpha and beta.
    """
    def __init__(self, alpha, beta):
        self.alpha = alpha
        self.beta = beta

    def hashf(self, x: float, n: int):
        """
        Returns a hash given integers x and n.
        :param x: The value to be hashed
        :param n: The number of unique ids of all sets (modulo)
        :return: The hashed value x given alpha and beta
        """
        
        hash_value = 0
        hash_value =  (self.alpha * x + self.beta) % n
        return hash_value

def compute_signature(hashes: list[HashFunction], ids: list[set[int]]):
    """
    This function will calculate the MinHash signature matrix from our sets of ids
    using the list of hash functions (hashes)
    :param hashes: The list of hash functions of arbitrary length
    :param ids: The list of sets of ids
    :return: The MinHash signature matrix for the given sets of ids
    """
    
    result = np.full((len(hashes), len(ids)), sys.maxsize)
    space = set().union(*ids)
    sorted_space = sorted(space)
    
    # START ANSWER
    if len(hashes) == 0 or len(ids) == 0:
        return np.full((len(hashes), len(ids)), sys.maxsize)
    
    max_id = max(sorted_space)
    number_distinct_ids = len(sorted_space)
    
    # Initialise an existence matrix of max_id x number of id sets
    # The matrix is 0-indexed, for index 0 matches `id` = 1
    existence_matrix = np.full((number_distinct_ids, len(ids)), -1)

    # Populate existence matrix
    for i in range(0, existence_matrix.shape[0]):
        for j in range(0, existence_matrix.shape[1]):
            # Existence matrix entry (`i`, `j`) will be 1 if set `j` contains id (= i + 1)
            # Else, it will be 0
            id = sorted_space[i]
            column_set = ids[j]
            existence_matrix[i, j] = 1 if id in column_set else 0

    # Calculate hash signature
    for i in range(0, existence_matrix.shape[0]):
        calculated_hashes = []
        # First, we pre-calculate the hashes for the current row index `i`
        for hashing_function in hashes:
            calculated_hashes.append(hashing_function.hashf(i, number_distinct_ids))

        # For every column in the existence matrix, if the entry is 1 (column j contains id i + 1)
        # Update the hash signature for (`i`, `j`) if the new hash for row i is smaller than any previous hash
        for j in range(0, existence_matrix.shape[1]):
            if existence_matrix[i, j] == 1:
                for result_i in range(0, result.shape[0]):
                    result[result_i, j] = min(result[result_i, j], calculated_hashes[result_i])
    # END ANSWER
    return result

if RUN_TESTS:
    import unittest

    class TestSolution(unittest.TestCase):

        def test_multiple_sets(self):
            h1 = HashFunction(2, 3)
            h2 = HashFunction(4, 2)
            h3 = HashFunction(1, 3)
            h4 = HashFunction(3, 1)

            test_hashes = [h1, h2, h3, h4]

            test_sets = [{1, 2, 3, 4}, {1}, {4, 5}, {1, 2, 3}, {1}]
            
            result = compute_signature(test_hashes, test_sets)
            expected = np.array([[0, 3, 1, 0, 3],
                                [0, 2, 3, 0, 2],
                                [0, 3, 1, 0, 3],
                                [0, 1, 0, 1, 1]])
            np.testing.assert_array_equal(result, expected)

        def test_identical_sets(self):
            h1 = HashFunction(2, 3)
            h2 = HashFunction(4, 2)
            h3 = HashFunction(1, 3)
            h4 = HashFunction(3, 1)

            test_hashes = [h1, h2, h3, h4]

            test_sets = [{2, 3}, {2, 3}, {2, 3}]
            
            result = compute_signature(test_hashes, test_sets)
            expected = np.array([[1, 1, 1],
                                [0, 0, 0],
                                [0, 0, 0],
                                [0, 0, 0]])
            np.testing.assert_array_equal(result, expected)

        def test_mutually_exclusive_sets(self):
            h1 = HashFunction(2, 3)
            h2 = HashFunction(4, 2)
            h3 = HashFunction(1, 3)

            test_hashes = [h1, h2, h3]

            test_sets = [{1, 2}, {3, 4}, {5, 6}]
            
            result = compute_signature(test_hashes, test_sets)
            expected = np.array([[3, 1, 1],
                                [0, 2, 0],
                                [3, 0, 1]])
            np.testing.assert_array_equal(result, expected)
        
        def test_non_consecutive_set(self):
            h1 = HashFunction(2, 3)
            h2 = HashFunction(4, 2)
            h3 = HashFunction(1, 3)
            h4 = HashFunction(3, 1)

            test_hashes = [h1, h2]

            test_sets = [{2, 3, 6}, {2, 6}, {2, 3}, {3, 6}]
            
            result = compute_signature(test_hashes, test_sets)
            expected = np.array([[0, 0, 0, 1],
                                 [0, 1, 0, 0]])
            np.testing.assert_array_equal(result, expected)

    unittest.main(argv=[''], verbosity=2, exit=False)

test_identical_sets (__main__.TestSolution.test_identical_sets) ... ok
test_multiple_sets (__main__.TestSolution.test_multiple_sets) ... ok
test_mutually_exclusive_sets (__main__.TestSolution.test_mutually_exclusive_sets) ... ok
test_non_consecutive_set (__main__.TestSolution.test_non_consecutive_set) ... ok

----------------------------------------------------------------------
Ran 4 tests in 0.007s

OK


# Start of Report

# CSE2525 Data Mining: Lab 2 - Matrix Decomposition