# Exercise 2
- BFR Clustering on FMA Dataset using PySpark
- This notebook implements the BFR clustering algorithm on the full FMA dataset

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans
import numpy as np
import json
from collections import defaultdict

# 1. Initialize Spark Session
spark = SparkSession.builder \
    .appName("BFR Clustering on FMA") \
    .master("local[*]") \
    .config("spark.driver.memory", "6g") \
    .getOrCreate()

## 2. Load CSV 
Remove First 3 Header Rows

In [10]:
raw_df = spark.read.csv("./fma_metadata/features.csv", header=True)
data_df = raw_df.subtract(raw_df.limit(3))

## 3. Convert Columns to Float

In [11]:
columns = data_df.columns
num_features = len(columns) - 1

data_df = data_df.select(
    col(columns[0]).cast(IntegerType()).alias("track_id"),
    *[col(c).cast(FloatType()).alias(c) for c in columns[1:]]
)

## 4. Compute Mean/Std for Normalization

In [12]:
def compute_stats(rows):
    sums = np.zeros(num_features)
    sumsqs = np.zeros(num_features)
    count = 0
    for row in rows:
        x = np.array(row[1:]).astype(float)
        sums += x
        sumsqs += x**2
        count += 1
    return [(sums, sumsqs, count)]

partial = data_df.rdd.mapPartitions(compute_stats)
sums, sumsqs, count = partial.reduce(lambda a, b: (a[0]+b[0], a[1]+b[1], a[2]+b[2]))
means = sums / count
stds = np.sqrt((sumsqs / count) - (means ** 2))
means_b = spark.sparkContext.broadcast(means)
stds_b = spark.sparkContext.broadcast(stds)

25/05/07 22:45:42 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: feature, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, chroma_cens, c

## 5. Normalize and Prepare Features

In [13]:
def normalize(row):
    x = np.array(row[1:]).astype(float)
    x = (x - means_b.value) / (stds_b.value + 1e-9)
    return (row[0], Vectors.dense(x))

norm_rdd = data_df.rdd.map(normalize)
norm_df = spark.createDataFrame(norm_rdd, ["track_id", "features"]).repartition(8).cache()

25/05/07 22:46:10 WARN DAGScheduler: Broadcasting large task binary with size 1072.0 KiB
                                                                                

## BFR Helper Functions

In [14]:
class ClusterSet:
    def __init__(self, k, threshold, num_features):
        self.k = k
        self.threshold = threshold
        self.num_features = num_features
        self.DS = {}  # Discard Set
        self.CS = {}  # Compression Set
        self.RS = []  # Retained Set
        self.track_to_cluster = {}
        self.cid_counter = 0

    def mahalanobis(self, x, stats):
        N, SUM, SUMSQ = stats["N"], stats["SUM"], stats["SUMSQ"]
        centroid = SUM / N
        var = (SUMSQ / N) - centroid ** 2
        var[var == 0] = 1e-9
        return np.sqrt(np.sum((x - centroid)**2 / var))

    def initialize(self, df):
        km = KMeans(k=self.k, seed=42, featuresCol="features").fit(df)
        preds = km.transform(df).select("track_id", "features", "prediction").collect()
        for row in preds:
            cid = int(row["prediction"])
            x = np.array(row["features"])
            tid = row["track_id"]
            if cid not in self.DS:
                self.DS[cid] = {"N": 0, "SUM": np.zeros_like(x), "SUMSQ": np.zeros_like(x)}
            self.DS[cid]["N"] += 1
            self.DS[cid]["SUM"] += x
            self.DS[cid]["SUMSQ"] += x ** 2
            self.track_to_cluster[tid] = cid

    def assign(self, tid, x):
        for cid, stats in self.DS.items():
            if self.mahalanobis(x, stats) < self.threshold:
                stats["N"] += 1
                stats["SUM"] += x
                stats["SUMSQ"] += x**2
                self.track_to_cluster[tid] = cid
                return True
        for cid, stats in self.CS.items():
            if self.mahalanobis(x, stats) < self.threshold:
                stats["N"] += 1
                stats["SUM"] += x
                stats["SUMSQ"] += x**2
                self.track_to_cluster[tid] = f"CS_{cid}"
                return True
        return False

    def recluster_RS(self):
        if len(self.RS) < 10:
            return
        k_rs = min(10, len(self.RS) // 2)
        rs_df = spark.createDataFrame([(i, Vectors.dense(x)) for i, x in self.RS], ["track_id", "features"])
        model = KMeans(k=k_rs, seed=42, featuresCol="features").fit(rs_df)
        preds = model.transform(rs_df).collect()

        new_RS = []
        grouped = defaultdict(list)
        for row in preds:
            grouped[row["prediction"]].append((row["track_id"], np.array(row["features"])))

        for group in grouped.values():
            if len(group) > 1:
                cid = self.cid_counter
                self.cid_counter += 1
                self.CS[cid] = {"N": 0, "SUM": np.zeros(self.num_features), "SUMSQ": np.zeros(self.num_features)}
                for tid, x in group:
                    self.CS[cid]["N"] += 1
                    self.CS[cid]["SUM"] += x
                    self.CS[cid]["SUMSQ"] += x**2
                    self.track_to_cluster[tid] = f"CS_{cid}"
            else:
                new_RS.extend(group)
        self.RS = new_RS

    def merge_CS_to_DS(self):
        for cs_id in list(self.CS.keys()):
            stats = self.CS[cs_id]
            centroid = stats["SUM"] / stats["N"]
            for ds_id, ds_stats in self.DS.items():
                dist = self.mahalanobis(centroid, ds_stats)
                if dist < self.threshold:
                    self.DS[ds_id]["N"] += stats["N"]
                    self.DS[ds_id]["SUM"] += stats["SUM"]
                    self.DS[ds_id]["SUMSQ"] += stats["SUMSQ"]
                    for tid, cid in list(self.track_to_cluster.items()):
                        if cid == f"CS_{cs_id}":
                            self.track_to_cluster[tid] = ds_id
                    del self.CS[cs_id]
                    break

## 6. BFR Loop

In [15]:
bfr = ClusterSet(k=8, threshold=2 * np.sqrt(num_features), num_features=num_features)

first_chunk = norm_df.filter(col("track_id") < 15000)
bfr.initialize(first_chunk)

step = 15000
max_id = norm_df.agg({"track_id": "max"}).collect()[0][0]

for start in range(15000, max_id + 1, step):
    chunk = norm_df.filter((col("track_id") >= start) & (col("track_id") < start + step))
    for row in chunk.collect():
        tid = row["track_id"]
        x = np.array(row["features"])
        if not bfr.assign(tid, x):
            bfr.RS.append((tid, x))

    if len(bfr.RS) >= 100:
        bfr.recluster_RS()

bfr.merge_CS_to_DS()

print(f"DS clusters: {len(bfr.DS)} | CS clusters: {len(bfr.CS)} | RS points: {len(bfr.RS)}")

25/05/07 22:46:11 WARN DAGScheduler: Broadcasting large task binary with size 1082.3 KiB
  return np.sqrt(np.sum((x - centroid)**2 / var))


DS clusters: 8 | CS clusters: 4 | RS points: 22


## 7. Save Results

In [16]:
final_clusters = {"DS": defaultdict(list), "CS": defaultdict(list), "RS": {}}
for tid, cid in bfr.track_to_cluster.items():
    if str(cid).startswith("CS_"):
        final_clusters["CS"][cid].append(tid)
    else:
        final_clusters["DS"][cid].append(tid)
for idx, (tid, _) in enumerate(bfr.RS):
    final_clusters["RS"][str(idx)] = tid

with open("./output/C2/bfr_results.json", "w") as f:
    json.dump(final_clusters, f, indent=4)