### Importing necessary libraries

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import plotly.express as px
from random import randint
import pandas as pd
import pickle

from PIL import Image
import torch
from torchvision import transforms
from torchvision.transforms import InterpolationMode
from torchvision.models import VGG19_Weights, vgg19
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
# ClearML for experement tracking (optional)
from clearml import Task

### Set device to GPU if available, else CPU

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if device.type == "cuda": 
    print("Running on GPU")
else:
    print("Running on CPU")

### Set path to dataset images directory and change working directory to path

In [None]:
path = r"PATH TO IMAAGE DIRECTORY"
os.chdir(path)

### Get list of image samples file paths

In [None]:
samples = []
with os.scandir(path) as files:
    for file in files:
        if file.name.endswith('.png'):
            samples.append(file.path)

### Load VGG19 model with pre-trained weights and remove classifier layers after first layer (change if you want to experement with other models)

In [None]:
model = vgg19(weights=VGG19_Weights.IMAGENET1K_V1)
model.classifier = model.classifier[:1]
model = model.to(device)

### Image transformer initialization

In [None]:
# Set image transformation parameters
resize_size = 256
crop_size = 224
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

# Define image transformation pipeline
transform = transforms.Compose([
    transforms.Resize(resize_size, interpolation=InterpolationMode.BILINEAR),
    transforms.CenterCrop(crop_size),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])

In [None]:
def extract_features(file, model, transform):
    """
    Extract features from an image using a pre-trained model.
    
    Args:
        file (str): Path to image file.
        model (torch.nn.Module): Pre-trained model for feature extraction.
        transform (torchvision.transforms.Compose): Image transformation pipeline.
    
    Returns:
        torch.Tensor: Extracted features.
    """
    img = Image.open(file)
    img = img.convert('RGB')
    img = transform(img)
    img = img.unsqueeze(0)
    img = img.to(device)
    features = model(img)

    return features

### Extract features from sample images and save feature vectors to a dictionary

In [None]:
# Extract features from sample images and save feature vectors to a dictionary
data = {}
p = r"CHANGE TO A LOCATION TO SAVE FEATURE VECTORS"

for sample in samples:
    try:
        feat = extract_features(sample, model, transform)
        data[sample] = feat.detach().cpu().numpy()
    except:
        with open(p,'wb') as file:
            pickle.dump(data,file)
            
# Convert dictionary of feature vectors to numpy arrays for clustering
filenames = np.array(list(data.keys()))
feat = np.array(list(data.values()))
feat = feat.reshape(-1, 4096) # Change to the number of features (if you changed the model)
# The output tensor should have the shape (number of samples, length of image feature vector)

In [None]:
feat.shape

### initialize random state

In [None]:
random_state = randint(0, 42)

### Experementation (optional)

In [None]:
def experementation(feat, random_state):
    """
    Experiment with different PCA and KMeans parameters to find optimal clustering.
    
    Args:
        feat (np.ndarray): Feature vectors for clustering.
    """
    for n_components_loop in range(1, feat.shape[0]):
        if n_components_loop != 1 and  n_components_loop % 5 != 0:
            continue
        
        pca = PCA(n_components=n_components_loop, random_state=random_state,)
        pca.fit(feat)
        x = pca.transform(feat)
    
        task = Task.init(
            project_name='Deep clustering',
            task_name=f'VGG19 PCA {n_components_loop} randomized',
            tags=['k-mean','VGG19','randomized'],
            output_uri=None
        )

        logger = task.get_logger()
            
        sse = []

        for n_clusters in range(2, feat.shape[0]):

            kmeans = KMeans(n_clusters=n_clusters, init = 'k-means++', n_init='auto', random_state=random_state)
            kmeans.fit(x)
            labels = kmeans.labels_
                
            silhouette = silhouette_score(x, labels)
            calinski_harabasz = calinski_harabasz_score(x, labels)
            davies_bouldin = davies_bouldin_score(x, labels)
                
            logger.report_scalar(title='Silhouette Score', series='Scores', value=silhouette, iteration=n_clusters)
            logger.report_scalar(title='Calinski-Harabasz Index', series='Scores', value=calinski_harabasz, iteration=n_clusters)
            logger.report_scalar(title='Davies-Bouldin Index', series='Scores', value=davies_bouldin, iteration=n_clusters)


            sse.append(kmeans.inertia_)

        fig = px.scatter(x=range(2, feat.shape[0]), y=sse)

        fig.update_traces(mode='lines+markers')

        fig.update_layout(
            title="Sum of Squared Distance vs Number of Clusters",
            xaxis_title="Number of clusters (k)",
            yaxis_title="Sum of squared distance"
        )
            
        logger.report_plotly(title=f'Sum of Squared Distance vs Number of Clusters PCA = {n_components_loop}', series='Scores k-means++', figure=fig)
            
        task.close()


In [None]:
# Uncomment if you want to run the experementation
# experementation(feat, random_state) 
# Check the results and pick optimal hyperparameter

### Set PCA and KMeans parameters and fit models to data

In [None]:
n_components = 200
num_of_clusters = 10

f"Random state: {random_state}"
unique_labels = [i for i in range(num_of_clusters)]

pca = PCA(n_components=n_components, svd_solver='randomized', random_state=random_state)
pca.fit(feat)
x = pca.transform(feat)

kmeans = KMeans(n_clusters=len(unique_labels), init='k-means++', n_init='auto', random_state=random_state)
kmeans.fit(x)

### Plot elbow curve to determine optimal number of clusters for KMeans algorithm

In [None]:
sse = []
list_k = list(range(2, feat.shape[0]))

for k in list_k:
    km = KMeans(n_clusters=k, init='k-means++', n_init='auto')
    km.fit(x)
    sse.append(km.inertia_)

fig = px.line(x=list_k, y=sse)
fig.update_layout(
    title='Elbow Curve',
    xaxis_title=r'Number of clusters *k*',
    yaxis_title='Sum of squared distance',
    width=2000,
    height=800
)
fig.show()

### Visualize clustering results in 1D, 2D or 3D depending on number of PCA components used for clustering 

In [None]:
if n_components == 1:
    pca_1d = PCA(n_components=1, random_state=random_state)
    x_1d = pca_1d.fit_transform(x)

    df = pd.DataFrame(x_1d, columns=['x'])
    df['cluster'] = kmeans.labels_

    fig = px.scatter(df, x='x', y='cluster', title='KMeans Clustering')
    fig.show()

elif n_components == 2:
    pca_2d = PCA(n_components=2, random_state=random_state)
    x_2d = pca_2d.fit_transform(x)

    df = pd.DataFrame(x_2d, columns=['x', 'y'])
    df['cluster'] = kmeans.labels_

    fig = px.scatter(df, x='x', y='y', color='cluster', title='KMeans Clustering')
    fig.show()

elif n_components == 3:
    pca_3d = PCA(n_components=3, random_state=random_state)
    x_3d = pca_3d.fit_transform(x)

    df = pd.DataFrame(x_3d, columns=['x', 'y', 'z'])
    df['cluster'] = kmeans.labels_

    fig = px.scatter_3d(df, x='x', y='y', z='z', color='cluster', title='KMeans Clustering')
    fig.show()
else:
    print("Can't visualize more than 3 dimensions")

### Group sample images by cluster assignment

In [None]:
groups = {}
for file, cluster in zip(filenames, kmeans.labels_):
    if cluster not in groups.keys():
        groups[cluster] = []
        groups[cluster].append(file)
    else:
        groups[cluster].append(file)

### Visualize all clusters

In [None]:
def view_cluster(cluster):
    """
    Visualize a cluster of sample images.
    
    Args:
        cluster (int): Cluster number.
    """
    fig = plt.figure(figsize=(25, 25))
    files = groups[cluster]
    if len(files) > 30:
        print(f"Clipping cluster size from {len(files)} to 30")
        files = files[:29]
    for index, file in enumerate(files):
        plt.subplot(10, 10, index + 1)
        img = Image.open(file).convert('RGB')
        plt.imshow(img)
        plt.axis('off')

In [None]:
for i in range(num_of_clusters):
    view_cluster(i)