<a href="https://colab.research.google.com/github/MuhammadEgaRam/skripsi/blob/main/Skripsi1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install scikit-fuzzy

Collecting scikit-fuzzy
  Downloading scikit_fuzzy-0.5.0-py2.py3-none-any.whl.metadata (2.6 kB)
Downloading scikit_fuzzy-0.5.0-py2.py3-none-any.whl (920 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m920.8/920.8 kB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: scikit-fuzzy
Successfully installed scikit-fuzzy-0.5.0


In [None]:
# First, let's import all necessary libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import cv2
import time
from PIL import Image
import requests
from io import BytesIO
from tqdm.notebook import tqdm
import skfuzzy as fuzz
from scipy import stats
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import GlobalAveragePooling2D
import warnings
# Feature Extraction using ResNet-50 with option to use saved metadata
from tensorflow.keras.layers import Dense, Flatten, GlobalAveragePooling2D, Input
from google.colab import files
from tensorflow.keras.utils import plot_model
import tensorflow.keras.backend as K
warnings.filterwarnings('ignore')
# Feature Silhouette_Score
from sklearn.metrics import silhouette_score
from sklearn.metrics.pairwise import cosine_similarity
from matplotlib import gridspec
import matplotlib.image as mpimg

# Set up matplotlib for high-quality visualization
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['figure.dpi'] = 100
plt.rcParams['savefig.dpi'] = 150
plt.style.use('seaborn-v0_8-whitegrid')

# Mount Google Drive - Run this cell first
from google.colab import drive
drive.mount('/content/drive')

# STEP 0: Define the paths to your data and output folders
# Update these paths to match your Google Drive structure
DATA_PATH = '/content/drive/MyDrive/Dokumen/Data Primer Batik'  # Your dataset path
OUTPUT_PATH = '/content/drive/MyDrive/batik_output'  # Output path for results

# Create output directory if it doesn't exist
os.makedirs(OUTPUT_PATH, exist_ok=True)
print(f"Data path: {DATA_PATH}")
print(f"Output path: {OUTPUT_PATH}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Data path: /content/drive/MyDrive/Dokumen/Data Primer Batik
Output path: /content/drive/MyDrive/batik_output


Load Data

In [None]:
# Data Loading and Preprocessing
def load_images(data_path, target_size=(224, 224)):
    """
    Load and preprocess images from the dataset
    """
    image_paths = []
    categories = []
    subcategories = []

    # Identify main categories (Batik Blimbing Fix, Batik Soendari, etc.)
    main_categories = [d for d in os.listdir(data_path)
                      if os.path.isdir(os.path.join(data_path, d))]

    print(f"Found {len(main_categories)} main categories: {main_categories}")

    for category in main_categories:
        category_path = os.path.join(data_path, category)

        # Get subcategories for each main category
        subcats = [d for d in os.listdir(category_path)
                  if os.path.isdir(os.path.join(category_path, d))]

        print(f"Category '{category}' has {len(subcats)} subcategories")

        for subcat in subcats:
            subcat_path = os.path.join(category_path, subcat)

            # Find all image files in this subcategory
            for root, _, files in os.walk(subcat_path):
                for file in files:
                    if file.lower().endswith(('.png', '.jpg', '.jpeg')):
                        file_path = os.path.join(root, file)
                        image_paths.append(file_path)
                        categories.append(category)
                        subcategories.append(subcat)

    print(f"Found {len(image_paths)} images across {len(set(categories))} categories and {len(set(subcategories))} subcategories")

    # Load and preprocess images
    images = []
    valid_paths = []
    valid_categories = []
    valid_subcategories = []

    for i, path in enumerate(tqdm(image_paths)):
        try:
            img = image.load_img(path, target_size=target_size)
            img_array = image.img_to_array(img)
            img_array = preprocess_input(img_array)
            images.append(img_array)
            valid_paths.append(path)
            valid_categories.append(categories[i])
            valid_subcategories.append(subcategories[i])
        except Exception as e:
            print(f"Error loading image {path}: {e}")

    # Create metadata dataframe
    metadata = pd.DataFrame({
        'image_path': valid_paths,
        'category': valid_categories,
        'subcategory': valid_subcategories
    })

    # Display some statistics
    print("\nData Loading Summary:")
    print(f"Successfully loaded {len(images)} images")
    print(f"Categories distribution:")
    print(metadata['category'].value_counts().head())
    print("\nSample subcategories:")
    print(metadata['subcategory'].value_counts().head())

    # Save metadata
    metadata.to_csv(os.path.join(OUTPUT_PATH, 'batik_metadata.csv'), index=False)
    print(f"Metadata saved to {os.path.join(OUTPUT_PATH, 'batik_metadata.csv')}")

    return np.array(images), valid_paths, metadata

images, image_paths, metadata = load_images(DATA_PATH)

Found 3 main categories: ['Rumah Seni Budaya Singhasari', 'Batik Soendari', 'Batik Blimbing']
Category 'Rumah Seni Budaya Singhasari' has 3 subcategories
Category 'Batik Soendari' has 11 subcategories
Category 'Batik Blimbing' has 10 subcategories
Found 834 images across 3 categories and 22 subcategories


  0%|          | 0/834 [00:00<?, ?it/s]


Data Loading Summary:
Successfully loaded 834 images
Categories distribution:
category
Batik Soendari                  511
Rumah Seni Budaya Singhasari    283
Batik Blimbing                   40
Name: count, dtype: int64

Sample subcategories:
subcategory
adiluhung               239
Batik Topeng Malang     123
Batik Cap Teratai        82
Batik Cap Biota Laut     63
Nonik AD                 63
Name: count, dtype: int64
Metadata saved to /content/drive/MyDrive/batik_output/batik_metadata.csv


Ekstrasi Fitur

In [None]:
def extract_features_from_metadata(metadata_path, output_path, save_csv=True, print_architecture=True):
    """
    Extract features from images using ResNet-50 without batching,
    loading image paths from previously saved metadata

    Parameters:
    - metadata_path: Path to the CSV file with image paths
    - output_path: Path to save the output files
    - save_csv: Whether to save features as CSV (default: True)
    - print_architecture: Whether to print the model architecture (default: True)

    Returns:
    - Features extracted from the model
    - Image paths
    - Metadata DataFrame
    """
    # Load metadata
    print(f"Loading metadata from {metadata_path}...")
    if not os.path.exists(metadata_path):
        raise FileNotFoundError(f"Metadata file not found at {metadata_path}")

    metadata = pd.read_csv(metadata_path)
    image_paths = metadata['image_path'].tolist()
    print(f"Loaded metadata for {len(image_paths)} images")

    # Verify images exist
    valid_paths = []
    for path in image_paths:
        if os.path.exists(path):
            valid_paths.append(path)
        else:
            print(f"Warning: Image not found at {path}")

    if len(valid_paths) < len(image_paths):
        print(f"Warning: {len(image_paths) - len(valid_paths)} images not found")

    print(f"Proceeding with {len(valid_paths)} valid images")

    # Initialize ResNet-50 model
    print("Initializing ResNet-50 model...")
    base_model = ResNet50(weights='imagenet', include_top=False)

    # Add a global spatial average pooling layer
    x = base_model.output
    x = GlobalAveragePooling2D()(x)

    # Add a fully connected layer with 1000 units (changed from 1024)
    x = Dense(1000, activation='relu')(x)

    # Add a flatten layer to make output 1-dimensional
    x = Flatten()(x)

    # Create the feature extraction model
    model = Model(inputs=base_model.input, outputs=x)
    print("Model loaded successfully")

    # Print the model architecture if requested
    if print_architecture:
        print("\n=== ResNet-50 Model Architecture ===")
        # Print summary of the base model
        print("\nBase ResNet-50 Model:")
        base_model.summary()

        # Print summary of our feature extraction model
        print("\nFeature Extraction Model (with FC layer):")
        model.summary()

        # Removed architecture image printing

        # Show the layers we're using for feature extraction
        print("\nLayers used for feature extraction:")
        print(f"1. Base ResNet-50 (output shape before pooling): {base_model.output_shape}")
        print(f"2. GlobalAveragePooling2D (output shape): (None, {base_model.output_shape[-1]})")
        print(f"3. Dense/Fully Connected (output shape): (None, 1000)")
        print(f"4. Flatten (output shape): (None, 1000)")

        # Print some additional details about the model
        print(f"\nTotal params in the model: {model.count_params():,}")
        print(f"Trainable params: {sum([K.count_params(w) for w in model.trainable_weights]):,}")
        print(f"Non-trainable params: {sum([K.count_params(w) for w in model.non_trainable_weights]):,}")

    # Extract features for each image
    print(f"\nExtracting features from {len(valid_paths)} images...")
    features = []
    valid_indices = []

    for i, path in enumerate(tqdm(valid_paths)):
        try:
            # Load and preprocess the image
            img = image.load_img(path, target_size=(224, 224))
            img_array = image.img_to_array(img)
            img_array = preprocess_input(np.expand_dims(img_array, axis=0))

            # Extract features
            feature = model.predict(img_array, verbose=0)
            features.append(feature[0])
            valid_indices.append(i)
        except Exception as e:
            print(f"Error processing image {path}: {e}")

    features_array = np.array(features)
    print(f"Feature extraction completed. Features shape: {features_array.shape}")

    # Filter metadata to include only successfully processed images
    valid_metadata = metadata.iloc[valid_indices].reset_index(drop=True)

    # Show sample feature vector
    if len(features_array) > 0:
        print("\nSample feature vector (first 10 values):")
        print(features_array[0][:10])

    # Save features as CSV if requested
    if save_csv and len(features_array) > 0:
        save_features_as_csv(features_array, valid_metadata, output_path)

    # Also save the features as NumPy array for faster loading in the future
    if len(features_array) > 0:
        np.save(os.path.join(output_path, 'batik_features.npy'), features_array)
        print(f"Features saved as NumPy array to {os.path.join(output_path, 'batik_features.npy')}")

        # Save updated metadata with only valid images
        valid_metadata.to_csv(os.path.join(output_path, 'valid_batik_metadata.csv'), index=False)
        print(f"Updated metadata saved to {os.path.join(output_path, 'valid_batik_metadata.csv')}")

    # Save the final model
    model_path = os.path.join(output_path, 'resnet50_batik_model.h5')
    model.save(model_path)
    print(f"Model saved to: {model_path}")

    return features_array, valid_paths, valid_metadata

In [None]:
def save_features_as_csv(features, metadata, output_path, filename='batik_features.csv'):
    """
    Save extracted features to a CSV file with image metadata

    Parameters:
    - features: Feature array to save
    - metadata: DataFrame with image metadata
    - output_path: Path to save the CSV file
    - filename: Name of the CSV file
    """
    print(f"Saving {features.shape[0]} feature vectors to CSV...")

    # Create a DataFrame with feature columns
    df_features = pd.DataFrame(features)
    df_features.columns = [f'Feature_{i}' for i in range(features.shape[1])]

    # Add metadata columns (image path, category, subcategory)
    if len(metadata) == len(df_features):
        for col in metadata.columns:
            df_features[col] = metadata[col].values

    # Save to CSV
    csv_path = os.path.join(output_path, filename)
    df_features.to_csv(csv_path, index=False)
    print(f"Features saved to: {csv_path}")

    # For Google Colab: Provide download link
    try:
        # Make file downloadable directly from Colab
        files.download(csv_path)
        print("Download initiated. Check your browser's download folder.")
    except Exception as e:
        print(f"Automatic download failed: {e}")
        print(f"You can manually download the file from: {csv_path}")

    return csv_path


# Example usage:
# Define paths
metadata_path = '/content/drive/MyDrive/batik_output/batik_metadata.csv'
output_path = '/content/drive/MyDrive/batik_output'

# Run feature extraction using previously saved metadata
features, valid_paths, valid_metadata = extract_features_from_metadata(
    metadata_path=metadata_path,
    output_path=output_path,
    save_csv=True,
    print_architecture=True  # Set to True to see the model architecture
)

Loading metadata from /content/drive/MyDrive/batik_output/batik_metadata.csv...
Loaded metadata for 834 images
Proceeding with 834 valid images
Initializing ResNet-50 model...
Model loaded successfully

=== ResNet-50 Model Architecture ===

Base ResNet-50 Model:



Feature Extraction Model (with FC layer):



Layers used for feature extraction:
1. Base ResNet-50 (output shape before pooling): (None, None, None, 2048)
2. GlobalAveragePooling2D (output shape): (None, 2048)
3. Dense/Fully Connected (output shape): (None, 1000)
4. Flatten (output shape): (None, 1000)

Total params in the model: 25,636,712
Trainable params: 25,583,592
Non-trainable params: 53,120

Extracting features from 834 images...


  0%|          | 0/834 [00:00<?, ?it/s]

Feature extraction completed. Features shape: (834, 1000)

Sample feature vector (first 10 values):
[1.1192669 0.        0.        1.1387758 0.        2.7318704 0.6614157
 0.        0.7596138 0.       ]
Saving 834 feature vectors to CSV...
Features saved to: /content/drive/MyDrive/batik_output/batik_features.csv


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Download initiated. Check your browser's download folder.
Features saved as NumPy array to /content/drive/MyDrive/batik_output/batik_features.npy




Updated metadata saved to /content/drive/MyDrive/batik_output/valid_batik_metadata.csv
Model saved to: /content/drive/MyDrive/batik_output/resnet50_batik_model.h5


Cluster

In [None]:
def cluster_subcategories(features, metadata, n_clusters=5, max_iter=100, error=0.005, m=2, output_path=None):
    """
    Cluster features by subcategory to ensure each subcategory appears in only one cluster

    Parameters:
    - features: Extracted image features
    - metadata: DataFrame containing image metadata (paths, subcategories)
    - n_clusters: Number of clusters
    - max_iter: Maximum number of iterations
    - error: Error threshold
    - m: Fuzziness coefficient
    - output_path: Path to save visualization results

    Returns:
    - cluster_membership: Cluster assignments for all images
    - subcategory_clusters: Mapping of subcategories to clusters
    - subcategory_centroids: Average feature vector for each subcategory
    """
    print(f"Clustering subcategories (ensuring one subcategory per cluster)...")

    if 'subcategory' not in metadata.columns:
        raise ValueError("Metadata must contain 'subcategory' column")

    # Step 1: Compute centroids for each subcategory
    subcategories = metadata['subcategory'].unique()
    print(f"Found {len(subcategories)} unique subcategories")

    subcategory_centroids = []
    valid_subcategories = []

    for subcategory in subcategories:
        # Get indices for this subcategory
        indices = metadata[metadata['subcategory'] == subcategory].index.tolist()

        if not indices:
            print(f"Warning: No images found for subcategory '{subcategory}'")
            continue

        # Compute average feature vector (centroid) for this subcategory
        subcategory_centroid = np.mean(features[indices], axis=0)
        subcategory_centroids.append(subcategory_centroid)
        valid_subcategories.append(subcategory)

    subcategory_centroids = np.array(subcategory_centroids)
    print(f"Computed centroids for {len(valid_subcategories)} subcategories with shape {subcategory_centroids.shape}")

    # Step 2: Apply clustering on subcategory centroids
    if len(valid_subcategories) < n_clusters:
        print(f"Warning: Number of subcategories ({len(valid_subcategories)}) is less than requested clusters ({n_clusters})")
        n_clusters = len(valid_subcategories)
        print(f"Adjusted number of clusters to {n_clusters}")

    print(f"Applying Fuzzy C-Means clustering with {n_clusters} clusters on subcategory centroids...")

    # Apply Fuzzy C-Means clustering on the subcategory centroids
    cntr, u, u0, d, jm, p, fpc = fuzz.cluster.cmeans(
        subcategory_centroids.T, n_clusters, m, error=error, maxiter=max_iter, init=None
    )

    # Get cluster assignments for subcategories
    subcategory_cluster_assignments = np.argmax(u, axis=0)

    # Create mapping from subcategory to cluster
    subcategory_to_cluster = {subcategory: cluster for subcategory, cluster in zip(valid_subcategories, subcategory_cluster_assignments)}

    # Step 3: Assign all images of a subcategory to the same cluster
    image_cluster_assignments = np.zeros(len(metadata), dtype=int)

    for i, row in metadata.iterrows():
        subcategory = row['subcategory']
        if subcategory in subcategory_to_cluster:
            image_cluster_assignments[i] = subcategory_to_cluster[subcategory]
        else:
            # Assign to closest cluster centroid if subcategory wasn't clustered
            feature_vector = features[i].reshape(1, -1)
            distances = [np.linalg.norm(feature_vector - cntr[:, j].reshape(1, -1)) for j in range(n_clusters)]
            image_cluster_assignments[i] = np.argmin(distances)

    # Create membership matrix for visualization (all images of same subcategory have same membership)
    u_images = np.zeros((n_clusters, len(metadata)))

    for i, row in metadata.iterrows():
        subcategory = row['subcategory']
        if subcategory in subcategory_to_cluster:
            cluster_idx = subcategory_to_cluster[subcategory]
            # Set high membership for assigned cluster, low for others
            for j in range(n_clusters):
                u_images[j, i] = 0.95 if j == cluster_idx else 0.05 / (n_clusters - 1)

    # Print clustering results
    print(f"Clustering completed with {n_clusters} clusters")
    print(f"Fuzzy Partition Coefficient: {fpc}")

    # Count subcategories in each cluster
    cluster_subcategory_counts = {}
    for subcategory, cluster in subcategory_to_cluster.items():
        if cluster not in cluster_subcategory_counts:
            cluster_subcategory_counts[cluster] = []
        cluster_subcategory_counts[cluster].append(subcategory)

    print("\nSubcategory distribution across clusters:")
    for cluster in range(n_clusters):
        subcategories_in_cluster = cluster_subcategory_counts.get(cluster, [])
        print(f"Cluster {cluster}: {len(subcategories_in_cluster)} subcategories")
        for subcategory in subcategories_in_cluster:
            count = len(metadata[metadata['subcategory'] == subcategory])
            print(f"  - {subcategory}: {count} images")

    # Count items in each cluster
    unique_clusters, counts = np.unique(image_cluster_assignments, return_counts=True)
    print("\nCluster size distribution:")
    for cluster, count in zip(unique_clusters, counts):
        print(f"Cluster {cluster}: {count} images ({count/len(image_cluster_assignments)*100:.2f}%)")

    # Display results with subcategory information
    if output_path is not None:
        visualize_subcategory_clusters(features, image_cluster_assignments, u_images, metadata,
                                       subcategory_to_cluster, output_path, n_clusters)

    return image_cluster_assignments, subcategory_to_cluster, subcategory_centroids

def visualize_subcategory_clusters(features, cluster_membership, membership_matrix, metadata,
                                 subcategory_to_cluster, output_path, n_clusters):
    """
    Visualize clustering results with one subcategory per cluster

    Parameters:
    - features: Extracted image features
    - cluster_membership: Cluster assignments for all images
    - membership_matrix: Fuzzy membership matrix
    - metadata: DataFrame containing image metadata
    - subcategory_to_cluster: Mapping of subcategories to clusters
    - output_path: Path to save visualization results
    - n_clusters: Number of clusters
    """
    print("\nGenerating cluster visualization with subcategory information...")

    # Create output directory for cluster visualizations
    cluster_viz_path = os.path.join(output_path, 'subcategory_cluster_visualization')
    os.makedirs(cluster_viz_path, exist_ok=True)

    # Create a DataFrame with cluster assignments
    df_results = pd.DataFrame({
        'image_path': metadata['image_path'],
        'subcategory': metadata['subcategory'],
        'cluster': cluster_membership
    })

    # Save complete results
    df_results.to_csv(os.path.join(cluster_viz_path, 'subcategory_clustering_results.csv'), index=False)

    # Create reverse mapping from cluster to subcategories
    cluster_to_subcategories = {}
    for subcategory, cluster in subcategory_to_cluster.items():
        if cluster not in cluster_to_subcategories:
            cluster_to_subcategories[cluster] = []
        cluster_to_subcategories[cluster].append(subcategory)

    # Save cluster to subcategory mapping
    with open(os.path.join(cluster_viz_path, 'cluster_subcategory_mapping.txt'), 'w') as f:
        f.write("Cluster to Subcategory Mapping:\n")
        f.write("=============================\n\n")

        for cluster in range(n_clusters):
            subcategories = cluster_to_subcategories.get(cluster, [])
            subcategories_str = ", ".join(subcategories)
            f.write(f"Cluster {cluster}: {subcategories_str}\n")

            # Count images per subcategory in this cluster
            f.write("  Images per subcategory:\n")
            for subcategory in subcategories:
                count = len(metadata[metadata['subcategory'] == subcategory])
                f.write(f"  - {subcategory}: {count} images\n")
            f.write("\n")

    # Analyze and visualize clusters
    print("\nAnalyzing clusters by subcategory:")

    # For each cluster, visualize sample images from each subcategory
    for cluster in range(n_clusters):
        # Get subcategories in this cluster
        subcategories = cluster_to_subcategories.get(cluster, [])

        if not subcategories:
            print(f"  Cluster {cluster}: No subcategories assigned")
            continue

        print(f"  Cluster {cluster}: {len(subcategories)} subcategories")

        # Save cluster results to CSV
        cluster_items = df_results[df_results['cluster'] == cluster]
        cluster_items.to_csv(os.path.join(cluster_viz_path, f'cluster_{cluster}_items.csv'), index=False)

        # Visualize sample images from each subcategory in this cluster
        visualize_cluster_subcategories(cluster, subcategories, metadata, membership_matrix, cluster_viz_path)

    # Create visual representation of cluster-subcategory mapping
    create_subcategory_cluster_visualization(subcategory_to_cluster, metadata, cluster_viz_path, n_clusters)

    print(f"\nClustering visualization completed. Results saved to {cluster_viz_path}")

def visualize_cluster_subcategories(cluster_id, subcategories, metadata, membership_matrix, output_path, samples_per_subcategory=2):
    """
    Visualize sample images from each subcategory in a cluster

    Parameters:
    - cluster_id: Cluster ID
    - subcategories: List of subcategories in this cluster
    - metadata: DataFrame containing image metadata
    - membership_matrix: Fuzzy membership matrix
    - output_path: Path to save visualizations
    - samples_per_subcategory: Number of sample images to display per subcategory
    """
    if not subcategories:
        return

    # Calculate total number of samples to display
    total_samples = len(subcategories) * samples_per_subcategory

    # Create figure
    fig_height = 3 * total_samples  # Adjust height based on number of samples
    plt.figure(figsize=(15, fig_height))
    plt.suptitle(f"Cluster {cluster_id}: Subcategory Samples", fontsize=16)

    # Plot samples for each subcategory
    sample_idx = 1

    for subcategory in subcategories:
        # Get items for this subcategory
        subcategory_items = metadata[metadata['subcategory'] == subcategory]

        if len(subcategory_items) == 0:
            continue

        print(f"    {subcategory}: {len(subcategory_items)} images")

        # Get indices
        indices = subcategory_items.index.tolist()

        # Sort by membership value for this cluster
        membership_values = membership_matrix[cluster_id, indices]
        sorted_indices = np.argsort(-membership_values)

        # Select top samples
        top_indices = sorted_indices[:min(samples_per_subcategory, len(sorted_indices))]
        selected_indices = [indices[i] for i in top_indices]

        # Plot each sample
        for idx in selected_indices:
            try:
                # Try to load and display the image
                img_path = metadata.loc[idx, 'image_path']
                if os.path.exists(img_path):
                    img = mpimg.imread(img_path)

                    plt.subplot(total_samples, 1, sample_idx)
                    plt.imshow(img)

                    # Show subcategory
                    plt.title(f"Subcategory: {subcategory}")
                    plt.axis('off')

                    sample_idx += 1
                else:
                    print(f"      Warning: Image not found at {img_path}")
            except Exception as e:
                print(f"      Error displaying image: {e}")

    plt.tight_layout()
    plt.subplots_adjust(top=0.95)

    # Save figure
    plt.savefig(os.path.join(output_path, f'cluster_{cluster_id}_subcategory_samples.png'))
    plt.close()

def create_subcategory_cluster_visualization(subcategory_to_cluster, metadata, output_path, n_clusters):
    """
    Create visual representation of cluster-subcategory mapping

    Parameters:
    - subcategory_to_cluster: Mapping of subcategories to clusters
    - metadata: DataFrame containing image metadata
    - output_path: Path to save visualization
    - n_clusters: Number of clusters
    """
    # Count images per subcategory
    subcategory_counts = metadata['subcategory'].value_counts()

    # Create DataFrame for visualization
    viz_data = []

    for subcategory, cluster in subcategory_to_cluster.items():
        count = subcategory_counts.get(subcategory, 0)
        viz_data.append({
            'subcategory': subcategory,
            'cluster': f'Cluster {cluster}',
            'image_count': count
        })

    df_viz = pd.DataFrame(viz_data)

    # Sort by cluster and then by image count within cluster
    df_viz = df_viz.sort_values(['cluster', 'image_count'], ascending=[True, False])

    # Create cluster bar chart
    plt.figure(figsize=(14, max(8, len(df_viz) * 0.3)))

    # Use different colors for each cluster
    palette = sns.color_palette("husl", n_clusters)
    cluster_colors = {f'Cluster {i}': palette[i] for i in range(n_clusters)}

    # Create the plot
    ax = sns.barplot(x='image_count', y='subcategory', hue='cluster',
                    data=df_viz, palette=cluster_colors)

    plt.title('Subcategory Distribution Across Clusters', fontsize=14)
    plt.xlabel('Number of Images', fontsize=12)
    plt.ylabel('Subcategory', fontsize=12)

    # Add image count labels
    for p in ax.patches:
        width = p.get_width()
        plt.text(width + 1, p.get_y() + p.get_height()/2, f'{int(width)}',
                ha='left', va='center')

    plt.tight_layout()
    plt.savefig(os.path.join(output_path, 'subcategory_cluster_distribution.png'))
    plt.close()

    # Create heatmap showing subcategory to cluster mapping
    cluster_subcategory_matrix = np.zeros((len(subcategory_to_cluster), n_clusters))
    subcategory_list = list(subcategory_to_cluster.keys())

    for i, subcategory in enumerate(subcategory_list):
        cluster = subcategory_to_cluster[subcategory]
        cluster_subcategory_matrix[i, cluster] = subcategory_counts.get(subcategory, 0)

    plt.figure(figsize=(n_clusters + 2, max(8, len(subcategory_list) * 0.4)))
    plt.title('Subcategory to Cluster Assignment', fontsize=14)

    # Create heatmap
    sns.heatmap(cluster_subcategory_matrix, cmap='viridis',
               xticklabels=[f'Cluster {i}' for i in range(n_clusters)],
               yticklabels=subcategory_list,
               cbar_kws={'label': 'Number of Images'})

    plt.tight_layout()
    plt.savefig(os.path.join(output_path, 'subcategory_cluster_heatmap.png'))
    plt.close()

# Evaluate clustering quality
def evaluate_subcategory_clustering(features, cluster_membership, metadata, subcategory_to_cluster, output_path=None):
    """
    Evaluate subcategory-based clustering quality

    Parameters:
    - features: Extracted features
    - cluster_membership: Cluster assignments
    - metadata: DataFrame containing image metadata
    - subcategory_to_cluster: Mapping of subcategories to clusters
    - output_path: Path to save evaluation results

    Returns:
    - evaluation_metrics: Dictionary of evaluation metrics
    """
    print("\nEvaluating subcategory-based clustering quality...")

    # Create evaluation directory
    if output_path:
        eval_path = os.path.join(output_path, 'subcategory_clustering_evaluation')
        os.makedirs(eval_path, exist_ok=True)

    # Calculate standard silhouette score (not ideal for this approach but provides a baseline)
    try:
        silhouette_avg = silhouette_score(features, cluster_membership, metric='cosine')
        print(f"Silhouette Score: {silhouette_avg:.4f}")
    except Exception as e:
        silhouette_avg = -1
        print(f"Error calculating silhouette score: {e}")

    # Calculate custom evaluation metrics
    n_clusters = len(set(cluster_membership))
    n_subcategories = len(subcategory_to_cluster)

    # Compute within-cluster variation for each cluster
    within_cluster_distances = []
    between_cluster_distances = []
    cluster_sizes = []

    for cluster in range(n_clusters):
        # Get indices for this cluster
        cluster_indices = np.where(cluster_membership == cluster)[0]
        cluster_sizes.append(len(cluster_indices))

        if len(cluster_indices) <= 1:
            continue

        # Calculate mean feature for this cluster
        cluster_mean = np.mean(features[cluster_indices], axis=0)

        # Calculate within-cluster distances
        within_dists = []
        for idx in cluster_indices:
            dist = np.linalg.norm(features[idx] - cluster_mean)
            within_dists.append(dist)

        within_cluster_distances.append(np.mean(within_dists))

        # Calculate between-cluster distances
        for other_cluster in range(cluster + 1, n_clusters):
            other_indices = np.where(cluster_membership == other_cluster)[0]

            if len(other_indices) == 0:
                continue

            other_mean = np.mean(features[other_indices], axis=0)
            between_dist = np.linalg.norm(cluster_mean - other_mean)
            between_cluster_distances.append(between_dist)

    # Calculate overall metrics
    avg_within_cluster_dist = np.mean(within_cluster_distances) if within_cluster_distances else float('inf')
    avg_between_cluster_dist = np.mean(between_cluster_distances) if between_cluster_distances else 0
    separation_ratio = avg_between_cluster_dist / avg_within_cluster_dist if avg_within_cluster_dist > 0 else 0

    print(f"Average Within-Cluster Distance: {avg_within_cluster_dist:.4f}")
    print(f"Average Between-Cluster Distance: {avg_between_cluster_dist:.4f}")
    print(f"Cluster Separation Ratio: {separation_ratio:.4f}")

    # Calculate cluster balance
    cluster_balance = 1 - np.std(cluster_sizes) / np.mean(cluster_sizes) if np.mean(cluster_sizes) > 0 else 0
    print(f"Cluster Balance (1 is perfectly balanced): {cluster_balance:.4f}")

    # Calculate subcategory purity (should be 1.0 since each subcategory is in only one cluster)
    subcategory_purity = 1.0  # By design of our algorithm
    print(f"Subcategory Purity: {subcategory_purity:.4f}")

    # Create reverse mapping from cluster to subcategories
    cluster_to_subcategories = {}
    for subcategory, cluster in subcategory_to_cluster.items():
        if cluster not in cluster_to_subcategories:
            cluster_to_subcategories[cluster] = []
        cluster_to_subcategories[cluster].append(subcategory)

    # Save evaluation results
    if output_path:
        # Save metrics to text file
        with open(os.path.join(eval_path, 'subcategory_clustering_metrics.txt'), 'w') as f:
            f.write(f"Subcategory Clustering Evaluation\n")
            f.write(f"================================\n\n")
            f.write(f"Number of clusters: {n_clusters}\n")
            f.write(f"Number of subcategories: {n_subcategories}\n")
            f.write(f"Silhouette Score: {silhouette_avg:.4f}\n")
            f.write(f"Average Within-Cluster Distance: {avg_within_cluster_dist:.4f}\n")
            f.write(f"Average Between-Cluster Distance: {avg_between_cluster_dist:.4f}\n")
            f.write(f"Cluster Separation Ratio: {separation_ratio:.4f}\n")
            f.write(f"Cluster Balance: {cluster_balance:.4f}\n")
            f.write(f"Subcategory Purity: {subcategory_purity:.4f}\n\n")

            # Write cluster details
            f.write(f"Cluster Details:\n")
            for cluster in range(n_clusters):
                subcategories = cluster_to_subcategories.get(cluster, [])
                f.write(f"  Cluster {cluster} ({len(subcategories)} subcategories):\n")

                for subcategory in subcategories:
                    count = len(metadata[metadata['subcategory'] == subcategory])
                    f.write(f"    - {subcategory}: {count} images\n")

                # Calculate metrics for this cluster
                indices = np.where(cluster_membership == cluster)[0]
                if len(indices) > 1:
                    cluster_mean = np.mean(features[indices], axis=0)
                    within_dist = np.mean([np.linalg.norm(features[idx] - cluster_mean) for idx in indices])
                    f.write(f"    Within-cluster distance: {within_dist:.4f}\n")
                f.write("\n")

        # Create visualization of metrics
        plt.figure(figsize=(15, 10))

        # Create 2x2 subplot layout
        plt.subplot(2, 2, 1)
        plt.bar(range(len(within_cluster_distances)), within_cluster_distances, color='skyblue')
        plt.axhline(y=avg_within_cluster_dist, color='r', linestyle='--')
        plt.title('Within-Cluster Distances')
        plt.xlabel('Cluster')
        plt.ylabel('Average Distance')
        plt.xticks(range(len(within_cluster_distances)), range(len(within_cluster_distances)))

        plt.subplot(2, 2, 2)
        plt.bar(range(len(cluster_sizes)), cluster_sizes, color='lightgreen')
        plt.axhline(y=np.mean(cluster_sizes), color='r', linestyle='--')
        plt.title('Cluster Sizes')
        plt.xlabel('Cluster')
        plt.ylabel('Number of Images')
        plt.xticks(range(len(cluster_sizes)), range(len(cluster_sizes)))

        plt.subplot(2, 2, 3)
        metrics = ['Silhouette', 'Separation', 'Balance', 'Purity']
        values = [max(0, silhouette_avg), separation_ratio, cluster_balance, subcategory_purity]
        plt.bar(metrics, values, color=['blue', 'green', 'orange', 'red'])
        plt.title('Clustering Quality Metrics')
        plt.ylabel('Score (higher is better)')

        plt.subplot(2, 2, 4)
        subcategories_per_cluster = [len(cluster_to_subcategories.get(cluster, [])) for cluster in range(n_clusters)]
        plt.bar(range(n_clusters), subcategories_per_cluster, color='purple')
        plt.title('Subcategories per Cluster')
        plt.xlabel('Cluster')
        plt.ylabel('Number of Subcategories')
        plt.xticks(range(n_clusters), range(n_clusters))

        plt.tight_layout()
        plt.savefig(os.path.join(eval_path, 'subcategory_clustering_metrics.png'))
        plt.close()

    # Return evaluation metrics
    metrics = {
        'silhouette_score': silhouette_avg,
        'within_cluster_distance': avg_within_cluster_dist,
        'between_cluster_distance': avg_between_cluster_dist,
        'separation_ratio': separation_ratio,
        'cluster_balance': cluster_balance,
        'subcategory_purity': subcategory_purity
    }

    return metrics

# Find optimal number of clusters for subcategory clustering
def find_optimal_subcategory_clusters(features, metadata, max_clusters=10, output_path=None):
    """
    Find the optimal number of clusters for subcategory-based clustering

    Parameters:
    - features: Extracted image features
    - metadata: DataFrame containing image metadata
    - max_clusters: Maximum number of clusters to try
    - output_path: Path to save results

    Returns:
    - optimal_clusters: Optimal number of clusters
    """
    print(f"Searching for optimal number of clusters for subcategory-based clustering (2-{max_clusters})...")

    # Get number of subcategories
    if 'subcategory' not in metadata.columns:
        raise ValueError("Metadata must contain 'subcategory' column")

    n_subcategories = metadata['subcategory'].nunique()
    max_clusters = min(max_clusters, n_subcategories)

    print(f"Found {n_subcategories} unique subcategories")
    print(f"Testing cluster counts from 2 to {max_clusters}")

    # Metrics to track
    silhouette_scores = []
    separation_ratios = []
    balance_scores = []

    # Create a progress bar
    progress_bar = tqdm(range(2, max_clusters + 1))

    for n_clusters in progress_bar:
        progress_bar.set_description(f"Testing {n_clusters} clusters")

        # Apply subcategory clustering
        cluster_membership, subcategory_to_cluster, _ = cluster_subcategories(
            features, metadata, n_clusters=n_clusters
        )

        # Evaluate clustering
        metrics = evaluate_subcategory_clustering(
            features, cluster_membership, metadata, subcategory_to_cluster
        )

        # Store metrics
        silhouette_scores.append(metrics['silhouette_score'])
        separation_ratios.append(metrics['separation_ratio'])
        balance_scores.append(metrics['cluster_balance'])

        progress_bar.set_postfix(
            silhouette=f"{metrics['silhouette_score']:.4f}",
            separation=f"{metrics['separation_ratio']:.4f}"
        )

    # Combine metrics to find optimal clusters
    # We'll normalize and weight each metric
    normalized_silhouette = normalize_scores(silhouette_scores)
    normalized_separation = normalize_scores(separation_ratios)
    normalized_balance = normalize_scores(balance_scores)

    # Combined score (equal weights)
    combined_scores = 0.4 * normalized_silhouette + 0.4 * normalized_separation + 0.2 * normalized_balance

    # Find optimal number of clusters
    optimal_idx = np.argmax(combined_scores)
    optimal_clusters = optimal_idx + 2  # +2 because we start from 2 clusters

    # Plot results
    if output_path:
        opt_path = os.path.join(output_path, 'optimal_subcategory_clusters')
        os.makedirs(opt_path, exist_ok=True)

        # Plot individual metrics
        plt.figure(figsize=(15, 12))

        # Plot silhouette scores
        plt.subplot(3, 1, 1)
        plt.plot(range(2, max_clusters + 1), silhouette_scores, marker='o', linestyle='-', linewidth=2)
        plt.axvline(x=optimal_clusters, color='r', linestyle='--')
        plt.title('Silhouette Score for Different Number of Clusters', fontsize=14)
        plt.xlabel('Number of Clusters', fontsize=12)
        plt.ylabel('Silhouette Score', fontsize=12)
        plt.grid(True, alpha=0.3)

        # Add value labels
        for i, score in enumerate(silhouette_scores):
            plt.annotate(f"{score:.3f}",
                        (i+2, score),
                        textcoords="offset points",
                        xytext=(0,10),
                        ha='center')

        # Plot separation ratios
        plt.subplot(3, 1, 2)
        plt.plot(range(2, max_clusters + 1), separation_ratios, marker='o', linestyle='-', linewidth=2, color='green')
        plt.axvline(x=optimal_clusters, color='r', linestyle='--')
        plt.title('Cluster Separation Ratio for Different Number of Clusters', fontsize=14)
        plt.xlabel('Number of Clusters', fontsize=12)
        plt.ylabel('Separation Ratio', fontsize=12)
        plt.grid(True, alpha=0.3)

        # Add value labels
        for i, score in enumerate(separation_ratios):
            plt.annotate(f"{score:.3f}",
                        (i+2, score),
                        textcoords="offset points",
                        xytext=(0,10),
                        ha='center')

        # Plot combined scores
        plt.subplot(3, 1, 3)
        plt.plot(range(2, max_clusters + 1), combined_scores, marker='o', linestyle='-', linewidth=2, color='purple')
        plt.scatter(optimal_clusters, combined_scores[optimal_idx], color='red', s=200,
                   label=f'Optimal: {optimal_clusters} clusters')
        plt.title('Combined Score for Different Number of Clusters', fontsize=14)
        plt.xlabel('Number of Clusters', fontsize=12)
        plt.ylabel('Combined Score', fontsize=12)
        plt.grid(True, alpha=0.3)
        plt.legend(fontsize=12)

        # Add value labels
        for i, score in enumerate(combined_scores):
            plt.annotate(f"{score:.3f}",
                        (i+2, score),
                        textcoords="offset points",
                        xytext=(0,10),
                        ha='center')

        plt.tight_layout()
        plt.savefig(os.path.join(opt_path, 'optimal_subcategory_clusters.png'))
        plt.close()

        # Save metrics to CSV
        metrics_df = pd.DataFrame({
            'n_clusters': list(range(2, max_clusters + 1)),
            'silhouette_score': silhouette_scores,
            'separation_ratio': separation_ratios,
            'balance_score': balance_scores,
            'combined_score': combined_scores
        })
        metrics_df.to_csv(os.path.join(opt_path, 'optimal_cluster_metrics.csv'), index=False)

    # Print results
    print("\nScores for different numbers of clusters:")
    for i, n_clusters in enumerate(range(2, max_clusters + 1)):
        print(f"  Clusters: {n_clusters}, Silhouette: {silhouette_scores[i]:.4f}, "
              f"Separation: {separation_ratios[i]:.4f}, Balance: {balance_scores[i]:.4f}, "
              f"Combined: {combined_scores[i]:.4f}" +
              (" (optimal)" if n_clusters == optimal_clusters else ""))

    return optimal_clusters

def normalize_scores(scores):
    """
    Normalize scores to [0, 1] range

    Parameters:
    - scores: List of scores to normalize

    Returns:
    - normalized_scores: Normalized scores
    """
    scores = np.array(scores)
    min_val = np.min(scores)
    max_val = np.max(scores)

    if max_val == min_val:
        return np.ones_like(scores)

    return (scores - min_val) / (max_val - min_val)

# Example usage:
# Define paths
OUTPUT_PATH = '/content/drive/MyDrive/batik_output'

# Load features and metadata
features = np.load(os.path.join(OUTPUT_PATH, 'batik_features.npy'))
metadata = pd.read_csv(os.path.join(OUTPUT_PATH, 'valid_batik_metadata.csv'))

# Find optimal number of clusters for subcategory-based clustering
optimal_clusters = find_optimal_subcategory_clusters(
    features,
    metadata,
    max_clusters=10,
    output_path=OUTPUT_PATH
)

# Apply clustering with the optimal number of clusters
cluster_membership, subcategory_to_cluster, subcategory_centroids = cluster_subcategories(
    features,
    metadata,
    n_clusters=optimal_clusters,
    output_path=OUTPUT_PATH
)

# Evaluate clustering
evaluate_subcategory_clustering(
    features,
    cluster_membership,
    metadata,
    subcategory_to_cluster,
    output_path=OUTPUT_PATH
)

Searching for optimal number of clusters for subcategory-based clustering (2-10)...
Found 22 unique subcategories
Testing cluster counts from 2 to 10


  0%|          | 0/9 [00:00<?, ?it/s]

Clustering subcategories (ensuring one subcategory per cluster)...
Found 22 unique subcategories
Computed centroids for 22 subcategories with shape (22, 1000)
Applying Fuzzy C-Means clustering with 2 clusters on subcategory centroids...
Clustering completed with 2 clusters
Fuzzy Partition Coefficient: 0.5000008903356306

Subcategory distribution across clusters:
Cluster 0: 13 subcategories
  - adiluhung: 239 images
  - Pendeng Jarit Pewarna Alami: 10 images
  - Paris Kusumahadi: 18 images
  - Singa Parang Hitam Putih: 17 images
  - Batik Cap Naga Merah: 34 images
  - Batik Cap Parang Tugu Singa: 24 images
  - Nonik AD: 63 images
  - Batik Cap Teratai: 82 images
  - Batik Topeng Malang: 123 images
  - Batik Cap Biota Laut: 63 images
  - Kastara Cakra Gama: 9 images
  - Angkucamala Puspa Padwa: 6 images
  - Daniswara Jiwatrisna Patibrata: 6 images
Cluster 1: 9 subcategories
  - Cap dan Tulis Leres Pending: 34 images
  - Batik Cap 3 Warna Bunga Orange: 18 images
  - Batik Cap Apel: 39 ima

{'silhouette_score': np.float32(0.14273629),
 'within_cluster_distance': np.float32(20.568932),
 'between_cluster_distance': np.float32(11.559022),
 'separation_ratio': np.float32(0.5619651),
 'cluster_balance': np.float64(0.2541966426858513),
 'subcategory_purity': 1.0}

similarity

In [None]:
# Evaluate clustering using Silhouette Coefficient
def evaluate_clustering(features, cluster_membership, metadata=None, output_path=None):
    """
    Calculate Silhouette Coefficient to evaluate clustering quality

    Parameters:
    - features: Extracted features
    - cluster_membership: Cluster assignments
    - metadata: DataFrame containing image metadata (optional)
    - output_path: Path to save evaluation results (optional)

    Returns:
    - score: Silhouette score
    """
    if len(np.unique(cluster_membership)) <= 1:
        print("Error: Only one cluster found, cannot calculate Silhouette Score")
        return 0

    # Calculate silhouette score
    score = silhouette_score(features, cluster_membership, metric='cosine')
    print(f"Silhouette Score: {score:.4f}")

    # If metadata is provided, perform additional evaluation
    if metadata is not None and output_path is not None:
        perform_subcategory_evaluation(features, cluster_membership, metadata, score, output_path)

    return score

def perform_subcategory_evaluation(features, cluster_membership, metadata, silhouette_score, output_path):
    """
    Perform additional evaluation based on subcategory information

    Parameters:
    - features: Extracted features
    - cluster_membership: Cluster assignments
    - metadata: DataFrame containing image metadata
    - silhouette_score: Overall silhouette score
    - output_path: Path to save evaluation results
    """
    print("\nPerforming subcategory-based evaluation...")

    # Create output directory for evaluation results
    eval_path = os.path.join(output_path, 'cluster_evaluation')
    os.makedirs(eval_path, exist_ok=True)

    # Combine clustering results with metadata
    df_results = pd.DataFrame({
        'image_path': metadata['image_path'],
        'subcategory': metadata['subcategory'] if 'subcategory' in metadata.columns else ['Unknown'] * len(metadata),
        'cluster': cluster_membership
    })

    # Calculate subcategory purity in each cluster
    clusters = np.unique(cluster_membership)
    cluster_purity = []
    subcategory_distribution = []

    for cluster in clusters:
        # Get items in this cluster
        cluster_items = df_results[df_results['cluster'] == cluster]
        cluster_size = len(cluster_items)

        # Calculate subcategory distribution
        subcategory_counts = cluster_items['subcategory'].value_counts()
        dominant_subcategory = subcategory_counts.index[0]
        dominant_count = subcategory_counts.iloc[0]
        purity = dominant_count / cluster_size

        cluster_purity.append({
            'cluster': cluster,
            'size': cluster_size,
            'dominant_subcategory': dominant_subcategory,
            'dominant_count': dominant_count,
            'purity': purity
        })

        # Store subcategory distribution for this cluster
        for subcategory, count in subcategory_counts.items():
            subcategory_distribution.append({
                'cluster': cluster,
                'subcategory': subcategory,
                'count': count,
                'percentage': (count / cluster_size) * 100
            })

    # Create DataFrames
    df_purity = pd.DataFrame(cluster_purity)
    df_distribution = pd.DataFrame(subcategory_distribution)

    # Calculate overall weighted purity
    total_images = len(df_results)
    weighted_purity = sum(row['purity'] * row['size'] for _, row in df_purity.iterrows()) / total_images

    # Calculate entropy for each cluster (lower is better)
    entropies = []
    for cluster in clusters:
        cluster_items = df_results[df_results['cluster'] == cluster]
        subcategory_probs = cluster_items['subcategory'].value_counts(normalize=True)
        entropy = -sum(p * np.log2(p) for p in subcategory_probs)
        entropies.append({
            'cluster': cluster,
            'entropy': entropy
        })

    df_entropy = pd.DataFrame(entropies)
    avg_entropy = df_entropy['entropy'].mean()

    # Save evaluation results
    df_purity.to_csv(os.path.join(eval_path, 'cluster_purity.csv'), index=False)
    df_distribution.to_csv(os.path.join(eval_path, 'subcategory_distribution.csv'), index=False)
    df_entropy.to_csv(os.path.join(eval_path, 'cluster_entropy.csv'), index=False)

    # Create summary file
    with open(os.path.join(eval_path, 'clustering_evaluation_summary.txt'), 'w') as f:
        f.write(f"Clustering Evaluation Summary\n")
        f.write(f"===========================\n\n")
        f.write(f"Number of clusters: {len(clusters)}\n")
        f.write(f"Number of images: {total_images}\n")
        f.write(f"Silhouette score: {silhouette_score:.4f}\n")
        f.write(f"Weighted cluster purity: {weighted_purity:.4f}\n")
        f.write(f"Average cluster entropy: {avg_entropy:.4f}\n\n")

        f.write(f"Cluster purity summary:\n")
        for _, row in df_purity.iterrows():
            f.write(f"  Cluster {row['cluster']}: {row['purity']:.4f} purity ")
            f.write(f"({row['dominant_subcategory']}: {row['dominant_count']}/{row['size']} images)\n")

    # Visualize purity and entropy
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

    # Plot cluster purity
    bars = ax1.bar(df_purity['cluster'], df_purity['purity'], color='skyblue')
    ax1.set_xlabel('Cluster')
    ax1.set_ylabel('Purity')
    ax1.set_title('Cluster Purity (higher is better)')
    ax1.set_ylim(0, 1)
    ax1.set_xticks(df_purity['cluster'])
    ax1.axhline(y=weighted_purity, color='r', linestyle='--', label=f'Weighted Avg: {weighted_purity:.4f}')
    ax1.legend()

    # Add value labels
    for bar in bars:
        height = bar.get_height()
        ax1.text(bar.get_x() + bar.get_width()/2., height + 0.02,
                f'{height:.3f}', ha='center', va='bottom')

    # Plot cluster entropy
    bars = ax2.bar(df_entropy['cluster'], df_entropy['entropy'], color='salmon')
    ax2.set_xlabel('Cluster')
    ax2.set_ylabel('Entropy')
    ax2.set_title('Cluster Entropy (lower is better)')
    ax2.set_xticks(df_entropy['cluster'])
    ax2.axhline(y=avg_entropy, color='r', linestyle='--', label=f'Average: {avg_entropy:.4f}')
    ax2.legend()

    # Add value labels
    for bar in bars:
        height = bar.get_height()
        ax2.text(bar.get_x() + bar.get_width()/2., height + 0.05,
                f'{height:.3f}', ha='center', va='bottom')

    plt.tight_layout()
    plt.savefig(os.path.join(eval_path, 'cluster_quality_metrics.png'))
    plt.close()

    # Create subcategory distribution heatmap
    create_subcategory_heatmap(df_distribution, clusters, eval_path)

    print(f"Subcategory-based evaluation completed and saved to {eval_path}")
    print(f"Weighted cluster purity: {weighted_purity:.4f}")
    print(f"Average cluster entropy: {avg_entropy:.4f}")

def create_subcategory_heatmap(df_distribution, clusters, output_path):
    """
    Create a heatmap showing the distribution of subcategories across clusters

    Parameters:
    - df_distribution: DataFrame with subcategory distribution
    - clusters: List of cluster IDs
    - output_path: Path to save the heatmap
    """
    # Pivot the data to create a subcategory × cluster matrix
    heatmap_data = df_distribution.pivot_table(
        index='subcategory',
        columns='cluster',
        values='percentage',
        fill_value=0
    )

    # Sort subcategories by total percentage across all clusters
    heatmap_data = heatmap_data.loc[heatmap_data.sum(axis=1).sort_values(ascending=False).index]

    # Create the heatmap
    plt.figure(figsize=(12, max(8, len(heatmap_data) * 0.4)))
    plt.title("Subcategory Distribution Across Clusters (%)", fontsize=14)

    # Create the heatmap
    sns.heatmap(heatmap_data, annot=True, cmap="viridis", fmt=".1f", linewidths=.5)

    plt.tight_layout()
    plt.savefig(os.path.join(output_path, 'subcategory_cluster_heatmap.png'))
    plt.close()

# Calculate similarity between images and visualize with subcategory information
def calculate_similarity(features, metadata=None, output_path=None):
    """
    Calculate cosine similarity between all pairs of images and visualize with subcategory information

    Parameters:
    - features: Extracted features
    - metadata: DataFrame containing image metadata (optional)
    - output_path: Path to save similarity results (optional)

    Returns:
    - similarity_matrix: Matrix of pairwise similarities
    """
    print("Calculating pairwise cosine similarity between images...")
    similarity_matrix = cosine_similarity(features)
    print(f"Similarity matrix shape: {similarity_matrix.shape}")

    # Display sample of similarity matrix
    plt.figure(figsize=(10, 8))
    plt.title("Sample of Similarity Matrix (first 50x50)")
    sns.heatmap(similarity_matrix[:50, :50], cmap="viridis")
    plt.xlabel("Image Index")
    plt.ylabel("Image Index")

    if output_path:
        plt.savefig(os.path.join(output_path, 'similarity_matrix_sample.png'))
    plt.show()

    # If metadata is provided, perform subcategory-based similarity analysis
    if metadata is not None and output_path is not None:
        analyze_subcategory_similarity(similarity_matrix, metadata, output_path)

    return similarity_matrix

def analyze_subcategory_similarity(similarity_matrix, metadata, output_path):
    """
    Analyze similarity within and between subcategories

    Parameters:
    - similarity_matrix: Matrix of pairwise similarities
    - metadata: DataFrame containing image metadata
    - output_path: Path to save analysis results
    """
    print("\nAnalyzing similarity within and between subcategories...")

    # Create output directory
    sim_path = os.path.join(output_path, 'similarity_analysis')
    os.makedirs(sim_path, exist_ok=True)

    # Get subcategories
    if 'subcategory' not in metadata.columns:
        print("No subcategory information available in metadata")
        return

    subcategories = metadata['subcategory'].unique()

    # Calculate average similarity within each subcategory
    within_similarity = []
    for subcategory in subcategories:
        # Get indices for this subcategory
        indices = metadata[metadata['subcategory'] == subcategory].index.tolist()

        if len(indices) < 2:  # Need at least 2 images to calculate similarity
            continue

        # Extract similarity submatrix for this subcategory
        submatrix = similarity_matrix[np.ix_(indices, indices)]

        # Exclude self-similarity (diagonal)
        mask = ~np.eye(submatrix.shape[0], dtype=bool)
        within_sim = np.mean(submatrix[mask])

        within_similarity.append({
            'subcategory': subcategory,
            'image_count': len(indices),
            'avg_similarity': within_sim
        })

    # Calculate average similarity between different subcategories
    between_similarity = []
    for i, subcategory1 in enumerate(subcategories):
        indices1 = metadata[metadata['subcategory'] == subcategory1].index.tolist()

        for subcategory2 in subcategories[i+1:]:
            indices2 = metadata[metadata['subcategory'] == subcategory2].index.tolist()

            # Extract cross-similarity between subcategories
            cross_sim = similarity_matrix[np.ix_(indices1, indices2)]
            avg_sim = np.mean(cross_sim)

            between_similarity.append({
                'subcategory1': subcategory1,
                'subcategory2': subcategory2,
                'avg_similarity': avg_sim
            })

    # Create DataFrames
    df_within = pd.DataFrame(within_similarity)
    df_between = pd.DataFrame(between_similarity)

    # Save results
    df_within.to_csv(os.path.join(sim_path, 'within_subcategory_similarity.csv'), index=False)
    df_between.to_csv(os.path.join(sim_path, 'between_subcategory_similarity.csv'), index=False)

    # Visualize within-subcategory similarity
    plt.figure(figsize=(12, 6))
    bars = plt.bar(df_within['subcategory'], df_within['avg_similarity'], color='skyblue')
    plt.axhline(y=df_within['avg_similarity'].mean(), color='r', linestyle='--',
                label=f'Average: {df_within["avg_similarity"].mean():.4f}')
    plt.xlabel('Subcategory')
    plt.ylabel('Average Similarity')
    plt.title('Within-Subcategory Similarity')
    plt.xticks(rotation=45, ha='right')
    plt.legend()

    # Add value labels
    for bar in bars:
        height = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2., height + 0.02,
                f'{height:.3f}', ha='center', va='bottom')

    plt.tight_layout()
    plt.savefig(os.path.join(sim_path, 'within_subcategory_similarity.png'))
    plt.close()

    # Create a heatmap for between-subcategory similarity
    if len(between_similarity) > 0:
        # Create a matrix for the heatmap
        subcategory_list = list(subcategories)
        sim_matrix = np.zeros((len(subcategory_list), len(subcategory_list)))

        # Fill the matrix with the similarity values
        for _, row in df_between.iterrows():
            i = subcategory_list.index(row['subcategory1'])
            j = subcategory_list.index(row['subcategory2'])
            sim_matrix[i, j] = row['avg_similarity']
            sim_matrix[j, i] = row['avg_similarity']  # Matrix is symmetric

        # Fill diagonal with within-subcategory similarity
        for _, row in df_within.iterrows():
            i = subcategory_list.index(row['subcategory'])
            sim_matrix[i, i] = row['avg_similarity']

        # Create the heatmap
        plt.figure(figsize=(12, 10))
        plt.title("Similarity Between Subcategories", fontsize=14)
        sns.heatmap(sim_matrix, xticklabels=subcategory_list, yticklabels=subcategory_list,
                    annot=True, cmap="viridis", fmt=".3f", linewidths=.5)
        plt.tight_layout()
        plt.savefig(os.path.join(sim_path, 'between_subcategory_similarity_heatmap.png'))
        plt.close()

    print(f"Subcategory similarity analysis completed and saved to {sim_path}")

# Example usage:
# Define paths
OUTPUT_PATH = '/content/drive/MyDrive/batik_output'

# Load features and metadata
features = np.load(os.path.join(OUTPUT_PATH, 'batik_features.npy'))
metadata = pd.read_csv(os.path.join(OUTPUT_PATH, 'valid_batik_metadata.csv'))

# Find optimal number of clusters
optimal_clusters = find_optimal_clusters(features, metadata=metadata, max_clusters=10, output_path=OUTPUT_PATH)

# Apply clustering with the optimal number of clusters
cluster_membership, u, cntr = cluster_features(
    features,
    metadata=metadata,
    n_clusters=optimal_clusters,
    output_path=OUTPUT_PATH
)

NameError: name 'find_optimal_clusters' is not defined