In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.utils.data as data

In [None]:
#hyperparameters
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
batch_size=100
embed_size=300
learningrate=0.0001
emotionlist=['sadness','joy','love','anger','fear','surprise']
emotiondict={emotionlist[i]:i for i in range(len(emotionlist))}
def idx2emotion(idx):
    if not isinstance(idx,(tuple,list)):
        return emotionlist[idx]
    return [idx2emotion(i) for i in idx]
def emotion2idx(emotion):
    if not isinstance(emotion,(tuple,list)):
        return emotiondict[emotion]
    return [emotion2idx(i) for i in emotion]
print(idx2emotion([1,4,3,2]))
print(emotion2idx(['sadness','joy']))

In [None]:
df = pd.read_pickle("merged_training.pkl")
df=df.reset_index(drop=True)

In [None]:
print(df.text.head())
print(len(df))

In [None]:
class Vocab():
    def __init__(self,tokens,min_freq=0):
        self.token_freq = {}
        for sentence in tokens:
            for token in sentence.split():
                if token not in self.token_freq:
                    self.token_freq[token] = 1
                else:
                    self.token_freq[token] += 1
        self.itos = ["<pad>","<unk>"]
        for token,freq in self.token_freq.items():
            if freq >= min_freq:
                self.itos.append(token)
        self.itos[2:]=sorted(self.itos[2:],key=lambda x:self.token_freq[x],reverse=True)
        self.stoi = {token:idx for idx,token in enumerate(self.itos)}
    def __len__(self):
        return len(self.itos)
    def __getitem__(self,idx):
        if not isinstance(idx,(list,tuple)):
            return self.itos[idx] if idx!=0 else ''
        return [self.__getitem__(i) for i in idx]
    def tokens2idx(self,tokens):
        if not isinstance(tokens,(list,tuple)):
            if ' ' in tokens:
                return self.tokens2idx(tokens.split())
            if tokens in self.stoi:
                return self.stoi[tokens]
            return self.stoi['<unk>']
        return [self.tokens2idx(i) for i in tokens]
vocab=Vocab(df.text,min_freq=10)
print(vocab[(0,1,2,3,4,5,6,7,8,9)])
print(vocab.stoi['i'],vocab.tokens2idx(['i','feel']))
print(len(vocab))

In [None]:
dfanger=df[df.emotions=='anger']
print(dfanger.head())
print(df.head())
df2=pd.concat([dfanger,df],ignore_index=True)
print(df2)

In [None]:
class dataset(data.Dataset):
    def __init__(self,datasource,vocab):
        self.set=datasource.reset_index(drop=True)
        self.vocab=vocab
    def __getitem__(self, index):
        c=vocab.tokens2idx(self.set.text[index])
        if not isinstance(c,list):
            c=[c]
        if len(c)>=50:
            c=c[:50]
        else:
            c=c+[0]*(50-len(c))
        return (torch.tensor(c),self.set.emotions[index])
    def __len__(self):
        return len(self.set)
trainset=dataset(df[:350000],vocab)
testset=dataset(df[350000:],vocab)
load_trainset=data.DataLoader(trainset,batch_size=batch_size,shuffle=True,drop_last=True)
load_testset=data.DataLoader(testset,batch_size=batch_size,shuffle=True,drop_last=True)
print(trainset[3])
print(vocab[list(trainset[3][0])])
print(df.text[3],df.emotions[3])

In [None]:
class emotion(nn.Module):
    def __init__(self,vocab_size,embed_size,num_classes,dropout=0.5):
        super(emotion,self).__init__()
        self.embedding=nn.Embedding(vocab_size,embed_size)
        self.multiheadattention1=nn.TransformerEncoderLayer(embed_size,5,batch_first=True,dropout=dropout)
        self.multiheadattention2=nn.TransformerEncoderLayer(embed_size,5,batch_first=True,dropout=dropout)
        self.multiheadattention3=nn.TransformerEncoderLayer(embed_size,5,batch_first=True,dropout=dropout)
        self.fc=nn.Linear(embed_size,num_classes)
        self.softmax=nn.Softmax()
    def forward(self,x):
        mask=~(x==0)
        # print(mask)
        x=self.embedding(x)
        x=self.multiheadattention1(x,src_key_padding_mask=mask)
        x=self.multiheadattention2(x,src_key_padding_mask=mask)
        x=self.multiheadattention3(x,src_key_padding_mask=mask)
        # print(x.shape)
        # print(mask.shape)
        # print(torch.mean(x,dim=1).shape)
        # print(torch.sum(mask,dim=1))
        x=torch.sum(x,dim=1)/torch.sum(mask,dim=1).unsqueeze(dim=1)
        #avg pool->attention pool
        x=self.fc(x)
        x=self.softmax(x)
        return x
model=emotion(len(vocab),embed_size,len(emotionlist))
model.to(device=device)
print(model(torch.ones(3,5,device=device).long()))
print(model(torch.tensor([[1,1,1,0,0],[3,6,5,3,0],[3,8,6,5,0]],device=device).long()))

In [None]:
def predict1sentence(model,sentence,vocab):
    model.eval()
    c=vocab.tokens2idx(sentence.split())
    if not isinstance(c,list):
        c=[c]
    if len(c)>=50:
        c=c[:50]
    else:
        c=c+[0]*(50-len(c))
    c=torch.tensor(c)
    c=c.to(device=device)
    prediction=model(c.unsqueeze(dim=0))
    # print(c)
    # print(prediction)
    # print(torch.argmax(prediction,dim=1))
    return idx2emotion(torch.argmax(prediction,dim=1))
print(predict1sentence(model,'i take every day as it comes i m just focussing on eating better at the moment i m not aiming for unrealistic targets or setting myself deadlines because then i feel pressured i m just improving my diet',vocab))

In [None]:
def get_acc(model,load_testset):
    correct=0
    total=0
    countclass=[0 for i in range(6)]

    for datapair in load_testset:
        text,label=datapair
        text=text.to(device)
        label=torch.tensor(emotion2idx(label))
        label=label.to(device)
        answer=model(text)
        answer=torch.argmax(answer,dim=1)
        for i in range(6):
            countclass[i]+=torch.count_nonzero(answer==torch.tensor([i],device=device))
        correct+=torch.count_nonzero(answer==label)
        total+=batch_size
    return correct,correct/total,countclass
print(get_acc(model,load_testset))

In [None]:
#train
optimizer=torch.optim.Adam(model.parameters(),lr=learningrate)
loss=nn.CrossEntropyLoss()
loss.to(device=device)
for epoch in range(5):
    model.eval()
    with torch.no_grad():
        acc=get_acc(model,load_testset=load_testset)
        print(acc)
        acc=get_acc(model,load_trainset)
        print(acc)
    model.train()
    totalloss=0
    for number,datapair in enumerate(load_trainset):
        # print(number)
        text,label=datapair
        text=text.to(device)
        label=torch.tensor(emotion2idx(label))
        # print(label)
        label=label.to(device)
        optimizer.zero_grad()
        result=model(text)
        # print(result)
        trainloss=loss(result,label)
        trainloss.backward()
        optimizer.step()
        totalloss+=trainloss
        if number%1000==0:
            print(number)
            print(totalloss/number)
    
