In [None]:

!pip install -q transformers scikit-learn
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import random

from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModel
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.utils.class_weight import compute_class_weight


## Load & Clean Data

In [1]:

seed=42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

train=pd.read_csv("data/train_complaints.csv")
test=pd.read_csv("data/test_complaints.csv")

clean=lambda df: df.rename(columns=lambda c:c.strip().lower().replace(" ","_"))
train,test=clean(train),clean(test)

find=lambda df,k:[c for c in df.columns if k in c][0]

TEXT=find(train,"text")
PRIMARY=find(train,"primary")
SECONDARY=find(train,"secondary")
SEVERITY=find(train,"severity")
ID=find(test,"id")


NameError: name 'random' is not defined

## Label Encoding

In [None]:

ple,sle=LabelEncoder(),LabelEncoder()

train["p"]=ple.fit_transform(train[PRIMARY])
train["s"]=sle.fit_transform(train[SECONDARY])
train["v"]=train[SEVERITY]-1

num_p=train.p.nunique()
num_s=train.s.nunique()

pw=torch.tensor(compute_class_weight(class_weight="balanced",
                                     classes=np.unique(train.p),
                                     y=train.p),dtype=torch.float)

sw=torch.tensor(compute_class_weight(class_weight="balanced",
                                     classes=np.unique(train.s),
                                     y=train.s),dtype=torch.float)


## Dataset

In [None]:

class DS(Dataset):
    def __init__(self,df,tokenizer,train=True):
        self.t=df[TEXT].fillna("").tolist()
        self.train=train
        self.tok=tokenizer
        if train:
            self.p=df.p.values
            self.s=df.s.values
            self.v=df.v.values

    def __len__(self): return len(self.t)

    def __getitem__(self,i):
        e=self.tok(self.t[i],
                   truncation=True,
                   padding="max_length",
                   max_length=256,
                   return_tensors="pt")

        item={k:v.squeeze() for k,v in e.items()}

        if self.train:
            item["p"]=torch.tensor(self.p[i])
            item["s"]=torch.tensor(self.s[i])
            item["v"]=torch.tensor(self.v[i])
        return item


## Model

In [None]:

class Model(nn.Module):
    def __init__(self,name):
        super().__init__()
        self.enc=AutoModel.from_pretrained(name)
        h=self.enc.config.hidden_size
        self.drop=nn.Dropout(0.2)
        self.ph=nn.Linear(h,num_p)
        self.sh=nn.Linear(h,num_s)
        self.vh=nn.Linear(h,5)

    def mean_pool(self,last,mask):
        mask=mask.unsqueeze(-1)
        return (last*mask).sum(1)/mask.sum(1)

    def forward(self,ids,mask):
        o=self.enc(ids,attention_mask=mask)
        pooled=self.mean_pool(o.last_hidden_state,mask)
        pooled=self.drop(pooled)
        return self.ph(pooled),self.sh(pooled),self.vh(pooled)


## Training + Prediction Function

In [None]:

def get_probs(model_name):
    tok=AutoTokenizer.from_pretrained(model_name)
    device="cuda" if torch.cuda.is_available() else "cpu"

    test_ds=DS(test,tok,False)
    test_loader=DataLoader(test_ds,batch_size=32)

    skf=StratifiedKFold(n_splits=5,shuffle=True,random_state=seed)

    TP,TS,TV=[],[],[]

    for fold,(tr,va) in enumerate(skf.split(train,train.s)):
        print(model_name,"Fold",fold+1)

        tr_loader=DataLoader(DS(train.iloc[tr],tok),
                             batch_size=16,shuffle=True)

        model=Model(model_name).to(device)

        ce_p=nn.CrossEntropyLoss(weight=pw.to(device))
        ce_s=nn.CrossEntropyLoss(weight=sw.to(device))
        ce_v=nn.CrossEntropyLoss()

        opt=torch.optim.AdamW(model.parameters(),lr=2e-5)

        for ep in range(6):
            model.train()
            for b in tr_loader:
                opt.zero_grad()

                ids=b["input_ids"].to(device)
                m=b["attention_mask"].to(device)
                p=b["p"].to(device)
                s=b["s"].to(device)
                v=b["v"].to(device)

                po,so,vo=model(ids,m)
                loss=(0.3*ce_p(po,p)+0.4*ce_s(so,s)+0.3*ce_v(vo,v))

                loss.backward()
                opt.step()

        model.eval()
        tp,ts,tv=[],[],[]
        with torch.no_grad():
            for b in test_loader:
                ids=b["input_ids"].to(device)
                m=b["attention_mask"].to(device)
                po,so,vo=model(ids,m)
                tp.append(torch.softmax(po,1).cpu().numpy())
                ts.append(torch.softmax(so,1).cpu().numpy())
                tv.append(torch.softmax(vo,1).cpu().numpy())

        TP.append(np.vstack(tp))
        TS.append(np.vstack(ts))
        TV.append(np.vstack(tv))

    return np.mean(TP,0),np.mean(TS,0),np.mean(TV,0)


## Run Ensemble

In [None]:

P1,S1,V1=get_probs("microsoft/deberta-v3-base")
P2,S2,V2=get_probs("roberta-base")

P=(P1+P2)/2
S=(S1+S2)/2
V=(V1+V2)/2

pf=P.argmax(1)
sf=S.argmax(1)
vf=V.argmax(1)+1

sub=pd.DataFrame({
    "complaint_id":test[ID],
    "primary_category":ple.inverse_transform(pf),
    "secondary_category":sle.inverse_transform(sf),
    "severity":vf
})

sub.to_csv("submission.csv",index=False)
sub.head()
