In [1]:
!pip install emoji==0.6.0
!pip install transformers
!pip install soynlp
!pip install pytorch_lightning
!pip install sentence_transformers

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import transformers
import emoji
import soynlp
import pytorch_lightning

In [3]:
import os
import pandas as pd
import numpy as np

from pprint import pprint

import torch
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch.optim.lr_scheduler import ExponentialLR

from pytorch_lightning import LightningModule, Trainer, seed_everything

from transformers import AutoModelForSequenceClassification, AutoTokenizer, AdamW

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import re
import emoji
from soynlp.normalizer import repeat_normalize

In [4]:
import pandas as pd
import torch
import re
import emoji

from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch.optim.lr_scheduler import ExponentialLR
from pytorch_lightning import LightningModule, Trainer, seed_everything
from transformers import AutoModelForSequenceClassification, AutoTokenizer, AdamW
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

from soynlp.normalizer import repeat_normalize


from pytorch_lightning.utilities.types import EVAL_DATALOADERS

class Model(LightningModule):

    def __init__(self, **kwargs):
        super().__init__()
        self.save_hyperparameters()

        self.training_step_outputs = []
        self.validation_step_outputs = []

        self.training_outputs = []
        self.validation_outputs = []
        self.training_loss = []
        self.validation_loss = []

        self.clsfier = AutoModelForSequenceClassification.from_pretrained(self.hparams.pretrained_model, num_labels=4)
        self.tokenizer = AutoTokenizer.from_pretrained(
            self.hparams.pretrained_tokenizer
            if self.hparams.pretrained_tokenizer
            else self.hparams.pretrained_model
        )

    def forward(self, **kwargs):
        return self.clsfier(**kwargs)
    
    def step(self, batch, batch_idx):
        data, labels = batch
        output = self(input_ids=data, labels=labels)

        loss = output.loss
        logits = output.logits

        preds = logits.argmax(dim=-1)

        y_true = list(labels.cpu().numpy())
        y_pred = list(preds.cpu().numpy())

        return {
            'loss': loss,
            'y_true': y_true,
            'y_pred': y_pred
        }
    
    def training_step(self, batch, batch_idx):
        
        ret = self.step(batch, batch_idx)
        self.training_step_outputs.append(ret)
        return ret
    
    def validation_step(self, batch, batch_idx):
        
        ret = self.step(batch, batch_idx)
        self.validation_step_outputs.append(ret)
        return ret

    def epoch_end(self, outputs, state='train'):
        loss = torch.tensor(0, dtype=torch.float)
        if state=='val': print(outputs)
        for i in outputs:
            loss += i['loss'].cpu().detach()
        loss = loss / len(outputs)

        y_true = []
        y_pred = []
        for i in outputs:
            y_true += i['y_true']
            y_pred += i['y_pred']

        acc = accuracy_score(y_true, y_pred)
        prec = precision_score(y_true, y_pred, average='macro')
        rec = recall_score(y_true, y_pred, average='macro')
        f1 = f1_score(y_true, y_pred, average='macro')

        if state =='train':
          self.training_outputs.append({'acc': acc, 'prec': prec, 'rec': rec, 'f1': f1})
          self.training_loss.append(loss)
        elif state == 'val':
          self.validation_outputs.append({'acc': acc, 'prec': prec, 'rec': rec, 'f1': f1})
          self.validation_loss.append(loss)

        self.log(state+'_loss', float(loss), on_epoch=True, prog_bar=True)
        self.log(state+'_acc', acc, on_epoch=True, prog_bar=True)
        self.log(state+'_precision', prec, on_epoch=True, prog_bar=True)
        self.log(state+'_recall', rec, on_epoch=True, prog_bar=True)
        self.log(state+'_f1', f1, on_epoch=True, prog_bar=True)
        print(f'[Epoch {self.trainer.current_epoch} {state.upper()}] Loss: {loss}, Acc: {acc}, Prec: {prec}, Rec: {rec}, F1: {f1}')
        return {'loss': loss}     

    def on_train_epoch_end(self): # fix
        self.epoch_end(self.training_step_outputs, state='train')
        self.training_step_outputs.clear()

    def on_validation_epoch_end(self): # fix
        self.epoch_end(self.validation_step_outputs, state='val')   
        self.validation_step_outputs.clear()

    def configure_optimizers(self):
        if self.hparams.optimizer == 'AdamW':
            optimizer = AdamW(self.parameters(), lr=self.hparams.lr)
        elif self.hparams.optimizer == 'AdamP':
            from adamp import AdamP
            optimizer = AdamP(self.parameters(), lr=self.hparams.lr)
        else:
            raise NotImplementedError('Only AdamW and AdamP is Supported!')
        if self.hparams.lr_scheduler == 'cos':
            scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=1, T_mult=2)
        elif self.hparams.lr_scheduler == 'exp':
            scheduler = ExponentialLR(optimizer, gamma=0.5)
        else:
            raise NotImplementedError('Only cos and exp lr scheduler is Supported!')
        return {
            'optimizer': optimizer,
            'scheduler': scheduler,
        }
    
    def read_data(self, path):
        if path.endswith('xlsx'):
            return pd.read_excel(path)
        elif path.endswith('csv'):
            return pd.read_csv(path)
        elif path.endswith('tsv') or path.endswith('txt'):
            return pd.read_csv(path, sep='\t')
        else:
            raise NotImplementedError('Only Excel(xlsx)/Csv/Tsv(txt) are Supported')
        
    def clean(self, x):
        emojis = ''.join(emoji.UNICODE_EMOJI.keys())
        pattern = re.compile(f'[^ .,?!/@$%~％·∼()\x00-\x7Fㄱ-힣{emojis}]+')
        url_pattern = re.compile(
            r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)')
        x = pattern.sub(' ', x)
        x = url_pattern.sub('', x)
        x = x.strip()
        x = repeat_normalize(x, num_repeats=2)
        return x

    def encode(self, x, **kwargs):
        return self.tokenizer.encode(
            self.clean(str(x)),
            padding='max_length',
            max_length = self.hparams.max_length,
            truncation=True,
            **kwargs,
        )
    
    def preprocess_dataframe(self, df):
        df['sentence'] = df['sentence'].map(self.encode)
        return df
    
    def dataloader(self, path, shuffle=False):
        df = self.read_data(path)
        df = self.preprocess_dataframe(df)

        dataset = TensorDataset(
            torch.tensor(df['sentence'].to_list(), dtype=torch.long),
            torch.tensor(df['label'].to_list(), dtype=torch.long)
        )
        return DataLoader(
            dataset,
            batch_size=self.hparams.batch_size * 1 if not self.hparams.tpu_cores else self.hparams.tpu_cores,
            shuffle=shuffle,
            num_workers=self.hparams.cpu_workers
        )
    
    def train_dataloader(self):
        return self.dataloader(self.hparams.train_data_path, shuffle=True)
    
    def val_dataloader(self):
        return self.dataloader(self.hparams.val_data_path, shuffle=True)

    def test_dataloader(self):
        return self.dataloader(self.hparams.test_data_path, shuffle=True)

In [5]:
import torch

def infer(model, x):

    x_tokens = model.tokenizer(x, return_tensors='pt')
    x_tokens = x_tokens.to('cuda')

    return torch.softmax(
        model(**x_tokens
    ).logits, dim=-1)

In [6]:
import pandas as pd
import numpy as np

def load_dataset(path):

    df = pd.read_csv(path, sep='\t')

    train_sentence = df['sentence']
    train_emotion = df['emotion']
    train_label = df['label']

    return train_sentence.to_numpy(), train_emotion.to_numpy(), train_label.to_numpy()


def idx2emoticon(path):
    newdic = dict()
    with open(path,'r') as f:
        while True:
            line = f.readline()
            if not line : break
            newdic[int(line.split(':')[0])] = line.split(':')[1][:-1]
    return newdic


def emoticon2idx(path):
    newdic = dict()
    with open(path,'r') as f:
        while True:
            line = f.readline()
            if not line : break
            newdic[line.split(':')[0]] = line.split(':')[1][:-1]
    return newdic


def convert_sentence_emotion(model, emotion, sentence):

    sentence_vector = []

    sentence = model.encode(sentence)

    for idx in range(len(sentence)):

        emotion_label = emotion[idx]
        distance = np.sqrt(np.mean(sentence[idx]**2))

        tmp_vector = np.array([0, 0, 0, 0])
        tmp_vector = np.append(tmp_vector, sentence[idx])
        tmp_vector[emotion_label] = distance
        sentence_vector.append(tmp_vector)

    return sentence_vector


def add_emotion(model, emotion, sentence):

    sentence = model.encode(sentence)

    distance = np.sqrt(np.mean(sentence**2))

    emo_added = np.array([0, 0, 0, 0])
    emo_added = np.append(emo_added, sentence)

    emo_added[emotion] = distance

    return emo_added

In [7]:
from sentence_transformers import SentenceTransformer, util

from sklearn.datasets import make_classification
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.neighbors import KNeighborsClassifier

In [8]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [9]:
emotion_model_path = '/content/drive/MyDrive/Colab Notebooks/ai project/epoch5-val_acc0.7347.ckpt'
sentence_model_path = 'snunlp/KR-SBERT-V40K-klueNLI-augSTS'
emoticon_sentence_path = '/content/drive/MyDrive/Colab Notebooks/ai project/myticon/dataset/sentence_emoticon.txt'
idx2emoticon_path = '/content/drive/MyDrive/Colab Notebooks/ai project/myticon/dataset/idx2emoticon.txt'
emoticon2idx_path = '/content/drive/MyDrive/Colab Notebooks/ai project/myticon/dataset/emoticon2idx.txt'

In [10]:
idx2emoticon = idx2emoticon(idx2emoticon_path)
emoticon2idx = emoticon2idx(emoticon2idx_path)

In [11]:
emotion_model = Model.load_from_checkpoint(emotion_model_path)

Some weights of the model checkpoint at beomi/KcELECTRA-base were not used when initializing ElectraForSequenceClassification: ['discriminator_predictions.dense_prediction.weight', 'discriminator_predictions.dense_prediction.bias', 'discriminator_predictions.dense.bias', 'discriminator_predictions.dense.weight']
- This IS expected if you are initializing ElectraForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing ElectraForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of ElectraForSequenceClassification were not initialized from the model checkpoint at beomi/KcELECTRA-base and are newly initialized: ['classifier.out_proj.bias', 'classifier

In [12]:
sentence_model = SentenceTransformer(sentence_model_path)

In [13]:
train_sentences, train_emotions, train_labels = load_dataset(emoticon_sentence_path)
train_sentences = convert_sentence_emotion(sentence_model, train_emotions, train_sentences)

In [14]:
knn = KNeighborsClassifier(n_neighbors=3, algorithm='brute', metric='cosine')
knn.fit(train_sentences, train_labels)

In [15]:
input_sentence = ['항상 밝은누나의모습이 보기좋아요']

In [16]:
sentence_emotion = infer(emotion_model, input_sentence)
sentence_emotion
sentence_emotion = np.argmax(sentence_emotion.cpu().detach())
sentence_emotion

tensor(1)

In [17]:
sentence_vector = add_emotion(sentence_model, int(sentence_emotion), input_sentence)

In [18]:
emoticon_idx = knn.predict([sentence_vector])

In [19]:
print('Recommended emoticon: ' + idx2emoticon[int(emoticon_idx)])

Recommended emoticon: ✧*.◟(ˊᗨˋ)◞.*✧


#metric
#{'슬픔':0, '기쁨':1, '분노':2, '당황':3}

In [20]:
from collections import Counter


In [21]:
num = int(emoticon2idx['⸜(*ˊᗜˋ*)⸝'])

In [22]:
train_sentences, train_emotions, train_labels = load_dataset(emoticon_sentence_path)

In [23]:
idx = np.where(train_labels ==num)

In [24]:
train_sentences[idx]

array(['두팔 벌려 볼을 밝히며 눈을 접으며 입을 벌려 웃는다', '해피데이', '너가 다시 돌아와서 기뻐!',
       '오늘 아침 너무 상쾌하다', '웃으면 팔 벌리고 웃는다'], dtype=object)

In [25]:
emotion_sentence = list(train_sentences[idx])
result=[]
for i in range(5):
  sentence_emotion = infer(emotion_model, emotion_sentence[i])
  sentence_emotion = np.argmax(sentence_emotion.cpu().detach())
  result.append(int(sentence_emotion))
result

[3, 3, 1, 1, 0]

In [26]:
def freq(x):
  cnt = Counter(x)
  order = cnt.most_common()
  maximum = order[0][1]

  modes = []
  for num in order:
    if num[1] == maximum:
      modes.append(num[0])
  return modes

In [27]:
freq(result)

[3, 1]

In [28]:
test = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/ai project/dataset/cus_test.csv')

In [29]:
test['label'].value_counts()

3    3084
2    2997
0    2872
1    2620
Name: label, dtype: int64

In [30]:
sen_list = list(test['sentence'].head(200))

In [31]:

final =[]
final2=[]
final3=[]
for sen in sen_list:
  try:
    input_emot = infer(emotion_model, sen)
    input_emot = np.argmax(input_emot.cpu().detach())
    sentence_vector = add_emotion(sentence_model, int(input_emot), sen)
    emoticon_idx = knn.predict([sentence_vector])
    pred_emot = idx2emoticon[int(emoticon_idx)]
    pred_idx = int(emoticon2idx[pred_emot])
    idx = np.where(train_labels ==pred_idx)
    emotion_sentence = list(train_sentences[idx])
    result=0
    result_list=[]
    for i in range(5):
      sentence_emotion = infer(emotion_model, emotion_sentence[i])
      sentence_emotion = np.argmax(sentence_emotion.cpu().detach())
      result_list.append(int(sentence_emotion))
      if int(sentence_emotion) == input_emot:
        result = result + 1
      
    if input_emot in freq(result_list):
      final2.append(1)
    else:
      final2.append(0)
    
    if input_emot in result_list:
      final3.append(1)
    else:
      final3.append(0)
    
    final.append(result/5)
  except:
    pass

  

In [32]:
mean1 = sum(final) / len(final)
mean2  = sum(final2) / len(final2)
mean3  = sum(final3) / len(final3)

In [33]:
print(mean1,mean2,mean3)

0.47738693467336707 0.592964824120603 0.8241206030150754
