In [None]:
# Cell 1: Imports
import warnings
warnings.filterwarnings("ignore")

import re, string, random, math
from collections import Counter, defaultdict
import numpy as np
import pandas as pd
from scipy.sparse import hstack, csr_matrix

import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.decomposition import TruncatedSVD
from sklearn.cluster import MiniBatchKMeans
from sklearn.preprocessing import normalize
from sklearn.metrics.pairwise import cosine_similarity

from kneed import KneeLocator   # for elbow detection

In [None]:
# Cell 2: Parameters
PARAMS = {
    "INPUT_FILE": "input.csv",
    "OUTPUT_FILE": "output.csv",
    "MATERIAL_TYPE": "FIN",            # change this when processing others
    "RANDOM_STATE": 42,
    "MAX_K": 250,                      # cap per parent bucket
    "ELBOW_SAMPLE": 15,                # points for elbow curve
    "SVD_COMPONENTS": 200,             # set to None to disable dimensionality reduction
    "SINGLETON_SIM_THRESHOLD": 0.45,   # cosine threshold for attaching outliers
    "SINGLETON_SIZE_THRESHOLD": 1,     # cluster sizes <= this = singleton
}

# Expand manually when you learn more about materials
PARENT_KEYWORDS = ["ASSY", "ASSEMBLY", "DB", "MOTOR", "BEARING", "CABLE", "VALVE"]

In [None]:
# Cell 3: Load Data
df = pd.read_csv(PARAMS["INPUT_FILE"])
df = df[df["material_type"] == PARAMS["MATERIAL_TYPE"]].copy().reset_index(drop=True)
df["material_description"] = df["material_description"].fillna("")

print(f"Loaded {len(df)} rows for material_type={PARAMS['MATERIAL_TYPE']}")

In [None]:
# Cell 4: Cleaning Function
def clean_text(text):
    text = str(text).upper().strip()
    # Keep only alphanumeric + space
    text = re.sub(r"[^A-Z0-9 ]", " ", text)
    # Collapse multiple spaces
    text = re.sub(r"\s+", " ", text).strip()
    return text

df["clean_desc"] = df["material_description"].apply(clean_text)
df.loc[df["clean_desc"].isin(["", ".", "-", "?", " "]), "clean_desc"] = "MISC"

In [None]:
# Cell 5: Parent Bucket Assignment
def get_parent(text):
    if text == "MISC":
        return "MISC"
    for kw in PARENT_KEYWORDS:
        if re.search(rf"\b{kw}\b", text):
            return kw
    return "OTHER"

df["parent_bucket"] = df["clean_desc"].apply(get_parent)

In [None]:
# Cell 6: Elbow Function
def find_optimal_k(X, max_k=50, sample_points=15, plot=False, bucket_name=""):
    k_values = np.linspace(2, max_k, sample_points, dtype=int)
    inertias = []

    for k in k_values:
        km = MiniBatchKMeans(n_clusters=k, random_state=PARAMS["RANDOM_STATE"], batch_size=2048)
        km.fit(X)
        inertias.append(km.inertia_)

    kn = KneeLocator(k_values, inertias, curve="convex", direction="decreasing")
    best_k = kn.knee if kn.knee else int(np.median(k_values))

    if plot:
        plt.figure()
        plt.plot(k_values, inertias, "bo-")
        if kn.knee:
            plt.axvline(kn.knee, color="r", linestyle="--", label=f"Elbow={kn.knee}")
        plt.title(f"Elbow Curve for {bucket_name}")
        plt.xlabel("Number of clusters (k)")
        plt.ylabel("Inertia")
        plt.legend()
        plt.show()

    return best_k

In [None]:
# Cell 7: Clustering + Singleton Handling
final_clusters = []

for bucket, group in df.groupby("parent_bucket"):
    print(f"Processing bucket: {bucket}, size={len(group)}")

    # trivial buckets
    if len(group) < 5 or bucket == "MISC":
        group["proposedkey"] = bucket
        final_clusters.append(group)
        continue

    # TF-IDF vectorization
    vectorizer = TfidfVectorizer(ngram_range=(2,4), analyzer="char_wb", min_df=2)
    X = vectorizer.fit_transform(group["clean_desc"])

    # optional SVD for large vocab
    if PARAMS["SVD_COMPONENTS"]:
        svd = TruncatedSVD(n_components=min(PARAMS["SVD_COMPONENTS"], X.shape[1]-1))
        X = svd.fit_transform(X)
    else:
        X = X.toarray()

    # elbow
    max_k = min(PARAMS["MAX_K"], len(group)//2)
    best_k = find_optimal_k(X, max_k=max_k, sample_points=PARAMS["ELBOW_SAMPLE"], plot=False, bucket_name=bucket)
    print(f"  -> Chosen k = {best_k}")

    # cluster
    km = MiniBatchKMeans(n_clusters=best_k, random_state=PARAMS["RANDOM_STATE"], batch_size=2048)
    cluster_labels = km.fit_predict(X)
    group["cluster_id"] = cluster_labels

    # assign names (most frequent token in cluster)
    cluster_names = {}
    for cid in np.unique(cluster_labels):
        texts = group[group["cluster_id"] == cid]["clean_desc"]
        tokens = " ".join(texts).split()
        cname = pd.Series(tokens).value_counts().index[0] if len(tokens) > 0 else bucket
        cluster_names[cid] = cname

    group["proposedkey"] = group["cluster_id"].map(cluster_names)

    # handle singletons: attach to nearest cluster if similar enough
    cluster_sizes = group["cluster_id"].value_counts()
    singletons = cluster_sizes[cluster_sizes <= PARAMS["SINGLETON_SIZE_THRESHOLD"]].index

    if len(singletons) > 0:
        print(f"  -> Handling {len(singletons)} singleton clusters")
        centroids = km.cluster_centers_
        X_norm = normalize(X)
        for cid in singletons:
            idxs = group[group["cluster_id"] == cid].index
            for idx in idxs:
                sims = cosine_similarity(X_norm[idx].reshape(1, -1), centroids)
                best_target = np.argmax(sims)
                if sims[0, best_target] >= PARAMS["SINGLETON_SIM_THRESHOLD"]:
                    group.at[idx, "cluster_id"] = best_target
                    group.at[idx, "proposedkey"] = cluster_names[best_target]

    final_clusters.append(group)

df_final = pd.concat(final_clusters).reset_index(drop=True)

In [None]:
# Cell 8: Save Final Output
df_final = df_final[["material_number", "material_type", "material_description", "proposedkey"]]
df_final = df_final.sort_values(by=["proposedkey", "material_number"]).reset_index(drop=True)
df_final.to_csv(PARAMS["OUTPUT_FILE"], index=False)

print(f"Final output saved to {PARAMS['OUTPUT_FILE']}")