In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torch.nn.functional as F
import torchvision.models as models
import torchvision
import os 
import pandas as pd
from skimage import io
from torch.utils.data import (
    Dataset,
    DataLoader,
)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# get the images
import zipfile
root_path = './'
with zipfile.ZipFile("/content/drive/MyDrive/data.zip","r") as zip_ref:
    zip_ref.extractall(root_path)

In [None]:
transform = transforms.Compose([ transforms.ToPILImage(),
                                  transforms.Resize(256),
                                  transforms.CenterCrop(224),
                                  transforms.ToTensor(),
                                  transforms.Normalize([0.485, 0.456, 0.406], 
                                                       [0.229, 0.224, 0.225])])

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class FashionDataset(Dataset):
  def __init__(self,csv_file,root_dir,transform):
    self.annotations = pd.read_csv(csv_file, header = 0)
    self.root_dir = root_dir
    self.transform = transform
    
    self.graphic_id_to_name = dict()
    self.graphic_name_to_id = dict()

    self.productGroup_id_to_name = dict()
    self.productGroup_name_to_id = dict()
    
    self.productType_id_to_name = dict()
    self.productType_name_to_id = dict()

  def __len__(self):
    return len(self.annotations)

  def __getitem__(self,index):
    image_path = os.path.join(self.root_dir, '0' + str(self.annotations.iloc[index,0]) + '.jpg')
    image = io.imread(image_path)

    self.annotations['graphical_appearance_name'] = self.annotations['graphical_appearance_name'].astype('category')
    self.annotations['product_group_name'] = self.annotations['product_group_name'].astype('category')
    self.annotations['product_type_name'] = self.annotations['product_type_name'].astype('category')

    self.graphic_id_to_name = dict( enumerate(self.annotations['graphical_appearance_name'].cat.categories ) )
    self.graphic_name_to_id = dict((v,k) for k,v in self.graphic_id_to_name.items())

    self.productGroup_id_to_name = dict( enumerate(self.annotations['product_group_name'].cat.categories ) )
    self.productGroup_name_to_id = dict((v,k) for k,v in self.productGroup_id_to_name.items())

    self.productType_id_to_name = dict( enumerate(self.annotations['product_type_name'].cat.categories ) )
    self.productType_name_to_id = dict((v,k) for k,v in self.productType_id_to_name.items())

    # To do changes
    self.productGroup_label = torch.tensor(self.productGroup_name_to_id[self.annotations.iloc[index,1]])
    self.graphic_label = torch.tensor(self.graphic_name_to_id[self.annotations.iloc[index,2]])
    self.productType_label = torch.tensor(self.productType_name_to_id[self.annotations.iloc[index,3]])

    if self.transform:
      image = self.transform(image)

    # return the image and all the associated labels
    dict_data = {
        'img': image,
        'labels': {
            'label_productGroup': self.productGroup_label,
            'label_graphic': self.graphic_label,
            'label_productType': self.productType_label
        }
    }
    return dict_data

In [None]:
#Hyperparameters
learning_rate = 1e-3
batch_size = 32
num_epochs = 10 

In [None]:
#Load dataset
dataset = FashionDataset(csv_file="articles_with_attributes_new.csv",root_dir="data", transform= transform)

train_size = int(0.1 * len(dataset))
test_size = int(0.1 * len(dataset))
validation_size = len(dataset) - (train_size + test_size)

train_set,test_set, validation_set = torch.utils.data.random_split(dataset,[train_size,test_size,validation_size])

train_loader = DataLoader(train_set,batch_size,shuffle=True,drop_last=True)
test_loader = DataLoader(test_set,batch_size,shuffle=True,drop_last=True)
validate_loader = DataLoader(validation_set,batch_size,shuffle=True,drop_last=True)

In [None]:
train_loader

In [None]:
class_to_idx = []
class_to_idx.append(dataset.productGroup_name_to_id)
class_to_idx.append(dataset.graphic_name_to_id)
class_to_idx.append(dataset.productType_name_to_id)

In [None]:
class_to_idx

In [None]:
class MultiOutputModel(nn.Module):
    def __init__(self, n_product_group_classes, n_graphic_classes, n_product_type_classes):
        super().__init__()
        self.resnet = models.resnet34(pretrained=True)
        self.model_wo_fc = nn.Sequential(*(list(self.resnet.children())[:-1]))

        # create separate classifiers for our outputs
        self.productGroup = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(in_features=512, out_features=n_product_group_classes)
        )
        self.graphic = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(in_features=512, out_features=n_graphic_classes)
        )
        self.productType = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(in_features=512, out_features=n_product_type_classes)
        )

    def forward(self, x):
        x = self.model_wo_fc(x)
        x = torch.flatten(x, 1)

        return {
            'productGroup': self.productGroup(x),
            'graphic': self.graphic(x),
            'productType': self.productType(x)
        }

In [None]:
col_names = ["article_id", "product_group_name", "graphical_appearance_name", "product_type_name"]
df = pd.read_csv('articles_with_attributes_new.csv',names = col_names, header= 0 )

In [None]:
df

In [None]:
df['product_group_name'].unique()

In [None]:
model = MultiOutputModel(n_product_group_classes=len(df['product_group_name'].unique()),
                             n_graphic_classes=len(df['graphical_appearance_name'].unique()),
                             n_product_type_classes=len(df['product_type_name'].unique())).to(device)

In [None]:
model

In [None]:
def criterion(loss_func,outputs,pictures):
  losses = 0
  for i, key in enumerate(outputs):
    losses += loss_func(outputs[key], pictures['labels'][f'label_{key}'].to(device))
  return losses

def training(model,device,lr_rate,epochs,train_loader):
  num_epochs = epochs
  losses = []
  checkpoint_losses = []

  optimizer = torch.optim.Adam(model.parameters(), lr=lr_rate)
  n_total_steps = len(train_loader)

  loss_func = nn.CrossEntropyLoss()

  for epoch in range(num_epochs):
     for i, pictures in enumerate(train_loader):
        images = pictures['img'].to(device)
        pictures = pictures

        outputs = model(images)

        loss = criterion(loss_func,outputs, pictures)
        losses.append(loss.item())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i+1) % (int(n_total_steps/1)) == 0:
            checkpoint_loss = torch.tensor(losses).mean().item()
            checkpoint_losses.append(checkpoint_loss)
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{n_total_steps}], Loss: {checkpoint_loss:.4f}')
  return checkpoint_losses

checkpoint_losses = training(model,device,0.0001,10,train_loader)

In [None]:
# Saving the model
torch.save(model.state_dict(), "PyTorch-MultiLabelClassification-resnet34.pth")

In [None]:
model = MultiOutputModel(n_product_group_classes=len(df['product_group_name'].unique()),
                             n_graphic_classes=len(df['graphical_appearance_name'].unique()),
                             n_product_type_classes=len(df['product_type_name'].unique())).to(device)

model.load_state_dict(torch.load("PyTorch-MultiLabelClassification-resnet34.pth"))
model.eval()

In [None]:
class color:
   RED = '\033[91m'
   BOLD = '\033[1m'
   END = '\033[0m'

In [None]:
def validation(model, dataloader, *args):

  all_predictions = torch.tensor([]).to(device)
  all_true_labels = torch.tensor([]).to(device)

  with torch.no_grad():
    n_correct = []
    n_class_correct = []
    n_class_samples = []
    n_samples = 0

    for arg in args:
      n_correct.append(len(arg))
      n_class_correct.append([0 for i in range(len(arg))])
      n_class_samples.append([0 for i in range(len(arg))])

    print("Length Of Dataloader",len(dataloader))
    
    for pictures in dataloader:
      images = pictures['img'].to(device)

      outputs = model(images)
      labels = [pictures['labels'][picture].to(device) for picture in pictures['labels']]

      for i,out in enumerate(outputs):

        _, predicted = torch.max(outputs[out],1)
        n_correct[i] += (predicted == labels[i]).sum().item()

        if i == 0:
          n_samples += labels[i].size(0)

        for k in range(32):
          # print("label",labels[i][k])
          # print("label[i]",labels[i])
          label = labels[i][k]
          pred = predicted[k]
          if (label == pred):
              n_class_correct[i][label] += 1
          n_class_samples[i][label] += 1
          
  return n_correct,n_samples,n_class_correct,n_class_samples

def class_acc(n_correct,n_samples,n_class_correct,n_class_samples,class_list):
    for i in range(len(class_list)):
      print("-------------------------------------------------")
      acc = 100.0 * n_correct[i] / n_samples
      print(color.BOLD + color.RED + f'Overall class performance: {round(acc,1)} %' + color.END)
      for k in range(len(class_list[i])):
          if int(n_class_samples[i][k]) == 0:
            acc = 100.0
          else:
            acc = 100.0 * n_class_correct[i][k] / n_class_samples[i][k]
            print(f'Accuracy of {class_list[i][k]}: {round(acc,1)} %')
    print("-------------------------------------------------")


classes_productGroup = list(dataset.productGroup_id_to_name.values())
classes_graphic = list(dataset.graphic_id_to_name.values())
classes_productType = list(dataset.productType_id_to_name.values())

class_list = [classes_productGroup,classes_graphic,classes_productType]

n_correct,n_samples,n_class_correct,n_class_samples = validation(model,test_loader,classes_productGroup,classes_graphic,classes_productType)

class_acc(n_correct,n_samples,n_class_correct,n_class_samples,class_list)

In [None]:
model

In [None]:
model._modules['model_wo_fc']._modules['8']

In [None]:
from PIL import Image

# Use the model object to select the desired layer
layer = model._modules['model_wo_fc']._modules['8']
print(layer)

# Set model to evaluation mode
model.eval()

transform = transforms.Compose([
                                  transforms.Resize(256),
                                  transforms.CenterCrop(224),
                                  transforms.ToTensor(),
                                  transforms.Normalize([0.485, 0.456, 0.406], 
                                                       [0.229, 0.224, 0.225])])

def get_vector(image):
    # Create a PyTorch tensor with the transformed image
    t_img = transform(image)
    # Create a vector of zeros that will hold our feature vector
    # The 'avgpool' layer has an output size of 512
    my_embedding = torch.zeros(512)

    # Define a function that will copy the output of a layer
    def copy_data(m, i, o):
        my_embedding.copy_(o.flatten())                 # <-- flatten

    # Attach that function to our selected layer
    h = layer.register_forward_hook(copy_data)
    # Run the model on our transformed image
    with torch.no_grad():                               # <-- no_grad context
        model(t_img.unsqueeze(0))                       # <-- unsqueeze
    # Detach our copy function from the layer
    h.remove()
    # Return the feature vector
    return my_embedding


In [None]:
def getEmbeddings(img_path):

  img = Image.open(img_path)
  embeddings = get_vector(img)

  return embeddings

In [None]:
model.cpu()

In [None]:

from multiprocessing import  Pool
import numpy as np

def parallelize_dataframe(df, func, n_cores=4):
    df_split = np.array_split(df, n_cores)
    pool = Pool(n_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

def getEmbeddings(img_path):

  image_path = os.path.join('data',img_path)
  img = Image.open(image_path)
  embeddings = get_vector(img)

  return embeddings

def getEmbeddingsOnDataset(df):
  df['Embeddings'] = df['image_path'].apply(lambda x: getEmbeddings(x))
  return df

train_df = pd.read_csv('articles_with_attributes_new.csv')
train_df['image_path'] = train_df['article_id'].apply(lambda x: '0' + str(x) + '.jpg')
train = parallelize_dataframe(train_df, getEmbeddingsOnDataset)

In [None]:
import pandas as pd
train.to_pickle('multiattribute_with_embeddings.pkl')    #to save the dataframe, df to 123.pkl
# train = pd.read_pickle('multiattribute_with_embeddings.pkl') #to load 123.pkl back to the dataframe df

In [None]:
train.to_csv('attribute_articles_with_embeddings.csv',index=False)

!cp -r 'attribute_articles_with_embeddings.csv' '/content/drive/MyDrive/'

In [None]:
!cp -r '/content/drive/MyDrive/attribute_articles_with_embeddings.csv' './'

In [None]:
train = pd.read_csv(r'attribute_articles_with_embeddings.csv')

In [None]:
!pip install annoy

In [None]:
from annoy import AnnoyIndex
f = len(train['Embeddings'][0])
t = AnnoyIndex(f, metric='euclidean')

ntree = 1000 # hyper-parameter, the more the number of trees better the prediction
for i, vector in enumerate(train['Embeddings']):
    t.add_item(i, vector)
_  = t.build(ntree)

In [None]:
import time
def get_similar_images_annoy(img_index):
    start = time.time()
    base_img_id, base_vector, productGroup_label, graphic_label, productType_label  = train.iloc[img_index, [0,5,1,2,3]]
    similar_img_ids = t.get_nns_by_item(img_index, 8)
    end = time.time()
    print(f'{(end - start) * 1000} ms')
    return base_img_id, productGroup_label, graphic_label, productType_label, train.iloc[similar_img_ids]

In [None]:
base_img_id, productGroup_label, graphic_label, productType_label, similar_images_df = get_similar_images_annoy(29187)

In [None]:
train[train['article_id'] == 693678001]

In [None]:
similar_images_df

In [None]:
!cp -r '/content/drive/MyDrive/multiattribute_with_embeddings.pkl' './'

In [None]:
import pandas as pd

train = pd.read_pickle('multiattribute_with_embeddings.pkl')

In [None]:
import time
def get_similar_images_annoy(img_index):
    start = time.time()
    base_img_id, base_vector, productGroup_label, graphic_label, productType_label  = train.iloc[img_index, [0,5,1,2,3]]
    similar_img_ids = t.get_nns_by_item(img_index, 8)
    end = time.time()
    print(f'{(end - start) * 1000} ms')
    return base_img_id, productGroup_label, graphic_label, productType_label, train.iloc[similar_img_ids]

In [None]:
from PIL import Image
import numpy as np

def process_image(image_path):
    ''' Scales, crops, and normalizes a PIL image for a PyTorch model,
        returns an Numpy array
    '''
    
    # Process a PIL image for use in a PyTorch model
    
    pil_image = Image.open(image_path)
    
    # Resize
    if pil_image.size[0] > pil_image.size[1]:
        pil_image.thumbnail((5000, 256))
    else:
        pil_image.thumbnail((256, 5000))
        
    # Crop 
    left_margin = (pil_image.width-224)/2
    bottom_margin = (pil_image.height-224)/2
    right_margin = left_margin + 224
    top_margin = bottom_margin + 224
    
    pil_image = pil_image.crop((left_margin, bottom_margin, right_margin, top_margin))
    
    # Normalize
    np_image = np.array(pil_image)/255
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    np_image = (np_image - mean) / std

    np_image = np_image.transpose((2, 0, 1))
    
    return np_image

In [None]:
# Implement the code to predict the class from an image file

def predict(image_path, model, topk=1):
    ''' Predict the class (or classes) of an image using a trained deep learning model.
    '''
    
    image = process_image(image_path)
    
    # Convert image to PyTorch tensor first
    image = torch.from_numpy(image).type(torch.cuda.FloatTensor)
    #print(image.shape)
    #print(type(image))
    
    # Returns a new tensor with a dimension of size one inserted at the specified position.
    image = image.unsqueeze(0)
    
    output = model.forward(image)
    
    attributes = []

    for i,label in enumerate(output):
      output_tensor = output.get(label)

      print("output_tensor", output_tensor)
      
      _,pred = torch.max(output_tensor, dim = 1)

      print("pred", pred)
    
      # # Probabilities and the indices of those probabilities corresponding to the classes
      # top_probabilities, top_indices = probabilities.topk(topk)
    
      # # Convert to lists
      # top_probabilities = top_probabilities.detach().type(torch.FloatTensor).numpy().tolist()[0] 
      # top_indices = top_indices.detach().type(torch.FloatTensor).numpy().tolist()[0] 
    
      # Convert topk_indices to the actual class labels using class_to_idx
      # Invert the dictionary so you get a mapping from index to class.
      
      idx_to_class = {value: key for key, value in class_to_idx[i].items()}
      
      # top_classes = [idx_to_class[index] for index in top_indices]

      class_label = idx_to_class[pred.detach().type(torch.FloatTensor).numpy()[0]]
      print("class_label", class_label)
      
      attributes.append(class_label)
    return attributes
    
classes = predict('0305304008.jpg', model)   
print(classes)