In [None]:
import os
import random
import numpy as np
import matplotlib . pyplot as plt
from mpl_toolkits . mplot3d import Axes3D
from sklearn . decomposition import PCA
from sklearn . preprocessing import LabelEncoder , StandardScaler
from sklearn . cluster import KMeans , AgglomerativeClustering
from sklearn . metrics import confusion_matrix
import seaborn as sns
from astropy .io import fits
from scipy . interpolate import interp1d
from scipy . ndimage import gaussian_filter1d
from scipy . stats import mode
import matplotlib .cm as cm

In [None]:
def load_spectrum(file_path, cutoff_wavelength=8000):
    try:
        with fits.open(file_path) as hdul:
            if len(hdul) < 3:
                raise ValueError("Expected at least 3 HDUs in the FITS file")

            data = hdul[1].data
            if data is None:
                raise ValueError("No data found in HDU 1")

            loglam = data['loglam']
            flux = data['flux']
            wavelength = 10**loglam

            mask = wavelength <= cutoff_wavelength
            wavelength = wavelength[mask]
            flux = flux[mask]

            hdu2_data = hdul[2].data
            if hdu2_data is None:
                raise ValueError("No data found in HDU 2")

            class_label = hdu2_data['CLASS'][0].strip()

            subclass_label = 'UNKNOWN'
            if 'SUBCLASS' in hdu2_data.columns.names:
                subclass_value = hdu2_data['SUBCLASS'][0]
                if isinstance(subclass_value, str) and subclass_value.strip():
                    subclass_label = subclass_value.strip()

        return wavelength, flux, class_label, subclass_label
    except Exception as e:
        print(f"Error loading spectrum from {file_path}: {e}")
        return None, None, None, None

In [None]:
# Normalization, smoothing
def normalize_flux(flux):
    return flux / np.max(flux)

def smooth_flux(flux, sigma=2):
    return gaussian_filter1d(flux, sigma=sigma)

def pre_process_spectrum(wavelength, flux):
    flux = normalize_flux(flux)
    flux = smooth_flux(flux)
    return wavelength, flux

def resample_spectrum(wavelength, flux, common_wavelength):
    interp_flux = interp1d(wavelength, flux, kind='linear', bounds_error=False, fill_value="extrapolate")
    return interp_flux(common_wavelength) 

In [None]:
spectra_folder = 'SELECTED_SPECTRA'
spectra_files = os.listdir(spectra_folder)

all_fluxes = []
all_combined_labels = []

# Define a common wavelength grid
common_wavelength = np.linspace(4000, 8000, 1000)

# Load selected spectra
for file_name in spectra_files:
    file_path = os.path.join(spectra_folder, file_name)
    wavelength, flux, class_label, subclass_label = load_spectrum(file_path)
    
    if wavelength is None or flux is None:
        continue
    
    # Preprocess and resample spectrum
    wavelength, flux = pre_process_spectrum(wavelength, flux)
    flux = resample_spectrum(wavelength, flux, common_wavelength)
    
    all_fluxes.append(flux)
    all_combined_labels.append(f"{class_label}_{subclass_label}")

# Stack all fluxes for PCA
all_fluxes = np.array(all_fluxes)
all_combined_labels = np.array(all_combined_labels)

print(f"Number of spectra after loading: {len(all_fluxes)}")

# Encode the labels to numeric values
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(all_combined_labels)

scaler = StandardScaler()
scaled_fluxes = scaler.fit_transform(all_fluxes)

In [None]:
# Perform PCA
pca = PCA(n_components=3)
pca_components = pca.fit_transform(scaled_fluxes) 
# Perform PCA Plotting in 3D
fig = plt.figure(figsize=(14, 10))
ax = fig.add_subplot(111, projection='3d')

# Use a larger colormap with many distinct colors
cmap = cm.get_cmap('tab20', num_clusters)

# Plot the PCA components
scatter = ax.scatter(pca_components[:, 0], pca_components[:, 1], pca_components[:, 2], 
                     c=encoded_labels, cmap=cmap, s=5)

# Set axis limits to zoom in on the dense region
ax.set_xlim(-40, 55)
ax.set_ylim(-40, 20)
ax.set_zlim(-30, 50)

# Set axis labels
ax.set_xlabel('PCA Component 1')
ax.set_ylabel('PCA Component 2')
ax.set_zlabel('PCA Component 3')

plt.title('3D PCA Plot of Spectra')

# Adjust the viewing angle to better see the layers
ax.view_init(elev=20, azim=60)

# Directly use the encoded labels for the legend
unique_labels = np.unique(encoded_labels)
handles = [plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=cmap(i), markersize=5) for i in unique_labels]
legend_labels = label_encoder.inverse_transform(unique_labels)

# Place the legend outside the plot
legend = ax.legend(handles, legend_labels, loc="center left", bbox_to_anchor=(1.05, 0.5), fontsize='small', title="Classes", title_fontsize='medium')

plt.show()

In [None]:
num_clusters = len(np.unique(all_combined_labels))

# Explained Variance Plot
plt.figure(figsize=(6, 4))
plt.plot(np.arange(1, len(pca.explained_variance_ratio_) + 1), np.cumsum(pca.explained_variance_ratio_), marker='o')
plt.xlabel('Number of PCA Components')
plt.ylabel('Cumulative Explained Variance')
plt.title('PCA Explained Variance')
plt.grid(True)
plt.show()

In [None]:
# Define range of K values to test
K = range(1, 55)  # Test cluster sizes from 1 to 14

# Initialize lists to store the metrics
inertia = []
silhouette_scores = []
calinski_harabasz_scores = []
davies_bouldin_scores = []

# Perform K-Means clustering for each K and calculate the metrics
for k in K:
    kmeans = KMeans(n_clusters=k, random_state=42)
    cluster_labels = kmeans.fit_predict(pca_components)
    
    # Calculate metrics
    inertia.append(kmeans.inertia_)
    
    if k > 1:  # Silhouette score is undefined for k=1
        silhouette_scores.append(silhouette_score(pca_components, cluster_labels))
        calinski_harabasz_scores.append(calinski_harabasz_score(pca_components, cluster_labels))
        davies_bouldin_scores.append(davies_bouldin_score(pca_components, cluster_labels))

In [None]:
# Plot Elbow Method (Inertia)
plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.plot(K, inertia, 'bo-')
plt.title('Elbow Method For Optimal K')
plt.xlabel('Number of Clusters (K)')
plt.ylabel('Inertia')
plt.grid(True)

# Plot Silhouette Scores
plt.subplot(2, 2, 2)
plt.plot(K[1:], silhouette_scores, 'bo-')
plt.title('Silhouette Score For Optimal K')
plt.xlabel('Number of Clusters (K)')
plt.ylabel('Silhouette Score')
plt.grid(True)

# Plot Calinski-Harabasz Index
plt.subplot(2, 2, 3)
plt.plot(K[1:], calinski_harabasz_scores, 'bo-')
plt.title('Calinski-Harabasz Index For Optimal K')
plt.xlabel('Number of Clusters (K)')
plt.ylabel('Calinski-Harabasz Index')
plt.grid(True)

# Plot Davies-Bouldin Index
plt.subplot(2, 2, 4)
plt.plot(K[1:], davies_bouldin_scores, 'bo-')
plt.title('Davies-Bouldin Index For Optimal K')
plt.xlabel('Number of Clusters (K)')
plt.ylabel('Davies-Bouldin Index')
plt.grid(True)

plt.tight_layout()
plt.show()

In [None]:
# Function to match clusters to original labels
def match_labels_to_clusters(original_labels, clusters):
    matched_labels = np.zeros_like(clusters)
    for cluster in np.unique(clusters):
        mask = clusters == cluster
        matched_labels[mask] = mode(original_labels[mask])[0]
    return matched_labels


In [None]:
import matplotlib.cm as cm  # Ensure you have imported the colormap module
from scipy.stats import mode
# Perform K-Means Clustering with the chosen number of clusters
optimal_k = 22  
kmeans = KMeans(n_clusters=optimal_k, random_state=42)
kmeans_clusters = kmeans.fit_predict(pca_components)

In [None]:
# Create a dictionary to map each cluster to the most frequent original label
cluster_labels = {}
for cluster in np.unique(kmeans_clusters):
    mask = kmeans_clusters == cluster
    most_frequent_label = mode(encoded_labels[mask])[0][0]
    cluster_labels[cluster] = most_frequent_label

# 3D PCA Plot with Centroids
fig = plt.figure(figsize=(14, 10))
ax = fig.add_subplot(111, projection='3d')

# Plot the PCA components colored by their cluster assignment
scatter = ax.scatter(pca_components[:, 0], pca_components[:, 1], pca_components[:, 2],
                     c=kmeans_clusters, cmap='tab10', s=5)

# Plot the centroids
centroids = kmeans.cluster_centers_
ax.scatter(centroids[:, 0], centroids[:, 1], centroids[:, 2], c='black', s=100, marker='x', label='Centroids')

# Set axis limits to zoom in on the dense region
ax.set_xlim(-40, 55)
ax.set_ylim(-40, 20)
ax.set_zlim(-30, 50)

# Set axis labels
ax.set_xlabel('PCA Component 1')
ax.set_ylabel('PCA Component 2')
ax.set_zlabel('PCA Component 3')
ax.set_title(f'3D PCA Plot with K-Means Clustering (K={optimal_k})')

In [None]:
# Generate unique labels for each cluster in the legend
handles = []
legend_labels = []
for cluster in np.unique(kmeans_clusters):
    cluster_color = cm.tab10(cluster % 10)  # Use the colormap from matplotlib.cm
    handles.append(plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=cluster_color, markersize=5))
    legend_labels.append(f"Cluster {cluster + 1}: {label_encoder.inverse_transform([cluster_labels[cluster]])[0]}")

# Place the legend outside the plot
legend = ax.legend(handles, legend_labels, title="K-Means Clusters", loc="center left", bbox_to_anchor=(1.05, 0.5))

ax.view_init(elev=20, azim=60)

plt.show()
# 2D PCA Projection: PCA Component 1 vs Component 2
fig, ax = plt.subplots(figsize=(8, 6))

# Scatter plot
ax.scatter(pca_components[:, 0], pca_components[:, 1], c=kmeans_clusters, cmap='tab10', s=10, alpha=0.7)

# Set axis labels and title
ax.set_xlabel('PCA Component 1')
ax.set_ylabel('PCA Component 2')
ax.set_title('2D Projection: PCA Component 1 vs 2')

# Show the plot
plt.show()
# 2D PCA Projection: PCA Component 1 vs Component 3
fig, ax = plt.subplots(figsize=(8, 6))

# Scatter plot
ax.scatter(pca_components[:, 0], pca_components[:, 2], c=kmeans_clusters, cmap='tab10', s=10, alpha=0.7)

# Set axis labels and title
ax.set_xlabel('PCA Component 1')
ax.set_ylabel('PCA Component 3')
ax.set_title('2D Projection: PCA Component 1 vs 3')

# Show the plot
plt.show() 
# 2D PCA Projection: PCA Component 1 vs Component 3
fig, ax = plt.subplots(figsize=(8, 6))

# Scatter plot
ax.scatter(pca_components[:, 1], pca_components[:, 2], c=kmeans_clusters, cmap='tab10', s=10, alpha=0.7)

# Set axis labels and title
ax.set_xlabel('PCA Component 2')
ax.set_ylabel('PCA Component 3')
ax.set_title('2D Projection: PCA Component 2 vs 3')

# Show the plot
plt.show()

In [None]:
import pandas as pd
# Create a DataFrame for analysis
cluster_analysis_df = pd.DataFrame({'True_Label': encoded_labels, 'Predicted_Cluster': kmeans_clusters})

# Group by predicted clusters and analyze the distribution of true labels
for cluster_id in range(optimal_k):
    cluster_data = cluster_analysis_df[cluster_analysis_df['Predicted_Cluster'] == cluster_id]
    most_common_label, count = mode(cluster_data['True_Label'])
    print(f"Cluster {cluster_id}: Most common label is {most_common_label[0]} with {count[0]} occurrences.")
    print(f"Label distribution in this cluster:\n{cluster_data['True_Label'].value_counts(normalize=True)}\n")
    # Distribution of cluster sizes for K-Means
kmeans_cluster_sizes = np.bincount(kmeans_clusters)

# Plotting the distribution
plt.figure(figsize=(10, 7))
plt.bar(range(len(kmeans_cluster_sizes)), kmeans_cluster_sizes, color='blue')
plt.title('Distribution of Cluster Sizes for K-Means Clustering')
plt.xlabel('Cluster')
plt.ylabel('Size')
plt.show()

In [None]:
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score
# Statistical quantities
kmeans_silhouette = silhouette_score(pca_components, kmeans_clusters)
kmeans_davies_bouldin = davies_bouldin_score(pca_components, kmeans_clusters)
kmeans_calinski_harabasz = calinski_harabasz_score(pca_components, kmeans_clusters)
kmeans_ari = adjusted_rand_score(encoded_labels, kmeans_clusters)

# Calculate the Normalized Mutual Information (NMI)
kmeans_nmi = normalized_mutual_info_score(encoded_labels, kmeans_clusters)
# Cluster purity
def cluster_purity(true_labels, cluster_labels):
    contingency_matrix = confusion_matrix(true_labels, cluster_labels)
    return np.sum(np.amax(contingency_matrix, axis=0)) / np.sum(contingency_matrix)

kmeans_purity = cluster_purity(encoded_labels, kmeans_clusters)
# Print statistical quantities
print("K-Means Clustering Statistics:")
print(f"Silhouette Score: {kmeans_silhouette}")
print(f"Davies-Bouldin Index: {kmeans_davies_bouldin}")
print(f"Calinski-Harabasz Index: {kmeans_calinski_harabasz}")
print(f"Adjusted Rand Index (ARI): {kmeans_ari}")
print(f"Normalized Mutual Information (NMI): {kmeans_nmi}")
print(f"Cluster Purity: {kmeans_purity}")

In [None]:
from scipy.cluster.hierarchy import linkage, dendrogram, fcluster

# Generate the linkage matrix using 'ward' method (commonly used in hierarchical clustering)
Z = linkage(pca_components, method='ward')

# Plot the dendrogram
plt.figure(figsize=(14, 10))
dendro = dendrogram(Z, truncate_mode='level', p=6)  # p=6 means show only the last 6 levels
plt.title('Hierarchical Clustering Dendrogram')
plt.xlabel('Sample Index')
plt.ylabel('Distance')
plt.show()
from scipy.stats import mode
import matplotlib.cm as cm
# cmap = cm.get_cmap('tab20', num_clusters)
cutoff_distance = 240

# Form clusters by cutting the dendrogram at the chosen distance
agg_clusters_from_dendrogram = fcluster(Z, cutoff_distance, criterion='distance')

In [None]:
# Number of clusters formed
num_clusters_formed = len(np.unique(agg_clusters_from_dendrogram))
print(f'Number of clusters formed: {num_clusters_formed}')
cmap = cm.get_cmap('tab20', num_clusters_formed)
# Assuming you have the `agg_cluster_labels` that map clusters to CLASS_SUBCLASS names
unique_labels = np.unique(agg_cluster_labels)
num_labels = len(unique_labels)

fig = plt.figure(figsize=(14, 10))
ax = fig.add_subplot(111, projection='3d')

# Plot the PCA components colored by their cluster assignment from the dendrogram
scatter = ax.scatter(pca_components[:, 0], pca_components[:, 1], pca_components[:, 2],
                     c=agg_clusters_from_dendrogram, cmap='tab10', s=5)

# Set axis limits to zoom in on the dense region
ax.set_xlim(-40, 55)
ax.set_ylim(-40, 20)
ax.set_zlim(-30, 50)

# Set axis labels
ax.set_xlabel('PCA Component 1')
ax.set_ylabel('PCA Component 2')
ax.set_zlabel('PCA Component 3')
plt.title(f'3D PCA Plot with Agglomerative Clustering (Cutoff at {cutoff_distance})')

In [None]:
# Create a dictionary to map each cluster to the most frequent CLASS_SUBCLASS label
cluster_to_class_subclass = {}

# For each cluster, find the most frequent CLASS_SUBCLASS label
for cluster in np.unique(agg_clusters_from_dendrogram):
    mask = agg_clusters_from_dendrogram == cluster  # Select data points in the current cluster
    most_frequent_label = mode(encoded_labels[mask])[0][0]  # Find the most frequent label in this cluster
    cluster_to_class_subclass[cluster] = most_frequent_label

# Decode the labels to get the actual CLASS_SUBCLASS names
cluster_to_class_subclass_names = {cluster: label_encoder.inverse_transform([label])[0] 
                                   for cluster, label in cluster_to_class_subclass.items()}
agg_cluster_labels = np.array([
    cluster_to_class_subclass_names[cluster] 
    for cluster in agg_clusters_from_dendrogram
])

In [None]:
# Print out the mapping
for cluster, class_subclass in cluster_to_class_subclass_names.items():
  print(f"Cluster {cluster}: {class_subclass}")

agg_cluster_labels = np.array([cluster_to_class_subclass_names[cluster] 
                               for cluster in agg_clusters_from_dendrogram])
ax.view_init(elev=20, azim=60)

plt.show()
plt.show()

In [None]:
# Generate the legend using cluster numbers and CLASS_SUBCLASS names
handles = []
legend_labels = []
for cluster in np.unique(agg_clusters_from_dendrogram):
    cluster_color = plt.cm.tab20(cluster % 20)  # Use tab20 colormap with cycling colors
    handles.append(plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=cluster_color, markersize=5))
    legend_labels.append(f"Cluster {cluster}: {cluster_to_class_subclass_names[cluster]}")

# Place the legend outside the plot
legend = ax.legend(handles, legend_labels, title="Agglomerative Clusters", loc="center left", bbox_to_anchor=(1.05, 0.5))

# Adjust the viewing angle to better see the clusters
ax.view_init(elev=20, azim=60)
plt.show()

In [None]:
from scipy.stats import mode
import matplotlib.cm as cm
# Perform Agglomerative Clustering with the chosen number of clusters
optimal_k = 22 
agg_clustering = AgglomerativeClustering(n_clusters=optimal_k)
agg_clusters = agg_clustering.fit_predict(pca_components)

# Create a dictionary to map each cluster to the most frequent original label
cluster_labels = {}
for cluster in np.unique(agg_clusters):
    mask = agg_clusters == cluster
    most_frequent_label = mode(encoded_labels[mask])[0][0]
    cluster_labels[cluster] = most_frequent_label

# 3D PCA Plot with Centroids
fig = plt.figure(figsize=(14, 10))
ax = fig.add_subplot(111, projection='3d')

# Plot the PCA components colored by their cluster assignment
scatter = ax.scatter(pca_components[:, 0], pca_components[:, 1], pca_components[:, 2],
                     c=agg_clusters, cmap='tab10', s=5)

# Set axis limits to zoom in on the dense region
ax.set_xlim(-40, 55)
ax.set_ylim(-40, 20)
ax.set_zlim(-30, 50)

# Set axis labels
ax.set_xlabel('PCA Component 1')
ax.set_ylabel('PCA Component 2')
ax.set_zlabel('PCA Component 3')
ax.set_title(f'3D PCA Plot with Agglomerative Clustering (K={optimal_k})')

In [None]:
# Generate unique labels for each cluster in the legend
handles = []
legend_labels = []
for cluster in np.unique(agg_clusters):
    cluster_color = cm.tab10(cluster % 10)  # Use the colormap from matplotlib.cm
    handles.append(plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=cluster_color, markersize=5))
    legend_labels.append(f"Cluster {cluster + 1}: {label_encoder.inverse_transform([cluster_labels[cluster]])[0]}")

# Place the legend outside the plot
legend = ax.legend(handles, legend_labels, title="Agglomerative Clustering Clusters", loc="center left", bbox_to_anchor=(1.05, 0.5))

ax.view_init(elev=20, azim=60)

plt.show()

# 2D PCA Projection: PCA Component 1 vs Component 2
fig, ax = plt.subplots(figsize=(8, 6))

# Scatter plot
ax.scatter(pca_components[:, 0], pca_components[:, 1], c=agg_clusters, cmap='tab10', s=10, alpha=0.7)

# Set axis labels and title
ax.set_xlabel('PCA Component 1')
ax.set_ylabel('PCA Component 2')
ax.set_title('2D Projection: PCA Component 1 vs 2')

# Show the plot
plt.show()
# 2D PCA Projection: PCA Component 1 vs Component 3
fig, ax = plt.subplots(figsize=(8, 6))

# Scatter plot
ax.scatter(pca_components[:, 0], pca_components[:, 2], c=agg_clusters, cmap='tab10', s=10, alpha=0.7)

# Set axis labels and title
ax.set_xlabel('PCA Component 1')
ax.set_ylabel('PCA Component 3')
ax.set_title('2D Projection: PCA Component 1 vs 3')

# Show the plot
plt.show()
# 2D PCA Projection: PCA Component 1 vs Component 3
fig, ax = plt.subplots(figsize=(8, 6))

# Scatter plot
ax.scatter(pca_components[:, 1], pca_components[:, 2], c=agg_clusters, cmap='tab10', s=10, alpha=0.7)

# Set axis labels and title
ax.set_xlabel('PCA Component 2')
ax.set_ylabel('PCA Component 3')
ax.set_title('2D Projection: PCA Component 2 vs 3')

# Show the plot
plt.show()


In [None]:
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score

agg_silhouette = silhouette_score(pca_components, agg_clusters)
agg_davies_bouldin = davies_bouldin_score(pca_components, agg_clusters)
agg_calinski_harabasz = calinski_harabasz_score(pca_components, agg_clusters)
agg_ari = adjusted_rand_score(encoded_labels, agg_clusters)
agg_nmi = normalized_mutual_info_score(encoded_labels, agg_clusters)
# Cluster purity
def cluster_purity(true_labels, cluster_labels):
    contingency_matrix = confusion_matrix(true_labels, cluster_labels)
    return np.sum(np.amax(contingency_matrix, axis=0)) / np.sum(contingency_matrix)
agg_purity = cluster_purity(encoded_labels, agg_clusters)
print("\nAgglomerative Clustering Statistics:")
print(f"Silhouette Score: {agg_silhouette}")
print(f"Davies-Bouldin Index: {agg_davies_bouldin}")
print(f"Calinski-Harabasz Index: {agg_calinski_harabasz}")
print(f"Adjusted Rand Index (ARI): {agg_ari}")
print(f"Normalized Mutual Information (NMI): {agg_nmi}")
print(f"Cluster Purity: {agg_purity}")
# Distribution of cluster sizes for Agglomerative Clustering
agg_cluster_sizes = np.bincount(agg_clusters)
plt.figure(figsize=(10, 7))
plt.bar(range(len(agg_cluster_sizes)), agg_cluster_sizes, color='green')
plt.title('Distribution of Cluster Sizes for Agglomerative Clustering')
plt.xlabel('Cluster')
plt.ylabel('Size')
plt.show()

In [None]:
import pandas as pd

original_class_names = label_encoder.inverse_transform(encoded_labels)

# Create a DataFrame for analysis
cluster_analysis_df = pd.DataFrame({
    'True_Label': encoded_labels,            # Encoded labels
    'True_Class_Name': original_class_names, # Decoded class names
    'Predicted_Cluster': agg_clusters        # Clusters from agglomerative clustering
})

# Group by predicted clusters and analyze the distribution of true labels
for cluster_id in range(optimal_k):  
    cluster_data = cluster_analysis_df[cluster_analysis_df['Predicted_Cluster'] == cluster_id]
    most_common_label_encoded, count = mode(cluster_data['True_Label'])
    most_common_class_name = label_encoder.inverse_transform([most_common_label_encoded[0]])[0]
    
    print(f"Cluster {cluster_id}: Most common class is '{most_common_class_name}' with {count[0]} occurrences.")
    print(f"Class distribution in this cluster:\n{cluster_data['True_Class_Name'].value_counts(normalize=True)}\n")