In [76]:
import numpy as np
from typing import Any, Sequence
from functools import partial


1. Zaimplementuj przynajmniej 3 "metryki" spośród wymienionych: cosinusowa, LCS,
   DICE, euklidesowa, Levenshteina.

In [100]:
def preprocess(text: str) -> str:
    # Your code here: Convert the text to lowercase. Remove all punctuation
    # marks;

    from string import punctuation

    text = text.lower().translate(str.maketrans("", "", punctuation))

    return text


def text_to_vec(docs: list[str]) -> list[list[int]]:
    # Your code here: Convert documents to numerical vectors. Preprocess
    # documents with the preprocess() function. Represent documents as vectors
    # of word frequencies, you will need to extract a vocabulary from all the
    # documents.
    freq_vecs = []

    from functools import reduce

    bag_of_words = {
        word: 0 for word in preprocess(reduce(lambda x, y: x + " " + y, docs)).split()
    }

    for text in docs:
        for word in bag_of_words:
            bag_of_words[word] = 0

        for word in preprocess(text).split():
            bag_of_words[word] += 1

        v = list(bag_of_words.values())
        freq_vecs.append(np.array(v))

    return freq_vecs


def levenshtein(seq_a: Sequence[Any], seq_b: Sequence[Any]) -> int:
    # Your code here:
    # Implement the Levenshtein distance calculation.
    # It should work on any sequences, not only on strings.

    m, n = len(seq_a), len(seq_b)
    d = [[None for _ in range(n)] for _ in range(m)]

    for i in range(m):
        d[i][0] = i

    for j in range(n):
        d[0][j] = j

    for i in range(1, m):
        for j in range(1, n):
            cost = 0 if seq_a[i] == seq_b[j] else 1
            min_cost = min(
                d[i - 1][j] + 1,  # deletion
                d[i][j - 1] + 1,  # insertion
                d[i - 1][j - 1] + cost,  # change
            )
            d[i][j] = min_cost

    return d[m - 1][n - 1]


def metric(x, y, type="euclidean"):
    match type:

        case "euclidean":
            vec_a, vec_b = text_to_vec([x, y])

            return np.linalg.norm(vec_a - vec_b)

        case "levenshtein":
            seq_a = preprocess(x).split()
            seq_b = preprocess(y).split()

            return levenshtein(seq_a, seq_b)

        case _:
            raise Exception(f"Unimplemented metric of type {type}")


2. Zaimplementuj przynajmniej 1 sposoby oceny jakości klasteryzacji (np. indeks
   Daviesa-Bouldina).

In [101]:
def DB_index(clusters, d):
    """Function which calculates Davies-Bouldin index for a given clusterization

    Args:
        clusters (dict): dict of clusters
        d (function: str, str -> float): distance between strings

    Returns:
        float: DB index
    """
    from numpy.linalg import norm

    n_clusters = len(clusters)
    centroids = {}
    avg_distances = {}

    for name, docs in clusters.items():
        vectors = text_to_vec(docs)

        geo_center = sum(vectors) / len(vectors)
        centroid_idx, min_centroid_dist = 0, norm(geo_center - vectors[0])

        for i, v in enumerate(vectors):
            dist = norm(geo_center - v)
            if dist < min_centroid_dist:
                centroid_idx, min_centroid_dist = i, dist

        centroids[name] = clusters[name][centroid_idx]

    for name, docs in clusters.items():
        avg_dist = 0
        for text in docs:
            avg_dist += d(centroids[name], text)
        avg_dist /= len(docs)

        avg_distances[name] = avg_dist

    db_index = 0
    for i, name_i in enumerate(clusters):
        max_ratio = 0
        for j, name_j in enumerate(clusters):
            if i != j:
                ratio = (avg_distances[name_i] + avg_distances[name_j]) / (
                    d(centroids[name_i], centroids[name_j])
                )
                max_ratio = max(max_ratio, ratio)
        db_index += max_ratio
    
    db_index /= n_clusters

    return db_index
    


4. Wykonaj klasteryzację zawartości załączonego pliku (lines.txt) przy użyciu
   metryk zaimplementowanych w pkt. 1. Każda linia to adres pocztowy firmy,
   różne sposoby zapisu tego samego adresu powinny się znaleźć w jednym
   klastrze.

In [102]:
with open("lines.txt", "r") as f:
    vectors = f.readlines()

preprocessed_vectors = []
for v in vectors:
    preprocessed_vectors.append(preprocess(v))


Porównaj jakość wyników sposobami zaimplementowanymi w pkt. 2.

In [106]:
lines = open("clusters.txt", "r").read().splitlines()
lines = list(filter(lambda text: text != "", lines))

n_clusters = len(list(filter(lambda text: text == "##########", lines)))
d = partial(metric, type="euclidean")
model_clusters = {i: [] for i in range(n_clusters)}

i = 0
for text in lines:
    if text == "##########":
        if len(model_clusters[i]) < 5:
            del model_clusters[i]
        i += 1
    else:
        model_clusters[i].append(text)

DB_index(model_clusters, d)


1.877585893366325