# **2 Fetching and Downloading Images**

## **2.1 Fetching my Google Photos**

In [None]:
!pip install --upgrade google-api-python-client google-auth google-auth-oauthlib requests

Collecting google-api-python-client
  Using cached google_api_python_client-2.154.0-py2.py3-none-any.whl.metadata (6.7 kB)
Collecting google-auth
  Using cached google_auth-2.36.0-py2.py3-none-any.whl.metadata (4.7 kB)
Using cached google_api_python_client-2.154.0-py2.py3-none-any.whl (12.6 MB)
Using cached google_auth-2.36.0-py2.py3-none-any.whl (209 kB)
Installing collected packages: google-auth, google-api-python-client
  Attempting uninstall: google-auth
    Found existing installation: google-auth 2.27.0
    Uninstalling google-auth-2.27.0:
      Successfully uninstalled google-auth-2.27.0
  Attempting uninstall: google-api-python-client
    Found existing installation: google-api-python-client 2.151.0
    Uninstalling google-api-python-client-2.151.0:
      Successfully uninstalled google-api-python-client-2.151.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency

In [None]:
import requests
import pickle
import os
from google_auth_oauthlib.flow import InstalledAppFlow

In [None]:
# Step 1: Authentication

# Define the scope for the Google Photos API
SCOPES = ['https://www.googleapis.com/auth/photoslibrary.readonly']

# Authenticate and obtain credentials
def authenticate_google_photos():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    if not creds or not creds.valid:
        flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
        creds = flow.run_local_server(port=8080)
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

creds = authenticate_google_photos()

In [None]:
# Step 2: Download Images

def download_images_with_pagination(creds, output_folder):
    os.makedirs(output_folder, exist_ok=True)

    headers = {'Authorization': f'Bearer {creds.token}'}
    url = 'https://photoslibrary.googleapis.com/v1/mediaItems?pageSize=10'

    while url:
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            data = response.json()
            items = data.get('mediaItems', [])
            print(f"Fetched {len(items)} photos. Starting download...")

            for item in items:
                image_url = item['baseUrl']
                filename = item['filename']
                try:
                    response = requests.get(image_url)
                    if response.status_code == 200:
                        file_path = os.path.join(output_folder, filename)
                        with open(file_path, 'wb') as f:
                            f.write(response.content)
                        print(f"Downloaded: {filename}")
                    else:
                        print(f"Failed to download {filename}: {response.status_code}")
                except Exception as e:
                    print(f"Error downloading {filename}: {e}")

            # Get the next page URL
            url = data.get('nextPageToken', None)
            if url:
                url = f'https://photoslibrary.googleapis.com/v1/mediaItems?pageToken={url}&pageSize=10'
        else:
            print(f"Error fetching photos: {response.status_code} - {response.text}")
            break


In [None]:
# Step 3: Saving under "my_photos" folder in Drive
download_images_with_pagination(creds, output_folder='/content/drive/MyDrive/Pipeline_2/my_photos')


## **2.2 Fetching Van Gogh's Paintings**

In [None]:
# Step 1: Fetch Metadata
def fetch_paintings(artist_name, api_key):
    """
    Fetch painting metadata from the WikiArt API.
    Args:
        artist_name (str): URL-friendly name of the artist.
        api_key (str): API key for WikiArt.
    Returns:
        list: List of painting metadata.
    """
    url = f"{API_BASE_URL}?artistUrl={artist_name}&json=2&key={api_key}"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to fetch data: {response.status_code}")
        return []

# Step 2: Download all images
def download_all_paintings(paintings, output_folder):
    """
    Download all paintings from the metadata list without limiting the count.
    Args:
        paintings (list): List of painting metadata.
        output_folder (str): Folder to save the downloaded images.
    """
    downloaded_count = 0
    for painting in paintings:
        title = painting["title"].replace(" ", "_").replace("/", "_")  # Sanitize file name
        image_url = painting["image"]
        file_path = os.path.join(output_folder, f"{title}.jpg")

        # Skip if the image already exists
        if os.path.exists(file_path):
            print(f"Image already exists: {title}")
            continue

        # Download and save the image
        try:
            response = requests.get(image_url, stream=True)
            if response.status_code == 200:
                with open(file_path, "wb") as f:
                    f.write(response.content)
                downloaded_count += 1
                print(f"Downloaded: {title} (Total downloaded: {downloaded_count})")
            else:
                print(f"Failed to download {title}")
        except Exception as e:
            print(f"Error downloading {title}: {e}")

    print(f"Total images downloaded: {downloaded_count}")

In [None]:
# WikiArt API Key
API_KEY = "23ef430d244c4ed6"

# API endpoint and artist's name
API_BASE_URL = "https://www.wikiart.org/en/App/Painting/PaintingsByArtist"
ARTIST_NAME = "vincent-van-gogh"  # URL-friendly name of Van Gogh
output_folder = "/content/drive/MyDrive/Pipeline_2/van_gogh_paintings"

# Create a folder for downloaded images
os.makedirs(output_folder, exist_ok=True)

if __name__ == "__main__":
    # Fetch painting data
    paintings_data = fetch_paintings(ARTIST_NAME, API_KEY)
    if paintings_data:
        download_all_paintings(paintings_data, output_folder)

In [None]:
# Count the total number of images in each Domain
def count_images_in_directory(directory, extensions=("jpg", "png", "jpeg")):
    count = 0
    for file_name in os.listdir(directory):
        if file_name.lower().endswith(extensions):
            count += 1
    return count


van_path = "/content/drive/MyDrive/Pipeline_2/van_gogh_paintings"
img_path = "/content/drive/MyDrive/Pipeline_2/my_photos"
van_count = count_images_in_directory(van_path)
img_count = count_images_in_directory(img_path)
print(f"The number of Google Photos Images is: {img_count}")
print(f"The number of Van Gogh Images is: {van_count}")


The number of Google Photos Images is: 5108
The number of Van Gogh Images is: 1725


# **3 Data PreProcessing and Exploratory Analysis**


## **Imports**

In [None]:
!pip install umap-learn dominate visdom torchsummary



In [None]:
# Imports
import shutil
import umap.umap_ as umap
from collections import Counter
import matplotlib.pyplot as plt
import numpy as np
import os
import itertools
from itertools import cycle

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchsummary import summary
from torchvision import transforms
from torchvision.models import resnet50
from torchvision.transforms.functional import to_pil_image
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from torchvision.utils import save_image

from PIL import Image

from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN, KMeans
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score
from sklearn.model_selection import ParameterGrid
from sklearn.manifold import TSNE

## **3.1 Exploratory Analysis**

### **3.1.1 Image Modes**

In [None]:
# Check Image Modes
def analyze_image_modes(image_dir):
    modes = []
    for file_name in os.listdir(image_dir):
        if file_name.lower().endswith(('.jpg', '.png', '.jpeg')):
            image_path = os.path.join(image_dir, file_name)
            try:
                image = Image.open(image_path)
                modes.append(image.mode)  # Get the mode (e.g., 'RGB', 'RGBA', 'L')
            except Exception as e:
                print(f"Error processing {file_name}: {e}")
    return Counter(modes)

In [None]:
van_mode_counts = analyze_image_modes(van_path)
img_mode_counts = analyze_image_modes(img_path)

print("Van Gogh Image Modes Count:")
for mode, count in van_mode_counts.items():
    print(f"{mode}: {count}")

print("Google Photos Image Modes Count:")
for mode, count in img_mode_counts.items():
    print(f"{mode}: {count}")

Van Gogh Image Modes Count:
RGB: 1610
RGBA: 115
Google Photos Image Modes Count:
RGB: 5107
RGBA: 1


### **3.1.2 Image Size Distribution**

In [None]:
# Distribution of image sizes
def analyze_image_sizes(image_dir):
    widths, heights = [], []
    for file_name in os.listdir(image_dir):
        if file_name.lower().endswith(('.jpg', '.png', '.jpeg')):
            image_path = os.path.join(image_dir, file_name)
            image = Image.open(image_path)
            widths.append(image.width)
            heights.append(image.height)
    return widths, heights

# Google Photos
widths, heights = analyze_image_sizes(img_path)
plt.scatter(widths, heights, alpha=0.5)
plt.title("Google Photos Image Size Distribution")
plt.xlabel("Width")
plt.ylabel("Height")
plt.show()

# Van Gogh
widths2, heights2 = analyze_image_sizes(van_path)
plt.scatter(widths2, heights2, alpha=0.5)
plt.title("Van Gogh Image Size Distribution")
plt.xlabel("Width")
plt.ylabel("Height")
plt.show()


## **3.2 Preprocessing Images**

In [None]:
# Global Save Path for Google Drive
DRIVE_SAVE_PATH = "/content/drive/MyDrive/Pipeline_2/"

# Ensure the save path exists
os.makedirs(DRIVE_SAVE_PATH, exist_ok=True)

# Defining the IMages Paths
van_path = "/content/drive/MyDrive/Pipeline_2/van_gogh_paintings"
img_path = "/content/drive/MyDrive/Pipeline_2/my_photos"

In [None]:
# Preprocessing
def preprocess_images(image_dir, resize_dim=(512, 512), save_path="preprocessed_images.pt"):
    """
    Preprocess images by resizing, normalizing, and converting to tensors.
    Args:
        image_dir (str): Path to the directory containing images.
        resize_dim (tuple): Dimensions to resize the images (default: 256x256).
        save_path (str): Path to save the preprocessed images and paths (relative to DRIVE_SAVE_PATH).
    Returns:
        torch.Tensor: Preprocessed image tensors.
        list: List of valid image paths.
    """
    full_save_path = os.path.join(DRIVE_SAVE_PATH, save_path)
    if os.path.exists(full_save_path):  # Load if preprocessed data already exists
        print(f"Loading preprocessed images from {full_save_path}...")
        images, image_paths = torch.load(full_save_path)
        return images, image_paths

    transform = transforms.Compose([
        transforms.Resize(resize_dim), # Resize to 512 x 512
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    valid_extensions = (".jpg", ".jpeg", ".png")
    images, image_paths = [], []

    for fname in os.listdir(image_dir):
        if fname.lower().endswith(valid_extensions):
            try:
                img_path = os.path.join(image_dir, fname)
                img = Image.open(img_path).convert("RGB") # Convert images to RGB
                images.append(transform(img))
                image_paths.append(img_path)
            except Exception as e:
                print(f"Error loading image {fname}: {e}")

    images = torch.stack(images)  # Combine into a tensor
    torch.save((images, image_paths), full_save_path)  # Save for reuse
    print(f"Preprocessed {len(images)} images and saved to {full_save_path}.")
    return images, image_paths

## **3.3 Feature Extraction**

### 3.3.1 **Feature Extraction with ResNet50**

In [None]:
# Load pretrained ResNet-50
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
resnet = resnet50(pretrained=True).to(device)
resnet.eval()

# Remove the final classification layer
feature_extractor = torch.nn.Sequential(*list(resnet.children())[:-1])



In [None]:
# Feature Extraction
def extract_features_in_batches(images, model, batch_size=16, save_path="extracted_features.npy"):
    """
    Extract features from images using a pretrained ResNet model in batches.
    Args:
        images (torch.Tensor): Batch of preprocessed images (N x 3 x H x W).
        model (nn.Module): Feature extractor (ResNet without the FC layer).
        batch_size (int): Size of each mini-batch.
        save_path (str): Path to save the extracted features (relative to DRIVE_SAVE_PATH).
    Returns:
        np.ndarray: Extracted features (N x 2048).
    """
    full_save_path = os.path.join(DRIVE_SAVE_PATH, save_path)
    if os.path.exists(full_save_path):  # Load if features already exist
        print(f"Loading extracted features from {full_save_path}...")
        return np.load(full_save_path)

    model = model.to(device)
    features = []

    # Process images in mini-batches
    with torch.no_grad():
        for i in range(0, len(images), batch_size):
            batch = images[i:i + batch_size].to(device)  # Slice batch
            batch_features = model(batch).view(batch.size(0), -1)  # Flatten features
            features.append(batch_features.cpu().numpy())  # Move to CPU and store

            # Progress log
            print(f"Processed batch {i // batch_size + 1}/{(len(images) + batch_size - 1) // batch_size}", end="\r")

    features = np.vstack(features)  # Combine all batch features
    np.save(full_save_path, features)  # Save for reuse
    print(f"\nExtracted features saved to {full_save_path}.")
    return features

### **3.3.2 t-SNE for Visualization**

In [None]:
# Function to create t-SNE visualization with images
def visualize_tsne_with_images(features, image_paths, sample_size, random_state=42):
    """
    Visualize t-SNE embeddings with image thumbnails.
    Args:
        features (np.ndarray): High-dimensional features (N x D).
        image_paths (list): List of corresponding image paths (N elements).
        sample_size (int): Number of images to sample for visualization.
        random_state (int): Random seed for reproducibility.
    """
    # Sample a subset of images and features
    np.random.seed(random_state)
    indices = np.random.choice(len(features), size=sample_size, replace=False)
    sampled_features = features[indices]
    sampled_paths = [image_paths[i] for i in indices]

    # Apply t-SNE to the sampled features
    tsne = TSNE(n_components=2, random_state=random_state, perplexity=30, n_iter=1000)
    tsne_embeddings = tsne.fit_transform(sampled_features)

    # Normalize t-SNE embeddings for better placement
    x_min, x_max = tsne_embeddings.min(0), tsne_embeddings.max(0)
    tsne_embeddings = (tsne_embeddings - x_min) / (x_max - x_min)

    # Visualize with image thumbnails
    fig, ax = plt.subplots(figsize=(12, 12))
    for i, (x, y) in enumerate(tsne_embeddings):
        # Load and resize the image
        img = Image.open(sampled_paths[i]).convert("RGB").resize((30, 30))
        ax.imshow(img, extent=(x - 0.015, x + 0.015, y - 0.015, y + 0.015), aspect="auto")

    ax.scatter(tsne_embeddings[:, 0], tsne_embeddings[:, 1], alpha=0.5, s=10, c='gray')  # Scatter plot for reference
    ax.set_title("t-SNE Visualization with Image Thumbnails")
    ax.axis("off")
    plt.show()

In [None]:
# Ensure features and image paths are extracted
images, image_paths = preprocess_images(img_path, save_path="preprocessed.pt")
features = extract_features_in_batches(images, feature_extractor, save_path="features.npy")

# Testing it out on the raw ResNet features for My Photos
visualize_tsne_with_images(features=features, image_paths=image_paths, sample_size=1000)

In [None]:
# Testing it out on the raw ResNet features for Van Gogh's Paintings
images, image_paths = preprocess_images(van_path, save_path="van_preprocessed.pt")
features = extract_features_in_batches(images, feature_extractor, save_path="van_features.npy")

# Visualize Van Gogh's Paintings
visualize_tsne_with_images(features=features, image_paths=image_paths, sample_size=1000)

## **3.4 Dimensionality Reduction**

In [None]:
# Dimensionality Reduction (PCA + UMAP)
def reduce_dimensions(features, save_path="reduced_features.npy", n_pca_components=300, n_umap_components=2, n_neighbors=15, min_dist=0.1):
    """
    Reduce dimensions using PCA followed by UMAP.
    Args:
        features (np.ndarray): High-dimensional feature vectors.
        save_path (str): Path to save the reduced features (relative to DRIVE_SAVE_PATH).
    Returns:
        np.ndarray: 2D UMAP-reduced features.
    """
    full_save_path = os.path.join(DRIVE_SAVE_PATH, save_path)
    if os.path.exists(full_save_path):  # Load if reduced features already exist
        print(f"Loading reduced features from {full_save_path}...")
        return np.load(full_save_path)

    # PCA
    pca = PCA(n_components=n_pca_components, random_state=42)
    pca_features = pca.fit_transform(features)
    print(f"PCA retained {np.sum(pca.explained_variance_ratio_) * 100:.2f}% variance.")
    # UMAP
    umap_reducer = umap.UMAP(n_components=n_umap_components, n_neighbors=n_neighbors, min_dist=min_dist, random_state=42)
    reduced_features = umap_reducer.fit_transform(pca_features)
    np.save(full_save_path, reduced_features)  # Save for reuse
    print(f"Reduced features saved to {full_save_path}.")
    return reduced_features

### **3.4.4 Visualize UMAP embeddings with image thumbnails**

In [None]:
# Function to create UMAP visualization with image thumbnails
def visualize_umap_with_images(umap_embeddings, image_paths, sample_size=200, random_state=42):
    """
    Visualize UMAP embeddings with image thumbnails.
    Args:
        umap_embeddings (np.ndarray): 2D UMAP embeddings (N x 2).
        image_paths (list): List of corresponding image paths (N elements).
        sample_size (int): Number of images to sample for visualization.
        random_state (int): Random seed for reproducibility.
    """
    # Sample a subset of images and embeddings
    np.random.seed(random_state)
    indices = np.random.choice(len(umap_embeddings), size=sample_size, replace=False)
    sampled_embeddings = umap_embeddings[indices]
    sampled_paths = [image_paths[i] for i in indices]

    # Normalize UMAP embeddings for better placement
    x_min, x_max = sampled_embeddings.min(0), sampled_embeddings.max(0)
    normalized_embeddings = (sampled_embeddings - x_min) / (x_max - x_min)

    # Visualize with image thumbnails
    fig, ax = plt.subplots(figsize=(12, 12))
    for i, (x, y) in enumerate(normalized_embeddings):
        # Load and resize the image
        img = Image.open(sampled_paths[i]).convert("RGB").resize((30, 30))
        ax.imshow(img, extent=(x - 0.015, x + 0.015, y - 0.015, y + 0.015), aspect="auto")

    ax.scatter(normalized_embeddings[:, 0], normalized_embeddings[:, 1], alpha=0.5, s=10, c='gray')  # Scatter plot for reference
    ax.set_title("UMAP Visualization with Image Thumbnails")
    ax.axis("off")
    plt.show()

In [None]:
# Applying Dimentionality Reduction to My Photos
images, image_paths = preprocess_images(img_path, save_path="preprocessed.pt")
features = extract_features_in_batches(images, feature_extractor, save_path="features.npy")
reduced_features = reduce_dimensions(features, save_path="reduced.npy")

visualize_umap_with_images(umap_embeddings=reduced_features, image_paths=image_paths, sample_size=1000)

In [None]:
# Applying Dimentionality Reduction to Van Gogh's
images, image_paths = preprocess_images(van_path, save_path="van_preprocessed.pt")
features = extract_features_in_batches(images, feature_extractor, save_path="van_features.npy")
reduced_features = reduce_dimensions(features, save_path="van_reduced.npy")

visualize_umap_with_images(umap_embeddings=reduced_features, image_paths=image_paths, sample_size=500)

## **3.5 Clustering**

### **3.5.1 Evaluation Metrics**

In [None]:
# Evaluate Clusters
def evaluate_clustering(data, labels, algorithm_name):
    if len(set(labels)) > 1:  # Ensure more than one cluster is formed
        silhouette = silhouette_score(data, labels)
        calinski_harabasz = calinski_harabasz_score(data, labels)
        davies_bouldin = davies_bouldin_score(data, labels)
        print(f"{algorithm_name} Metrics:")
        print(f"  Silhouette Score: {silhouette:.4f}")
        print(f"  Calinski-Harabasz Index: {calinski_harabasz:.4f}")
        print(f"  Davies-Bouldin Index: {davies_bouldin:.4f}")
        return silhouette, calinski_harabasz, davies_bouldin
    else:
        print(f"{algorithm_name} Metrics: No valid clusters formed.")
        return -1, -1, -1


In [None]:
images, image_paths = preprocess_images(img_path, save_path="preprocessed.pt")
features = extract_features_in_batches(images, feature_extractor, save_path="features.npy")
reduced_features = reduce_dimensions(features, save_path="reduced.npy")

data = reduced_features



Loading preprocessed images from /content/drive/MyDrive/Pipeline_2/preprocessed.pt...


  images, image_paths = torch.load(full_save_path)


Loading extracted features from /content/drive/MyDrive/Pipeline_2/features.npy...
Loading reduced features from /content/drive/MyDrive/Pipeline_2/reduced.npy...


In [None]:
# Apply DBSCAN
dbscan = DBSCAN(eps=0.1, min_samples=5)
dbscan_labels = dbscan.fit_predict(data)

# Apply K-Means
kmeans = KMeans(n_clusters=3, random_state=42)
kmeans_labels = kmeans.fit_predict(data)

evaluate_clustering(data, dbscan_labels, "DBSCAN")
evaluate_clustering(data, kmeans_labels, "K-Means")

DBSCAN Metrics:
  Silhouette Score: 0.2605
  Calinski-Harabasz Index: 229.7848
  Davies-Bouldin Index: 1.3661
K-Means Metrics:
  Silhouette Score: 0.4670
  Calinski-Harabasz Index: 4245.8340
  Davies-Bouldin Index: 0.8088


(0.46696675, 4245.834033398844, 0.8087504754742091)

In [None]:
# Loop through different numbers of clusters
results = []
for n_clusters in [3, 4, 5]:
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    kmeans_labels = kmeans.fit_predict(data)
    silhouette, calinski_harabasz, davies_bouldin = evaluate_clustering(data, kmeans_labels, "K-Means")
    results.append({
        "n_clusters": n_clusters,
        "silhouette": silhouette,
        "calinski_harabasz": calinski_harabasz,
        "davies_bouldin": davies_bouldin
    })

# Convert results to a DataFrame and display
results_df = pd.DataFrame(results)
results_df

K-Means Metrics:
  Silhouette Score: 0.4670
  Calinski-Harabasz Index: 4245.8340
  Davies-Bouldin Index: 0.8088
K-Means Metrics:
  Silhouette Score: 0.3854
  Calinski-Harabasz Index: 3732.3416
  Davies-Bouldin Index: 0.9129
K-Means Metrics:
  Silhouette Score: 0.4036
  Calinski-Harabasz Index: 3268.2861
  Davies-Bouldin Index: 0.7773


Unnamed: 0,n_clusters,silhouette,calinski_harabasz,davies_bouldin
0,3,0.466967,4245.834033,0.80875
1,4,0.385355,3732.341605,0.912872
2,5,0.403565,3268.286053,0.777264


### **3.5.3 Visualization**

In [None]:
# Updated to integrate the reduced features directly and fit within the pipeline

def visualize_clusters(reduced_features, labels, title="Clustering Visualization"):
    """
    Visualize clusters in the reduced feature space.
    Args:
        reduced_features (np.ndarray): 2D reduced feature space (e.g., UMAP output).
        labels (np.ndarray): Cluster labels for the data points.
        title (str): Title of the plot.
    """
    unique_labels = set(labels)
    colors = [plt.cm.tab10(i / float(len(unique_labels) - 1)) for i in range(len(unique_labels))]

    plt.figure(figsize=(10, 8))
    for k, col in zip(unique_labels, colors):
        if k == -1:  # Noise points
            col = [0, 0, 0, 1]  # Black for noise

        class_member_mask = (labels == k)
        xy = reduced_features[class_member_mask]
        plt.scatter(xy[:, 0], xy[:, 1], s=50, c=[col], label=f"Cluster {k}" if k != -1 else "Noise")

    plt.title(title)
    plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
    plt.tight_layout()
    plt.show()


In [None]:
visualize_clusters(data, dbscan_labels, "DBSCAN Clustering")
visualize_clusters(data, kmeans_labels, "K-Means Clustering")

### **3.5.2 Grid Search for DBSCAN Parameters**

In [None]:
# Define the parameter grid
param_grid = {
    'eps': np.linspace(0.1, 2.0, 20),  # 20 values for eps between 0.1 and 2.0
    'min_samples': range(2, 10)  # Values for min_samples between 2 and 9
}

# Generate all combinations of parameters
grid = ParameterGrid(param_grid)

# Best score tracking
best_score = -1
best_params = {}

# Function to evaluate clustering
def evaluate_clustering(data, labels):
    if len(set(labels)) > 1:  # Ensure more than one cluster is formed
        silhouette = silhouette_score(data, labels)
        calinski_harabasz = calinski_harabasz_score(data, labels)
        davies_bouldin = davies_bouldin_score(data, labels)
        return silhouette, calinski_harabasz, davies_bouldin
    else:
        return -1, -1, -1  # Invalid clustering

# Perform grid search
results = []

for params in grid:
    dbscan = DBSCAN(eps=params['eps'], min_samples=params['min_samples'])
    labels = dbscan.fit_predict(data)
    silhouette, calinski_harabasz, davies_bouldin = evaluate_clustering(data, labels)

    results.append({
        'eps': params['eps'],
        'min_samples': params['min_samples'],
        'silhouette': silhouette,
        'calinski_harabasz': calinski_harabasz,
        'davies_bouldin': davies_bouldin
    })

    if silhouette > best_score:
        best_score = silhouette
        best_params = params

# Display the best parameters
import pandas as pd
results_df = pd.DataFrame(results)
best_result = results_df.loc[results_df['silhouette'].idxmax()]
print("Best Parameters:")
print(best_result)

Best Parameters:
eps                    0.100000
min_samples            5.000000
silhouette             0.260460
calinski_harabasz    229.784779
davies_bouldin         1.366059
Name: 3, dtype: float64


In [None]:
# Add a column to rank by Silhouette Score
results_df = results_df.sort_values(by="silhouette", ascending=False).reset_index(drop=True)

# Print the best results df
results_df

Unnamed: 0,eps,min_samples,silhouette,calinski_harabasz,davies_bouldin
0,0.1,5,0.260460,229.784779,1.366059
1,0.1,4,0.257073,307.200105,1.395067
2,0.1,3,0.252895,452.349077,1.491732
3,0.1,2,0.244771,544.411119,1.566227
4,1.9,5,0.244717,217.226379,0.350656
...,...,...,...,...,...
155,0.4,5,-0.364013,119.118189,1.041764
156,0.4,6,-0.364013,119.118189,1.041764
157,0.4,4,-0.364013,119.118189,1.041764
158,0.4,3,-0.365997,115.676516,0.962927


### **3.5.4 Clustering and Saving Classification**

In [None]:
# Step 4: Clustering
def cluster_features(features, save_path="cluster_labels.npy", method="dbscan", n_clusters=5, eps=0.5, min_samples=5):
    """
    Cluster features using DBSCAN or K-Means.
    Args:
        features (np.ndarray): Reduced features for clustering.
        save_path (str): Path to save the cluster labels (relative to DRIVE_SAVE_PATH).
    Returns:
        np.ndarray: Cluster labels.
    """
    full_save_path = os.path.join(DRIVE_SAVE_PATH, save_path)
    if os.path.exists(full_save_path):  # Load if cluster labels already exist
        print(f"Loading cluster labels from {full_save_path}...")
        return np.load(full_save_path)

    if method == "dbscan":
        clusterer = DBSCAN(eps=eps, min_samples=min_samples)
    elif method == "kmeans":
        clusterer = KMeans(n_clusters=n_clusters, random_state=42)
    else:
        raise ValueError("Unsupported clustering method. Use 'dbscan' or 'kmeans'.")

    labels = clusterer.fit_predict(features)
    np.save(full_save_path, labels)  # Save for reuse
    print(f"Cluster labels saved to {full_save_path}.")
    return labels

In [None]:
# Step 5: Save Clustered Images
def save_clustered_images(image_paths, labels, output_dir="clustered_images"):
    """
    Save clustered images into separate folders based on cluster labels.
    """
    full_output_dir = os.path.join(DRIVE_SAVE_PATH, output_dir)
    os.makedirs(full_output_dir, exist_ok=True)
    for label in set(labels):
        cluster_dir = os.path.join(full_output_dir, f"cluster_{label}")
        os.makedirs(cluster_dir, exist_ok=True)
    for img_path, label in zip(image_paths, labels):
        if label == -1:  # Skip noise points (for DBSCAN)
            continue
        shutil.copy(img_path, os.path.join(cluster_dir, os.path.basename(img_path)))
    print(f"Clustered images saved to '{full_output_dir}'.")

## **Run the Full PreProcessing Pipeline**

In [None]:
# Full Pipeline Function
def full_pipeline(image_dir, output_prefix="pipeline", clustering_method="dbscan"):
    """
    Full pipeline for preprocessing, feature extraction, dimensionality reduction, and clustering.
    Args:
        image_dir (str): Path to the directory containing images.
        output_prefix (str): Prefix for saved files (relative to DRIVE_SAVE_PATH).
        clustering_method (str): Clustering method ('dbscan' or 'kmeans').
    """
    images, image_paths = preprocess_images(image_dir, save_path=f"{output_prefix}_preprocessed.pt")
    features = extract_features(images, feature_extractor, save_path=f"{output_prefix}_features.npy")
    reduced_features = reduce_dimensions(features, save_path=f"{output_prefix}_reduced.npy")
    labels = cluster_features(reduced_features, save_path=f"{output_prefix}_labels.npy", method=clustering_method)
    save_clustered_images(image_paths, labels, output_dir=f"{output_prefix}_clustered")
    return labels

# Apply Pipeline
full_pipeline(img_path, output_prefix="your_images", clustering_method="dbscan")
full_pipeline(van_path, output_prefix="vangogh_images", clustering_method="dbscan")


---
# **4 CycleGAN**



## **4.2 Model Architecture**

### **Paper-Based Discriminator Archeticture**

In [None]:
# Create a block
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, stride):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 4, stride, 1, bias=True, padding_mode="reflect"),
            nn.InstanceNorm2d(out_channels),
            nn.LeakyReLU(0.2)
        )

    def forward(self, x):
        return self.conv(x)

class Discriminator(nn.Module):
    def __init__(self, in_channels=3, features=[64, 128, 256, 512]):
        super().__init__()
        # Initial layer
        self.initial = nn.Sequential(
            nn.Conv2d(
                in_channels,
                features[0],
                kernel_size=4,
                stride=2,
                padding=1,
                padding_mode="reflect",
            ),
            nn.LeakyReLU(0.2),
        )

        # Building the layers dynamically
        layers = []
        in_channels = features[0]
        for feature in features[1:]:
            layers.append(Block(in_channels, feature, stride=1 if feature == features[-1] else 2))
            in_channels = feature

        # Final layer
        layers.append(
            nn.Conv2d(in_channels, 1, kernel_size=4, stride=1, padding=1, padding_mode="reflect")
        )

        # Convert the layers list to a Sequential module
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        x = self.initial(x)
        x = self.model(x)
        return torch.sigmoid(x)


In [None]:
# Provide summary of the model archeticture
model = Discriminator().cuda()
summary(model, input_size=(3, 512, 512))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 256, 256]           3,136
         LeakyReLU-2         [-1, 64, 256, 256]               0
            Conv2d-3        [-1, 128, 128, 128]         131,200
    InstanceNorm2d-4        [-1, 128, 128, 128]               0
         LeakyReLU-5        [-1, 128, 128, 128]               0
             Block-6        [-1, 128, 128, 128]               0
            Conv2d-7          [-1, 256, 64, 64]         524,544
    InstanceNorm2d-8          [-1, 256, 64, 64]               0
         LeakyReLU-9          [-1, 256, 64, 64]               0
            Block-10          [-1, 256, 64, 64]               0
           Conv2d-11          [-1, 512, 63, 63]       2,097,664
   InstanceNorm2d-12          [-1, 512, 63, 63]               0
        LeakyReLU-13          [-1, 512, 63, 63]               0
            Block-14          [-1, 512,

### **Paper-Based Generator Archeticture**

In [None]:
# Generator
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, down=True, use_act=True, **kwargs):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, padding_mode="reflect", **kwargs) # the keyward arguments are the kernel size, stride and padding
            if down
            else nn.ConvTranspose2d(in_channels, out_channels, **kwargs),
            nn.InstanceNorm2d(out_channels),
            nn.ReLU(inplace=True) if use_act else nn.Identity()
        )

    def forward(self, x):
        return self.conv(x)

class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.block = nn.Sequential(
            ConvBlock(channels, channels, kernel_size=3, padding=1),
            ConvBlock(channels, channels, use_act=False, kernel_size=3, padding=1),
        )

    def forward(self, x):
        return x + self.block(x)

class Generator(nn.Module):
    def __init__(self, img_channels, num_features=64, num_residuals=9):
        super().__init__()
        self.initial = nn.Sequential(
            nn.Conv2d(img_channels, num_features, kernel_size=7, stride=1, padding=3, padding_mode="reflect"),
            nn.ReLU(inplace=True),
        )
        self.down_blocks = nn.ModuleList(
            [
                ConvBlock(num_features, num_features*2, kernel_size=3, stride=2, padding=1),
                ConvBlock(num_features*2, num_features*4, kernel_size=3, stride=2, padding=1),
            ]
        )
        self.residual_blocks = nn.Sequential(
            *[ResidualBlock(num_features*4) for _ in range(num_residuals)]
        )
        self.up_blocks = nn.ModuleList(
            [
                ConvBlock(num_features*4, num_features*2, down=False, kernel_size=3, stride=2, padding=1, output_padding=1),
                ConvBlock(num_features*2, num_features*1, down=False, kernel_size=3, stride=2, padding=1, output_padding=1),
            ]
        )
        self.last = nn.Conv2d(num_features*1, img_channels, kernel_size=7, stride=1, padding=3, padding_mode="reflect")

    def forward(self, x):
        x = self.initial(x)
        for layer in self.down_blocks:
            x = layer(x)
        x = self.residual_blocks(x)
        for layer in self.up_blocks:
            x = layer(x)
        return torch.tanh(self.last(x))



In [None]:
# Provide summary of the model archeticture
model = Generator(img_channels=3).cuda()
summary(model, input_size=(3, 512, 512))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 512, 512]           9,472
              ReLU-2         [-1, 64, 512, 512]               0
            Conv2d-3        [-1, 128, 256, 256]          73,856
    InstanceNorm2d-4        [-1, 128, 256, 256]               0
              ReLU-5        [-1, 128, 256, 256]               0
         ConvBlock-6        [-1, 128, 256, 256]               0
            Conv2d-7        [-1, 256, 128, 128]         295,168
    InstanceNorm2d-8        [-1, 256, 128, 128]               0
              ReLU-9        [-1, 256, 128, 128]               0
        ConvBlock-10        [-1, 256, 128, 128]               0
           Conv2d-11        [-1, 256, 128, 128]         590,080
   InstanceNorm2d-12        [-1, 256, 128, 128]               0
             ReLU-13        [-1, 256, 128, 128]               0
        ConvBlock-14        [-1, 256, 1

## **Simpler Model Archeticture**

### **Discriminator Archeticture**

In [None]:
# Define Discriminator
class Discriminator(nn.Module):
    def __init__(self, in_channels):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(128, 1, kernel_size=4, stride=2, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.model(x)

### **Generator Archeticture**

In [None]:
# Define Generator
class Generator(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(True),
            nn.ConvTranspose2d(64, out_channels, kernel_size=4, stride=2, padding=1),
            nn.Tanh()
        )

    def forward(self, x):
        return self.model(x)

## **4.3.1 Loss Functions**

In [None]:
# Loss
class CycleGANLosses:
    def __init__(self, lambda_cycle=10.0, lambda_identity=5.0):
        self.adversarial_loss = nn.MSELoss()
        self.cycle_loss = nn.L1Loss()
        self.identity_loss = nn.L1Loss()
        self.lambda_cycle = lambda_cycle
        self.lambda_identity = lambda_identity

    def generator_loss(self, fake_output, real_images, reconstructed_images, identity_images=None):
        gan_loss = self.adversarial_loss(fake_output, torch.ones_like(fake_output))
        cycle_loss = self.cycle_loss(reconstructed_images, real_images) * self.lambda_cycle
        identity_loss = self.identity_loss(identity_images, real_images) * self.lambda_identity if identity_images is not None else 0.0
        return gan_loss + cycle_loss + identity_loss

    def discriminator_loss(self, real_output, fake_output):
        real_loss = self.adversarial_loss(real_output, torch.ones_like(real_output))
        fake_loss = self.adversarial_loss(fake_output, torch.zeros_like(fake_output))
        return (real_loss + fake_loss) / 2

In [None]:
# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## **Loading the Data**

In [None]:
# Custom Dataset Loader for Unlabeled Images
class UnpairedImageDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.image_paths = sorted([os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.endswith(('.png', '.jpg', '.jpeg'))])
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image

# Load Dataset
def load_data(data_dir, batch_size):
    transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Using 512x512 or 256 x 256 resolution
    transforms.ToTensor(),
    transforms.Normalize(mean= [0.5, 0.5, 0.5], std=[0.5, 0.5,0.5])
    ])


    dataset = UnpairedImageDataset(data_dir, transform=transform)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=True)

## **Defining the Training Loop**

In [None]:
# Training function
def train_cycle_gan(generator_A2B, generator_B2A, discriminator_A, discriminator_B,
                    dataloader_A, dataloader_B, optimizer_G, optimizer_D,
                    loss_fn, epochs, device, checkpoint_dir="checkpoints"):
    # Ensure the checkpoint directory exists
    os.makedirs(checkpoint_dir, exist_ok=True)

    # Initialize lists to store losses
    generator_losses = []
    discriminator_losses = []

    # Load from checkpoint if exists
    start_epoch = 0
    if os.path.exists(os.path.join(checkpoint_dir, "last_checkpoint.pth")):
        print("Loading from checkpoint...")
        checkpoint = torch.load(os.path.join(checkpoint_dir, "last_checkpoint.pth"))
        generator_A2B.load_state_dict(checkpoint["generator_A2B"])
        generator_B2A.load_state_dict(checkpoint["generator_B2A"])
        discriminator_A.load_state_dict(checkpoint["discriminator_A"])
        discriminator_B.load_state_dict(checkpoint["discriminator_B"])
        optimizer_G.load_state_dict(checkpoint["optimizer_G"])
        optimizer_D.load_state_dict(checkpoint["optimizer_D"])
        generator_losses = checkpoint["generator_losses"]
        discriminator_losses = checkpoint["discriminator_losses"]
        start_epoch = checkpoint["epoch"] + 1
        print(f"Resuming training from epoch {start_epoch}...")

    # Training loop
    for epoch in range(start_epoch, epochs):
        epoch_generator_loss = 0.0
        epoch_discriminator_loss = 0.0
        num_batches = 0

        for i, (real_A, real_B) in enumerate(zip(dataloader_A, cycle(dataloader_B))):
            num_batches += 1
            real_A = real_A.to(device)
            real_B = real_B.to(device)

            # ------------------------
            # Train Generators
            # ------------------------
            optimizer_G.zero_grad()

            fake_B = generator_A2B(real_A)
            reconstructed_A = generator_B2A(fake_B)
            fake_A = generator_B2A(real_B)
            reconstructed_B = generator_A2B(fake_A)

            identity_A = generator_B2A(real_A)
            identity_B = generator_A2B(real_B)

            # Compute generator losses
            loss_G_A2B = loss_fn.generator_loss(discriminator_B(fake_B), real_A, reconstructed_A, identity_B)
            loss_G_B2A = loss_fn.generator_loss(discriminator_A(fake_A), real_B, reconstructed_B, identity_A)
            loss_G = loss_G_A2B + loss_G_B2A
            loss_G.backward()
            optimizer_G.step()

            # ------------------------
            # Train Discriminators
            # ------------------------
            optimizer_D.zero_grad()

            fake_A_detached = fake_A.detach()
            fake_B_detached = fake_B.detach()

            loss_D_A = loss_fn.discriminator_loss(discriminator_A(real_A), discriminator_A(fake_A_detached))
            loss_D_B = loss_fn.discriminator_loss(discriminator_B(real_B), discriminator_B(fake_B_detached))

            loss_D = (loss_D_A + loss_D_B) / 2
            loss_D.backward()
            optimizer_D.step()

            # Accumulate epoch losses
            epoch_generator_loss += loss_G.item()
            epoch_discriminator_loss += loss_D.item()

        # Compute average losses for the epoch
        avg_generator_loss = epoch_generator_loss / num_batches
        avg_discriminator_loss = epoch_discriminator_loss / num_batches
        generator_losses.append(avg_generator_loss)
        discriminator_losses.append(avg_discriminator_loss)

        print(f"Epoch [{epoch+1}/{epochs}], Generator Loss: {avg_generator_loss:.4f}, Discriminator Loss: {avg_discriminator_loss:.4f}")

        # Save checkpoint after every epoch
        checkpoint = {
            "generator_A2B": generator_A2B.state_dict(),
            "generator_B2A": generator_B2A.state_dict(),
            "discriminator_A": discriminator_A.state_dict(),
            "discriminator_B": discriminator_B.state_dict(),
            "optimizer_G": optimizer_G.state_dict(),
            "optimizer_D": optimizer_D.state_dict(),
            "epoch": epoch,
            "generator_losses": generator_losses,
            "discriminator_losses": discriminator_losses,
        }
        torch.save(checkpoint, os.path.join(checkpoint_dir, "last_checkpoint.pth"))

        # Save models every 10 epochs
        if (epoch + 1) % 10 == 0:
            torch.save(generator_A2B.state_dict(), os.path.join(checkpoint_dir, f"generator_A2B_epoch{epoch+1}.pth"))
            torch.save(generator_B2A.state_dict(), os.path.join(checkpoint_dir, f"generator_B2A_epoch{epoch+1}.pth"))
            torch.save(discriminator_A.state_dict(), os.path.join(checkpoint_dir, f"discriminator_A_epoch{epoch+1}.pth"))
            torch.save(discriminator_B.state_dict(), os.path.join(checkpoint_dir, f"discriminator_B_epoch{epoch+1}.pth"))

    return generator_losses, discriminator_losses

### **Visualization**

In [None]:
# Visualization function
def plot_losses(generator_losses, discriminator_losses):
    plt.figure(figsize=(12, 6))

    # Generator Loss
    plt.subplot(1, 2, 1)
    plt.plot(range(1, len(generator_losses) + 1), generator_losses, label="Generator Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.title("Generator Loss")
    plt.legend()

    # Discriminator Loss
    plt.subplot(1, 2, 2)
    plt.plot(range(1, len(discriminator_losses) + 1), discriminator_losses, label="Discriminator Loss", color="orange")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.title("Discriminator Loss")
    plt.legend()

    plt.tight_layout()
    plt.show()

### **Run the Training**

In [None]:
def train_cycle_gan_model(
    data_A_dir,
    data_B_dir,
    batch_size=16,
    epochs=40,
    generator_lr=0.0002,
    discriminator_lr=0.0001,
    lambda_cycle=10.0,
    lambda_identity=5.0,
    checkpoint_dir="checkpoints",
):
    """
    Train a CycleGAN model with the given parameters.

    Args:
        data_A_dir (str): Path to dataset A.
        data_B_dir (str): Path to dataset B.
        batch_size (int): Batch size for training.
        epochs (int): Number of training epochs.
        generator_lr (float): Learning rate for the generators.
        discriminator_lr (float): Learning rate for the discriminators.
        lambda_cycle (float): Weight for cycle consistency loss.
        lambda_identity (float): Weight for identity loss.
        checkpoint_dir (str): Directory to save model checkpoints.

    Returns:
        None
    """
    # Load datasets
    dataloader_A = load_data(data_A_dir, batch_size=batch_size)
    dataloader_B = load_data(data_B_dir, batch_size=batch_size)

    # Initialize models
    generator_A2B = Generator(3, 3).to(device)
    generator_B2A = Generator(3, 3).to(device)
    discriminator_A = Discriminator(3).to(device)
    discriminator_B = Discriminator(3).to(device)

    # Initialize loss function
    loss_fn = CycleGANLosses(lambda_cycle=lambda_cycle, lambda_identity=lambda_identity)

    # Optimizers
    optimizer_G = optim.Adam(itertools.chain(generator_A2B.parameters(), generator_B2A.parameters()),
                             lr=generator_lr, betas=(0.5, 0.999))
    optimizer_D = optim.Adam(itertools.chain(discriminator_A.parameters(), discriminator_B.parameters()),
                             lr=discriminator_lr, betas=(0.5, 0.999))

    # Train the CycleGAN model
    generator_losses, discriminator_losses = train_cycle_gan(
        generator_A2B, generator_B2A, discriminator_A, discriminator_B,
        dataloader_A, dataloader_B, optimizer_G, optimizer_D,
        loss_fn, epochs, device, checkpoint_dir
    )

    # Plot the losses
    plot_losses(generator_losses, discriminator_losses)

