# **Fashion Editorial Classification and Recoloring**
Deep Unsupervised Learning - Final Project

Yasaman Badeli. Catalina López B. Joao Vitor Peluzo Cardoso.

Professor: Ali Moridnejad.

Montreal, 2025.

References:
Data
https://www.kaggle.com/code/prites/starter-fashion-product-images-dataset-e4a4b7fb-7/notebook
https://www.kaggle.com/code/selvasubramanians/download-images-from-url/notebook
https://www.kaggle.com/datasets/vishalbsadanand/deepfashion-1?select=datasets
https://github.com/yousefiparsa/apparel-masking-color-detection/tree/master/examples
Models
https://huggingface.co/prithivMLmods/Fashion-Product-articleType
https://www.kaggle.com/code/vipulsharma2000/imaterialist-fashion-eda-object-detection-colors/notebook
https://osieardi.medium.com/no-cap-just-rgb-pythons-image-color-detection-3e06218d8531

In [1]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torchvision.utils as vutils
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import pandas as pd
import requests
from io import BytesIO

In [None]:
print(torch.__version__)
print("CUDA available:", torch.cuda.is_available())
print("GPU name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU found")

base_path = r"/kaggle/input/dataset-col/Deep-Unsupervised-Learning\fashion-dataset\fashion-dataset"

images_file = os.path.join(base_path, "images.csv")
styles_file = os.path.join(base_path, "styles.csv")


images_df = pd.read_csv(images_file)
styles_df = pd.read_csv(styles_file, on_bad_lines='skip')

2.5.1+cu121
CUDA available: True
GPU name: NVIDIA GeForce RTX 4060 Laptop GPU


In [None]:
images_df['id'] = images_df['filename'].apply(lambda x: x.replace('.jpg', '')).astype(int)

df = pd.merge(images_df, styles_df, on='id')

image_download_dir = os.path.join(base_path, "downloaded_images")
os.makedirs(image_download_dir, exist_ok=True)

for idx, row in df.iterrows():
    img_id = row['id']
    img_url = row['link']
    img_path = os.path.join(image_download_dir, f"{img_id}.jpg")
    
    if not os.path.exists(img_path):
        try:
            response = requests.get(img_url, timeout=5)
            with open(img_path, 'wb') as f:
                f.write(response.content)
            print(f"Downloaded image {img_id}")
        except Exception as e:
            print(f"Failed to download image {img_id}: {e}")

Downloaded image 15970
Downloaded image 39386
Downloaded image 59263
Downloaded image 21379
Downloaded image 53759
Downloaded image 1855
Downloaded image 30805
Downloaded image 26960
Downloaded image 29114
Downloaded image 30039
Downloaded image 9204
Downloaded image 48123
Downloaded image 18653
Downloaded image 47957
Downloaded image 46885
Downloaded image 12369
Downloaded image 29928
Downloaded image 42419
Downloaded image 51832
Downloaded image 47359
Downloaded image 17429
Downloaded image 12967
Downloaded image 6842
Downloaded image 13089
Downloaded image 18461
Downloaded image 9036
Downloaded image 48311
Downloaded image 7990
Downloaded image 56019
Downloaded image 21977
Downloaded image 37812
Downloaded image 4729
Downloaded image 56825
Downloaded image 39988
Downloaded image 59051
Downloaded image 20099
Downloaded image 58183
Downloaded image 29742
Downloaded image 51658
Downloaded image 3954
Downloaded image 18839
Downloaded image 2886
Downloaded image 23278
Downloaded image 28

In [None]:
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

In [None]:
class FashionKaggleDataset(Dataset):
    def __init__(self, df, image_dir=None, transform=None):
        self.df = df
        self.image_dir = image_dir
        self.transform = transform
        self.local_images = {}
        
        
        if image_dir:
            for filename in os.listdir(image_dir):
                if filename.endswith('.png') or filename.endswith('.jpg'):
                    try:
                        
                        img_id = int(filename.split('.')[0])
                        self.local_images[img_id] = os.path.join(image_dir, filename)
                    except:
                        continue

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_id = self.df.iloc[idx]['id']
        img_url = self.df.iloc[idx]['link']
        
        
        if self.image_dir and img_id in self.local_images:
            try:
                image = Image.open(self.local_images[img_id]).convert('RGB')
            except Exception as e:
                print(f"Failed to load local image {img_id}: {e}")
                image = Image.new('RGB', (64, 64), (0, 0, 0))  
        else:
            
            try:
                response = requests.get(img_url, timeout=5)
                image = Image.open(BytesIO(response.content)).convert('RGB')
            except Exception as e:
                print(f"Failed to load image from URL {img_url}: {e}")
                image = Image.new('RGB', (64, 64), (0, 0, 0))  
        
        if self.transform:
            image = self.transform(image)
        
        return image

In [None]:
transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])


image_dir = image_download_dir


dataset = FashionKaggleDataset(df, image_dir=image_dir, transform=transform)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=0)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [None]:
class ConvVAE(nn.Module):
    def __init__(self, latent_dim=32):
        super(ConvVAE, self).__init__()
        self.latent_dim = latent_dim

        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 32, 4, stride=2, padding=1),  
            nn.ReLU(),
            nn.Conv2d(32, 64, 4, stride=2, padding=1),  
            nn.ReLU(),
            nn.Conv2d(64, 128, 4, stride=2, padding=1),  
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(128 * 8 * 8, 256),
            nn.ReLU(),
        )
        self.fc_mu = nn.Linear(256, latent_dim)
        self.fc_logvar = nn.Linear(256, latent_dim)

        # Decoder
        self.decoder_input = nn.Linear(latent_dim, 128 * 8 * 8)
        self.decoder = nn.Sequential(
            nn.Unflatten(1, (128, 8, 8)),
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1),  
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 4, stride=2, padding=1), 
            nn.ReLU(),
            nn.ConvTranspose2d(32, 3, 4, stride=2, padding=1),  
        )

    def encode(self, x):
        h = self.encoder(x)
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        h = self.decoder_input(z)
        return self.decoder(h)

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        recon_x = self.decode(z)
        return recon_x, mu, logvar

latent_dim = 32
vae = ConvVAE(latent_dim=latent_dim).to(device)

In [None]:
def vae_loss(recon_x, x, mu, logvar, beta=1.0):
    recon_loss = F.mse_loss(recon_x, x, reduction='sum')
    kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return recon_loss + beta * kl_loss

In [None]:
optimizer = optim.Adam(vae.parameters(), lr=1e-3)

num_epochs = 10
vae.train()
for epoch in range(num_epochs):
    total_loss = 0
    for batch_idx, data in enumerate(dataloader):
        data = data.to(device)
        
        optimizer.zero_grad()
        recon_batch, mu, logvar = vae(data)
        loss = vae_loss(recon_batch, data, mu, logvar)
        
        loss.backward()
        total_loss += loss.item()
        optimizer.step()
        
        if batch_idx % 100 == 0:
            print(f'Epoch {epoch+1}/{num_epochs}, Batch {batch_idx}/{len(dataloader)}, Loss: {loss.item()/len(data):.4f}')
    
    print(f'Epoch {epoch+1}/{num_epochs}, Average Loss: {total_loss/len(dataloader.dataset):.4f}')


torch.save(vae.state_dict(), os.path.join(base_path, "vae_fashion.pth"))

In [None]:
def visualize_vae_results(vae, dataloader, num_samples=8):
    vae.eval()
    with torch.no_grad():
        data = next(iter(dataloader)).to(device)
        recon, _, _ = vae(data)
        
        data = data * 0.5 + 0.5
        recon = recon * 0.5 + 0.5
        
        plt.figure(figsize=(15, 4))
        for i in range(num_samples):
            plt.subplot(2, num_samples, i+1)
            plt.imshow(data[i].permute(1, 2, 0).cpu().numpy())
            plt.axis('off')
            if i == 0:
                plt.title('Real Images')
            
            plt.subplot(2, num_samples, num_samples+i+1)
            plt.imshow(recon[i].permute(1, 2, 0).cpu().numpy())
            plt.axis('off')
            if i == 0:
                plt.title('Reconstructed Images')
        plt.show()
        
        z = torch.randn(num_samples, latent_dim).to(device)
        samples = vae.decode(z)
        samples = samples * 0.5 + 0.5
        
        plt.figure(figsize=(15, 2))
        for i in range(num_samples):
            plt.subplot(1, num_samples, i+1)
            plt.imshow(samples[i].permute(1, 2, 0).cpu().numpy())
            plt.axis('off')
            if i == 0:
                plt.title('Generated Samples')
        plt.show()

visualize_vae_results(vae, dataloader)

In [None]:
primary_colors = {
    "red": [(200, 0, 0), (255, 100, 100)],
    "green": [(0, 200, 0), (100, 255, 100)],
    "blue": [(0, 0, 200), (100, 100, 255)],
    "yellow": [(200, 200, 0), (255, 255, 205)],
    "light yellow": [(200, 236, 205), (255, 255, 255)],
    "black": [(0, 0, 0), (50, 50, 50)],
    "white": [(200, 200, 200), (255, 255, 255)],
    "grey": [(100, 100, 100), (200, 200, 200)],
    "purple": [(100, 0, 100), (200, 100, 200)],
    "orange": [(200, 100, 0), (255, 150, 50)],
    "light red": [(200, 100, 100), (255, 180, 180)],
    "dark red": [(100, 0, 0), (200, 100, 100)],
    "light green": [(100, 255, 100), (150, 255, 150)],
    "dark green": [(0, 100, 0), (100, 200, 100)],
    "light blue": [(100, 100, 255), (150, 150, 255)],
    "dark blue": [(0, 0, 100), (100, 100, 200)],
    "light grey": [(150, 150, 150), (200, 200, 200)],
    "dark grey": [(50, 50, 50), (100, 100, 100)],
}

In [None]:
def is_color_in_range(color, color_range):
    return all(
        lower <= c <= upper
        for c, (lower, upper) in zip(color, zip(color_range[0], color_range[1]))
    )

def get_primary_color_name(detected_color):
    for color_name, color_range in primary_colors.items():
        if is_color_in_range(detected_color, color_range):
            return color_name
    return "Multi"

In [None]:
def grid_sample(image, grid_size=15):
    width, height = image.size
    pixels = image.load()
    sampled_pixels = []
    for x in range(0, width, width // grid_size):
        for y in range(0, height, height // grid_size):
            r, g, b = pixels[x, y]
            if (r, g, b) != (0, 0, 0):
                sampled_pixels.append((r, g, b))
    return np.array(sampled_pixels)

def resize_image_from_center(image, scale_factor=1/3):
    original_width, original_height = image.size
    new_width = int(original_width * scale_factor)
    new_height = int(original_height * scale_factor)
    left = (original_width - new_width) // 2
    upper = (original_height - new_height) // 4
    right = left + new_width
    lower = upper + new_height
    cropped_image = image.crop((left, upper, right, lower))
    return cropped_image

In [None]:
def process_image_and_detect_colors(image):
    image = resize_image_from_center(image)
    grid_pixels = grid_sample(image, grid_size=15)
    if len(grid_pixels) > 0:
        color_counts = {}
        for pixel in grid_pixels:
            color_name = get_primary_color_name(tuple(pixel))
            if color_name in color_counts:
                color_counts[color_name] += 1
            else:
                color_counts[color_name] = 1
        primary_color_name = max(color_counts, key=color_counts.get)
        primary_color_value = tuple(grid_pixels[color_counts[primary_color_name] - 1])
        return primary_color_name, primary_color_value
    return "Unknown", (0, 0, 0)

In [None]:
def detect_color_from_file(row, image_dir):
    img_id = row['id']
    img_url = row['link']
    img_path = os.path.join(image_dir, f"{img_id}.jpg")
    
   
    if os.path.exists(img_path):
        try:
            image = Image.open(img_path).convert('RGB')
            color_name, color_rgb = process_image_and_detect_colors(image)
            return pd.Series([color_name, color_rgb])
        except Exception as e:
            print(f"Failed to process local image {img_id}: {e}")
    
    
    try:
        response = requests.get(img_url, timeout=5)
        image = Image.open(BytesIO(response.content)).convert('RGB')
        color_name, color_rgb = process_image_and_detect_colors(image)
        return pd.Series([color_name, color_rgb])
    except Exception as e:
        print(f"Failed to process {img_url}: {e}")
        return pd.Series(["Unknown", (0, 0, 0)])

In [None]:
from tqdm import tqdm
tqdm.pandas()
df[['detected_color_name', 'detected_color_rgb']] = df.progress_apply(
    lambda row: detect_color_from_file(row, image_download_dir), axis=1
)


df.to_csv(os.path.join(base_path, "fashion_with_colors.csv"), index=False)

In [None]:
import numpy as np
from sklearn.decomposition import PCA

In [None]:
def analyze_latent_space(vae, dataloader, df, num_samples=1000):
    vae.eval()
    latent_vectors = []
    colors = []
    
    with torch.no_grad():
        for i, data in enumerate(dataloader):
            if i * dataloader.batch_size >= num_samples:
                break
            data = data.to(device)
            mu, _ = vae.encode(data)
            latent_vectors.append(mu.cpu().numpy())
            
            batch_indices = range(i * dataloader.batch_size, min((i+1) * dataloader.batch_size, len(df)))
            batch_colors = df.iloc[batch_indices]['detected_color_name'].tolist()
            colors.extend(batch_colors)
    
    latent_vectors = np.concatenate(latent_vectors, axis=0)
    
    pca = PCA(n_components=2)
    latent_2d = pca.fit_transform(latent_vectors)
    
    plt.figure(figsize=(10, 8))
    for color in set(colors):
        if color and color != "Unknown":
            mask = np.array(colors) == color
            plt.scatter(latent_2d[mask, 0], latent_2d[mask, 1], label=color, alpha=0.6)
    plt.legend()
    plt.title('Latent Space Colored by Detected Color')
    plt.show()

analyze_latent_space(vae, dataloader, df)

In [None]:
df_subset = df.head(1000)  
dataset = FashionKaggleDataset(df_subset, image_dir=image_dir, transform=transform)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True, num_workers=0)

In [None]:
df_subset = df[df['articleType'].isin(['Tshirts', 'Jeans'])].head(1000)
dataset = FashionKaggleDataset(df_subset, image_dir=image_dir, transform=transform)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True, num_workers=0)