In [3]:
!pip install torch torchvision opencv-python pillow numpy
!pip install psycopg2-binary pgvector




In [4]:
# Run as a code cell
!pip install -q opencv-python-headless pillow tqdm scikit-learn pandas


In [None]:
# Import libraries
import os, zipfile
import numpy as np
from PIL import Image
import cv2
import torch
from torchvision import models, transforms
from tqdm import tqdm

# Set device
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Using device:", device)

# Upload your dataset zip
from google.colab import files
uploaded = files.upload()  # select DeepFashion2-master.zip

# Unzip dataset
zip_path = next(iter(uploaded.keys()))
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall('./DeepFashion2')
dataset_dir = './DeepFashion2'
print("Dataset extracted to:", dataset_dir)

Using device: cpu


Saving DeepFashion2-master.zip to DeepFashion2-master (1).zip
Dataset extracted to: ./DeepFashion2


In [None]:
# Load ResNet50 backbone (remove fc)
model = models.resnet50(pretrained=True)
model.fc = torch.nn.Identity()
model.to(device).eval()

# Preprocessing pipeline
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

# Function: compute visual embedding
def image_to_embedding(path_or_pil):
    if isinstance(path_or_pil, str):
        img = Image.open(path_or_pil).convert("RGB")
    else:
        img = path_or_pil.convert("RGB")
    x = preprocess(img).unsqueeze(0).to(device)
    with torch.no_grad():
        emb = model(x).cpu().numpy().reshape(-1).astype(np.float32)
    emb /= (np.linalg.norm(emb) + 1e-9)
    return emb  # 2048-d




Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth


100%|██████████| 97.8M/97.8M [00:00<00:00, 113MB/s]


In [None]:
def color_histogram_hsv_bgr(bgr_img, bins=(8,8,8)):
    hsv = cv2.cvtColor(bgr_img, cv2.COLOR_BGR2HSV)
    hist = cv2.calcHist([hsv],[0,1,2],None,bins,[0,180,0,256,0,256])
    hist = hist.flatten().astype(np.float32)
    hist /= (np.linalg.norm(hist)+1e-9)
    return hist  # 512-d if bins=(8,8,8)


In [None]:
# Example one-hot encoders (adjust as per dataset)
categories = ['top','bottom','dress','outer','shoes','bag','accessory','suit','skirt','jumpsuit']
colors = ['red','green','blue','yellow','black','white','pink','brown','purple','grey']

cat_to_idx = {c:i for i,c in enumerate(categories)}
color_to_idx = {c:i for i,c in enumerate(colors)}

def encode_metadata(category, color):
    cat_vec = np.zeros(len(categories), dtype=np.float32)
    col_vec = np.zeros(len(colors), dtype=np.float32)
    if category in cat_to_idx:
        cat_vec[cat_to_idx[category]] = 1.0
    if color in color_to_idx:
        col_vec[color_to_idx[color]] = 1.0
    return np.concatenate([cat_vec, col_vec])  # 20-d metadata vector


In [None]:
# Walk dataset and process images
all_vectors = []
all_filenames = []

# Adjust path according to your dataset structure
for root, dirs, files in tqdm(os.walk(dataset_dir)):
    for file in files:
        if file.lower().endswith(('.jpg','.png')):
            path = os.path.join(root, file)

            # Visual embedding
            try:
                pil_img = Image.open(path).convert("RGB")
            except:
                continue
            visual_emb = image_to_embedding(pil_img)

            # Color histogram
            bgr_img = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
            color_hist = color_histogram_hsv_bgr(bgr_img)

            # Metadata vector
            # Example: folder names as category/color
            parts = path.split(os.sep)
            # Adjust indices according to your folder depth
            category = parts[-3] if len(parts)>=3 else 'top'
            color = parts[-2] if len(parts)>=2 else 'red'
            metadata_vec = encode_metadata(category, color)

            # Combine
            final_vec = np.concatenate([visual_emb, color_hist, metadata_vec])
            final_vec /= (np.linalg.norm(final_vec)+1e-9)

            all_vectors.append(final_vec)
            all_filenames.append(path)

print("Processed", len(all_vectors), "images")
all_vectors = np.stack(all_vectors)
print("Feature matrix shape:", all_vectors.shape)  # (N_images, 2048+512+20)


10it [00:07,  1.25it/s]

Processed 8 images
Feature matrix shape: (8, 2580)





Metadata Encoding (category, occasion, colors)


In [None]:
# Sizes, occasions, colors
sizes = ['XS','S','M','L','XL']
occasions = ['casual','formal','party','sports','beach']
colors = ['red','green','blue','yellow','black','white','pink','brown','purple','grey']

size_to_idx = {s:i for i,s in enumerate(sizes)}
occ_to_idx = {o:i for i,o in enumerate(occasions)}
color_to_idx = {c:i for i,c in enumerate(colors)}

def encode_user_preferences(size=None, occasion=None, preferred_colors=[]):
    """
    size: str
    occasion: str
    preferred_colors: list of str
    Returns normalized vector of size (len(sizes)+len(occasions)+len(colors))
    """
    size_vec = np.zeros(len(sizes), dtype=np.float32)
    occ_vec = np.zeros(len(occasions), dtype=np.float32)
    color_vec = np.zeros(len(colors), dtype=np.float32)

    if size in size_to_idx:
        size_vec[size_to_idx[size]] = 1.0
    if occasion in occ_to_idx:
        occ_vec[occ_to_idx[occasion]] = 1.0
    for c in preferred_colors:
        if c in color_to_idx:
            color_vec[color_to_idx[c]] = 1.0

    user_vec = np.concatenate([size_vec, occ_vec, color_vec])
    user_vec /= (np.linalg.norm(user_vec)+1e-9)
    return user_vec  # e.g., 5+5+10=20-d vector


In [None]:
# Example user
size = 'M'
occasion = 'party'
preferred_colors = ['red','black','blue']

user_vec = encode_user_preferences(size=size, occasion=occasion, preferred_colors=preferred_colors)
print("User vector shape:", user_vec.shape)
print(user_vec)
print("L2 norm:", np.linalg.norm(user_vec))


User vector shape: (20,)
[0.        0.        0.4472136 0.        0.        0.        0.
 0.4472136 0.        0.        0.4472136 0.        0.4472136 0.
 0.4472136 0.        0.        0.        0.        0.       ]
L2 norm: 0.99999994


In [None]:
def build_hybrid_user_vector(user_pref_vec, liked_item_vecs, alpha=0.5):
    """
    Combine explicit preferences with liked item embeddings
    alpha: weight of explicit preferences
    liked_item_vecs: np.array of shape (num_items, item_vec_dim)
    """
    # Calculate the expected dimension of the item vectors
    item_vec_dim = liked_item_vecs.shape[1] if liked_item_vecs is not None and len(liked_item_vecs) > 0 else len(user_pref_vec)

    # Pad user_pref_vec with zeros to match the dimension of item vectors
    padded_user_pref_vec = np.zeros(item_vec_dim, dtype=np.float32)
    # Assuming the metadata is the last part of the item vector
    padded_user_pref_vec[-len(user_pref_vec):] = user_pref_vec

    if liked_item_vecs is not None and len(liked_item_vecs) > 0:
        mean_liked = liked_item_vecs.mean(axis=0)
        # Ensure dimensions match before adding
        if padded_user_pref_vec.shape[0] != mean_liked.shape[0]:
            raise ValueError("Dimension mismatch between padded user preferences and mean liked items.")
        hybrid = alpha * padded_user_pref_vec + (1 - alpha) * mean_liked
    else:
        hybrid = padded_user_pref_vec

    hybrid /= (np.linalg.norm(hybrid) + 1e-9)
    return hybrid

Content-based Recommender (Cosine Similarity / FAISS)


In [None]:
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

def recommend_content(user_vec, item_vectors, filenames, top_k=10,
                      user_size=None, user_occasion=None, item_metadata=None):
    """
    user_vec: np.array, hybrid user vector
    item_vectors: np.array, all item feature vectors
    filenames: list of corresponding image paths
    top_k: number of recommendations
    user_size / user_occasion: optional filters
    item_metadata: dict {filename: {'size':..., 'occasion':...}}
    """
    # Compute cosine similarity
    sims = cosine_similarity(user_vec.reshape(1,-1), item_vectors)[0]

    # Sort indices by similarity
    idx_sorted = sims.argsort()[::-1]

    # Apply business rules overlay
    recommendations = []
    for i in idx_sorted:
        fname = filenames[i]
        if item_metadata:
            meta = item_metadata.get(fname, {})
            if user_size and meta.get('size') != user_size:
                continue
            if user_occasion and meta.get('occasion') != user_occasion:
                continue
        recommendations.append((fname, sims[i]))
        if len(recommendations) >= top_k:
            break

    return recommendations


Prepare item metadata for filtering


In [None]:
# Example: for testing, create dummy metadata
item_metadata = {}
for f in all_filenames:
    # randomly assign size & occasion for testing
    item_metadata[f] = {
        'size': np.random.choice(['XS','S','M','L','XL']),
        'occasion': np.random.choice(['casual','formal','party','sports','beach'])
    }


Test content-based recommendation

In [None]:
# Example user
size = 'M'
occasion = 'party'
preferred_colors = ['red','black','blue']
user_pref_vec = encode_user_preferences(size, occasion, preferred_colors)

# Suppose the user liked first 3 items
liked_item_vecs = all_vectors[:3]
hybrid_vec = build_hybrid_user_vector(user_pref_vec, liked_item_vecs, alpha=0.6)

# Get top 5 recommendations
recs = recommend_content(hybrid_vec, all_vectors, all_filenames, top_k=5,
                         user_size=size, user_occasion=occasion,
                         item_metadata=item_metadata)

print("Top-5 content-based recommendations:")
for f, score in recs:
    print(f, "| similarity:", score)


Top-5 content-based recommendations:
./DeepFashion2/DeepFashion2-master/images/annotation.jpg | similarity: 0.4752463


In [None]:
from scipy.sparse import csr_matrix

# Example dummy data: user_id, item_id, interaction_value
data = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
row_indices = [0, 0, 1, 1, 2, 2, 3, 3, 4, 4]  # User IDs (0-indexed)
col_indices = [0, 1, 0, 2, 1, 3, 2, 4, 3, 4]  # Item IDs (0-indexed)
num_users = 5
num_items = 5

# Create CSR sparse matrix
interaction_matrix = csr_matrix((data, (row_indices, col_indices)), shape=(num_users, num_items))

print("Placeholder interaction_matrix created (sparse format):")
print(interaction_matrix)

# Optional: view as dense array for clarity
print("\nDense format:")
print(interaction_matrix.toarray())


Placeholder interaction_matrix created (sparse format):
<Compressed Sparse Row sparse matrix of dtype 'int64'
	with 10 stored elements and shape (5, 5)>
  Coords	Values
  (0, 0)	1
  (0, 1)	1
  (1, 0)	1
  (1, 2)	1
  (2, 1)	1
  (2, 3)	1
  (3, 2)	1
  (3, 4)	1
  (4, 3)	1
  (4, 4)	1

Dense format:
[[1 1 0 0 0]
 [1 0 1 0 0]
 [0 1 0 1 0]
 [0 0 1 0 1]
 [0 0 0 1 1]]


Train SVD on interaction matrix

In [None]:
from sklearn.decomposition import TruncatedSVD

# Convert to dense or keep as sparse (TruncatedSVD works with sparse CSR)
interaction_csr = interaction_matrix  # already csr

n_factors = 3  # number of latent factors
svd = TruncatedSVD(n_components=n_factors, random_state=42)
svd.fit(interaction_csr)

# Predicted user-item scores
predicted_scores = svd.transform(interaction_csr) @ svd.components_

print("Predicted scores shape:", predicted_scores.shape)


Predicted scores shape: (5, 5)


Recommend top-K items for a user

In [None]:
import numpy as np

def recommend_svd(user_id, predicted_scores, filenames, top_k=3):
    user_scores = predicted_scores[user_id]
    top_items = np.argsort(-user_scores)[:top_k]  # descending order
    recommended_files = [filenames[i] for i in top_items]
    return recommended_files, user_scores[top_items]

# Example: user 0
all_filenames = [f"item_{i}" for i in range(num_items)]  # dummy filenames
top_items, top_scores = recommend_svd(user_id=0, predicted_scores=predicted_scores, filenames=all_filenames, top_k=3)

print("Top-3 CF recommendations for user 0:")
for f, s in zip(top_items, top_scores):
    print(f, "| score:", s)


Top-3 CF recommendations for user 0:
item_1 | score: 0.9236067977499787
item_0 | score: 0.9236067977499784
item_3 | score: 0.20000000000000012


Normalize *scores*

In [None]:
def normalize_scores(scores):
    scores = np.array(scores, dtype=np.float32)
    min_s, max_s = scores.min(), scores.max()
    if max_s - min_s == 0:
        return np.zeros_like(scores)
    return (scores - min_s) / (max_s - min_s)


Hybrid recommendation function


In [None]:
def recommend_hybrid(user_id, hybrid_vec, content_vectors, all_filenames,
                     interaction_pred_scores, top_k=5, alpha=0.6,
                     user_size=None, user_occasion=None, item_metadata=None):
    """
    Combines content-based and collaborative filtering recommendations
    """
    # --- Content-based scores ---
    sims = cosine_similarity(hybrid_vec.reshape(1,-1), content_vectors)[0]

    # Apply business rules overlay
    content_scores = []
    for i, fname in enumerate(all_filenames):
        if item_metadata:
            meta = item_metadata.get(fname, {})
            if user_size and meta.get('size') != user_size:
                content_scores.append(0)
                continue
            if user_occasion and meta.get('occasion') != user_occasion:
                content_scores.append(0)
                continue
        content_scores.append(sims[i])

    # Normalize content and CF scores
    content_scores = normalize_scores(content_scores)
    cf_scores = normalize_scores(interaction_pred_scores[user_id])

    # Weighted hybrid
    hybrid_scores = alpha * content_scores + (1-alpha) * cf_scores

    # Top-K
    top_indices = np.argsort(-hybrid_scores)[:top_k]
    top_files = [all_filenames[i] for i in top_indices]
    top_scores = hybrid_scores[top_indices]

    return list(zip(top_files, top_scores))


Test hybrid recommendation


In [None]:
# Example: user 0
size = 'M'
occasion = 'party'
preferred_colors = ['red','black','blue']
user_pref_vec = encode_user_preferences(size, occasion, preferred_colors)

# Suppose user liked first 2 items
liked_item_vecs = all_vectors[:2]
hybrid_vec_user = build_hybrid_user_vector(user_pref_vec, liked_item_vecs, alpha=0.6)

# Dummy item metadata for business rules
item_metadata = {}
for f in all_filenames:
    item_metadata[f] = {'size': np.random.choice(['S','M','L']),
                        'occasion': np.random.choice(['casual','party','formal'])}

# Get top-5 hybrid recommendations
top_hybrid = recommend_hybrid(user_id=0, hybrid_vec=hybrid_vec_user,
                              content_vectors=all_vectors, all_filenames=all_filenames,
                              interaction_pred_scores=predicted_scores,
                              top_k=5, alpha=0.6,
                              user_size=size, user_occasion=occasion,
                              item_metadata=item_metadata)

print("Top-5 Hybrid Recommendations for user 0:")
for f, score in top_hybrid:
    print(f, "| hybrid score:", score)


Top-5 Hybrid Recommendations for user 0:
item_0 | hybrid score: 0.4
item_1 | hybrid score: 0.4
item_2 | hybrid score: 0.1527864
item_3 | hybrid score: 0.1527864
item_4 | hybrid score: 0.0


Prepare “ground truth” data


In [None]:
# Example: simulate ground truth liked items for users
ground_truth = {
    0: [0, 2],   # user 0 likes item 0 and 2
    1: [1, 3],
    2: [0, 1, 3],
    3: [2, 4],
    4: [3, 4]
}


Precision@K

In [None]:
def precision_at_k(recommended, ground_truth_items, k=5):
    recommended = recommended[:k]
    return len(set(recommended) & set(ground_truth_items)) / k
# Suppose we have hybrid recommendations (item indices) for user 0
recommended_items_indices = [int(f.split("_")[1]) for f, s in top_hybrid]  # convert filenames to indices

precision = precision_at_k(recommended_items_indices, ground_truth[0], k=5)
print(f"Precision@5 for user 0: {precision:.2f}")


Precision@5 for user 0: 0.40


Recall@K

In [None]:
def recall_at_k(recommended, ground_truth_items, k=5):
    recommended = recommended[:k]
    return len(set(recommended) & set(ground_truth_items)) / len(ground_truth_items)

recall = recall_at_k(recommended_items_indices, ground_truth[0], k=5)
print(f"Recall@5 for user 0: {recall:.2f}")


Recall@5 for user 0: 1.00


NDCG@K (Normalized Discounted Cumulative Gain)

In [None]:
import numpy as np

def ndcg_at_k(recommended, ground_truth_items, k=5):
    recommended = recommended[:k]
    dcg = 0
    for i, item in enumerate(recommended):
        if item in ground_truth_items:
            dcg += 1 / np.log2(i + 2)  # +2 because index starts at 0
    # Ideal DCG
    idcg = sum(1 / np.log2(i + 2) for i in range(min(len(ground_truth_items), k)))
    return dcg / idcg if idcg > 0 else 0

ndcg = ndcg_at_k(recommended_items_indices, ground_truth[0], k=5)
print(f"NDCG@5 for user 0: {ndcg:.2f}")


NDCG@5 for user 0: 0.92


Evaluate for all *users*

In [None]:
k = 5
precisions, recalls, ndcgs = [], [], []

for user_id in range(num_users):
    recommended_indices = [int(f.split("_")[1]) for f, s in top_hybrid]  # reuse hybrid recommendation
    precisions.append(precision_at_k(recommended_indices, ground_truth[user_id], k))
    recalls.append(recall_at_k(recommended_indices, ground_truth[user_id], k))
    ndcgs.append(ndcg_at_k(recommended_indices, ground_truth[user_id], k))

print(f"Average Precision@{k}: {np.mean(precisions):.2f}")
print(f"Average Recall@{k}: {np.mean(recalls):.2f}")
print(f"Average NDCG@{k}: {np.mean(ndcgs):.2f}")


Average Precision@5: 0.44
Average Recall@5: 1.00
Average NDCG@5: 0.72
