In [None]:
import numpy as np
import torch
from torch import nn
from torch.optim import AdamW
from tqdm import tqdm
import pandas as pd
from utils.utils import extract_category_by_level, mapping_categories_training
from transformers import AutoTokenizer, BertModel
import warnings
from tensorflow.keras.utils import to_categorical


warnings.filterwarnings("ignore")

In [None]:
df_data = pd.read_csv('../data/group_cate_item_mapped_mainword.csv', encoding='utf-16le')
df_standard_categories = pd.read_csv('../data/mindmap.csv', encoding='iso-8859-1')
col_train = "bez"
saved_name = "bert_full_item"

In [None]:
df_data.head(2)

In [None]:
df_standard_categories.head(2)

In [None]:
for col in df_standard_categories.columns:
    df_standard_categories[col] = df_standard_categories[col].str.lower()
df_data.dropna(subset=[col_train], inplace=True)
df_data['category'] = df_data['category'].str.lower()

In [None]:
categories = extract_category_by_level(df_standard_categories, category='f&b', level=2)

In [None]:
df_data['category_training'] = df_data['category'].apply(lambda x: mapping_categories_training(x, categories))

In [None]:
df_data = df_data[df_data['category_training'] != 'other']

In [None]:
df_data.head(2)

In [None]:
y = df_data['category_training']
X_text = df_data[col_train]

In [None]:
class Create_Model(nn.Module):
    def __init__(self, pretrained_text_model, n_classes):
        super(Create_Model, self).__init__()
        self.pretrained_text_model = pretrained_text_model

        self.dropout = nn.Dropout(0.1)
        self.relu = nn.ReLU()
        self.linear_text = nn.Linear(768, 512)  # Linear layer after BERT
        self.output = nn.Linear(512, n_classes)  # Final output layer

    def forward(self, input_id, attention_mask):
        # Assuming that 'input_id' and 'attention_mask' are provided as arguments
        train_input = {'input_ids': input_id, 'attention_mask': attention_mask}

        # Forward pass through the pre-trained model
        embedding_text = self.pretrained_text_model(**train_input, return_dict=False)[1]

        # Forward pass through additional layers
        x = self.linear_text(embedding_text)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.output(x)

        return x  # Removed softmax here; it'll be applied in CrossEntropyLoss

## Train from checkpoint

In [None]:
checkpoint_path = '../model/bert_full_item_0.788.pt'

with open(f'../model/{saved_name}_id2label.txt', 'r') as f:
    id2label = eval(f.read())
    
label2id = {k: v for v, k in id2label.items()}
y_labels = to_categorical([label2id[i] for i in y], dtype="uint8")

model = torch.load(checkpoint_path)
max_acc = 0.788

## Train from scratch

In [None]:
# id2label = {v: k for v, k in enumerate(y.unique())}
# label2id = {k: v for v, k in enumerate(y.unique())}
# y_labels = to_categorical([label2id[i] for i in y], dtype="uint8")
# 
# with open(f'../model/{saved_name}_id2label.txt', 'w') as f:
#     f.write(str(id2label))
# 
# pretrain_model = BertModel.from_pretrained("bert-base-cased")
# 
# # for param in pretrain_model.parameters():
# #     param.requires_grad = False
# 
# # Freeze layers: Only leave the last 6 layers to be trainable
# for param in list(pretrain_model.parameters())[:-6]:
#     param.requires_grad = False
# 
# model = Create_Model(pretrain_model, n_classes=len(label2id))
# max_acc = 0

In [None]:
length_train_idx = int(len(df_data) * 0.8)

X_train = X_text
X_test = X_text.loc[length_train_idx:]

y_train = y_labels
y_test = y_labels[length_train_idx:]

In [None]:
y_labels.shape

In [None]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, texts_data, labels):
        self.tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
        self.labels = labels  #y_labels
        self.texts = list(np.array(texts_data))

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        y = self.labels[idx]
        x_text = self.tokenizer(self.texts[idx], padding='max_length', max_length=512, truncation=True,
                                return_tensors="pt")
        return x_text, y

In [None]:
def train(model, X_train, X_test, y_train, y_test, max_acc, learning_rate, batch_size, epochs,
          device):
    train, val = Dataset(X_train, y_train), Dataset(X_test, y_test)
    train_dataloader = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True)
    val_dataloader = torch.utils.data.DataLoader(val, batch_size=batch_size)

    criterion = nn.CrossEntropyLoss()
    criterion = criterion.to(device)

    optimizer = AdamW(model.parameters(), lr=learning_rate)

    model.to(device)

    
    for epoch_num in range(epochs):
        model.train()
        total_acc_train = 0
        total_loss_train = 0
        total_batch_train = 0

        for train_input, train_label in tqdm(train_dataloader):
            optimizer.zero_grad()
            total_batch_train += 1

            # Move labels to the device and convert from one-hot to class indices
            train_label = train_label.to(device)
            train_label_indices = torch.argmax(train_label, dim=1)

            attention_mask = train_input['attention_mask'].to(device)
            input_id = train_input['input_ids'].squeeze(1).to(device)

            output = model(input_id, attention_mask)

            # Using class indices in CrossEntropyLoss
            batch_loss = criterion(output, train_label_indices)
            total_loss_train += batch_loss.item()
            batch_loss.backward()

            # output = torch.nn.functional.softmax(output, dim=-1)
            acc = ((output.argmax(-1) == train_label.argmax(-1)).sum()) / len(train_label)
            total_acc_train += acc.item()
            optimizer.step()

        # model.eval()
        # total_acc_val = 0
        # total_loss_val = 0
        # total_batch_val = 0
        # for val_input, val_label in tqdm(val_dataloader):
        #     with torch.no_grad():
        #         total_batch_val += 1
        # 
        #         # Move labels to the device and convert from one-hot to class indices
        #         val_label = val_label.to(device)
        #         val_label_indices = torch.argmax(val_label, dim=1)
        # 
        #         attention_mask = val_input['attention_mask'].to(device)
        #         input_id = val_input['input_ids'].squeeze(1).to(device)
        # 
        #         output = model(input_id, attention_mask)
        # 
        #         # Using class indices in CrossEntropyLoss
        #         batch_loss = criterion(output, val_label_indices)
        #         total_loss_val += batch_loss.item()
        # 
        #         # output = torch.nn.functional.softmax(output, dim=-1)
        # 
        #         acc = ((output.argmax(-1) == val_label.argmax(-1)).sum()) / len(val_label)
        #         total_acc_val += acc.item()
        # 
        # if total_acc_val / (total_batch_val) > max_acc:
        #     torch.save(model, f'../model/bert_full_item_model_{total_acc_val / (total_batch_val):.3f}.pt')
        #     max_acc = total_acc_val / total_batch_val
        # 
        # print(
        #     f'Epochs:{epoch_num + 1} | Train Loss:{total_loss_train / (total_batch_train):.3f} | Train Accuracy:{total_acc_train / (total_batch_train):.3f} | Val Loss:{total_loss_val / (total_batch_val):.3f} | Val Accuracy:{total_acc_val / (total_batch_val):.3f}')
        if total_acc_train / (total_batch_train) > max_acc:
            torch.save(model, f'../model/{saved_name}_{total_acc_train / (total_batch_train):.3f}.pt')
            max_acc = total_acc_train / total_batch_train

        print(
            f'Epochs:{epoch_num + 1} | Train Loss:{total_loss_train / (total_batch_train):.3f} | Train Accuracy:{total_acc_train / (total_batch_train):.3f}')


In [None]:
BATCH_SIZE = 64
EPOCHS = 100
LR = 0.0001
DEVICE = 'cuda:0'

train(model, X_train, X_test, y_train, y_test, max_acc, LR, BATCH_SIZE, EPOCHS, DEVICE)

# Test

In [None]:
df_data = pd.read_csv('../data/group_cate_item_mapped_mainword.csv', encoding='utf-16le')
df_standard_categories = pd.read_csv('../data/mindmap.csv', encoding='iso-8859-1')

for col in df_standard_categories.columns:
    df_standard_categories[col] = df_standard_categories[col].str.lower()
df_data.dropna(subset=[col_train], inplace=True)
df_data['category'] = df_data['category'].str.lower()
categories = extract_category_by_level(df_standard_categories, category='f&b', level=2)
df_data['category_training'] = df_data['category'].apply(lambda x: mapping_categories_training(x, categories))
df_data = df_data.sample(frac=1).reset_index(drop=True)

with open(f'../model/{saved_name}_id2label.txt', 'r') as f:
    id2label = eval(f.read())

In [None]:
class Create_Model(nn.Module):
    def __init__(self, pretrained_text_model, n_classes):
        super(Create_Model, self).__init__()
        self.pretrained_text_model = pretrained_text_model

        self.dropout = nn.Dropout(0.1)
        self.relu = nn.ReLU()
        self.linear_text = nn.Linear(768, 512)  # Linear layer after BERT
        self.output = nn.Linear(512, n_classes)  # Final output layer

    def forward(self, input_id, attention_mask):
        # Assuming that 'input_id' and 'attention_mask' are provided as arguments
        train_input = {'input_ids': input_id, 'attention_mask': attention_mask}

        # Forward pass through the pre-trained model
        embedding_text = self.pretrained_text_model(**train_input, return_dict=False)[1]

        # Forward pass through additional layers
        x = self.linear_text(embedding_text)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.output(x)

        return x  # Removed softmax here; it'll be applied in CrossEntropyLoss


In [None]:
device = "cuda:0"
model = torch.load('../model/bert_full_item_model_0.870.pt')
model.to(device)

In [None]:
df_data.head(2)

In [None]:
bez_test = """abcde 12n 12kg"""

tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
test_token = tokenizer([bez_test], padding='max_length', max_length=512, truncation=True, return_tensors="pt")
input_id = test_token['input_ids'].squeeze(1).to(device)
attention_mask = test_token['attention_mask'].to(device)
output = model(input_id, attention_mask=attention_mask)
output_proba = torch.nn.functional.softmax(output, dim=-1)
output = output_proba.argmax(-1)[0].cpu().numpy().tolist()
"""print output_proba with class name"""
proba_format = {id2label[i]: proba for i, proba in enumerate(output_proba[0].cpu().detach().numpy().tolist())}
print(proba_format)
print("Class:", id2label[output])

## Evaluate

In [None]:
# def evaluate(model, X_text_test, X_num_test, y_test, batch_size, device):
#     val = Dataset(X_text_test, X_num_test, y_test)
#     val_dataloader = torch.utils.data.DataLoader(val, batch_size=batch_size)
#     model = model.to(device)
#     model.eval()
#     prediction = []
#     for val_input, val_label in tqdm(val_dataloader):
#         with torch.no_grad():
#             val_label = val_label.to(device)
#             attention_mask = val_input['attention_mask'].to(device)
#             input_id = val_input['input_ids'].squeeze(1).to(device)
#             output = model(input_id, attention_mask=attention_mask)
#             output = torch.nn.functional.softmax(output, dim=-1)
#             output = output.argmax(-1)[0].cpu().numpy().tolist()
#             prediction.append(output)
#     return prediction
# 
# prediction = evaluate(model, X_test, None, y_test, batch_size=1, device=device)
# from sklearn.metrics import classification_report
# print(classification_report(y_test.argmax(-1), prediction))