# **pipeline2: pretrained cnn features**

In [None]:
import torch
import torchvision.models as models
import torchvision.transforms as transforms

#method 1: pretrained cnn -- used for knn, logistic reg, svm, nn and ensemble method training
# Load pre-trained ResNet model 
model = models.resnet18(pretrained=True)
model = torch.nn.Sequential(*list(model.children())[:-1])  # Remove final classification layer
model.eval()  # Set to evaluation mode

# **Preprocessing**

In [None]:
import numpy as np 
import pandas as pd 
import cv2 as cv2
import os as os
from sklearn.preprocessing import LabelEncoder
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import matplotlib.pyplot as plt


# Define folder paths for pleasant and unpleasant images
pleasant_path = "/kaggle/input/mldata/train-images/train/pleasant" 
unpleasant_path = "/kaggle/input/mldata/train-images/train/unpleasant" 
test_path = "/kaggle/input/mldata/TEST_images/TEST_images"

# List to store image data
image_data1 = []
image_data2 = []

test_data = []

# Process Pleasant Images
for filename in sorted(os.listdir(pleasant_path)):
    if filename.endswith(".jpg"):
        image_data1.append([filename, 1])

# Process Unpleasant Images
for filename in sorted(os.listdir(unpleasant_path)):
    if filename.endswith(".jpg"):
        image_data2.append([filename, 0])

# Process test images
for filename in sorted(os.listdir(test_path)):
    if filename.endswith(".jpg"):
        test_data.append([filename, 0]) #defaults to unpleasant label (0)

# Create DataFrame
df1 = pd.DataFrame(image_data1, columns=["Filename", "Label"])
df2 = pd.DataFrame(image_data2, columns=["Filename", "Label"])
df3 = pd.DataFrame(test_data, columns=["Filename", "Label"])

#df1.head()
#df2.head()

# show info for all files (count,freq,unique)
print(df1.describe())
print(df2.describe())

# Save to CSV
df1.to_csv("pleasant_labels.csv", index=False)
df2.to_csv("unpleasant_labels.csv", index=False)

# Image transformations (match what ResNet expects)
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # Resize image
    transforms.ToTensor(),  # Convert to tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize
])


labels_df = pd.concat([df1, df2], ignore_index=True)
labels_df.to_csv("test.csv",index=False)

image_features = []
labels = []
pleasant = pleasant_path
print(pleasant)
unpleasant = unpleasant_path
print(unpleasant)

#(1)
# Process each image and get its assigned label
for _, row in labels_df.iterrows():
    
    label = row["Label"]
    filename = os.path.basename(row["Filename"])

    if(label == 1):
        img_path = pleasant+"/"+filename
    elif(label == 0):
        img_path = unpleasant+"/"+filename
    
    # Load and preprocess image
    img = Image.open(img_path).convert("RGB")
    img = transform(img).unsqueeze(0)  

        # Extract features
    with torch.no_grad():
        features = model(img)

    features = features.view(-1).numpy()  # Flatten feature vector

        # Store features & label
    image_features.append(features)
    labels.append(label)

# Convert to DataFrame

df = pd.DataFrame(image_features)
df["Label"] = labels # labels are the targets
df = df.dropna()
# Save features to CSV
df.to_csv("extracted_features_pytorch.csv", index=False)

# Display first few rows
df.head()
print(df.describe())

In [None]:
df1.head()

In [None]:
image_features2 = []
labels2 = []

# Image transformations (match what ResNet expects)
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # Resize image
    transforms.ToTensor(),  # Convert to tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize
])


df3.to_csv("test2.csv",index=False)
#(1)
# Process each image and get its assigned label
for _, row in df3.iterrows():
    
    label = row["Label"]
    filename = os.path.basename(row["Filename"])

    img_path = test_path+"/"+filename
  

    if img_path is None: #testing
         print(f"Warning: {filename} not found in any folder!")
         continue

    # print(f"Processing: {img_path} with label {label}") #debugging
    
    # Load and preprocess image
    img = Image.open(img_path).convert("RGB")
    img = transform(img).unsqueeze(0)  # Add batch dimension

    # Extract features
    with torch.no_grad():
        features = model(img)

    features = features.view(-1).numpy()  # Flatten feature vector

    # Store features & label
    image_features2.append(features)
    labels2.append(label)

# Convert to DataFrame
dft = pd.DataFrame(image_features2)
dft["Label"] = labels2 # so basically labels are the targets
dft = dft.dropna()
# Save features to CSV
dft.to_csv("extracted_features_test.csv", index=False)

# Display first few rows
dft.head()
print(dft.describe())

# **train-test set split**

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# Split features (X) and labels (y)
X = df.iloc[:, :-1]  # Features after preprocessing
y = df.iloc[:, -1]  # Corresponding labels

# Train-test split (80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

l = LabelEncoder()
y_train_enc = l.fit_transform(y_train)
y_test_enc = l.fit_transform(y_test)
i =0
while(i<5):
    print(y_train_enc[i]) #head() but for this particular array
    i+=1

X_train.head()

# **dimension reduction**

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.decomposition import PCA
from sklearn.manifold import LocallyLinearEmbedding

def plot_2d(X_2d, y=None):
    X_2d = np.array(X_2d)
    if y is not None:
        y = np.array(y)
    plt.figure(figsize=(8, 6))
    scatter = plt.scatter(X_2d[:, 0], X_2d[:, 1], c=y, cmap=plt.cm.get_cmap('viridis', 2), alpha=0.7)
    plt.colorbar(scatter, ticks=[0, 1], label='Class')
    plt.xlabel('Dim 1')
    plt.ylabel('Dim 2')
    plt.title('2D Visualization')
    plt.grid(True)
    plt.show()

def plot_3d(X_3d, y=None):
    X_3d = np.array(X_3d)
    if y is not None:
        y = np.array(y)
    fig = plt.figure(figsize=(10, 8))
    ax = fig.add_subplot(111, projection='3d')
    scatter = ax.scatter(X_3d[:, 0], X_3d[:, 1], X_3d[:, 2], c=y, cmap='viridis', alpha=0.7)
    ax.set_xlabel('Dim 1')
    ax.set_ylabel('Dim 2')
    ax.set_zlabel('Dim 3')
    ax.set_title('3D Visualization')
    fig.colorbar(scatter, label='Class')
    plt.show()



In [None]:
import numpy as np
from sklearn.decomposition import PCA
from sklearn.manifold import LocallyLinearEmbedding
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)


def build_autoencoder(input_dim, latent_dim):
    input_layer = Input(shape=(input_dim,))
    
    # Encoder with 2 hidden layers
    encoded = Dense(128, activation='relu')(input_layer)
    encoded = Dense(256, activation='relu')(encoded)
    latent = Dense(latent_dim)(encoded)
    
    # Decoder with 2 hidden layers (reverse order)
    decoded = Dense(256, activation='relu')(latent)
    decoded = Dense(128, activation='relu')(decoded)
    output_layer = Dense(input_dim)(decoded)

    autoencoder = Model(input_layer, output_layer)
    encoder = Model(input_layer, latent)

    autoencoder.compile(optimizer=Adam(1e-3), loss='mse')
    return autoencoder, encoder

def dim_reduct(model,m, X_train, X_test):
    # Ensure numpy arrays
    X_train = np.array(X_train)
    X_test = np.array(X_test)

    
    if model == "pca":
            pca = PCA(n_components=m)
            X_train_d = pca.fit_transform(X_train)
            X_test_d = pca.transform(X_test)
            if m == 2:
                plot_2d(X_train_d,y_train.values)
            elif m == 3:
                plot_3d(X_train_d,y_train.values)
            return X_train_d

    elif model == "lle":
            lle = LocallyLinearEmbedding(n_components=m, n_neighbors=10)
            X_train_d = lle.fit_transform(X_train)
            if m == 2:
                plot_2d(X_train_d,y_train.values)
            elif m == 3:
                plot_3d(X_train_d,y_train.values)
            return X_train_d

    elif model == "autoenc":
            input_dim = X_train.shape[1]

            # Build autoencoder for given latent size
            autoencoder, encoder = build_autoencoder(input_dim, latent_dim=m)

            # Train the autoencoder
            autoencoder.fit(X_train, X_train, epochs=20, batch_size=64, shuffle=True, verbose=0)

            # Transform (encode) the data
            X_train_d = encoder.predict(X_train)

            # Plot
            if m == 2:
                plot_2d(X_train_d,y_train.values)
            elif m == 3:
            # Build and compile 3D autoencoder
                autoencoder_3d, encoder_3d = build_autoencoder(input_dim, latent_dim=3)
                autoencoder_3d.compile(optimizer=Adam(1e-3), loss='mse') 
                autoencoder_3d.fit(X_train, X_train, epochs=20, batch_size=64, shuffle=True, verbose=0)

                X_train_3d = encoder_3d.predict(X_train)
                plot_3d(X_train_3d, y_train.values)
            return X_train_d



# **evaluation method**

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, roc_auc_score
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import mean_squared_error
from sklearn.metrics import RocCurveDisplay, ConfusionMatrixDisplay
from sklearn.metrics import f1_score
from sklearn.metrics import silhouette_score
from collections import Counter
from scipy.stats import mode

def purity_score_labels(y_true, y_pred):

    clusters = np.unique(y_pred) # unique values of prediction
    total = len(y_true) # counts unique values
    correct = 0
    for cluster in clusters:
        idx = (y_pred == cluster)
        true_labels = y_true[idx]
        if len(true_labels) == 0:
            continue
        most_common, count = mode(true_labels, keepdims=True)
        correct += count[0]
    return correct / total


def compute_metrics_labels(X, y_true, y_pred):
    # Classification report
    print(classification_report(y_true, y_pred))

    # Silhouette score
    sil_score = silhouette_score(X, y_pred)
    print(f"Silhouette Score: {sil_score:.4f}")

    # Purity score
    purity = purity_score_labels(np.array(y_true), np.array(y_pred))
    print(f"Purity: {purity:.4f}")

    # F1 Score
    f1 = f1_score(y_true, y_pred, average='macro')
    print(f"F1 Score: {f1:.4f}")

    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    print("Confusion Matrix:\n", cm)

    return sil_score, purity, f1

# **model training**

In [None]:
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.metrics import pairwise_distances
from sklearn.base import clone

M = [2,3,5,10,15,20]
K = [2,3,4,5,6,7,8,9,10]
dim_models = ["pca","lle","autoenc"]
clust_models = ["kmeucl","kmcos","agglo"]
sil_score = 0
purity = 0
f1 = 0

def clustering_pipeline(X,y,k,model):
    # Euclidean KMeans
    if(model == "kmeucl"):
        kmeans_eucl = KMeans(n_clusters=k, random_state=42)
        y_pred_eucl = kmeans_eucl.fit_predict(X)
        sil_score, purity, f1 = compute_metrics_labels(X, y, y_pred_eucl)
    elif(model == "kmcos"):
        cosine_dist = pairwise_distances(X, metric='cosine')
        kmeans_cos = KMeans(n_clusters=len(np.unique(y)), random_state=42)
        y_pred_cos = kmeans_cos.fit_predict(cosine_dist)
        sil_score, purity, f1 = compute_metrics_labels(X, y, y_pred_cos)
    elif(model == "agglo"):
        agglo = AgglomerativeClustering(n_clusters=len(np.unique(y)))
        y_pred_agglo = agglo.fit_predict(X)
        sil_score, purity, f1 = compute_metrics_labels(X, y, y_pred_agglo)
    return {"silhouette": sil_score, 
            "purity": purity, 
            "f1": f1, 
            "k":k,
            "model": model}

results = []
best_score = -1
best_model = None
best_params = {}
best_k = -1
    
for m in M:
    print("---------------------------",m,"---------------------------")
    for model in dim_models:
        print("=======================",model,"=======================")
        for k in K:
            for cmodel in clust_models:
                X_train_red = dim_reduct(model,m,X_train,X_test)
                #clustering_pipeline(X_train_red, y_train,k,cmodel)
                res = clustering_pipeline(X_train_red, y_train, k, cmodel)
                res.update({"dim_red_model": model, "m": m})
                results.append(res)
                
                # Update best
                if res["silhouette"] > best_score:
                    best_score = res["silhouette"]
                    best_model = res
                    best_k = k
                  

In [None]:
M = [15,20] #repeated procedure run due to technical problems on kaggle environment
for m in M:
    print("---------------------------",m,"---------------------------")
    for model in dim_models:
        print("=======================",model,"=======================")
        for k in K:
            for cmodel in clust_models:
                X_train_red = dim_reduct(model,m,X_train,X_test)
                #clustering_pipeline(X_train_red, y_train,k,cmodel)
                res = clustering_pipeline(X_train_red, y_train, k, cmodel)
                res.update({"dim_red_model": model, "m": m})
                results.append(res)
                
                # Update best
                if res["silhouette"] > best_score:
                    best_score = res["silhouette"]
                    best_model = res
                    best_k = k

In [None]:
print("Best configuration based on Silhouette Score:")
print(best_model)
print("Best k* based on Silhouette Score:")
print(best_k)

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

import pandas as pd
results_df = pd.DataFrame(results)
results_df.to_csv("clustering_sil_results.csv",index=False)

for model in dim_models:
    for cmodel in clust_models:
        subset = results_df[(results_df["dim_red_model"] == model) & (results_df["model"] == cmodel)]
        plt.figure()
        sns.lineplot(data=subset, x="k", y="silhouette", hue="m", marker="o")
        plt.title(f"{model} + {cmodel} - Silhouette vs k")
        plt.xlabel("Number of Clusters (k)")
        plt.ylabel("Silhouette Score")
        plt.legend(title="Dim m")
        plt.grid(True)
        plt.show()


In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

def plot_scores_vs_k(results_df, model_filter, dim_method_filter):
    df = results_df[(results_df['model'] == model_filter) & 
                    (results_df['dim_red_model'] == dim_method_filter)]

    plt.figure(figsize=(10, 6))
    sns.lineplot(x='k', y='silhouette', data=df, label='Silhouette Score', marker='o')
    sns.lineplot(x='k', y='purity', data=df, label='Purity', marker='s')
    plt.title(f'Scores vs k for {model_filter} + {dim_method_filter}')
    plt.xlabel('Number of Clusters (k)')
    plt.ylabel('Score')
    plt.legend()
    plt.grid(True)
    plt.show()
    
for model in dim_models:
    for cmodel in clust_models:
        plot_scores_vs_k(results_df,cmodel,model)


In [None]:
pivoted = results_df.pivot_table(index='k', columns='model', values='silhouette')
sns.heatmap(pivoted, annot=True, cmap="viridis")
plt.title('Silhouette Scores by Clustering Model and k')


In [None]:
best_k_df = results_df.loc[results_df.groupby(['model', 'dim_red_model'])['silhouette'].idxmax()]
sns.barplot(x='dim_red_model', y='k', hue='model', data=best_k_df)
plt.title('Best k per Clustering + Dim. Reduction Method')


In [None]:
import pandas as pd

results_df = pd.DataFrame(results)

summary_df = results_df.pivot_table(
    index=['dim_red_model', 'model', 'k'],
    values=['silhouette', 'purity', 'f1']
).reset_index()

# Sort by silhouette
summary_df = summary_df.sort_values(by='silhouette', ascending=False)
summary_df.to_csv("clustering_summary.csv", index=False)

# Show top results
print(summary_df.head(10))


# **pipeline2 final training**

In [None]:
import csv

X_unknown = dft.iloc[:, :-1] #aligned with df3a

y_pred_u = best_model2.predict(X_unknown)


unknown_pics = pd.read_csv('/kaggle/input/mldata/Test-IDs.csv')
size = y_pred_u.size

pics_unordered = df3["Filename"].tolist()
pics_ordered = unknown_pics["Filename"].tolist()

unordered_map = {filename: index for index, filename in enumerate(pics_unordered)}

# Initialize the sorted_labels array with zero values.
sorted_labels = [0] * size

# for filename in pics_ordered:
#     if filename not in unordered_map:
#         print(f"Filename '{filename}' not found in unordered_map!")

# Step 1: Normalize both lists (strip whitespace and lowercase everything)
pics_unordered_norm = [p.strip().lower() for p in df3["Filename"].tolist()]
pics_ordered_norm = [p.strip().lower() for p in unknown_pics["Filename"].tolist()]

# # Step 2: Get missing filenames
# missing_files = [f for f in pics_ordered_norm if f not in pics_unordered_norm]

# print(f"Missing: {len(missing_files)} out of {len(pics_ordered_norm)} filenames.")
# print("Example missing filenames:", missing_files[:10])


# Loop over each entry in pics_ordered and assign the corresponding y_pred_u value to the sorted_labels.
for idx, filename in enumerate(pics_ordered):

    unordered_index = unordered_map[filename]
    
    sorted_labels[unordered_index] = y_pred_u[idx]

for i in range(size):
    if sorted_labels[i] == 0:
        sorted_labels[i] = "0"
id_list = [i for i in range(size)]

with open('output_labels.csv', mode='w', newline='') as file:
    writer = csv.writer(file, delimiter=',')  # delimiter is a comma
    
    writer.writerow(['ID', 'label'])
    
    for i in range(len(id_list)):
        writer.writerow([id_list[i], str(sorted_labels[i])])  # Convert to string


In [None]:
import os
for dirname, _, filenames in os.walk('/kaggle/working'):
    for filename in filenames:
        print(os.path.join(dirname, filename))