In [None]:
BATCH_SIZE=128#  8 for 13.8 gb usage, 6 for less than 12 gb usage
NUM_CLASSES=27
SEED=42

CLIP_FEATURE_SIZE=768

#testing
# train_percentage=0.001
# valid_percentage=0.001

train_percentage=0.8
valid_percentage=0.1
#(test_percentage takes the rest)

use_cropped=True

google_colab=False
force_cpu=False


TEXT_EMBEDDING_DIM=300 #cannot be changed or retrain the text model
VOCAB_SIZE=67465


In [None]:
images_name="cropped_images" if use_cropped else "images"


In [None]:
%pip install transformers pandas tqdm scikit-learn imageio matplotlib wget plotly dash unidecode tensorflow

In [None]:
from transformers import CLIPProcessor, CLIPModel,CLIPFeatureExtractor
import pandas as pd
import re
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import Dataset, DataLoader
from torch.optim import lr_scheduler
import torchvision

import imageio

from tqdm.notebook import tqdm
from sklearn.preprocessing import OneHotEncoder

from sklearn.metrics import f1_score

import zipfile
import os
import copy

import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px

from datetime import datetime
import unidecode
from PIL import Image

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences




In [None]:
if google_colab:
    # mount the drive where your dataset is availabledevice = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    from google.colab import drive
    drive.mount('/content/drive')
    filepath='/content/drive/MyDrive/datasets/multimodal_product_classification/' # add your own path. Where to save the dataset

    if not os.path.exists('datasets'):
        os.makedirs('datasets')
        with zipfile.ZipFile(filepath+images_name+'.zip', 'r') as zip_ref:
            zip_ref.extractall('datasets')

    datasets_path="/content/datasets/"
    save_directory="/content/drive/MyDrive/Lessons/Models/multimodal_classification/"

else:
    import wget
    if not os.path.exists('datasets'):
        os.makedirs('datasets')
        output_directory="datasets"
        csv_zip = wget.download("https://nextcloud.its-tps.fr/s/BTpB4SC93NreZxg/download/csv_data.zip",out=output_directory)
        if use_cropped:
            images_zip=wget.download("https://nextcloud.its-tps.fr/s/8dZMpfpDNnpaZ5P/download/cropped_images.zip",out=output_directory)
        else:
            images_zip=wget.download("https://nextcloud.its-tps.fr/s/fgBxQczEAZ7ws8J/download/images.zip",out=output_directory)
        with zipfile.ZipFile(output_directory+'/csv_data.zip', 'r') as zip_ref:
            zip_ref.extractall('datasets')
        with zipfile.ZipFile(output_directory+'/'+images_name+'.zip', 'r') as zip_ref:
            zip_ref.extractall('datasets')
    filepath=os.getcwd()+'/datasets/'
    save_directory='../models/'
    datasets_path=filepath


In [None]:
if force_cpu:
    device = torch.device("cpu")
else:
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


In [None]:
# Load data
X_train = pd.read_csv(filepath+'X_train.csv').fillna("")
y_train = pd.read_csv(filepath+'Y_train.csv').fillna("")
X_train=X_train.drop(columns="Unnamed: 0")
y_train=y_train.drop(columns="Unnamed: 0")


# Cleaning and Preprocessing Text
CLEANR = re.compile('<.*?>') # delete html tag
def clean_html(raw_html):
  cleantext = re.sub(CLEANR, '', raw_html)
  return cleantext

def clean_text(text):
    text=clean_html(text)
    # Remove special characters and numbers
    text = unidecode.unidecode(text)
    text = re.sub(r'[^a-zA-ZäöüßÄÖÜ ]', '', text)
    # Convert text to lowercase
    text = text.lower()
    return text
    
# Apply cleaning function to the 'designation' column
X_train['designation'] = X_train['designation'].fillna('').apply(clean_text)
X_train['description'] = X_train['description'].fillna('').apply(clean_text)



In [None]:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(X_train['designation'])
vocab_size = len(tokenizer.word_index) + 1
print(vocab_size)# nearly 70 000 of vocab size, it seems too much

In [None]:
encoded_df=pd.get_dummies(y_train, columns=['prdtypecode'])
y_train_categorical = encoded_df.values.tolist()
length=len(y_train_categorical)
y_train_one_hot=pd.DataFrame(np.zeros((length,1),dtype=list),columns=["labels"])
for index,row in tqdm(enumerate(y_train_categorical)):
    y_train_one_hot.loc[index, "labels"]=row
train_df=pd.concat([X_train,y_train_one_hot],axis=1)
y_train_out=np.argmax(y_train_categorical,1)
y_train_out
label_dict={}
i=0
while len(label_dict)<27:
    key=y_train_out[i]
    if key not in label_dict.keys():
        value=y_train['prdtypecode'][i]
        label_dict[key] = value
    i+=1




In [None]:
class ImageTextDataLoader(Dataset):
    """Title, Description and Image dataset."""

    def __init__(self, dataframe, image_dir):
        """
        Arguments:
            csv_file (string): Path to the csv file with annotations.
            image_dir (string): Directory with all the images.
        """
        self.df = dataframe
        self.image_dir = image_dir

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        selected_df=self.df.iloc[idx]

        image_name="image_"+str(selected_df["imageid"])+"_product_"+str(selected_df["productid"])+".jpg"
        filepath=os.path.join(self.image_dir,image_name)
        image_arr = Image.open(filepath)
        image_arr = image_arr.resize((224, 224))
        image_arr=np.array(image_arr)
       



        designation_text=selected_df['designation']
        description_text=selected_df['description']
        if len(description_text)>10:
            description_sequences = tokenizer.texts_to_sequences([description_text])
            # Padding to max length of text
            description = pad_sequences(description_sequences, maxlen=34)

        else :description=np.zeros((1,34))
        description=torch.from_numpy(description).to(device, dtype=int)

        designation_sequences = tokenizer.texts_to_sequences([designation_text])
        # Padding to max length of text
    
        designation =pad_sequences(designation_sequences, maxlen=34)
        designation = torch.from_numpy(designation).to(device, dtype=int)
        label=torch.tensor(selected_df['labels'], dtype=torch.float,device=device)
      
        return [designation,description,image_arr,label]


In [None]:
if use_cropped==True:
    dataset=ImageTextDataLoader(train_df,datasets_path+"/cropped_train")
else:
    dataset=ImageTextDataLoader(train_df,datasets_path+"/images/image_train")

In [None]:
train_size = int(train_percentage * len(dataset))
valid_size = int(valid_percentage * len(dataset))
test_size = len(dataset) - train_size - valid_size

generator = torch.Generator().manual_seed(SEED)

train_dataset, valid_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, valid_size, test_size],generator=generator)
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_dataloader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)


dataloaders={
    'train':train_dataloader,
    'val':valid_dataloader,
    'test':test_dataloader
    }


In [None]:

# Define the model
class Text_model(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_classes):
        super(Text_model, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.conv_blocks = nn.ModuleList([
            nn.Conv2d(1, 512, (i, embedding_dim), padding=(0, 0))
            for i in range(1,7)
        ])
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(512 * 6, CLIP_FEATURE_SIZE)
        self.classif=nn.Linear(CLIP_FEATURE_SIZE,num_classes)

    def forward(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)  # Add channel dimension for Conv2d
        conv_outputs = [nn.functional.relu(conv_block(x)).max(dim=3)[0].max(dim=2)[0] for conv_block in self.conv_blocks]# [0] to get only the values and not the indices ( in pos 1 )
        x = torch.cat(conv_outputs, dim=1)
        # Dense Layer

        # Flatten Layer
        x = x.view(x.size(0), -1)
        # Dropout Layer
        x = self.dropout(x)

        x = self.fc(x)
        x = self.classif(x)
        return x


In [None]:
text_model = Text_model(VOCAB_SIZE, TEXT_EMBEDDING_DIM, NUM_CLASSES)
saved_state_dict = torch.load("/home/onyxia/work/multimodal_product_data_classification/models/Text_model_val_f1_0.747_epoch6.ckpt")
text_model.load_state_dict(saved_state_dict)

In [None]:
text_model.classif = nn.Sequential()#delete the classification head
text_model.to(device)
text_model.eval()

In [None]:
Clip_model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14-336").to(device)
Clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14-336")

In [None]:
def get_images_features(images):
    inputs = Clip_processor(images=images, return_tensors="pt").to(device)
    image_features = Clip_model.get_image_features(**inputs).to(device)
    del inputs
    return(image_features)

In [None]:
class ClassificationHead(nn.Module):
   def __init__(self, input_dim, num_classes):
       super(ClassificationHead, self).__init__()

       self.head=nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(input_dim,128),
            nn.ReLU(),
            nn.Linear(128,num_classes),
       )

   def forward(self, x):
       x = self.head(x)
       return (x)


In [None]:
model=ClassificationHead(CLIP_FEATURE_SIZE*2,NUM_CLASSES).to(device)
model.load_state_dict(torch.load("/home/onyxia/work/multimodal_product_data_classification/models/CLIP_model_val_f1_0.747_epoch6.ckpt"))
criterion = torch.nn.CrossEntropyLoss()

# All parameters are being optimized
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)


In [None]:
def calculate_weighted_f1_score(y_true, y_pred):
  return f1_score(y_true, y_pred, average='weighted')

In [None]:
@torch.no_grad
def eval_model(model,eval_dataloader):
    preds_list=[]
    model.eval()
    # Iterate over data.
    loop_on_eval_dataloader=tqdm(eval_dataloader,position=1,leave=False,ncols=800)
    for designation,description,image_arr,labels in loop_on_eval_dataloader:
        description_features=[]
        with torch.no_grad():
            for i in range(len(description)):
            #get the description that are not null
                if torch.count_nonzero(description[i])>0:
                    description_feature=text_model(description[i])
                    description_features.append(description_feature.T)
                else:
                    description_features.append([])
            images_features=get_images_features(image_arr).unsqueeze(2)
            designation_features=text_model(designation.squeeze(1)).unsqueeze(2)

            if len(description_features[0])>0:
                designation_features[0]=(designation_features[0]+description_features[0])/2


            #HERE implement fusion model of designation, description and image_arr
            input_features=torch.cat((images_features[0] , designation_features[0])).unsqueeze(0)

            for i in range(1,images_features.size(dim=0)):
                # Perform the multiplication and append the result to the results array
                if len(description_features[i])>0:

                    designation_features[i]=(designation_features[i]+description_features[i])/2

                input_feature=torch.cat((images_features[i] , designation_features[i]))
                input_features = torch.cat((input_features, input_feature.unsqueeze(0)), dim=0)
            input_features=input_features.squeeze(2)

            preds = model(input_features)
            preds_list.append(preds)

    return preds_list

In [None]:
# Load data
X_eval = pd.read_csv(filepath+'X_test.csv').fillna("")
X_eval=X_eval.drop(columns="Unnamed: 0")
X_eval['designation'] = X_eval['designation'].fillna('').apply(clean_text)
X_eval['description'] = X_eval['description'].fillna('').apply(clean_text)

X_eval["labels"]=np.zeros((len(X_eval["designation"]),1))

In [None]:
eval_dataset=ImageTextDataLoader(X_eval,datasets_path+"/images/image_test")
print(len(eval_dataset))
eval_dataloader = DataLoader(eval_dataset, batch_size=BATCH_SIZE, shuffle=False)
