## 1. Import libraries

In [2]:
import os
import json
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from sklearn.metrics.pairwise import cosine_similarity, euclidean_distances
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt

## 2. Prepair data

In [3]:
# paths to data
data_dir = 'Data/fashionIQ_dataset/fashionIQ_dataset'
image_dir = os.path.join(data_dir, 'images')
json_dir = os.path.join(data_dir, 'image_splits')

### 2.1 Init functions for prepair and pre-process data

In [8]:
# function to read data files

# read data in json file
def read_json(file_path):
    with open(file_path, 'r') as f:
        return json.load(f)

#  load image (path of image) from file json (follow cattegory)
def load_image_list(category, split):
    json_path = os.path.join(json_dir, f'split.{category}.{split}.json')
    image_list = read_json(json_path)
    return [os.path.join(image_dir, image_name + '.jpg') for image_name in image_list]

In [7]:
# function to preprocess data

# define transform for preprocess step
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# define class FashionIQDataset 
class FashionIQDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

### 2.2 Load and preprocess data

In [6]:
train_images_dress = load_image_list('dress', 'train')
val_images_dress = load_image_list('dress', 'val')
test_images_dress = load_image_list('dress', 'test')

# category - shirt
train_images_shirt = load_image_list('shirt', 'train')
val_images_shirt = load_image_list('shirt', 'val')
test_images_shirt = load_image_list('shirt', 'test')

# category - top&tee
train_images_toptee = load_image_list('toptee', 'train')
val_images_toptee = load_image_list('toptee', 'val')
test_images_toptee = load_image_list('toptee', 'test')

# save images
dress = train_images_dress + val_images_dress + test_images_dress
shirt = train_images_shirt + val_images_shirt + test_images_shirt
toptee = train_images_toptee + val_images_toptee + test_images_toptee



In [8]:
import cv2 as cv

def saveImage(category, new_path):
    index = 1
    for path in category:
        image = cv.imread(path)
        cv.imwrite(new_path + "_" + str(index) + ".jpg", image)
        index +=1
        
saveImage(dress, "Data/fashionIQ/dress/dress")
        
# saveImage(shirt, "Data/fashionIQ/shirt/shirt")
# saveImage(shirt, "Data/fashionIQ/top-tee/topntee")  

In [5]:
# load and split data in corresponding set 

# load images follow their category in three set: train, val and test
# category - dress
train_images_dress = load_image_list('dress', 'train')
val_images_dress = load_image_list('dress', 'val')
test_images_dress = load_image_list('dress', 'test')

# category - shirt
train_images_shirt = load_image_list('shirt', 'train')
val_images_shirt = load_image_list('shirt', 'val')
test_images_shirt = load_image_list('shirt', 'test')

# category - top&tee
train_images_toptee = load_image_list('toptee', 'train')
val_images_toptee = load_image_list('toptee', 'val')
test_images_toptee = load_image_list('toptee', 'test')

# combine all image (in each set) to make a dataset
train_images = train_images_dress + train_images_shirt + train_images_toptee
val_images = val_images_dress + val_images_shirt + val_images_toptee
test_images = test_images_dress + test_images_shirt + test_images_toptee

# label for images (0: dress, 1: shirt, 2: toptee)
train_labels = [0] * len(train_images_dress) + [1] * len(train_images_shirt) + [2] * len(train_images_toptee)
val_labels = [0] * len(val_images_dress) + [1] * len(val_images_shirt) + [2] * len(val_images_toptee)
test_labels = [0] * len(test_images_dress) + [1] * len(test_images_shirt) + [2] * len(test_images_toptee)

In [6]:
# pre-process data
train_dataset = FashionIQDataset(train_images, train_labels, transform=transform)
val_dataset = FashionIQDataset(val_images, val_labels, transform=transform)
test_dataset = FashionIQDataset(test_images, test_labels, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


## 3. ResNet50 Model

In [None]:
# define neccessary infor and ResNet50 model
model_dir = 'models'
num_classes = 3
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# init resnet50 model
model = models.resnet50(pretrained = True)
# modify the last layer to appropriate the number of class in the dataset
model.fc = nn.Linear(model.fc.in_features, num_classes)
# put model to device (GPU if has)
model = model.to(device)

# init criterion and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

### 3.1 Train model

In [None]:
model_dir = 'models'
num_classes = 3
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# init resnet50 model
model = models.resnet50(pretrained = True)
# modify the last layer to appropriate the number of class in the dataset
model.fc = nn.Linear(model.fc.in_features, num_classes)
# put model to device (GPU if has)
model = model.to(device)

# init criterion and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# init train & evaluate function 
total_loss, total_val_acc = []
def train(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=10):
    best_val_accuracy = 0.0
    for epoch in range(num_epochs):
        print(f"training on epoch {epoch+1} ...")
        model.train()
        running_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            # Xóa gradients của bộ tối ưu hóa
            optimizer.zero_grad()

            # Tiến hành dự đoán
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Lan truyền ngược và tối ưu hóa
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        # Đánh giá trên tập validation sau mỗi epoch
        val_accuracy = evaluate(model, val_loader, device)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}, Val Accuracy: {val_accuracy:.2f}%")
        
        total_loss.append(running_loss)
        total_val_acc(val_accuracy)
        
        # Lưu lại mô hình tốt nhất
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            model_save_path = os.path.join(model_dir, 'best_model.pth')
            torch.save(model.state_dict(), model_save_path)
            print("Saved best model to", model_save_path)

# Hàm đánh giá mô hình
def evaluate(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy

In [None]:
# train model
train(model, train_loader, val_loader, criterion, optimizer, device)

### 3.2 Use pre-train weight for model

In [None]:
# if device is GPU 
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_dir = 'models/best_model.pth'
num_classes = 3

# init resnet50 model as the way training
model = models.resnet50(pretrained=False)  # pretrained=False vì bạn sẽ tải trọng số của mình
model.fc = nn.Linear(model.fc.in_features, num_classes)

# load 
# model.load_state_dict(torch.load(model_dir))
model.load_state_dict(torch.load(model_dir))

model.to(device)  
# Chuyển mô hình về chế độ đánh giá (evaluation mode)
model.eval()

In [3]:
# if device is CPU
num_classes = 3
model_dir = 'models/best_model.pth'

# Tạo mô hình ResNet-50
model = models.resnet50(pretrained=False)
model.fc = nn.Linear(model.fc.in_features, num_classes)

# load weight and put on CPU
model.load_state_dict(torch.load(model_dir, map_location=torch.device('cpu')))

# Chuyển mô hình về chế độ đánh giá (evaluation mode)
model.eval()

  model.load_state_dict(torch.load(model_dir, map_location=torch.device('cpu')))


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

## 4. Extract feature

In [21]:
def extract_features_batchwise(loader, model, device, save_path_features, save_path_labels):
    model.eval()
    features_list = []
    labels_list = []
    
    os.makedirs(os.path.dirname(save_path_features), exist_ok=True)
    os.makedirs(os.path.dirname(save_path_labels), exist_ok=True)
    
    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(loader):
            images = images.to(device)
            output = model(images)
            output = output.view(output.size(0), -1)
            features_list.append(output.cpu().numpy())
            labels_list.append(labels.numpy())
            
            # save feature in batch to save resource
            if batch_idx % 10 == 0:  # save after 10 batch
                batch_features = np.concatenate(features_list, axis=0)
                batch_labels = np.concatenate(labels_list, axis=0)
                np.save(f'{save_path_features}_batch_{batch_idx}.npy', batch_features, allow_pickle=False)
                np.save(f'{save_path_labels}_batch_{batch_idx}.npy', batch_labels, allow_pickle=False)
                features_list = []
                labels_list = []
            
    if features_list:
        batch_features = np.concatenate(features_list, axis=0)
        batch_labels = np.concatenate(labels_list, axis=0)
        np.save(f'{save_path_features}_final.npy', batch_features, allow_pickle=False)
        np.save(f'{save_path_labels}_final.npy', batch_labels, allow_pickle=False)
        
        
def merge_npy_files(folder_path, file_pattern, output_file):
    sorted_files = sorted(
    [f for f in os.listdir(folder_path) if f.startswith(file_pattern + '_batch') and f.endswith('.npy')],
    key=lambda x: int(x.split('_')[-1].split('.')[0])
)
    sorted_files.append(file_pattern + '_final.npy')
    arrays = [np.load(os.path.join(folder_path, f)) for f in sorted_files]
    
    all_array = np.concatenate(arrays, axis=0)
    np.save(output_file, all_array, allow_pickle=False)
    print(f"Saved combined features to {output_file}")

### 4.1 Extract feature for all images in dataset

In [14]:
save_path_features = 'features/features/features'
save_path_labels = 'features/labels/labels'

all_dataset = FashionIQDataset(train_images + val_images + test_images, train_labels + val_labels + test_labels, transform=transform)
all_loader = DataLoader(all_dataset, batch_size=32, shuffle=False)
extract_features_batchwise(all_loader, model, device, save_path_features, save_path_labels)

merge_npy_files('features/labels', 'labels', 'features/all_labels.npy')
merge_npy_files('features/features', 'features', 'features/all_features.npy')

# load các file đặc trưng lên
all_features = np.load('features/all_features.npy')
all_labels = np.load('features/all_labels.npy')

### 4.2 Extract feature for test dataset

In [22]:
def calculate_query_accuracy(query_loader, feature_extractor, all_features, all_labels, transform, device, k=5, metric='cosine'):
    feature_extractor.eval()
    predictions = []
    true_labels = []

    with torch.no_grad():
        for images, labels in query_loader:
            images = images.to(device)
            labels = labels.numpy()
            output_features = feature_extractor(images)
            output_features = output_features.view(output_features.size(0), -1).cpu().numpy()

            # Tìm các ảnh gần nhất
            if metric == 'cosine':
                distances = cosine_similarity(output_features, all_features)
                distances = 1 - distances  # Chuyển đổi từ similarity sang distance
            elif metric == 'euclidean':
                distances = euclidean_distances(output_features, all_features)
            else:
                raise ValueError("Metric must be either 'cosine' or 'euclidean'")

            # Dự đoán lớp cho tất cả ảnh trong batch
            knn = NearestNeighbors(n_neighbors=k, metric=metric)
            knn.fit(all_features)
            _, indices = knn.kneighbors(output_features)

            for idx_list, true_label in zip(indices, labels):
                predicted_labels = [all_labels[idx] for idx in idx_list]
                most_common_label = np.bincount(predicted_labels).argmax()
                predictions.append(most_common_label)
                true_labels.append(true_label)

    return accuracy_score(true_labels, predictions)

In [None]:
# Tạo DataLoader cho tập kiểm tra
test_dataset = FashionIQDataset(test_images, test_labels, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Tính độ chính xác cho mô hình truy vấn
print("accuracy")
query_accuracy = calculate_query_accuracy(test_loader, model, all_features, all_labels, transform, device, k=5, metric='cosine')
print(f"Query Accuracy: {query_accuracy:.2f}")