In [18]:
from datasets import load_dataset

dataset = load_dataset("ag_news")
train_data = dataset['train']

import pandas as pd
df = pd.DataFrame(train_data)


In [38]:
from sklearn.model_selection import train_test_split

df = df.groupby('label').apply(lambda x: x.sample(1250)).reset_index(drop=True)
X = df['text']
y = df['label']





In [33]:
from sklearn.feature_extraction.text import TfidfVectorizer

vectorizer = TfidfVectorizer(max_features=1000)
X_tfidf = vectorizer.fit_transform(X)


In [None]:
from sentence_transformers import SentenceTransformer

model = SentenceTransformer('all-MiniLM-L12-v2')
X_sbert = model.encode(X, show_progress_bar=True)


Batches:   0%|          | 0/157 [00:00<?, ?it/s]

In [23]:
import numpy as np
#np.save("sbert_embeddings.npy", X_sbert)

In [None]:
import pandas as pd
import plotly.express as px
import plotly.io as pio

from numpy.typing import NDArray
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

from umap import UMAP

In [30]:
def project_vectors(data: NDArray, technique: str = "tsne", **options) -> NDArray:
    if technique == "pca":
        transformer = PCA(**options)
    elif technique == "tsne":
        transformer = TSNE(**options)
    elif technique == "umap":
        transformer = UMAP(**options)
    else:
        raise ValueError(
            f"Invalid technique: {technique}. Choose from 'pca', 'tsne', or 'umap'."
        )

    transformed_data = transformer.fit_transform(data)
    return transformed_data

In [166]:
def plot_embeddings(embeddings: NDArray,
                    clustering_results: NDArray,
                    symbol: str = "class",
                    color: str = "clustering_results",
                    reduction_techniques: str = "tsne",
                    classes: list[str] = y,
                    text: list[str] = X,
                    plot_3d: bool = False):
    if embeddings.shape[1] > 2:
        reduced_embeddings = project_vectors(embeddings, technique=reduction_techniques,
                                             n_components=3 if plot_3d else 2, random_state=6)
        print(f"Reduced embeddings from {embeddings.shape[1]} to {reduced_embeddings.shape[1]}")
    else:
        reduced_embeddings = embeddings

    df = pd.DataFrame({
        "x": reduced_embeddings[:, 0],
        "y": reduced_embeddings[:, 1],
        "class": classes,
        "text": list(map(str, text)),
        "clustering_results": clustering_results
    })

    if plot_3d:
        df["z"] = reduced_embeddings[:, 2]

    df = df.astype({
        "class": "category",
        "clustering_results": "category"
    })
    if plot_3d:
        fig = px.scatter_3d(df, x="x", y="y", z="z", color=color, symbol=symbol,
                            title=f"{reduction_techniques} reduction technique. Visualization of Image Embeddings")
    else:
        fig = px.scatter(df, x="x", y="y", color=color, symbol=symbol,
                         title=f"{reduction_techniques} reduction technique. Visualization of Image Embeddings")

    fig.update_traces(textfont_size=25, marker=dict(size=5))
    fig.update_layout(template="plotly")
    fig.show()


In [None]:
plot_embeddings(X_tfidf.toarray(), 0, symbol="class", reduction_techniques="pca", color="class")
plot_embeddings(X_sbert, 0, symbol="class", reduction_techniques="pca", color="class")


Reduced embeddings from 1000 to 3


Reduced embeddings from 384 to 2


In [168]:
plot_embeddings(X_tfidf.toarray(), 0, symbol="class", reduction_techniques="tsne", color="class")
plot_embeddings(X_sbert, 0, symbol="class", reduction_techniques="tsne", color="class")

Reduced embeddings from 1000 to 2


Reduced embeddings from 384 to 2


In [244]:
X_umap_tfidf = project_vectors(X_tfidf.toarray(), technique="umap", n_components=2)
X_umap_sbert = project_vectors(X_sbert, technique="umap", n_components=2)

plot_embeddings(X_umap_tfidf, 0, symbol="class", reduction_techniques="umap", color="class")
plot_embeddings(X_umap_sbert, 0, symbol="class", reduction_techniques="umap", color="class")


'force_all_finite' was renamed to 'ensure_all_finite' in 1.6 and will be removed in 1.8.


'force_all_finite' was renamed to 'ensure_all_finite' in 1.6 and will be removed in 1.8.



In [62]:
from typing import Tuple
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score

def classify_knn(X_train, X_test, y_train, y_test) -> Tuple[float, float]:
    knn_classifier = KNeighborsClassifier(n_neighbors=5, metric='cosine')
    knn_classifier.fit(X_train, y_train)
    y_pred = knn_classifier.predict(X_test)
    return f1_score(y_test, y_pred, average="weighted"), accuracy_score(y_test, y_pred)


In [None]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
X_sbert_scaled = scaler.fit_transform(X_sbert)
X_tfidf_scaled = scaler.fit_transform(X_tfidf.toarray())
X_umap_sbert_scaled = scaler.fit_transform(X_umap_sbert)
X_umap_tfidf_scaled = scaler.fit_transform(X_umap_tfidf)


In [127]:
X_full_sbert_train, X_full_sbert_test, y_full_sbert_train, y_full_sbert_test = train_test_split(X_sbert_scaled,y,test_size=0.2, random_state=42)
X_full_tfidf_train, X_full_tfidf_test, y_full_tfidf_train, y_full_tfidf_test = train_test_split(X_tfidf_scaled,y,test_size=0.2, random_state=42)
X_umap_sbert_train, X_umap_sbert_test, y_umap_sbert_train, y_umap_sbert_test = train_test_split(X_umap_sbert_scaled, y, test_size=0.2, random_state=42)
X_umap_tfidf_train, X_umap_tfidf_test, y_umap_tfidf_train, y_umap_tfidf_test = train_test_split(X_umap_tfidf_scaled, y, test_size=0.2, random_state=42)

In [None]:
import xgboost as xgb

def classify_xgboost(X_train, X_test, y_train, y_test):
    xgb_classifier = xgb.XGBRFClassifier(max_depth=5, random_state=42)
    xgb_classifier.fit(X_train, y_train)
    y_pred = xgb_classifier.predict(X_test)
    return f1_score(y_test, y_pred, average="weighted"), accuracy_score(y_test, y_pred)


In [172]:
results = []

# knn
f1, acc = classify_knn(X_full_sbert_train, X_full_sbert_test, y_full_sbert_train, y_full_sbert_test)
results.append(("KNN", "Full SBERT", f1, acc))

f1, acc = classify_knn(X_full_tfidf_train, X_full_tfidf_test, y_full_tfidf_train, y_full_tfidf_test)
results.append(("KNN", "Full TF-IDF", f1, acc))

f1, acc = classify_knn(X_umap_sbert_train, X_umap_sbert_test, y_umap_sbert_train, y_umap_sbert_test)
results.append(("KNN", "UMAP + SBERT", f1, acc))

f1, acc = classify_knn(X_umap_tfidf_train, X_umap_tfidf_test, y_umap_tfidf_train, y_umap_tfidf_test)
results.append(("KNN", "UMAP + TF-IDF", f1, acc))

# xgb
f1, acc = classify_xgboost(X_full_sbert_train, X_full_sbert_test, y_full_sbert_train, y_full_sbert_test)
results.append(("XGBoost", "Full SBERT", f1, acc))

f1, acc = classify_xgboost(X_full_tfidf_train, X_full_tfidf_test, y_full_tfidf_train, y_full_tfidf_test)
results.append(("XGBoost", "Full TF-IDF", f1, acc))

f1, acc = classify_xgboost(X_umap_sbert_train, X_umap_sbert_test, y_umap_sbert_train, y_umap_sbert_test)
results.append(("XGBoost", "UMAP + SBERT", f1, acc))

f1, acc = classify_xgboost(X_umap_tfidf_train, X_umap_tfidf_test, y_umap_tfidf_train, y_umap_tfidf_test)
results.append(("XGBoost", "UMAP + TF-IDF", f1, acc))






In [173]:
df = pd.DataFrame(results, columns=["Model", "Data", "F1 Score", "Accuracy"])

print(df.sort_values(by="F1 Score", ascending=False).to_string(index=False))

  Model          Data  F1 Score  Accuracy
    KNN    Full SBERT  0.905419     0.906
XGBoost  UMAP + SBERT  0.886184     0.887
    KNN  UMAP + SBERT  0.838998     0.841
XGBoost    Full SBERT  0.795331     0.796
XGBoost UMAP + TF-IDF  0.767923     0.769
    KNN   Full TF-IDF  0.734220     0.736
    KNN UMAP + TF-IDF  0.710603     0.713
XGBoost   Full TF-IDF  0.647179     0.635


In [None]:
from sklearn.mixture import GaussianMixture
from sklearn import cluster
from sklearn.neighbors import NearestNeighbors

from sklearn.metrics.cluster import adjusted_rand_score


In [79]:


def estimate_dbscan_eps(embeddings: NDArray, n_samples: int = 1000, k: int = 5, quantile: float = 0.1) -> float:
    """
    Estimates a suitable eps parameter for DBSCAN based on k-distance graph.
    
    Parameters:
    -----------
    embeddings : numpy.ndarray
        The embeddings to analyze, shape (n_samples, n_features)
    n_samples : int
        Number of samples to use for estimation (to speed up computation)
    k : int
        Number of neighbors to consider
    quantile : float
        Quantile to use for selecting the eps value (lower means tighter clusters)
    plot : bool
        Whether to generate and display a k-distance plot
        
    Returns:
    --------
    eps : float
        Estimated eps value for DBSCAN
    """
    if embeddings.shape[0] > n_samples:
        indices = np.random.choice(embeddings.shape[0], n_samples, replace=False)
        sample_data = embeddings[indices]
    else:
        sample_data = embeddings

    nbrs = NearestNeighbors(n_neighbors=k + 1).fit(sample_data)
    distances, _ = nbrs.kneighbors(sample_data)

    kdistances = np.sort(distances[:, k])

    eps = np.quantile(kdistances, quantile)

    print(f"Estimated eps value: {eps}")
    return eps

def cluster_embeddings(embeddings: NDArray, algorithm_name: str = 'KMeans', **kwargs):
    """
    Clusters embeddings using a specified clustering algorithm from sklearn.
    
    Parameters:
    -----------
    embeddings : numpy.ndarray
        The embeddings to cluster, shape (n_samples, n_features)
    algorithm_name : str
        Name of the clustering algorithm to use (must be available in sklearn.cluster
        or be GaussianMixture)
    **kwargs : 
        Additional parameters to pass to the clustering algorithm
        
    Returns:
    --------
    labels : numpy.ndarray
        Cluster labels for each embedding, shape (n_samples,)
    model : object
        The fitted clustering model
    """
    if not isinstance(embeddings, np.ndarray):
        raise TypeError("Embeddings must be a numpy array")

    if len(embeddings.shape) != 2:
        raise ValueError(f"Embeddings must be 2D array, got shape {embeddings.shape}")

    if algorithm_name == 'GaussianMixture':
        algorithm_class = GaussianMixture
    else:
        try:
            algorithm_class = getattr(cluster, algorithm_name)
        except AttributeError:
            raise ValueError(f"Algorithm '{algorithm_name}' not found in sklearn.cluster or is not GaussianMixture")

    model = algorithm_class(**kwargs)

    if hasattr(model, 'fit_predict'):
        labels = model.fit_predict(embeddings)
    elif hasattr(model, 'fit') and hasattr(model, 'predict'):

        model.fit(embeddings)
        labels = model.predict(embeddings)
    else:
        raise ValueError(f"Algorithm '{algorithm_name}' does not support required methods")

    return labels, model


In [359]:
kmeans_clusters_sbert, _ = cluster_embeddings(X_umap_sbert_scaled, 'KMeans', n_clusters=4)
kmeans_clusters_tfidf, _ = cluster_embeddings(X_umap_tfidf_scaled, 'KMeans', n_clusters=4)
dbscan_clusters_sbert, _ = cluster_embeddings(X_umap_sbert_scaled, 'DBSCAN',eps=estimate_dbscan_eps(X_umap_sbert_scaled)+0.1, min_samples=5)
dbscan_clusters_tfidf, _ = cluster_embeddings(X_umap_tfidf_scaled, 'DBSCAN', eps=estimate_dbscan_eps(X_umap_tfidf_scaled), min_samples=5)


Estimated eps value: 0.0368782888056369
Estimated eps value: 0.05758562119803827


In [324]:
#kmeans for sbert
plot_embeddings(X_umap_sbert_scaled, kmeans_clusters_sbert, symbol="class", color="clustering_results", reduction_techniques="pca")


In [323]:
#kmeans for tfidf
plot_embeddings(X_umap_tfidf_scaled, kmeans_clusters_tfidf, symbol="class", color="clustering_results", reduction_techniques="pca")

In [343]:
plot_embeddings(X_umap_sbert_scaled, dbscan_clusters_sbert, symbol="class", color="clustering_results", reduction_techniques="pca")


In [360]:
#dbscan for tfidf
plot_embeddings(X_umap_tfidf_scaled, dbscan_clusters_tfidf, symbol="class", color="clustering_results", reduction_techniques="pca")


In [361]:

results = []

# knn
results.append(("KMeans", "SBERT", adjusted_rand_score(y, kmeans_clusters_sbert)))
results.append(("KMeans", "TFIDF", adjusted_rand_score(y, kmeans_clusters_tfidf)))
results.append(("DBScan", "SBERT", adjusted_rand_score(y, dbscan_clusters_sbert)))
results.append(("DBScan", "TFIDF", adjusted_rand_score(y, dbscan_clusters_tfidf)))

df = pd.DataFrame(results, columns=["Method", "Data", "Score"])

print(df.to_string(index=False))

Method  Data    Score
KMeans SBERT 0.586064
KMeans TFIDF 0.422503
DBScan SBERT 0.309668
DBScan TFIDF 0.082515
