In [1]:
import os
import warnings

import hdbscan
import numpy as np
import pandas as pd
import plotly.express as px
import seaborn as sns
import umap
from ax.plot.pareto_frontier import plot_pareto_frontier
from ax.plot.pareto_utils import compute_posterior_pareto_frontier
from ax.service.ax_client import AxClient
from ax.service.managed_loop import optimize
from ax.service.utils.instantiation import ObjectiveProperties
from sklearn.metrics import (
    calinski_harabasz_score,
    davies_bouldin_score,
    silhouette_score,
)

from ssl_wafermap.utilities.plotting import create_subplots, mpn65_palette

warnings.filterwarnings("ignore")

In [2]:
df = pd.read_pickle("../data/interim/model_preds/SwaV_preds_full.pkl.xz")
cols = df.columns.difference(["failureType", "failureCode", "waferMap"])
data = df[cols].values

In [3]:
reducer = umap.UMAP(
    random_state=0,
    n_neighbors=30,
    min_dist=0,
    n_components=50,
    densmap=True,
    dens_lambda=0.1,
)
reduced_data = reducer.fit_transform(data)

In [4]:
def hdbscan_evaluation_function(parameterization):
    # Perform HDBSCAN clustering on the data with given parameters
    clusterer = hdbscan.HDBSCAN(
        min_samples=parameterization.get("min_samples"),
        min_cluster_size=parameterization.get("min_cluster_size"),
        cluster_selection_epsilon=parameterization.get("cluster_selection_epsilon"),
        metric=parameterization.get("metric"),
    )
    # clusterer.fit(data)
    clusterer.fit(reduced_data)

    # Calculate the number of clusters and points labeled as noise to constrain the optimization
    labels = clusterer.labels_
    n_clusters = labels.max() + 1
    n_noise = (labels == -1).sum()

    # Compute the silhouette score, Calinski-Harabasz score, and Davies-Bouldin score
    # These should be on the subset of the data NOT labeled as noise (i.e. labels != -1)
    subset_data, subset_labels = reduced_data[labels != -1], labels[labels != -1]
    silhouette = silhouette_score(subset_data, subset_labels)
    calinski_harabasz = calinski_harabasz_score(subset_data, subset_labels)
    davies_bouldin = davies_bouldin_score(subset_data, subset_labels)

    # Return the evaluation metrics and outcome constraints
    # These are tuples of the metrics with the SEM
    return {
        "n_noise": (n_noise, 0),
        "n_clusters": (n_clusters, 0),
        "silhouette": (silhouette, 0),
        "calinski_harabasz": (calinski_harabasz, 0),
        "davies_bouldin": (davies_bouldin, 0),
    }


# Create a search space for the hyperparameters
parameters = [
    {"name": "min_samples", "type": "range", "bounds": [1, 60], "value_type": "int"},
    {
        "name": "min_cluster_size",
        "type": "range",
        "bounds": [10, 100],
        "value_type": "int",
    },
    {
        "name": "cluster_selection_epsilon",
        "type": "range",
        "bounds": [0.1, 1.5],
        "value_type": "float",
    },
    {
        "name": "metric",
        "type": "choice",
        "values": ["euclidean", "manhattan", "canberra", "braycurtis"],
        "value_type": "str",
        "is_ordered": False,
    },
]


# Initialize the optimization client
ax_client = AxClient(random_seed=0)
ax_client.create_experiment(
    parameters=parameters,
    # Optimize cluster evaluation metrics while minimizing the number of noise points
    objectives={
        "silhouette": ObjectiveProperties(minimize=False),
        "calinski_harabasz": ObjectiveProperties(minimize=False),
        "davies_bouldin": ObjectiveProperties(minimize=True),
        "n_noise": ObjectiveProperties(minimize=True),
    },
    # We will constrain the number of possible clusters
    outcome_constraints=["n_clusters <= 50", "n_clusters >= 25"],
    overwrite_existing_experiment=True,
)

# Run 30 trials
for i in range(30):
    parameterization, trial_index = ax_client.get_next_trial()
    ax_client.complete_trial(
        trial_index=trial_index,
        raw_data=hdbscan_evaluation_function(parameterization),
    )

[INFO 05-22 10:58:33] ax.service.ax_client: Starting optimization with verbose logging. To disable logging, set the `verbose_logging` argument to `False`. Note that float values in the logs are rounded to 6 decimal points.
[INFO 05-22 10:58:33] ax.service.utils.instantiation: Due to non-specification, we will use the heuristic for selecting objective thresholds.
[INFO 05-22 10:58:33] ax.service.utils.instantiation: Created search space: SearchSpace(parameters=[RangeParameter(name='min_samples', parameter_type=INT, range=[1, 60]), RangeParameter(name='min_cluster_size', parameter_type=INT, range=[10, 100]), RangeParameter(name='cluster_selection_epsilon', parameter_type=FLOAT, range=[0.1, 1.5]), ChoiceParameter(name='metric', parameter_type=STRING, values=['euclidean', 'manhattan', 'canberra', 'braycurtis'], is_ordered=False, sort_values=False)], parameter_constraints=[]).
[INFO 05-22 10:58:33] ax.modelbridge.dispatch_utils: Using Bayesian optimization with a categorical kernel for impr

In [5]:
# Retrieve the best parameters
best_param_dict = ax_client.get_pareto_optimal_parameters()

# Display the best parameters and their evaluation metrics
summary_table = []
for exp_idx, summary in best_param_dict.items():
    params, (eval_metrics, sems) = summary
    row = {"Experiment": exp_idx, **params, **eval_metrics}
    summary_table.append(row)

summary_table = pd.DataFrame(summary_table)
int_cols = ["n_clusters", "n_noise"]
summary_table[int_cols] = summary_table[int_cols].astype(int)

param_cols = ["min_samples", "min_cluster_size", "cluster_selection_epsilon", "metric"]
obj_cols = [
    "n_clusters",
    "n_noise",
    "silhouette",
    "calinski_harabasz",
    "davies_bouldin",
]

param_df = summary_table[param_cols]
obj_df = summary_table[obj_cols]

# Concatenate the grouped columns
df_summary = pd.concat(
    [param_df, obj_df], axis=1, keys=["Hyperparameters", "Objectives"]
)
df_summary.index = summary_table["Experiment"]
df_summary

[INFO 05-22 11:31:01] ax.service.utils.best_point: Using inferred objective thresholds: [ObjectiveThreshold(calinski_harabasz >= 106532.31080560196), ObjectiveThreshold(davies_bouldin <= 0.5052049976231597), ObjectiveThreshold(n_noise <= 6668.193151355564), ObjectiveThreshold(silhouette >= 0.6530150630123105)], as objective thresholds were not specified as part of the optimization configuration on the experiment.


Unnamed: 0_level_0,Hyperparameters,Hyperparameters,Hyperparameters,Hyperparameters,Objectives,Objectives,Objectives,Objectives,Objectives
Unnamed: 0_level_1,min_samples,min_cluster_size,cluster_selection_epsilon,metric,n_clusters,n_noise,silhouette,calinski_harabasz,davies_bouldin
Experiment,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2
16,47,100,0.152732,euclidean,25,2739,0.662906,107090.742123,0.498947
18,46,100,0.394319,manhattan,26,5784,0.757863,112163.938301,0.455913
19,47,100,0.376523,manhattan,26,6300,0.75749,112675.055295,0.450873
22,51,100,0.221726,manhattan,26,6082,0.752193,109883.162297,0.445613
23,49,100,0.17198,manhattan,28,6310,0.761815,107770.33626,0.455709
25,53,100,0.299149,manhattan,25,5817,0.741067,111933.460386,0.438201
26,51,100,0.383701,manhattan,25,6082,0.746734,112528.755698,0.436363
28,46,100,0.431833,manhattan,25,5784,0.755467,110957.850974,0.454833
29,41,100,0.398654,manhattan,26,5370,0.758244,109881.990856,0.488614


In [6]:
# Create a 2D UMAP embedding of the data
reducer_2d = umap.UMAP(random_state=0)
embedding_2d = reducer_2d.fit_transform(data)

In [None]:
best_idx = 16
best_params = best_param_dict[best_idx][0]

# Perform HDBSCAN clustering on the data with the best parameters
clusterer = hdbscan.HDBSCAN(**best_params)
clusterer.fit(reduced_data)

In [9]:
emb_df = pd.DataFrame(embedding_2d, columns=["x", "y"])
emb_df["failureType"] = df["failureType"].values
emb_df["waferMap"] = df["waferMap"].values
emb_df["cluster"] = pd.Series(clusterer.labels_).astype("category").values
emb_df.sort_values("cluster", inplace=True)

In [10]:
fig = px.scatter(
    emb_df,
    x="x",
    y="y",
    color="cluster",
    # color_discrete_sequence=px.colors.qualitative.Light24,
    color_discrete_sequence=mpn65_palette(),
    width=800,
    height=600,
    template="simple_white",
    color_discrete_map={-1: "lightgray"},
)
# fig.update_traces(
#     marker=dict(
#         line=dict(
#             width=emb_df["cluster"].map(lambda x: 0.5 if x < 0 else 0).values,
#             color="white",
#         )
#     )
# )
fig.show()

In [11]:
import base64
from io import BytesIO

import matplotlib.pyplot as plt
from bokeh.models import CategoricalColorMapper, ColumnDataSource, HoverTool
from bokeh.palettes import Spectral10, Category20_20
from bokeh.plotting import figure, output_notebook, show
from PIL import Image
from sklearn.datasets import load_digits

output_notebook()

In [15]:
def embeddable_image(data):
    image = Image.fromarray(data, mode="L").resize((100, 100), resample=Image.Resampling.NEAREST)
    buffer = BytesIO()
    image.save(buffer, format="png")
    for_encoding = buffer.getvalue()
    return "data:image/png;base64," + base64.b64encode(for_encoding).decode()


emb_df["image"] = list(map(embeddable_image, emb_df.waferMap))
# list(map(embeddable_image, emb_df.waferMap))

In [None]:
plot_figure = figure(
    title="Interactive UMAP Plot of SwAV Features with HDBSCAN Cluster Labels",
    width=1000,
    height=600,
)
plot_figure.add_tools(
    HoverTool(
        tooltips="""
<div>
    <div>
        <img src='@image' style='float: left; margin: 5px 5px 5px 5px'/>
    </div>
    <div>
        <span style='font-size: 16px; color: #224499'>Cluster:</span>
        <span style='font-size: 18px'>@cluster</span>
    </div>
</div>
"""
    )
)

# We have to plot each cluster group separately, because the hover tool doesn't work with a legend otherwise
colors = ["#d4d4d4"] + mpn65_palette()
for (digit, group), name, color in zip(
    emb_df.groupby("cluster"), emb_df.cluster.unique(), colors
):
    plot_figure.circle(
        "x",
        "y",
        source=group,
        color=color,
        line_alpha=0.8,
        fill_alpha=0.7,
        # radius=.1,
        size=5,
        radius_dimension="x",
        legend_label=f"{name}",
    )

# Move the legend to the right, outside the plot
plot_figure.add_layout(plot_figure.legend[0], "right")

# Title the legend and make it interactive
plot_figure.legend.title = "HDBSCAN Cluster"
# plot_figure.legend.location = "top_left"
plot_figure.legend.click_policy = "hide"

show(plot_figure)