**libraries**

In [19]:
import cv2
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import math
from sklearn.preprocessing import StandardScaler
import shutil
from sklearn.cluster import KMeans, DBSCAN, MeanShift, estimate_bandwidth
import seaborn as sns
from sklearn.neighbors import NearestNeighbors
from sklearn.decomposition import PCA
from sklearn.metrics import pairwise_distances

**PHASE 1**

In [None]:
image_folder = 'dataset'  
df = pd.read_csv('features.csv')

df['eye_color_score'] = 0
df['skin_color_score'] = 0

face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
eye_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml')

face_locations = []

#function to calculate the color score using HSV color space
def calculate_color_score_hsv(region):
    hsv_region = cv2.cvtColor(region, cv2.COLOR_BGR2HSV)
    
    avg_hsv_per_row = np.mean(hsv_region, axis=0)
    avg_hsv = np.mean(avg_hsv_per_row, axis=0)  # [H, S, V]
    
    hue = avg_hsv[0] * 2  #Scaling back to [0, 360] degrees
    saturation = avg_hsv[1] / 255.0  #Normalize to [0, 1]
    value = avg_hsv[2] / 255.0  #Normalize to [0, 1]
    
    if hue < 30 or hue > 330:  #red
        color_score = 100 + saturation * 50 + value * 50
    elif 30 <= hue < 90:  #yellow-green
        color_score = 120 + saturation * 50 + value * 50
    elif 90 <= hue < 150:  #green
        color_score = 140 + saturation * 50 + value * 50
    elif 150 <= hue < 210:  #blue 
        color_score = 160 + saturation * 50 + value * 50
    elif 210 <= hue < 270:  #purple 
        color_score = 180 + saturation * 50 + value * 50
    elif 270 <= hue < 330:  #pink-red
        color_score = 190 + saturation * 50 + value * 50
    else:
        #fallback color score
        color_score = 150 + saturation * 50 + value * 50
        
    return color_score


for index, row in df.iterrows():
    image_id = row['image_id']
    image_path = os.path.join(image_folder, f"{image_id}")
    image = cv2.imread(image_path)

    if image is None:
        print(f"Image not found: {image_path}")
        df.loc[index, 'skin_color_score'] = 404
        df.loc[index, 'eye_color_score'] = 404
        continue

    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.05, minNeighbors=7)

    if len(faces) > 0:
        (x, y, w, h) = faces[0]
        face_locations.append((x, y, w, h)) 
    else:
        if len(face_locations) > 0:
            avg_face_location = np.mean(face_locations, axis=0).astype(int)
            (x, y, w, h) = avg_face_location

    face_region = image[y:y+h, x:x+w]

    #skin color extraction
    skin_region = face_region[int(h/4):int(3*h/4), int(w/4):int(3*w/4)]
    skin_color_score = calculate_color_score_hsv(skin_region)

    #manual Eye color extraction
    left_eye_region = face_region[int(h/5):int(h/2.5), int(w/8):int(w/4)]
    right_eye_region = face_region[int(h/5):int(h/2.5), int(5*w/8):int(3*w/4)]
    
    left_eye_color_score = calculate_color_score_hsv(left_eye_region)
    right_eye_color_score = calculate_color_score_hsv(right_eye_region)
    
    eye_color_score = (left_eye_color_score + right_eye_color_score) / 2

    df.loc[index, 'skin_color_score'] = skin_color_score
    df.loc[index, 'eye_color_score'] = eye_color_score

df.to_csv('features.csv', index=False)

print("Features updated!")

#function to display the image with bounding boxes for face and manual eyes
def display_image_with_bounding_boxes(image, face_rect, left_eye_rect, right_eye_rect, opencv_eye_rects):
    (x, y, w, h) = face_rect
    cv2.rectangle(image, (x, y), (x + w, y + h), (255, 0, 0), 2)  #blue rectangle for face

    #draw manual bounding boxes for the left and right eyes (green)
    (lx, ly, lw, lh) = left_eye_rect
    cv2.rectangle(image, (lx, ly), (lx + lw, ly + lh), (0, 255, 0), 2)  

    (rx, ry, rw, rh) = right_eye_rect
    cv2.rectangle(image, (rx, ry), (rx + rw, ry + rh), (0, 255, 0), 2)  
    
    #draw OpenCV eye bounding boxes (yellow)
    for (ex, ey, ew, eh) in opencv_eye_rects:
        cv2.rectangle(image, (ex, ey), (ex + ew, ey + eh), (0, 255, 255), 2)

    plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    plt.axis('off')
    plt.show()

#process 10 random images to show
sample_images = df.sample(10)

for index, row in sample_images.iterrows():
    image_id = row['image_id']
    image_path = os.path.join(image_folder, f"{image_id}")
    image = cv2.imread(image_path)

    if image is None:
        print(f"Image not found: {image_path}")
        continue


    #manual Eye Detection
    left_eye_rect = (x + int(w / 8), y + int(h / 5), int(w / 4), int(h / 4))
    right_eye_rect = (x + int(5 * w / 8), y + int(h / 5), int(w / 4), int(h / 4)) 
    
    #detect eyes using OpenCV's eye detector
    face_region_gray = gray[y:y+h, x:x+w]
    opencv_eye_rects = eye_cascade.detectMultiScale(face_region_gray, scaleFactor=1.05, minNeighbors=5)

    #adjust OpenCV eye rectangles to be in the context of the whole image
    opencv_eye_rects_global = []
    for (ex, ey, ew, eh) in opencv_eye_rects:
        opencv_eye_rects_global.append((x + ex, y + ey, ew, eh))

    display_image_with_bounding_boxes(
        image.copy(),
        face_rect=(x, y, w, h),
        left_eye_rect=left_eye_rect,
        right_eye_rect=right_eye_rect,
        opencv_eye_rects=opencv_eye_rects_global
    )

**PHASE 2**

In [None]:
df = pd.read_csv('features.csv')

#first column is image_id
features = df.iloc[:, 1:].values  
feature_names = df.columns[1:]

scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)

def mean(feature):
    return sum(feature) / len(feature)

# Function to calculate Pearson correlation between two features
def pearson_correlation(feature_x, feature_y):
    mean_x = mean(feature_x)
    mean_y = mean(feature_y)
    
    numerator = 0
    denominator_x = 0
    denominator_y = 0
    
    for i in range(len(feature_x)):
        numerator += (feature_x[i] - mean_x) * (feature_y[i] - mean_y)
        denominator_x += (feature_x[i] - mean_x) ** 2
        denominator_y += (feature_y[i] - mean_y) ** 2
    
    denominator = math.sqrt(denominator_x) * math.sqrt(denominator_y)
    
    if denominator == 0:
        return 0  #avoid division by zero
    return numerator / denominator

num_features = features_scaled.shape[1]

correlation_matrix = np.zeros((num_features, num_features))

for i in range(num_features):
    for j in range(i, num_features):
        correlation_matrix[i][j] = pearson_correlation(features_scaled[:, i], features_scaled[:, j])
        correlation_matrix[j][i] = correlation_matrix[i][j]

# Function to select features based on correlation and variance
def select_features(features, correlation_matrix, threshold=0.8):
    selected_features = []

    variances = np.var(features, axis=0)
    
    # Sort feature indices by variance in descending order
    sorted_by_variance = np.argsort(-variances)
    
    for i in sorted_by_variance:
        correlated = False
        for j in selected_features:
            if abs(correlation_matrix[i][j]) > threshold: 
                correlated = True
                break
        if not correlated:
            selected_features.append(i)
        if len(selected_features) == 6: 
            break
    
    return selected_features

threshold = 0.8
selected_feature_indices = select_features(features_scaled, correlation_matrix, threshold)

selected_feature_names = feature_names[selected_feature_indices]

print(f"Selected Feature Names: {selected_feature_names}")


**PHASE 3**

In [None]:
df = pd.read_csv('features.csv')

selected_columns = ['Wearing_Lipstick', 'Big_Nose', 'Blurry', 'Bangs', 'Chubby','Brown_Hair']
# selected_columns = ['skin_color_score', 'eye_color_score', 'Blurry', 'Bangs', 'Chubby','Brown_Hair']
X = df[selected_columns].values

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

#Elbow method for KMeans
def plot_elbow_method(X_scaled, max_k=10):
    inertias = []
    K = range(1, max_k+1)
    
    for k in K:
        kmeans = KMeans(n_clusters=k, random_state=42)
        kmeans.fit(X_scaled)
        inertias.append(kmeans.inertia_)
    
    plt.figure(figsize=(10, 6))
    plt.plot(K, inertias, 'bo-', markersize=8)
    plt.xlabel('Number of Clusters (k)')
    plt.ylabel('Inertia')
    plt.title('Elbow Method For Optimal k')
    plt.xticks(K)
    plt.grid(True)
    plt.show()

plot_elbow_method(X_scaled, max_k=10)

#k-Distance plot for finding eps in DBSCAN
def plot_k_distance(X_scaled, min_samples=10):
    neighbors = NearestNeighbors(n_neighbors=min_samples)
    neighbors_fit = neighbors.fit(X_scaled)
    distances, _ = neighbors_fit.kneighbors(X_scaled)
    
    # Sort distances to find the "elbow" point
    distances = np.sort(distances[:, min_samples-1], axis=0)
    plt.figure(figsize=(10, 6))
    plt.plot(distances)
    plt.title(f'k-Distance Plot for DBSCAN (k={min_samples})')
    plt.xlabel('Data Points sorted by Distance to kth Neighbor')
    plt.ylabel(f'{min_samples}-th Nearest Neighbor Distance')
    plt.grid(True)
    plt.show()

noise = np.random.normal(0, 0.1, X_scaled.shape)
X_noisy = X_scaled + noise
plot_k_distance(X_noisy, min_samples=10)

#Estimate initial bandwidth for MeanShift
estimated_bandwidth = estimate_bandwidth(X_scaled, quantile=0.3)
print(f"Estimated Bandwidth for MeanShift: {estimated_bandwidth}")

def save_cluster_images(df, labels, num_clusters, image_column='image_id', save_dir='clusters'):
    if os.path.exists(save_dir):
        shutil.rmtree(save_dir) 
    os.makedirs(save_dir)

    for cluster_num in range(num_clusters):
        cluster_folder = os.path.join(save_dir, f'cluster_{cluster_num}')
        os.makedirs(cluster_folder)

        cluster_images = df[df['cluster'] == cluster_num][image_column]

        if len(cluster_images) <= 10:
            images_to_save = cluster_images
        else:
            images_to_save = cluster_images.sample(10)

        for image_name in images_to_save:
            image_path = os.path.join('dataset', image_name)
            if os.path.exists(image_path):
                shutil.copy(image_path, cluster_folder)
            else:
                print(f"Image {image_name} not found")


def cluster_size_score(labels):
    #Count the number of points in each cluster
    unique_labels, counts = np.unique(labels, return_counts=True)
    
    #Check if noise is present and filter it out
    if -1 in unique_labels:
        noise_index = np.where(unique_labels == -1)[0][0]  #Find the index of the noise cluster
        counts = np.delete(counts, noise_index)            #Remove noise count from counts array

    #Calculate variance of cluster sizes as a measure of balance
    score = np.var(counts)  #Lower variance means more balanced clusters
    
    return score


# Hyperparameter tuning and clustering
def run_clustering_and_evaluate(X_scaled, algorithm, params):
    if algorithm == 'KMeans':
        model = KMeans(n_clusters=params['n_clusters'], random_state=42)
    elif algorithm == 'DBSCAN':
        model = DBSCAN(eps=params['eps'], min_samples=params['min_samples'])
    elif algorithm == 'MeanShift':
        model = MeanShift(bandwidth=params['bandwidth'])
    
    labels = model.fit_predict(X_scaled)
    
    score = cluster_size_score(labels)
    print(f"{algorithm} Cluster Size Score: {score}")
    return labels

kmeans_params = {'n_clusters': 7}
kmeans_labels = run_clustering_and_evaluate(X_scaled, 'KMeans', kmeans_params)
# Save cluster images for kmeans
df['cluster'] = kmeans_labels 
save_cluster_images(df, kmeans_labels, kmeans_params['n_clusters'], image_column='image_id', save_dir='clusters_kmeans')

dbscan_params = {'eps': 0.6, 'min_samples': 10}  
dbscan_labels = run_clustering_and_evaluate(X_scaled, 'DBSCAN', dbscan_params)

num_dbscan_clusters = len(set(dbscan_labels)) - (1 if -1 in dbscan_labels else 0)  # Ignore noise (-1) cluster

# Save cluster images for DBSCAN
df['cluster'] = dbscan_labels
save_cluster_images(df, dbscan_labels, num_dbscan_clusters, image_column='image_id', save_dir='clusters_dbscan')

meanshift_params = {'bandwidth': estimated_bandwidth}
meanshift_labels = run_clustering_and_evaluate(X_scaled, 'MeanShift', meanshift_params)

num_meanshift_clusters = len(set(meanshift_labels))

# Save cluster images for MeanShift
df['cluster'] = meanshift_labels  
save_cluster_images(df, meanshift_labels, num_meanshift_clusters, image_column='image_id', save_dir='clusters_meanshift')

# Plot heatmap of the features for each cluster
def plot_heatmap(df, labels, selected_columns, name):
    df['cluster'] = labels
    cluster_means = df.groupby('cluster')[selected_columns].mean()
    
    plt.figure(figsize=(10, 8))
    sns.heatmap(cluster_means, annot=True, cmap='coolwarm')
    plt.title(name + " Cluster Heatmap of Selected Features")
    plt.show()

plot_heatmap(df, kmeans_labels, selected_columns, 'KMeans')
plot_heatmap(df, dbscan_labels, selected_columns, 'DBSCAN')
plot_heatmap(df, meanshift_labels, selected_columns, 'MeanShift')


**PHASE 4**

In [None]:
df = pd.read_csv('features.csv')

selected_columns = ['Wearing_Lipstick', 'Big_Nose', 'Blurry', 'Bangs', 'Chubby','Brown_Hair']
X = df[selected_columns].values

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

def run_clustering_and_evaluate(X_scaled, algorithm, params):
    if algorithm == 'KMeans':
        model = KMeans(n_clusters=params['n_clusters'], random_state=42)
    elif algorithm == 'DBSCAN':
        model = DBSCAN(eps=params['eps'], min_samples=params['min_samples'])
    elif algorithm == 'MeanShift':
        model = MeanShift(bandwidth=params['bandwidth'])
    
    labels = model.fit_predict(X_scaled)
    return labels

kmeans_params = {'n_clusters': 7}
kmeans_labels = run_clustering_and_evaluate(X_scaled, 'KMeans', kmeans_params)

dbscan_params = {'eps': 0.6, 'min_samples': 10}
dbscan_labels = run_clustering_and_evaluate(X_scaled, 'DBSCAN', dbscan_params)

meanshift_params = {'bandwidth': 2.63}
meanshift_labels = run_clustering_and_evaluate(X_scaled, 'MeanShift', meanshift_params)

def reduce_dimensions(X, method='PCA'):
    if method == 'PCA':
        pca = PCA(n_components=2)
        X_reduced = pca.fit_transform(X)
        print(f'Explained variance by PCA: {pca.explained_variance_ratio_}')
    return X_reduced

# Define colors for clusters and noise
def get_custom_palette(labels):
    unique_labels = set(labels)
    n_clusters = len(unique_labels) - (1 if -1 in unique_labels else 0)
    

    palette = sns.color_palette("Set1", n_colors=n_clusters)
    palette = [(0.0, 0.0, 0.0)] + palette  #black for noise
    return palette

#function for visualizing clustering results
def visualize_clusters(X_reduced, labels, title='Cluster Visualization'):
    palette = get_custom_palette(labels)
    
    plt.figure(figsize=(10, 8))
    sns.scatterplot(x=X_reduced[:, 0], y=X_reduced[:, 1], hue=labels, palette=palette, s=50, alpha=0.7, edgecolor='k')
    plt.title(title)
    plt.xlabel('Principal Component 1')
    plt.ylabel('Principal Component 2')
    plt.legend(title='Cluster')
    plt.show()

X_reduced_pca = reduce_dimensions(X_scaled, method='PCA')

visualize_clusters(X_reduced_pca, kmeans_labels, title='KMeans Clustering Visualization (PCA)')

visualize_clusters(X_reduced_pca, dbscan_labels, title='DBSCAN Clustering Visualization (PCA)')

visualize_clusters(X_reduced_pca, meanshift_labels, title='MeanShift Clustering Visualization (PCA)')


**PHASE 5**

In [None]:
df = pd.read_csv('features.csv')

selected_columns = ['Wearing_Lipstick', 'Big_Nose', 'Blurry', 'Bangs', 'Chubby','Brown_Hair']
X = df[selected_columns].values


kmeans = KMeans(n_clusters=7, random_state=42)
kmeans.fit(X)

#get the cluster centers
cluster_centers = kmeans.cluster_centers_

df['kmeans_cluster'] = kmeans.labels_

#use KNN to find the closest points to each cluster center
def find_nearest_points(X, cluster_centers, num_points=50):
    nearest_neighbors = NearestNeighbors(n_neighbors=num_points)
    nearest_neighbors.fit(X)
    
    #for each cluster center, find the nearest points
    closest_points_indices = []
    for center in cluster_centers:
        distances, indices = nearest_neighbors.kneighbors([center])
        closest_points_indices.append(indices.flatten())
    return closest_points_indices

#find 50 and 3000 closest points for each cluster center
closest_50_indices = find_nearest_points(X, cluster_centers, num_points=50)
closest_3000_indices = find_nearest_points(X, cluster_centers, num_points=3000)

#analyze if the nearest points belong to the same cluster
def analyze_cluster_membership(closest_indices, original_labels):
    mismatches = []
    for cluster_num, indices in enumerate(closest_indices):
        cluster_label = cluster_num  #the label of the cluster center
        mismatch_points = []
        
        for index in indices:
            if original_labels[index] != cluster_label:
                mismatch_points.append(index)
        
        mismatches.append(mismatch_points)
    return mismatches

def analyze_mismatch_details(mismatch_points, X, cluster_centers, original_labels):
    categories = {'Overlapping': 0, 'Noise': 0, 'Misclassified': 0}
    
    for index in mismatch_points:
        point = X[index]
        distances = pairwise_distances([point], cluster_centers).flatten()  #distances to all centers
        nearest_center = np.argmin(distances)  #closest center to the point
        
        #determine the mismatch category
        if abs(distances[nearest_center] - np.partition(distances, 1)[1]) < 0.1:  #overlapping clusters
            category = "Overlapping"
        else:
            category = "Noise" if distances[nearest_center] > np.mean(distances) else "Misclassified"
        
        categories[category] += 1
    
    return categories

def report_mismatches(mismatches, num_points, X, cluster_centers, df):
    for cluster_num, mismatch_points in enumerate(mismatches):
        print(f"Cluster {cluster_num}:")
        print(f"Total mismatches for {num_points} points: {len(mismatch_points)}")
        
        if mismatch_points:
            categories = analyze_mismatch_details(mismatch_points, X, cluster_centers, df['kmeans_cluster'].values)
            
            print(f"Overlapping: {categories['Overlapping']} points")
            print(f"Noise: {categories['Noise']} points")
            print(f"Misclassified: {categories['Misclassified']} points")

print("Analysis for 50 closest points to each cluster center:")
mismatches_50 = analyze_cluster_membership(closest_50_indices, df['kmeans_cluster'].values)
report_mismatches(mismatches_50, 50, X, cluster_centers, df)

print("\nAnalysis for 3000 closest points to each cluster center:")
mismatches_3000 = analyze_cluster_membership(closest_3000_indices, df['kmeans_cluster'].values)
report_mismatches(mismatches_3000, 3000, X, cluster_centers, df)


**PHASE 6**

In [None]:
df_train = pd.read_csv('features.csv')
df_test = pd.read_csv('test_features.csv')  

selected_columns = ['Wearing_Lipstick', 'Big_Nose', 'Blurry', 'Bangs', 'Chubby','Brown_Hair']
X_train = df_train[selected_columns].values
X_test = df_test[selected_columns].values

kmeans = KMeans(n_clusters=7, random_state=42)
kmeans.fit(X_train)

#Predict clusters for the test data
df_test['predicted_cluster'] = kmeans.predict(X_test)

df_test.to_csv('test_features.csv', index=False)

save_dir = 'test_cluster_results'
if os.path.exists(save_dir):
    shutil.rmtree(save_dir)  
os.makedirs(save_dir)

def save_images_for_test_data(df_test, X_train, df_train, kmeans, save_dir, image_column='image_id'):
    sample_test_data = df_test.groupby('predicted_cluster').apply(lambda x: x.sample(1, random_state=42)).reset_index(drop=True)  # 1 test point per cluster

    for idx, row in sample_test_data.iterrows():
        test_point = row[selected_columns].values.reshape(1, -1)
        predicted_cluster = row['predicted_cluster']
        test_image_id = row[image_column]

        test_image_folder = os.path.join(save_dir, f'test_image_{test_image_id}_cluster_{predicted_cluster}')
        os.makedirs(test_image_folder, exist_ok=True)

        test_image_path = os.path.join('test', test_image_id)
        if os.path.exists(test_image_path):
            shutil.copy(test_image_path, test_image_folder)

        cluster_indices = np.where(kmeans.labels_ == predicted_cluster)[0]
        cluster_sample_indices = np.random.choice(cluster_indices, 5, replace=False)
        cluster_sample_data = df_train.iloc[cluster_sample_indices]

        for cluster_idx, cluster_row in cluster_sample_data.iterrows():
            cluster_image_id = cluster_row[image_column]
            cluster_image_path = os.path.join('dataset', cluster_image_id) 
            if os.path.exists(cluster_image_path):
                shutil.copy(cluster_image_path, test_image_folder)

save_images_for_test_data(df_test, X_train, df_train, kmeans, save_dir)

print(f"Results saved in the folder: {save_dir}")
