## Lab 03: KMeans, Clustering, and Distance Metrics

In this lab, you will be learning how to use the KMeans algorithm to cluster data and how to use distance metrics to measure the similarity between data points. You will also implement the KMeans algorithm with the provided methods and distance metric methods.

_You will also be able to test your methods running the cells provided (you'll see what I mean)_

In [1]:
#Run this cell! 
#This read method is needed to parse the csv file:

import csv

def read_csv(csv_file_path):
    """
        Given a path to a csv file, return a matrix (list of lists)
        in row major.
    """
    with open(csv_file_path, 'r') as f:
        return [[int(cell) if cell.isdigit() else cell for cell in row] for row in csv.reader(f)]

### Complete the methods below

#### `kmeans`

In [4]:
from collections import defaultdict
from math import inf
import random
import csv

def get_centroid(points):
    """
    Accepts a list of points, each with the same number of dimensions.
    (points can have more dimensions than 2)
    
    Returns a new point which is the center of all the points.
    """
    return NotImplementedError("Implement this function")

def get_centroids(dataset, assignments):
    """
    Accepts a dataset and a list of assignments; the indexes 
    of both lists correspond to each other.
    Compute the centroid for each of the assigned groups.
    Return `k` centroids in a list
    """
    return NotImplementedError("Implement this function")

def assign_points(data_points, centers):
    """
    This method assigns each point in data_points to the closest center
    and returns a list of assignments.
    """
    #NOTE: i did this method for u guys so you can see an example of how 
    #these variables are used. You can change it if you want.
    return [min(range(len(centers)), key=lambda i: distance(point, centers[i])) for point in data_points]


def distance(a, b):
    """
    Returns the Euclidean distance between a and b
    """
    return NotImplementedError("Implement this function")

def distance_squared(a, b):
    return NotImplementedError("Implement this function")

def cost_function(clustering):
    """
    Accepts a clustering as a dictionary of lists of points.
    Returns the cost of the clustering.
    """
    return NotImplementedError("Implement this function")

def generate_k(dataset, k):
    """
    Given `data_set`, which is an array of arrays,
    return a random set of k points from the data_set
    """
    return NotImplementedError("Implement this function")

def generate_k_pp(dataset, k):
    """
    Given `data_set`, which is an array of arrays,
    return a random set of k points from the data_set
    where points are picked with a probability proportional
    to their distance as per kmeans pp
    """
    return NotImplementedError("Implement this function")

def _do_lloyds_algo(dataset, k_points):
    assignments = assign_points(dataset, k_points)
    old_assignments = None
    while assignments != old_assignments:
        new_centers = get_centroids(dataset, assignments)
        old_assignments = assignments
        assignments = assign_points(dataset, new_centers)
    clustering = defaultdict(list)
    for assignment, point in zip(assignments, dataset):
        clustering[assignment].append(point)
    return clustering


def k_means(dataset, k):
    if k not in range(1, len(dataset)+1):
        raise ValueError("lengths must be in [1, len(dataset)]")
    
    k_points = generate_k(dataset, k)
    return _do_lloyds_algo(dataset, k_points)


def k_means_pp(dataset, k):
    if k not in range(1, len(dataset)+1):
        raise ValueError("lengths must be in [1, len(dataset)]")

    k_points = generate_k_pp(dataset, k)
    return _do_lloyds_algo(dataset, k_points)


SyntaxError: invalid character '】' (U+3011) (2843276491.py, line 31)

Run the methods provided to see if your `kmeans` works

In [3]:
#DO NOT MODIFY THIS CELL!!!!

import random
import csv
#import read

def clustered_all_points(clustering, dataset):
    points = []
    for assignment in clustering:
        points += clustering[assignment]
    for point in points:
        if point not in dataset:
            return False
    return True


def test_kmeans_when_k_is_1():
    datasetPath = "tests/test_files/dataset_1.csv"

    random.seed(1)
    dataset = read_csv(datasetPath)
    expected_clustering = dataset
    clustering = k_means(dataset=dataset, k=1)

    assert len(clustering.keys()) == 1
    assert clustered_all_points(clustering, dataset) is True

    clustered = []
    for assignment in clustering:
        clustered.append(clustering[assignment])
    assert clustered == [expected_clustering]

    print("test_kmeans_when_k_is_1 passed!")


def test_kmeans_when_k_is_2():
    datasetPath = "tests/test_files/dataset_1.csv"
    expected1 = "tests/test_files/dataset_1_k_is_2_0.csv"
    expected2 = "tests/test_files/dataset_1_k_is_2_1.csv"

    random.seed(1)
    dataset = read_csv(datasetPath)
    expected_clustering1 = read_csv(expected1)
    expected_clustering2 = read_csv(expected2)
    clustering = k_means(dataset=dataset, k=2)
    cost = cost_function(clustering)

    for _ in range(10):
        new_clustering = k_means(dataset=dataset, k=2)
        new_cost = cost_function(clustering)
        if new_cost < cost:
            clustering = new_clustering
            cost = new_cost


    assert len(clustering.keys()) == 2
    assert clustered_all_points(clustering, dataset) is True
    clustered = []
    for assignment in clustering:
        clustered.append(clustering[assignment])
    assert clustered == [expected_clustering1, expected_clustering2]

    print("test_kmeans_when_k_is_2 passed!")



def test_kmeans_when_k_is_3():
    datasetPath = "tests/test_files/dataset_1.csv"
    expected1 = "tests/test_files/dataset_1_k_is_3_0.csv"
    expected2 = "tests/test_files/dataset_1_k_is_3_1.csv"
    expected3 = "tests/test_files/dataset_1_k_is_3_2.csv"
    
    random.seed(1)
    dataset = read_csv(datasetPath)
    expected_clustering1 = read_csv(expected1)
    expected_clustering2 = read_csv(expected2)
    expected_clustering3 = read_csv(expected3)
    clustering = k_means(dataset=dataset, k=3)
    cost = cost_function(clustering)

    for _ in range(10):
        new_clustering = k_means(dataset=dataset, k=3)
        new_cost = cost_function(clustering)
        if new_cost < cost:
            clustering = new_clustering
            cost = new_cost

    assert len(clustering.keys()) == 3
    assert clustered_all_points(clustering, dataset) is True
    
    print("test_kmeans_when_k_is_3 passed!")

    clustered = []
    for assignment in clustering:
        clustered.append(clustering[assignment])
    assert clustered == [expected_clustering1, expected_clustering2, expected_clustering3]


def test_kmeans_pp_when_k_is_2():
    # read file named dataset_1.csv 
    datasetPath = "tests/test_files/dataset_1.csv"
    expected1 = "tests/test_files/dataset_1_k_is_2_0.csv"
    expected2 = "tests/test_files/dataset_1_k_is_2_1.csv"

    dataset = read_csv(datasetPath)
    expected_clustering1 = read_csv(expected1)
    expected_clustering2 = read_csv(expected2)
    clustering = k_means_pp(dataset=dataset, k=2)
    cost = cost_function(clustering)

    for _ in range(10):
        new_clustering = k_means_pp(dataset=dataset, k=2)
        new_cost = cost_function(clustering)
        if new_cost < cost:
            clustering = new_clustering
            cost = new_cost


    assert len(clustering.keys()) == 2
    assert clustered_all_points(clustering, dataset) is True
    clustered = []
    for assignment in clustering:
        clustered.append(clustering[assignment])
    assert clustered.sort() == [expected_clustering1, expected_clustering2].sort()

    print("test_kmeans_pp_when_k_is_2 passed!")

def test_kmeans_pp_when_k_is_3():
    datasetPath = "tests/test_files/dataset_1.csv"
    expected1 = "tests/test_files/dataset_1_k_is_3_0.csv"
    expected2 = "tests/test_files/dataset_1_k_is_3_1.csv"
    expected3 = "tests/test_files/dataset_1_k_is_3_2.csv"
    
    dataset = read_csv(datasetPath)
    expected_clustering1 = read_csv(expected1)
    expected_clustering2 = read_csv(expected2)
    expected_clustering3 = read_csv(expected3)
    clustering = k_means_pp(dataset=dataset, k=3)
    cost = cost_function(clustering)

    for _ in range(10):
        new_clustering = k_means_pp(dataset=dataset, k=3)
        new_cost = cost_function(clustering)
        if new_cost < cost:
            clustering = new_clustering
            cost = new_cost

    assert len(clustering.keys()) == 3
    assert clustered_all_points(clustering, dataset) is True
    
    clustered = []
    for assignment in clustering:
        clustered.append(clustering[assignment])
    assert clustered.sort() == [expected_clustering1, expected_clustering2, expected_clustering3].sort()

    print("test_kmeans_pp_when_k_is_3 passed!")

#testing methods
test_kmeans_when_k_is_1()
test_kmeans_when_k_is_2()
test_kmeans_when_k_is_3()
test_kmeans_pp_when_k_is_2()
test_kmeans_pp_when_k_is_3()

TypeError: object of type 'NotImplementedError' has no len()

#### `sim`

This is where you will write your distance metric methods. Sim is short for similarity. You will be implementing the following distance metrics:

- Euclidean Distance
- Manhattan Distance
- Jaccard Distance
- Cosine Similarity

Note: x and y are arrays, but keep in mind you may be given invalid inputs. You will need to handle invalid inputs.

In [25]:
def euclidean_dist(x, y):
    return NotImplementedError("Implement this method")

def manhattan_dist(x, y):
    return NotImplementedError("Implement this method")

def jaccard_dist(x, y):
    return NotImplementedError("Implement this method")

def cosine_sim(x, y):
    return NotImplementedError("Implement this method")

#### testing `sim`

Run the methods provided to see if your `sim` works

In [None]:
def _generate_rand_point(dimension):
    return [random.randrange(1, 1000, 1) for i in range(dimension)]


def test_euclidean():
    # sanity checks
    try:
        euclidean_dist([], [])
    except ValueError as e:
        assert str(e) == "lengths must not be zero"
    try:
        euclidean_dist([0], [0,0])
    except ValueError as e:
        assert str(e) == "lengths must be equal"
    
    assert euclidean_dist([0,0], [1,0]) == 1
    assert euclidean_dist([0,0,0], [1,0,0]) == 1
    assert euclidean_dist([0,0,0], [0,0,0]) == 0
    assert euclidean_dist([0,0,0], [1,0,0]) == euclidean_dist([1,0,0], [0,0,0])
    dimension = random.randint(1, 100)
    x = _generate_rand_point(dimension)
    # distance from a pt to itself is 0
    assert euclidean_dist(x, x) == 0
    # distance is always positive
    y = _generate_rand_point(dimension)
    assert euclidean_dist(x, y) >= 0
    # distance is symmetric
    assert euclidean_dist(x, y) == euclidean_dist(y, x)
    # triangle inequality
    z = _generate_rand_point(dimension)
    assert euclidean_dist(x, y) <= euclidean_dist(x, z) + euclidean_dist(z, y)

    print("euclidean_dist is right!")

def test_manhattan():
    # sanity checks
    try:
        manhattan_dist([], [])
    except ValueError as e:
        assert str(e) == "lengths must not be zero"
    try:
        manhattan_dist([0], [0,0])
    except ValueError as e:
        assert str(e) == "lengths must be equal"
    
    assert manhattan_dist([0,0], [1,1]) == 2
    assert manhattan_dist([0,0,0], [1,1,1]) == 3
    dimension = random.randint(1, 100)
    x = _generate_rand_point(dimension)
    # distance from a pt to itself is 0
    assert manhattan_dist(x, x) == 0
    # distance is always positive
    y = _generate_rand_point(dimension)
    assert manhattan_dist(x, y) >= 0
    # distance is symmetric
    assert manhattan_dist(x, y) == manhattan_dist(y, x)
    # triangle inequality
    z = _generate_rand_point(dimension)
    assert manhattan_dist(x, y) <= manhattan_dist(x, z) + manhattan_dist(z, y)

    print("manhattan_dist is right!")

def test_cosine():
    try:
        cosine_sim([], [])
    except ValueError as e:
        assert str(e) == "lengths must not be zero"
    try:
        cosine_sim([0], [0,0])
    except ValueError as e:
        assert str(e) == "lengths must be equal"
    
    assert cosine_sim([1,0], [1,0]) == 1
    assert cosine_sim([0,1,0], [1,0,0]) == 0

    print("cosine_sim is right!")


def test_jaccard():
    # sanity checks
    try:
        jaccard_dist([], [])
    except ValueError as e:
        assert str(e) == "lengths must not be zero"
    
    assert jaccard_dist([0,0], [1,0]) == .5
    assert jaccard_dist([0,0,0], [1,1,1]) == 1
    dimension = random.randint(1, 100)
    x = _generate_rand_point(dimension)
    # distance from a pt to itself is 0
    assert jaccard_dist(x, x) == 0
    # distance is always positive
    y = _generate_rand_point(dimension)
    assert jaccard_dist(x, y) >= 0
    # distance is symmetric
    assert jaccard_dist(x, y) == jaccard_dist(y, x)
    # triangle inequality
    z = _generate_rand_point(dimension)
    assert jaccard_dist(x, y) <= jaccard_dist(x, z) + jaccard_dist(z, y)

    print("jaccard_dist is right!")

#testing methods
test_euclidean()
test_manhattan()
test_cosine()
test_jaccard()