In [7]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
import warnings
import torch
import random
import ast
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.neighbors import NearestNeighbors
from mlxtend.frequent_patterns import apriori, association_rules

warnings.filterwarnings('ignore')

In [8]:
fashion_df = pd.read_csv("../data/cleaned_data/fashion_dataset.csv")
fashion_df.reset_index()

Unnamed: 0,index,image_name,x_1,y_1,x_2,y_2,width,height,area,category_id,category_name,category_type,category_type_name,eval_status,positive_attributes,absolute_path,num_attributes
0,0,img/Sheer_Pleated-Front_Blouse/img_00000001.jpg,72,79,232,273,160,194,31040,3,Blouse,1,upper-body,train,"[717, 818]",../data/img/Sheer_Pleated-Front_Blouse/img_000...,2
1,1,img/Sheer_Pleated-Front_Blouse/img_00000002.jpg,67,59,155,161,88,102,8976,3,Blouse,1,upper-body,train,"[717, 818]",../data/img/Sheer_Pleated-Front_Blouse/img_000...,2
2,2,img/Sheer_Pleated-Front_Blouse/img_00000003.jpg,65,65,156,200,91,135,12285,3,Blouse,1,upper-body,val,"[141, 717, 837, 956]",../data/img/Sheer_Pleated-Front_Blouse/img_000...,4
3,3,img/Sheer_Pleated-Front_Blouse/img_00000004.jpg,51,62,167,182,116,120,13920,3,Blouse,1,upper-body,train,[716],../data/img/Sheer_Pleated-Front_Blouse/img_000...,1
4,4,img/Sheer_Pleated-Front_Blouse/img_00000005.jpg,46,88,166,262,120,174,20880,3,Blouse,1,upper-body,test,"[349, 405, 717, 810]",../data/img/Sheer_Pleated-Front_Blouse/img_000...,4
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
289207,289207,img/Paisley_Print_Babydoll_Dress/img_00000050.jpg,1,1,300,300,299,299,89401,41,Dress,3,full-body,train,"[30, 681, 682, 730]",../data/img/Paisley_Print_Babydoll_Dress/img_0...,4
289208,289208,img/Paisley_Print_Babydoll_Dress/img_00000051.jpg,14,58,225,277,211,219,46209,41,Dress,3,full-body,train,"[275, 681]",../data/img/Paisley_Print_Babydoll_Dress/img_0...,2
289209,289209,img/Paisley_Print_Babydoll_Dress/img_00000052.jpg,18,41,149,230,131,189,24759,41,Dress,3,full-body,train,"[30, 681]",../data/img/Paisley_Print_Babydoll_Dress/img_0...,2
289210,289210,img/Paisley_Print_Babydoll_Dress/img_00000053.jpg,75,47,220,300,145,253,36685,41,Dress,3,full-body,train,"[30, 681, 682, 730]",../data/img/Paisley_Print_Babydoll_Dress/img_0...,4


In [9]:
def load_attributes_cloth(file_path):
    with open(file_path, 'r') as f:
        lines = f.readlines()
    
    num_attributes = int(lines[0].strip())
    columns = lines[1].strip().split()
    
    data = []
    for i in range(2, len(lines)):
        line = lines[i].strip()
        parts = line.split()
        
        attr_type = int(parts[-1])
        attr_name = ' '.join(parts[:-1])
        
        attr_id = i - 2
        
        data.append([attr_id, attr_name, attr_type])
    
    attr_df = pd.DataFrame(data, columns=['attr_id', 'attr_name', 'attr_type'])
    attr_df['attr_type_name'] = attr_df['attr_type'].map({
        1: 'texture', 
        2: 'fabric', 
        3: 'shape', 
        4: 'part', 
        5: 'style'
    })
    
    return attr_df
attr_df = load_attributes_cloth("../data/Anno_coarse/list_attr_cloth.txt")
attr_id_to_name = dict(zip(attr_df['attr_id'].astype(str), attr_df['attr_name']))
attr_id_to_type = dict(zip(attr_df['attr_id'].astype(str), attr_df['attr_type_name']))

# 1. CNN Feature Extraction Functions

In [10]:
class FashionDataset(Dataset):
    def __init__(self, df, transform=None, crop=True):
        self.df = df
        self.transform = transform
        self.crop = crop

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]

        img_path = row['absolute_path']
        image = Image.open(img_path).convert('RGB')

        if self.crop and all(col in row.index for col in ['x_1', 'y_1', 'x_2', 'y_2']):
            x1, y1 = max(0, row['x_1']), max(0, row['y_1'])
            x2, y2 = min(image.width, row['x_2']), min(image.height, row['y_2'])

            if x2 > x1 and y2 > y1:
                image = image.crop((x1, y1, x2, y2))

        if self.transform:
            image = self.transform(image)

        sample = {
            'image': image,
            'category_id': row['category_id'] if 'category_id' in row else -1,
            'image_path': img_path,
            'index': idx
        }

        return sample

def set_transformations(mode='train'):
    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]

    if mode == 'train':
        # Augment data
        transform = transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(10),
            transforms.ColorJitter(brightness=0.1, contrast=0.1),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean, std)
        ])
    else:
        # validation and testing, no augmentation
        transform = transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean, std)
        ])
    return transform

In [20]:
def extract_features(model, dataloader):
    features = []
    image_paths = []
    category_ids = []
    indices = []
    attribute_lists = []
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    model.eval()

    with torch.no_grad():
        for batch in dataloader:
            image = batch['image'].to(device)

            output = model(images)
            batch_features = output.squeeze().cpu().numpy()

            if len(batch_features.shape) == 1:
                batch_features = batch_features.reshape(1,-1)

            features.append(batch_features)
            image_paths.extend(batch['image_path'])
            category_ids.extend(batch['category_id'].numpy())
            indices.extend(batch['index'].numpy())

            for attrs in batch['positive_attributes']:
                if isinstance(attrs, str):
                    try:
                        attrs_list = ast.literal_eval(attrs)
                        attribute_lists.append(attrs_list if isinstance(attrs_list, list) else [])
                    except:
                        all_attributes.append([])
                else:
                    attribute_lists.append([])

    features = np.vstack(features)
    return features, image_paths, category_ids, indices, attribute_lists

# 2. Association Rule Mining Functions

In [12]:
def create_association_rules(fashion_df, attr_df, min_support=0.01, min_confidence=0.5, min_lift=1.0):
    all_attributes = []
    for attrs in fashion_df['positive_attributes']:
        if isinstance(attrs, str):
            try:
                parsed = ast.literal_eval(attrs)
                all_attributes.append(parsed if isinstance(parsed, list) else [])
            except:
                stripped = attrs.strip('[]')
                if stripped:
                    items = [item.strip() for item in stripped.split(',')]
                    all_attributes.append([int(item) for item in items if item.isdigit()])
                else:
                    all_attributes.append([])
        elif isinstance(attrs, list):
            all_attributes.append(attrs)
        else:
            all_attributes.append([])
            
    attr_counts = {}
    for attr_list in all_attributes:
        for attr in attr_list:
            attr_id = str(attr)
            attr_counts[attr_id] = attr_counts.get(attr_id, 0) + 1

    # only use attributes that appear at least 5 times
    valid_attrs = {attr_id for attr_id, count in attr_counts.items()
                  if count >= 5 and attr_id in attr_df['attr_id'].astype(str).values}

    attribute_matrix = []
    for attr_list in all_attributes:
        item_attrs = {attr_id: 0 for attr_id in valid_attrs}
    
        for attr in attr_list:
            attr_id = str(attr)
            if attr_id in valid_attrs:
                item_attrs[attr_id] = 1
        attribute_matrix.append(item_attrs)
    attribute_matrix = pd.DataFrame(attribute_matrix)

    attr_mapping = {}
    for attr_id in valid_attrs:
        attr_name = attr_id_to_name.get(attr_id, "Unknown")
        attr_type = attr_id_to_type.get(attr_id, "Unknown")
        attr_mapping[attr_id] = f"{attr_name} ({attr_type})"

    frequent_itemsets = apriori(attribute_matrix, min_support=0.01, use_colnames=True, max_len=3)
    rules = association_rules(frequent_itemsets, metric="confidence", min_threshold=0.5)
    rules = rules[rules['lift'] >= 1.0]
    rules = rules.sort_values(['confidence', 'lift'], ascending=False)

    return rules, attr_mapping

In [13]:
def attributes_from_rules(query_attrs, rules, top_n=5):
    compl_attributes = []
    query_attr_set = set(query_attrs)

    # rules where antecedent is a subset of query attrs
    for _, rules in rules.iterrows():
        antecedent = set(rule['antecedents'])

        if antecedent.issubset(query_attr_set):
            # add consequents as complementary attrs
            for item in rule['consequents']:
                if item not in query_attr_set and item not in compl_attributes:
                    compl_attributes.append(item)

                    if len(compl_attributes) >= top_n:
                        return compl_attributes
    return compl_attributes

In [14]:
def attribute_similarity(item_attrs, query_attrs, compl_attrs, rule_weight=0.7):
    item_attrs_set = set(item_attrs)
    query_attrs_set = set(query_attrs)
    compl_attrs_set = set(compl_attrs)

    # Jaccard similarity
    matching_attrs = item_attrs_set.intersection(query_attrs_set)
    query_sim = len(matching_attrs) / max(1, len(query_attrs_set.union(item_attrs_set)))

    # if complementary, give it more weight
    matching_compl = item_attrs_set.intersection(compl_attrs_set)
    compl = len(matching_compl) / max(1, len(compl_attrs_set)) if compl_attrs_set else 0

    # combine score
    score = (rule_weight * query_sim) + ((1 - rule_weight) * compl)
    return score

# 3. Hybrid Recommendation Functions

In [None]:
def hybrid_recommendations(query_idx, fashion_df, features, indices, attribute_lists, rules, 
                           k=5, visual_weight=0.6, attr_weight=0.4):
    # extract features and attributes from query item
    query_feature = features[query_idx].reshape(1,-1)
    query_attrs = attribute_lists[query_idx]

    # get complementary attributes from generated association rules
    compl_attrs = attributes_from_rules(query_attrs, rules)

    # visual similar items
    knn = NearestNeighbors(metric='cosine')
    knn.fit(features)

    distances, neighbor_indices = knn.kneighbors(query_feature, n_neighbors=min(k*3, len(features)))

    distances = distances.flatten.tolist()
    neighbor_indices = neighbor_indices.flatten().tolist()

    # if query is in neighbors, remove (don't want to recommend query)
    if query_idx in neighbor_indices:
        idx = neighbor_indices.index(query_idx)
        neighbhor_indices.pop(idx)
        distances.pop(idx)

    attr_scores = []
    for idx in neighbor_indices:
        neighbor_attrs = attribute_lists[idx]
        attr_score = attribute_similarity(neighbor_attrs, query_attrs, compl_attrs)
        attr_scores.append(attr_score)

    # normalize scores
    max_distance = max(distances) if distances else 1
    norm_distances = [1 - (d / max_distance) for d in distances]

    max_attr_score = max(attr_scores) if attr_scores and max(attr_scores) > 0 else 1
    norm_attr_scores = [s / max_attr_score for s in attr_scores]

    combined_scores = [(visual_weight * dist + attr_weight * attr_score) for dist, attr_score 
                      in zip(norm_distances, norm_attr_scores)]

    # Sort scores and get the top k
    recommendations = sorted(zip(neighbor_indices, combined_scores), key=lambda x: x[1], reverse=True)[:k]

    recommendation_indices = [idx for idx, _ in recommendations]
    recommendation_scores = [score for _, score in recommendations]

    original_indices = [indices[idx] for idx in recommendation_indices]

    return original_indices, recommendation_scores
    

# 4. Run Recommendations

In [17]:
# set up datasets
transform = set_transformations(mode='val')
dataset = FashionDataset(fashion_df, transform=transform)
dataloader = DataLoader(dataset, batch_size=32, shuffle=False, num_workers=4)

In [18]:
# remove classification layer
model = models.resnet50(pretrained=True)
feature_model = nn.Sequential(*list(model.children())[:-1])

In [None]:
# extract features
features, image_paths, category_ids, indices, attribute_lists = extract_features(feature_model, dataloader)

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/anaconda3/lib/python3.12/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/lib/python3.12/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'FashionDataset' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>


In [None]:
# create association rules and get mapping for attributes
rules, attr_mapping = create_association_rules(fashion_df, attr_df)

In [None]:
sample_items = random.sample(range(len(indices)), 5)

In [None]:
results = []
for idx in sample_items:
    query_idx = idx
    query_original_idx = indices[query_idx]

    print(f"Item {query_idx}, Original Index: {query_original_idx}")
    print(f"Category: {fashion_df.iloc[query_original_idx]['category_name']}")

    # only get valid attributes
    query_attrs_list = fashion_df.iloc[query_original_idx]['positive_attributes']
    if isinstance(query_attrs_list, str):
        try:
            query_attrs = ast.literal_eval(query_attrs_list)
        except:
            query_attrs = []
    else:
        query_attrs = query_attrs_list if isinstance(query_attrs_list, list) else []

    rec_indices, rec_scores = hybrid_recommendations(query_idx, fashion_df, features, indices, 
                                                     attribute_lists, rules, k=5)
    print(f"Top 5 Recommendations for {query_idx}")
    for i, (rec_idx, score) in enumerate(zip(rec_indices, rec_scores)):
        rec_item = fashion_df.iloc[rec_idx]
        print(f"{i+1}. {rec_item['category_name']} - Score: {score:.4f})")

    results.append({
        'query_index': query_original_idx,
        'recommendations' : rec_indices,
        'scores': rec_scores
    })