In [None]:
import torch
torch.cuda.get_device_name()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Download and install the requirements(restart runtime if using Google Colab)

In [None]:
! pip install transformers sentencepiece
! pip install polyglot langdetect pyicu pycld2 morfessor pyenchant
! pip install indic-nlp-library
! sudo apt-get install libenchant1c2a

# Importing stock ml libraries

In [None]:
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn import metrics
import transformers
import torch
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoModel

# Setting up the device for GPU usage

In [None]:
from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'
device

# Choose language 

In [None]:
language = input('Choose language: 1 for tamil, 2 for malayalam, 3 for kannada: ')
class_list = ['Not_offensive',
 'Offensive_Targeted_Insult_Group',
 'Offensive_Targeted_Insult_Individual',
 'Offensive_Targeted_Insult_Other',
 'Offensive_Untargetede']

train_file_name = None
dev_file_name = None

if language == '1': 
    language = 'Tamil'
    lang = 'ta'
    class_list.append('not-Tamil')
    train_file_name = 'tamil_offensive_full_train.xlsx'
    dev_file_name = 'tamil_offensive_full_dev.xlsx'
    test_file_name = 'tamil_offensive_full_test_with_labels.xlsx'
elif language == '2': 
    language = 'malayalam'
    lang = 'ml'
    # As it contains only 5 classes ('Offensive_Targeted_Insult_Other' is not present)
    class_list.append('not-malayalam')
    class_list.remove('Offensive_Targeted_Insult_Other')

    train_file_name = 'mal_full_offensive_train.xlsx'
    dev_file_name = 'mal_full_offensive_dev.xlsx'
    test_file_name = 'mal_full_offensive_test_with_labels.xlsx'
elif language == '3': 
    language = 'Kannada'
    lang = 'kn'
    class_list.append('not-Kannada')
    train_file_name = 'kannada_offensive_train.xlsx'
    dev_file_name = 'kannada_offensive_dev.xlsx'
    test_file_name = 'kannada_offensive_test_with_labels.xlsx'


# Load data into dataframes

In [None]:
choose_not_class = input(f"Do you want to keep the not-{language} class: y or n: ")
file_path = input("Enter folder path: ")
train_file_name = file_path+"/"+train_file_name
dev_file_name = file_path+"/"+dev_file_name
test_file_name = file_path+"/"+test_file_name

if choose_not_class.lower() == 'y':
    choose_not_class = True
else:
    choose_not_class = False


train_df = pd.read_excel(train_file_name, header=None)
train_df.columns = ['Input', 'Label']
train_df = train_df.dropna().drop_duplicates().reset_index(drop=True, inplace=False)

dev_df = pd.read_excel(dev_file_name, header=None)
dev_df.columns = ['Input', 'Label']

test_df = pd.read_excel(test_file_name, header=None, engine='openpyxl')
test_df.columns = ['Input', 'Label']


if not choose_not_class:
    train_df = train_df[train_df['Label'] != f'not-{language}']
    dev_df = dev_df[dev_df['Label'] != f'not-{language}']
    test_df = test_df[test_df['Label'] != f'not-{language}']

    if f'not-{language}' in class_list:
        class_list.remove(f'not-{language}')

# Labels mapped to integers
train_df['Label'] = train_df.apply(lambda x:  class_list.index(x['Label']),axis=1)
dev_df['Label'] = dev_df.apply(lambda x:  class_list.index(x['Label']),axis=1)
test_df['Label'] = test_df.apply(lambda x:  class_list.index(x['Label']),axis=1)


print(f'Number of examples in the train set: {train_df.shape[0]}')
print(f'Number of examples in the validation set: {dev_df.shape[0]}')
print(f'Number of examples in the test set: {test_df.shape[0]}')

# How sample data looks like.

In [None]:
train_df.head()

In [None]:
lang

# Create class to index maps for each language

In [None]:
dict_ = None

if lang == "kn":
    dict_ = {
        'Not_offensive': 0,
        'Offensive_Targeted_Insult_Individual': 1,
        'not-Kannada': 2,
        'Offensive_Targeted_Insult_Group': 3,
        'Offensive_Untargetede': 4,
        'Offensive_Targeted_Insult_Other': 5
    }

elif lang == "ta":
    dict_ = {
        'Not_offensive': 0,
        'Offensive_Targeted_Insult_Individual': 1,
        'not-Tamil': 2,
        'Offensive_Targeted_Insult_Group': 3,
        'Offensive_Untargetede': 4,
        'Offensive_Targeted_Insult_Other': 5
    }

elif lang == "ml":
    dict_ = {
        'Not_offensive': 0,
        'Offensive_Targeted_Insult_Individual': 1,
        'not-malayalam': 2,
        'Offensive_Targeted_Insult_Group': 3,
        'Offensive_Untargetede': 4,
    }
    
n_classes = len(dict_.keys())

In [None]:
## used by cmi
from IPython.utils import io
import polyglot
from polyglot.text import Text, Word
import nltk
from nltk.tokenize import word_tokenize
nltk.download('punkt')
import warnings
import enchant
from enchant.checker import SpellChecker
import re
from indicnlp.tokenize.indic_tokenize import trivial_tokenize

chkr = SpellChecker("en_US")
d = enchant.Dict("en_US")

def parse_input(text, base_lang):
    base_lang_regex_map = {
        "ml": '0d[0-7][0-9A-F]',
        "ta": '0b[89A-F][0-9A-F]',
        "kn": '0c[89A-F][0-9A-F]'
    }

    # tokenizer = nltk.tokenize.TweetTokenizer()
    # sentence_list = tokenizer.tokenize(text)

    sentence_list = trivial_tokenize(text)

    labelled_input = []
    
    for word in sentence_list:
        # word = word.encode('ascii', 'ignore').decode('ascii')
        x = Text(word)
        chkr.set_text(word)

        flag  = 0
        # check for native language using utf-8 codes
        # logic: if the word is in native script then the language is native becasue no one writes english in native script
        if all([re.search(base_lang_regex_map[base_lang], "%04x"%(ord(char))) for char in word]):
            labelled_input.append(word + f'/{base_lang}')
            flag = 1
        
        # check if language is english
        # logic: if the word is in roman script and it is a proper english word (when we check with oxford), it is obviously english
        elif word.isalpha():
            if d.check(word): 
                labelled_input.append(word + f'/en')
                flag = 1
            # else:
            #     try:
            #         # inspired by: https://stackoverflow.com/questions/31026394/how-to-correct-text-and-return-the-corrected-text-automatically-with-pyenchant
            #         if d.check(next(chkr).suggest()[0]):
            #             labelled_input.append(word + f'/en')
            #             flag = 1
            #     except Exception as exc:
            #         print(exc)
                    
        # use the polyglot library
        if not flag:
            if x.language.code == base_lang:
                labelled_input.append(word + f'/{base_lang}')
            elif x.language.code == 'en':
                labelled_input.append(word + f'/en')
            else:
                labelled_input.append(word + f'/O')

    labelled_input = ' '.join(labelled_input)

    return labelled_input

In [None]:
## used by cmi
%%capture
import copy

processed_df = train_df.copy(deep=True)
processed_df['Input'] = processed_df['Input'].apply(str)
processed_df['Input'] = processed_df['Input'].apply(lambda x: parse_input(x, lang))

In [None]:
%%capture
val_processed_df = dev_df.copy(deep=True)
val_processed_df['Input'] = val_processed_df['Input'].apply(str)
val_processed_df['Input'] = val_processed_df['Input'].apply(lambda x: parse_input(x, lang))

In [None]:
%%capture
test_processed_df = test_df.copy(deep=True)
test_processed_df['Input'] = test_processed_df['Input'].apply(str)
test_processed_df['Input'] = test_processed_df['Input'].apply(lambda x: parse_input(x, lang))

In [None]:
train_df.head(10)

In [None]:
processed_df.head(10)

In [None]:
def to_str(text):
    tt = text['Input']
    if not isinstance(tt, str):
        tt = str(tt)
    return pd.Series([tt, text['Label']])

# Sections of config - Defining some key variables that will be used later on in the training

In [None]:
MAX_LEN = 200
TRAIN_BATCH_SIZE = 32
VALID_BATCH_SIZE = 16
EPOCHS = 5
LEARNING_RATE = 1e-05
tokenizer = AutoTokenizer.from_pretrained('ai4bharat/indic-bert')

In [None]:
class CustomDataset(Dataset):

    def __init__(self, dataframe, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.data = dataframe
        self.sent = dataframe.Input
        self.targets = dataframe.Label
        self.max_len = max_len

    def __len__(self):
        return len(self.sent)

    def __getitem__(self, index):
        sent = str(self.sent[index])
        sent = " ".join(sent.split())

        inputs = self.tokenizer.encode_plus(
            sent,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            pad_to_max_length=True,
            return_token_type_ids=True
        )
        ids = inputs['input_ids']
        mask = inputs['attention_mask']
        token_type_ids = inputs["token_type_ids"]


        return {
            'ids': torch.tensor(ids, dtype=torch.long),
            'mask': torch.tensor(mask, dtype=torch.long),
            'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
            'targets': torch.tensor(self.targets[index], dtype=torch.long)
        }

In [None]:
training_set = CustomDataset(train_df, tokenizer, MAX_LEN)
testing_set = CustomDataset(dev_df, tokenizer, MAX_LEN)

real_testing_set = CustomDataset(test_df, tokenizer, MAX_LEN)

In [None]:
train_params = {'batch_size': TRAIN_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

test_params = {'batch_size': VALID_BATCH_SIZE,
                'shuffle': False,
                'num_workers': 0
                }

training_loader = DataLoader(training_set, **train_params)
testing_loader = DataLoader(testing_set, **test_params)
real_test_loader = DataLoader(real_testing_set, **test_params)

# Whether to use cosnorm

In [None]:
cosnorm = input('Enter y if cosnorm to be used else n: ')

if cosnorm.lower() == 'y':
    cosnorm = True
else:
    cosnorm = False

In [None]:
if cosnorm:
    from cosnorm import CosNorm_Classifier

class AlbertClass(torch.nn.Module):
    def __init__(self):
        super(AlbertClass, self).__init__()
        self.l1 = AutoModel.from_pretrained("ai4bharat/indic-bert")
        self.pre_classifier = torch.nn.Linear(768, 768)
        self.dropout = torch.nn.Dropout(0.3)
        self.classifier1 = torch.nn.Linear(768, n_classes)
        if cosnorm:
            self.classifier2 = CosNorm_Classifier(768, n_classes)

    def forward(self, input_ids, attention_mask, token_type_ids):
        output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        pooler = self.pre_classifier(output_1['pooler_output'])
        pooler = torch.nn.ReLU()(pooler)
        pooler = self.dropout(pooler)

        if cosnorm is True:
            output = self.classifier2(pooler)
        else:
            output = self.classifier1(pooler)
        return output

model = AlbertClass()
model.to(device)
print('Loaded!')

In [None]:
weight = None

if lang == "kn":
    weight = (torch.tensor([1/3382,1/486,1/1407,1/327,1/212,1/122])).cuda()
elif lang == "ta":
    weight = (torch.tensor([1/25215, 1/1447, 1/2338, 1/2550, 1/2894, 1/454])).cuda()
else: 
    weight = (torch.tensor([1/10382,1/171,1/882,1/106,1/154])).cuda()

In [None]:
from cmi_loss import CMILoss

use_cmi = input("Enter 1 for cmi loss, anything else for cross entropy: ")

loss_function = None

if use_cmi == 1:
    loss_function = CMILoss(weight=weight)
else: 
    loss_function = torch.nn.CrossEntropyLoss(weight = weight)

optimizer = torch.optim.Adam(params =  model.parameters(), lr=LEARNING_RATE)

In [None]:
def calcuate_accuracy(preds, targets):
    n_correct = (preds==targets).sum().item()
    return n_correct

In [None]:
losslsit = []
acclist = []

# Train loop

In [None]:
def train(epoch):
    tr_loss = 0
    n_correct = 0
    nb_tr_steps = 0
    nb_tr_examples = 0
    
    model.train()
    for _,data in enumerate(training_loader):
        ids = data['ids'].to(device, dtype = torch.long)
        mask = data['mask'].to(device, dtype = torch.long)
        token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
        targets = data['targets'].to(device, dtype = torch.long)

        outputs = model(ids, mask, token_type_ids)
        
        loss = None

        #CE
        if use_cmi == 1:
            loss = loss_function(outputs, targets, processed_df, lang)

        # CMI
        else:
            loss = loss_function(outputs, targets)
        
        # print(loss)
        tr_loss += loss.item()
        big_val, big_idx = torch.max(outputs.data, dim=1)
        n_correct += calcuate_accuracy(big_idx, targets)
        nb_tr_steps += 1
        nb_tr_examples+=targets.size(0)
        #print(targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
    print(f'The Total Accuracy for Epoch {epoch} : {(n_correct*100)/nb_tr_examples}')
    epoch_loss = tr_loss/nb_tr_steps
    epoch_accu = (n_correct*100)/nb_tr_examples
    losslsit.append(float(epoch_loss))
    acclist.append(float(epoch_accu))
    print(f"Training Loss Epoch: {epoch_loss}")
    print(f"Training Accuracy Epoch: {epoch_accu}")

In [None]:
# model.load_state_dict(torch.load("/content/drive/MyDrive/Datasets/models/kan/CMI_model_18_epochs.pth"))

In [None]:
EPOCHS=41

for epoch in range(1,EPOCHS+1):
    train(epoch)
    if (epoch) % 3 == 0:
        torch.save(model.state_dict(),f"/content/drive/MyDrive/Datasets/models/kan/CMI_model_{epoch}_epochs.pth")

In [None]:
import matplotlib
matplotlib.pyplot.plot(range(len(losslsit)),losslsit)

In [None]:
matplotlib.pyplot.plot(range(len(acclist)),acclist)

In [None]:
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score,confusion_matrix, classification_report

def print_metrices(pred,true):
    print(confusion_matrix(true,pred))
    print(classification_report(true,pred,))
    print("Accuracy : ",accuracy_score(pred,true))
    print("Precison : ",precision_score(pred,true, average = 'weighted'))
    print("Recall : ",recall_score(pred,true,  average = 'weighted'))
    print("F1 : ",f1_score(pred,true,  average = 'weighted'))

In [None]:
import sklearn
def valid(model, testing_loader):
    model.eval()
    n_correct = 0; n_wrong = 0; total = 0; tr_loss=0; nb_tr_steps=0; nb_tr_examples=0
    y_true=torch.tensor([])
    y_pred=torch.tensor([])
    idx = []
    with torch.no_grad():
        for _, data in enumerate(testing_loader):
            ids = data['ids'].to(device, dtype = torch.long)
            mask = data['mask'].to(device, dtype = torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
            targets = data['targets'].to(device, dtype = torch.long)
            outputs = model(ids, mask, token_type_ids).squeeze()
            loss = loss_function(outputs, targets, val_processed_df, "kn")
            tr_loss += loss.item()
            big_val, big_idx = torch.max(outputs.data, dim=1)
            idx.extend(big_idx)
            n_correct += calcuate_accuracy(big_idx, targets)
            y_pred = torch.cat((y_pred, big_idx.detach().cpu()),0)
            y_true = torch.cat((y_true,targets.detach().cpu()),0)
            nb_tr_steps += 1
            nb_tr_examples+=targets.size(0)
            
    epoch_loss = tr_loss/nb_tr_steps
    epoch_accu = (n_correct*100)/nb_tr_examples
    print(f"Validation Loss Epoch: {epoch_loss}")
    print(f"Validation Accuracy Epoch: {epoch_accu}")
    y_true = y_true.detach().cpu().numpy()
    y_pred = y_pred.detach().cpu().numpy()
    # scores = metrics.precision_recall_fscore_support(y_true,y_pred, average=None,
    #                                                    labels = [0,1,2,3,4,5])
    # print(f"Precision: {scores[0]}")
    # print(f"Recall:    {scores[1]}")
    # print(f"f1:        {scores[2]}")
    # print(f"support.   {scores[3]}")
    #print(sklearn.metrics.classification_report(y_true,y_pred))

    print_metrices(y_pred,y_true)
    return epoch_accu

In [None]:
acc = valid(model, testing_loader)
print(f"Accuracy on test data = {acc}")

In [None]:
# torch.save(model.state_dict(),f"/content/drive/MyDrive/Datasets/Eacl_Kannada_model/Kannada_model_{len(acclist)}_epochs.pth")

In [None]:
def test(model, testing_loader):
    model.eval()
    n_correct = 0; n_wrong = 0; total = 0; tr_loss=0; nb_tr_steps=0; nb_tr_examples=0
    y_true=torch.tensor([])
    y_pred=torch.tensor([])

    idx = []
    with torch.no_grad():
        save_df = {"Not_offensive" : [],
                   'Offensive_Targeted_Insult_Group' :[],
                   'Offensive_Targeted_Insult_Individual' : [],
                   'Offensive_Untargetede' : [],
                   f'not-{language}' : [],
                   "Inputs" : [],
                   "Correct Label" : [],
                   "Predicted Label" : [],
        }
        if language!="malayalam":
            save_df['Offensive_Targeted_Insult_Other'] = []

        full_op = torch.Tensor([])
        full_gt = torch.Tensor([])
        full_pred = torch.Tensor([])
        full_ip = []

        for i, data in enumerate(testing_loader):
            ids = data['ids'].to(device, dtype = torch.long)
            mask = data['mask'].to(device, dtype = torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
            targets = data['targets'].to(device, dtype = torch.long)
            outputs = model(ids, mask, token_type_ids).squeeze()

            full_ip.append(ids.detach().cpu()) # [x,16,200]

            # shape is [16,x] for most variables
            # need to make it [-1,x]
            # take variable and keep concatenating
            full_op = torch.cat((full_op, outputs.detach().cpu()),dim=0)
            
            loss = loss_function(outputs, targets, val_processed_df, "kn")
            tr_loss += loss.item()
            big_val, big_idx = torch.max(outputs.data, dim=1)
            idx.extend(big_idx)
            n_correct += calcuate_accuracy(big_idx, targets)
            y_pred = torch.cat((y_pred, big_idx.detach().cpu()),0)
            y_true = torch.cat((y_true,targets.detach().cpu()),0)
            nb_tr_steps += 1
            nb_tr_examples+=targets.size(0)


        save_df["Inputs"] = [x for x in dev_df["Input"]]
        save_df["Predicted Label"] = [class_list[int(i)] for i in y_pred]
        save_df["Correct Label"] = [class_list[int(i)] for i in y_true]
        

        save_df["Not_offensive"] = full_op[:,class_list.index("Not_offensive")]
        save_df["Offensive_Targeted_Insult_Group"] = full_op[:,class_list.index("Offensive_Targeted_Insult_Group")]
        save_df["Offensive_Targeted_Insult_Individual"] = full_op[:,class_list.index("Offensive_Targeted_Insult_Individual")]
        if language != "malayalam":
            save_df["Offensive_Targeted_Insult_Other"] = (full_op[:,class_list.index("Offensive_Targeted_Insult_Other")])
        save_df["Offensive_Untargetede"] = (full_op[:,class_list.index("Offensive_Untargetede")])
        save_df[f'not-{language}'] = (full_op[:,class_list.index(f"not-{language}")])
        
        print(save_df["Inputs"][0])
        print(test_df["Input"][0])
        print(save_df["Predicted Label"][0])
        print(save_df["Correct Label"][0])
        print(save_df["Not_offensive"][0])
        print(save_df["Offensive_Targeted_Insult_Group"][0])
        print(save_df["Offensive_Targeted_Insult_Individual"][0])
        if lang!= "ml":
            print(save_df["Offensive_Targeted_Insult_Other"][0])
        print(save_df["Offensive_Untargetede"][0])
        print(save_df[f'not-{language}'][0])

    epoch_loss = tr_loss/nb_tr_steps
    epoch_accu = (n_correct*100)/nb_tr_examples
    print(f"Validation Loss Epoch: {epoch_loss}")
    print(f"Validation Accuracy Epoch: {epoch_accu}")
    y_true = y_true.detach().cpu().numpy()
    y_pred = y_pred.detach().cpu().numpy()
    # scores = metrics.precision_recall_fscore_support(y_true,y_pred, average=None,
    #                                                    labels = [0,1,2,3,4,5])
    # print(f"Precision: {scores[0]}")
    # print(f"Recall:    {scores[1]}")
    # print(f"f1:        {scores[2]}")
    # print(f"support.   {scores[3]}")
    #print(sklearn.metrics.classification_report(y_true,y_pred))

    print_metrices(y_pred,y_true)
    return epoch_accu, save_df

In [None]:
acc,save_dict = test(model, testing_loader)

In [None]:
for key in save_dict.keys():
    print(key, " : ", len(save_dict[key]))

In [None]:
save_df = pd.DataFrame(save_dict)

In [None]:
save_df.head()

In [None]:
save_df.to_excel(f"/content/drive/MyDrive/Datasets/Predictions/CMI_IndicBert_{language}_predictions.xlsx", encoding="utf-8")