# LIBRARIES AND DATA LOADS

In [None]:
%pip install contextily
%pip install plotly
%pip install geopandas
%pip install fastkml fiona shapely lxml
%pip install hdbscan
%pip install --upgrade hdbscan scikit-learn
# !pip install hdbscan

In [2]:
import os
import warnings
import tempfile
from zipfile import ZipFile
from io import BytesIO

import numpy as np
import pandas as pd
import geopandas as gpd
import fiona
import requests
import plotly.express as px
import plotly.graph_objects as go

from sklearn.cluster import KMeans, DBSCAN
from sklearn.model_selection import ParameterGrid
from sklearn.metrics import (
    silhouette_score,
    silhouette_samples,
    adjusted_rand_score,
    davies_bouldin_score,
    calinski_harabasz_score
)
from sklearn.preprocessing import Normalizer

import hdbscan

# Suppress future warnings
warnings.filterwarnings("ignore", category=FutureWarning)


# FUNCTIONS TO PLOT RESULTS

The function to plot points from dataframe and it allows to select the style of the map of plotting:


| **Style Name**                  | **Description** |
|---------------------------------|---------------------------------------------------------------|
| `"basic"`                        | Simple, clean map with standard colors and labels. |
| `"carto-darkmatter"`              | Dark-themed map with high contrast, designed for nighttime use. |
| `"carto-darkmatter-nolabels"`     | Dark map without labels, useful for minimalistic designs. |
| `"carto-positron"`                | Light-themed, minimalistic map with soft colors. |
| `"carto-positron-nolabels"`       | Light-colored map without labels, good for data overlays. |
| `"carto-voyager"`                 | General-purpose map with a balance of detail and simplicity. |
| `"carto-voyager-nolabels"`        | Same as Voyager but without labels. |
| `"dark"`                          | Dark mode map similar to `"carto-darkmatter"`, with more vibrant colors. |
| `"light"`                         | Light-themed map similar to `"carto-positron"`, with more contrast. |
| `"open-street-map"`               | Standard OpenStreetMap tiles, free to use. |
| `"outdoors"`                      | Outdoor-style map, includes terrain details, parks, and hiking trails. |
| `"satellite"`                     | Pure satellite imagery without any additional overlays. |
| `"satellite-streets"`             | Satellite imagery with streets, roads, and place names overlaid. |
| `"streets"`                       | Standard street map with clear roads and points of interest. |
| `"white-bg"`                      | Blank white canvas (no map). Useful for custom layers and no external HTTP requests. |


In [3]:
def plot_data(df, faults_df, style="carto-positron"):
    # Scale 'z' values to max 15
    max_size = 15
    z_scaled = df['z'] / df['z'].max() * max_size
    z_scaled = z_scaled.clip(1, max_size)  # Optional: prevent too-small points

    # Create earthquake scatter map
    fig = go.Figure(go.Scattermap(
        lon=df['x'], lat=df['y'],
        marker=dict(size=z_scaled, color="red", opacity=0.6),
        text=df['z'],
        name="Earthquakes"
    ))

    # Add faults
    fig.add_trace(go.Scattermap(
        lat=faults_df["Latitude"],
        lon=faults_df["Longitude"],
        mode='markers',
        marker=dict(size=2, color='#3776ab'),
        name="Faults"
    ))

    # Map layout
    fig.update_layout(
        map=dict(
            style=style,
            center=dict(lon=df['x'].median(), lat=df['y'].median()),
            zoom=5.5,
        ),
        width=950,
        height=800,
        showlegend=True
    )

    fig.show()


## GET THE PARAMETERS SET FROM RESULTS_DF

In [4]:
def get_param_key(mode, result_df, line_id):
    """
    Generates a tuple of parameters from a row ID in result_df.

    Args:
        result_df (pd.DataFrame): DataFrame containing clustering results.
        line_id (int): Index of the row to extract parameters from.

    Returns:
        tuple: Formatted parameter key as a tuple of (key, value) pairs.
    """
    if line_id not in result_df.index:
        raise ValueError(f"Row ID {line_id} not found in DataFrame.")

    if mode == 'kmeans':
        row = result_df.loc[line_id, ['init', 'max_iter', 'n_clusters', 'n_init']].to_dict()
        param_key = tuple(row.items())

    if mode == 'dbscan':
        row = result_df.loc[line_id, ['eps',  'metric', 'min_samples']].to_dict()
        param_key = tuple(row.items())

    if mode == 'hdbscan':
        row = result_df.loc[line_id, ['metric', 'min_cluster_size']].to_dict()
        param_key = tuple((k, None if pd.isna(v) else v) for k, v in row.items())

    return param_key

## PLOT CLUSTERS

In [5]:
def plot_data_clusters(results_df, clusters_dict,  faults_df, id, mode, style="carto-positron"):
    """
    Plots clustered data on a map using Plotly (go.Scattermap) with a proper legend.

    Args:
        clusters_dict (dict): Dictionary containing clustering results.
        param_key (tuple): The specific parameter set as a tuple (same format as dictionary keys).
        style (str): Map style (default: "carto-positron").

    Returns:
        None
    """

    param_key = get_param_key(mode, results_df, id)
    if 'data' not in clusters_dict:
        raise KeyError("The dictionary must contain an entry with key 'data' containing the original dataset.")

    if param_key not in clusters_dict:
        raise KeyError(f"Parameters {param_key} not found in clusters_dict.")

    # Retrieve original data
    data = clusters_dict['data'].copy()

    # Retrieve cluster labels
    data['cluster'] = clusters_dict[param_key]

    # Create a scatter map figure
    fig = go.Figure()

    fig.add_trace(go.Scattermap(
            lat=faults_df["Latitude"],
            lon=faults_df["Longitude"],
            mode='markers',
            marker=go.scattermap.Marker(
                size=2, color='#3776ab'
            ),
            name="Faults"
        ))

    # Get unique clusters
    unique_clusters = sorted(data['cluster'].unique())

    # Define a color map for clusters
    color_map = px.colors.qualitative.Set1  # Change color scheme if needed

    # Plot each cluster separately to show legend
    for i, cluster in enumerate(unique_clusters):
        cluster_data = data[data['cluster'] == cluster]

        fig.add_trace(go.Scattermap(
            lon=cluster_data['x'], lat=cluster_data['y'],
            mode="markers",
            marker=dict(
                size=4,
                color=color_map[i % len(color_map)],  # Assign color from color_map
                opacity=0.7
            ),
            text=[f"Cluster {cluster}"] * len(cluster_data),  # Hover text
            name=f"Cluster {cluster}"  # Important: This ensures a legend entry
        ))

    # Format param_key into a more readable title
    formatted_params = ', '.join(f"{key} = {value}" for key, value in param_key)
    # Convert the row into a list of dictionaries, then extract the first one
    param_stat_list = results_df.loc[results_df['index'] == id, ['silhouette_score', 'dbi', 'chs']].to_dict(orient='records')
    param_stat = param_stat_list[0] if param_stat_list else {}
    formatted_stat = ', '.join(f"{key} = {value:0.3f}" for key, value in param_stat.items())
    # Update layout with map style
    fig.update_layout(
        map={
            'style': style,
            'center': {'lon': data['x'].median(), 'lat': data['y'].median()},
            'zoom': 5.5
        },
        width=950,
        height=800,
        showlegend=True,  # Ensures legend is displayed
        title=f"Cluster Visualization for parameters: {formatted_params} <br> with {formatted_stat}"
    )

    fig.show()


## PLOT 2-DIM STATS

In [6]:

def plot_2d_stat(results_df, x_variable, y_variable, color_variable, title=''):

    hover_data = [col for col in results_df.columns]

    fig = px.scatter(
        results_df,
        x=x_variable,
        y=y_variable,
        color=color_variable,
        color_continuous_scale='plasma',
        size_max=32,
        hover_data=hover_data,  # Include the index in hover data
        title=title
    )
    fig.show()



## ARI

In [7]:
def calculate_ari(method, results_df,  cluster_dict, num_clusters ,silhouette_score_level ):
  # Calculate Adjusted Rand Index for each pair of label sets
  ari_data = {}

  labels = results_df[(results_df['silhouette_score']> silhouette_score_level) &(results_df['n_clusters']==num_clusters)]['index'].unique()
  for label1 in labels:
      ari_data[label1] = {}
      key_param_1 = get_param_key(method, results_df, label1)
      for label2 in labels:
          key_param_2 = get_param_key(method, results_df, label2)
          ari_data[label1][label2] = adjusted_rand_score(cluster_dict[key_param_1], cluster_dict[key_param_2])

  # Create a DataFrame from the nested dictionary
  ari_df = pd.DataFrame(ari_data)
  return ari_df

In [8]:
def plot_ari(ari_df):

    fig = px.imshow(
        ari_df.values,
        text_auto='.2f',
        color_continuous_scale='RdBu_r',
        aspect='auto',
        zmin=0,
        zmax=1
    )

    # Custom axis labels
    fig.update_xaxes(
        tickmode='array',
        tickvals=list(range(len(ari_df.columns))),
        ticktext=ari_df.columns
    )

    fig.update_yaxes(
        tickmode='array',
        tickvals=list(range(len(ari_df.index))),
        ticktext=ari_df.index
    )

    fig.update_layout(
        title="ARI (0 to 1 Scale)",
        xaxis_title="",
        yaxis_title=""
    )

    fig.show()


In [9]:
def calculate_ari_all(method, results_df, clusters_dict, silhouette_score_level = 0.0):

    num_clusters = results_df[results_df['silhouette_score']> silhouette_score_level]['n_clusters'].unique()

    for num_cluster in num_clusters:
        n = results_df[(results_df['n_clusters']==num_cluster) & (results_df['silhouette_score']> silhouette_score_level)]['index'].nunique()
        uni = results_df[(results_df['n_clusters']==num_cluster) & (results_df['silhouette_score']> silhouette_score_level)]['index'].unique()
        print(f"{n} ids with {num_cluster} number of clusters: {uni}")
        ari_df = calculate_ari(method, results_df,clusters_dict, num_cluster, silhouette_score_level)
        plot_ari(ari_df)

# DATA LOAD

In [10]:
def read_github_kmz_faults(github_folder_url, kmz_filenames):
    """
    Reads KMZ files stored in a GitHub repository, extracts KML data, and returns a DataFrame of fault coordinates.

    Args:
        github_folder_url (str): Base URL of the GitHub repository's folder (without filenames).
        kmz_filenames (list): List of KMZ filenames to fetch from GitHub.

    Returns:
        pd.DataFrame: A DataFrame containing fault coordinates from KMZ files.
    """

    all_fault_points = []  # Store extracted points

    # Temporary folder for extracted KML files
    temp_dir = tempfile.mkdtemp()

    for kmz_file in kmz_filenames:
        kmz_url = f"{github_folder_url}/{kmz_file}"  # Construct the GitHub raw file URL

        try:
            # Download the KMZ file from GitHub
            response = requests.get(kmz_url)
            response.raise_for_status()  # Raise error for failed requests

            # Extract KMZ in memory
            with ZipFile(BytesIO(response.content), 'r') as zip_ref:
                zip_ref.extractall(temp_dir)  # Extract to temp directory

            # Locate the extracted KML file (most KMZs contain "doc.kml")
            kml_file = os.path.join(temp_dir, "doc.kml")

            # Get available layers
            layers = fiona.listlayers(kml_file)

            # Loop through each layer
            for layer in layers:
                gdf = gpd.read_file(kml_file, driver="KML", layer=layer)

                if gdf.empty:
                    continue

                gdf = gdf.explode(index_parts=True, ignore_index=True)  # Ensure correct format
                gdf["coords"] = gdf["geometry"].apply(lambda geom: list(geom.coords) if geom else None)

                # Store extracted points
                for _, row in gdf.iterrows():
                    if row["coords"]:
                        for coord in row["coords"]:
                            # Handle both (lon, lat) and (lon, lat, alt) cases
                            lon, lat = coord[:2]  # Ignore altitude if present
                            all_fault_points.append({"Longitude": lon, "Latitude": lat, "File": kmz_file})

        except Exception as e:
            print(f"Error processing file {kmz_file}: {e}")
            continue

    # Convert to DataFrame
    faults_df = pd.DataFrame(all_fault_points)

    return faults_df


In [None]:
github_kmz_folder = "https://raw.githubusercontent.com/SkurativskaKateryna/IJ_NLM_Earthquake_clusterization/refs/heads/main/DATA/"
faults_df = read_github_kmz_faults(github_kmz_folder, kmz_filenames=["AFEAD_J38.kmz","AFEAD_J39.kmz","AFEAD_K38.kmz","AFEAD_K39.kmz"])
print(faults_df.head())

In [12]:
# Define URL and file name
base_url = "https://raw.githubusercontent.com/SkurativskaKateryna/IJ_NLM_Earthquake_clusterization/refs/heads/main/DATA/"
file_name = "Earthquake_depth6000ClearNew.txt"
path = base_url + file_name

data = pd.read_csv(path,delimiter="\t", header=None, names=["y", "x", "z"])[["x", "y", "z"]]
data['x'] = pd.to_numeric(data['x'], errors='coerce')
data = data.dropna()
data = data[data['y'] > 0].reset_index(drop=True)
scaler = Normalizer()
data['scaled_z'] = scaler.fit_transform(data[['z']])
print(data.shape)

(6196, 4)


In [None]:
data.head()

# DATA OVERVIEW

In [None]:
# Print basic statistics
print("Shape:", data.shape)
print("\nBasic Statistics:")
print(data.describe())

In [None]:
plot_data(data, faults_df, style="carto-positron")

# CLUSTERIZATION
## KMEAN

In [None]:
def KMEAN_gridsearch(df):
    # Define parameter grid
    param_grid = {
        'n_clusters': list(range(2, int(np.sqrt(len(df))))),  # Range of clusters
        'init': ['k-means++', 'random'],  # Initialization methods
        'n_init': [ 30],  # Number of random restarts
        'max_iter': [300] #, 500]  # Max iterations
    }


    # Store results
    results = []
    clusters_dict = {}  # Dictionary to store cluster labels
    clusters_dict['data'] = df[['x', 'y', 'z']] # Store original data

    # Total number of iterations
    total_combinations = len(list(ParameterGrid(param_grid)))
    current_iteration = 0

    # Grid Search over parameters
    for params in ParameterGrid(param_grid):
        current_iteration += 1

        # Print progress
        print(f"Iteration {current_iteration}/{total_combinations}: Testing parameters {params}")

        # Apply KMeans clustering
        kmeans = KMeans(
            n_clusters=params['n_clusters'],
            init=params['init'],
            n_init=params['n_init'],
            max_iter=params['max_iter'],
            random_state=42
        )

        cluster_labels = kmeans.fit_predict(df[['x', 'y', 'scaled_z']])

        # Compute metrics
        inertia = kmeans.inertia_
        silhouette_avg = silhouette_score(df[['x', 'y', 'scaled_z']], cluster_labels)
        dbi = davies_bouldin_score(df[['x', 'y', 'scaled_z']], cluster_labels)
        chs = calinski_harabasz_score(df[['x', 'y', 'scaled_z']], cluster_labels)

        # Compute silhouette scores for each sample
        silhouette_values = silhouette_samples(df[['x', 'y', 'scaled_z']], cluster_labels)
        silhouette_per_cluster = {label: [] for label in cluster_labels if label != -1}  # Exclude noise (-1)
        for label, score in zip(cluster_labels, silhouette_values):
            if label != -1:  # Exclude noise points from per-cluster silhouette scores
                silhouette_per_cluster[label].append(score)

        # Print results for each iteration
        print(f"  - Silhouette Score: {silhouette_avg:.4f}, Inertia: {inertia:.2f}, DBI: {dbi:.4f}, CHS: {chs:.4f}")

        # Store results in DataFrame format
        results.append({
            'n_clusters': params['n_clusters'],
            'init': params['init'],
            'n_init': params['n_init'],
            'max_iter': params['max_iter'],
            'inertia': inertia,
            'silhouette_score': silhouette_avg,
            'dbi': dbi,
            'chs': chs,
            'silhouette_per_cluster': silhouette_per_cluster
        })

        # Store cluster labels in dictionary (using tuple of params as key)
        clusters_dict[tuple(params.items())] = cluster_labels

    # Convert results to a DataFrame and sort by best silhouette score
    results_df = pd.DataFrame(results).sort_values(by='silhouette_score', ascending=False).reset_index(drop=True)

    print("\n✅ Grid Search Completed! Returning results.")
    results_df = results_df.reset_index()

    return results_df, clusters_dict


In [None]:
results_df_kmean, clusters_dict_kmean = KMEAN_gridsearch(data)

In [None]:
results_df_kmean

In [None]:
plot_data_clusters(results_df_kmean, clusters_dict_kmean, faults_df, id=0, mode='kmeans', style="carto-positron")

In [None]:
plot_2d_stat(results_df=results_df_kmean.drop(columns=['silhouette_per_cluster']), x_variable='n_clusters', y_variable='silhouette_score', color_variable='chs', title='KMEAN parameter analysis')
plot_2d_stat(results_df=results_df_kmean.drop(columns=['silhouette_per_cluster']), x_variable='n_clusters', y_variable='chs', color_variable='silhouette_score', title='KMEAN parameter analysis')

## DBSCAN

In [None]:
def DBSCAN_gridsearch(df):
    # Define parameter grid
    param_grid = {
        'eps': np.round(np.linspace(0.05, 2.15, num=int((2.15 - 0.05) / 0.05) + 1), 2).tolist(),  # from 0.2 to 1.0 with the step 0.1
        'min_samples': np.linspace(15, 365, num=int((365 - 15) / 50) + 1, dtype=int).tolist(),  # from 25 to 375 with the step 50
        'metric': ['euclidean', 'manhattan']  # Distance metrics
    }

    # Store results
    results = []
    clusters_dict = {}
    clusters_dict['data'] = df[['x', 'y', 'z']]  # Store original data

    # Total number of iterations
    total_combinations = len(list(ParameterGrid(param_grid)))
    current_iteration = 0

    # Grid Search over parameters
    for params in ParameterGrid(param_grid):
        current_iteration += 1
        print(f"Iteration {current_iteration}/{total_combinations}: Testing parameters {params}")

        # Apply DBSCAN clustering
        dbscan = DBSCAN(
            eps=params['eps'],
            min_samples=params['min_samples'],
            metric=params['metric']
        )
        cluster_labels = dbscan.fit_predict(df[['x', 'y', 'scaled_z']] )

        # Ignore models where all points are labeled as noise (-1)
        unique_labels = set(cluster_labels)
        if len(unique_labels) <= 1:
            print("  - Skipping due to all points being noise.")
            continue

        # Compute clustering metrics
        silhouette_avg = silhouette_score(df[['x', 'y', 'scaled_z']], cluster_labels) if len(unique_labels) > 1 else -1
        dbi = davies_bouldin_score(df[['x', 'y', 'scaled_z']], cluster_labels)
        chs = calinski_harabasz_score(df[['x', 'y', 'scaled_z']], cluster_labels)

        # Compute silhouette scores for each sample
        silhouette_values = silhouette_samples(df, cluster_labels)
        silhouette_per_cluster = {label: [] for label in unique_labels if label != -1}  # Exclude noise (-1)
        for label, score in zip(cluster_labels, silhouette_values):
            if label != -1:  # Exclude noise points from per-cluster silhouette scores
                silhouette_per_cluster[label].append(score)

        print(f"  - Number of clusters: {len(unique_labels) - 1}, Silhouette Score: {silhouette_avg:.4f}, DBI: {dbi:.4f}, CHS: {chs:.4f}")

        # Store results
        results.append({
            'eps': params['eps'],
            'min_samples': params['min_samples'],
            'metric': params['metric'],
            'n_clusters': len(unique_labels) - 1,
            'silhouette_score': silhouette_avg,
            'dbi': dbi,
            'chs': chs,
            'silhouette_per_cluster': silhouette_per_cluster
        })

        # Store cluster labels in dictionary
        clusters_dict[tuple(params.items())] = cluster_labels

    # Convert results to a DataFrame and sort by best silhouette score
    results_df = pd.DataFrame(results).sort_values(by='silhouette_score', ascending=False).reset_index(drop=True)
    results_df = results_df.reset_index()

    print("\n✅ Grid Search Completed! Returning results.")

    return results_df, clusters_dict


In [None]:
results_df_dbscan, clusters_dict_dbscan = DBSCAN_gridsearch(data)

In [None]:
results_df_dbscan

In [None]:
plot_2d_stat(results_df=results_df_dbscan.drop(columns=['silhouette_per_cluster']), x_variable='n_clusters', y_variable='silhouette_score', color_variable='eps', title='DBSCAN parameter analysis')
plot_2d_stat(results_df=results_df_dbscan.drop(columns=['silhouette_per_cluster']), x_variable='n_clusters', y_variable='chs', color_variable='silhouette_score', title='DBSCAN parameter analysis')
plot_2d_stat(results_df=results_df_dbscan.drop(columns=['silhouette_per_cluster']), x_variable='n_clusters', y_variable='dbi', color_variable='silhouette_score', title='DBSCAN parameter analysis')

In [None]:
plot_data_clusters(results_df_dbscan, clusters_dict_dbscan, faults_df, id=370, mode='dbscan', style="carto-positron")

In [None]:
silhouette_score_level = 0.0
num_clusters = results_df_dbscan[results_df_dbscan['silhouette_score']> silhouette_score_level]['n_clusters'].unique()
for num_cluster in num_clusters:
  n = results_df_dbscan[(results_df_dbscan['n_clusters']==num_cluster) & (results_df_dbscan['silhouette_score']> silhouette_score_level)]['index'].nunique()
  print(f"{n} clusters with {num_cluster} number of clusters")

In [None]:
calculate_ari_all(method='dbscan', results_df=results_df_dbscan, clusters_dict=clusters_dict_dbscan, silhouette_score_level = 0.0)

## HDBSCAN algorithm

In [None]:
def HDBSCAN_gridsearch(df):
    # Define parameter grid
    param_grid = {
        'min_cluster_size': np.linspace(20, 301, num=int((300 - 20) / 20), dtype=int).tolist(),
        'metric': ['euclidean', 'manhattan']
    }

    # Store results
    results = []
    clusters_dict = {}
    clusters_dict['data'] = df[['x', 'y', 'z']]  # Store original data

    # Total number of iterations
    total_combinations = len(list(ParameterGrid(param_grid)))
    current_iteration = 0

    # Grid Search over parameters
    for params in ParameterGrid(param_grid):
        current_iteration += 1
        print(f"Iteration {current_iteration}/{total_combinations}: Testing parameters {params}")

        # Apply HDBSCAN clustering
        hdb = hdbscan.HDBSCAN(
            min_cluster_size=params['min_cluster_size'],
            metric=params['metric']
        )
        cluster_labels = hdb.fit_predict(df[['x', 'y', 'scaled_z']])

        # Ignore models where all points are labeled as noise (-1)
        unique_labels = set(cluster_labels)
        if len(unique_labels) <= 1:
            print("  - Skipping due to all points being noise.")
            continue

        # Compute clustering metrics
        silhouette_avg = silhouette_score(df[['x', 'y', 'scaled_z']], cluster_labels) if len(unique_labels) > 1 else -1
        dbi = davies_bouldin_score(df[['x', 'y', 'scaled_z']], cluster_labels)
        chs = calinski_harabasz_score(df[['x', 'y', 'scaled_z']], cluster_labels)

        # Compute silhouette scores for each sample
        silhouette_values = silhouette_samples(df[['x', 'y', 'scaled_z']], cluster_labels)
        silhouette_per_cluster = {label: [] for label in unique_labels if label != -1}  # Exclude noise (-1)
        for label, score in zip(cluster_labels, silhouette_values):
            if label != -1:  # Exclude noise points from per-cluster silhouette scores
                silhouette_per_cluster[label].append(score)

        print(f"  - Number of clusters {len(unique_labels) - 1}, Silhouette Score: {silhouette_avg:.4f}, DBI: {dbi:.4f}, CHS: {chs:.4f}")

        # Store results
        results.append({
            'min_cluster_size': params['min_cluster_size'],
            'metric': params['metric'],
            'n_clusters': len(unique_labels) - 1,
            'silhouette_score': silhouette_avg,
            'dbi': dbi,
            'chs': chs,
            'silhouette_per_cluster': silhouette_per_cluster
        })

        # Store cluster labels in dictionary
        clusters_dict[tuple(params.items())] = cluster_labels

    # Convert results to a DataFrame and sort by best silhouette score
    results_df = pd.DataFrame(results).sort_values(by='silhouette_score', ascending=False).reset_index(drop=True)
    results_df = results_df.reset_index()
    print("\n✅ Grid Search Completed! Returning results.")

    return results_df, clusters_dict


In [None]:
results_df_hdbscan, clusters_dict_hdbscan = HDBSCAN_gridsearch(data)

In [None]:
results_df_hdbscan

In [None]:
plot_2d_stat(results_df=results_df_hdbscan.drop(columns=['silhouette_per_cluster']), x_variable='n_clusters', y_variable='silhouette_score', color_variable='min_cluster_size', title='HDBSCAN parameter analysis')
plot_2d_stat(results_df=results_df_hdbscan.drop(columns=['silhouette_per_cluster']), x_variable='n_clusters', y_variable='chs', color_variable='min_cluster_size', title='HDBSCAN parameter analysis')
plot_2d_stat(results_df=results_df_hdbscan.drop(columns=['silhouette_per_cluster']), x_variable='n_clusters', y_variable='dbi', color_variable='min_cluster_size', title='HDBSCAN parameter analysis')

In [None]:
plot_data_clusters(results_df_hdbscan, clusters_dict_hdbscan, faults_df, id=0, mode='hdbscan', style="carto-positron")

In [None]:
silhouette_score_level = 0.1
num_clusters = results_df_hdbscan[results_df_hdbscan['silhouette_score']> silhouette_score_level]['n_clusters'].unique()
for num_cluster in num_clusters:
  n = results_df_hdbscan[(results_df_hdbscan['n_clusters']==num_cluster) & (results_df_hdbscan['silhouette_score']> silhouette_score_level)]['index'].nunique()
  print(f"{n} clusters with {num_cluster} number of clusters")

In [None]:
calculate_ari_all('hdbscan',results_df_hdbscan, clusters_dict_hdbscan, silhouette_score_level = 0.0)

# DOWNLOAD RESULTED DATA AS TXT

In [None]:
import os

def save_cluster_result(method, results_df, cluster_dic, id):
    param_key = get_param_key(method, results_df, id)

    data_result = cluster_dic['data'].copy()
    data_result['cluster'] = cluster_dic[param_key]

    # Define and create the output folder
    folder_name = f'Cluster_result_{method}_id_{id}'
    os.makedirs(folder_name, exist_ok=True)

    for cluster_label in data_result['cluster'].unique():
        # Filter the DataFrame for the current cluster
        cluster_data = data_result[data_result['cluster'] == cluster_label][['y', 'x', 'z']]

        # Define the full file path
        filename = f'Cluster_result_{method}_id_{id}_cluster_{cluster_label}.dat'
        file_path = os.path.join(folder_name, filename)

        # Save the filtered data to a .dat file
        cluster_data.to_csv(file_path, sep=' ', index=False, header=False)

        print(f'Saved {filename} to {folder_name}')


In [None]:
save_cluster_result('hdbscan', results_df_hdbscan, clusters_dict_hdbscan, id=0)