## 사전 준비

In [28]:
import os
import numpy as np
import pandas as pd
import pickle
from tqdm import tqdm

import tensorflow as tf
from transformers import TFElectraModel

In [7]:
tf.random.set_seed(42)
np.random.seed(42)
np.set_printoptions(precision=6, suppress=True)

In [19]:
CLASS_NUMBER = 3
NEG_CLASS_NUMBER = 5
MAX_LEN = 40
MAX_WORD = 20
BERT_CKPT = './data_out/'
DATA_IN_PATH = '../metadata/'
DATA_OUT_PATH = './data_out/'

### 토크나이저, 라벨인코더 로드

In [11]:
# Load Electra Tokenizer
with open(DATA_OUT_PATH+'tokenizer.pickle', 'rb') as handle:
    loaded_tokenizer = pickle.load(handle)

In [10]:
# Load 3classes(neg, neut, pos) LabelEncoder
with open(DATA_OUT_PATH+'3classEncoder.pickle', 'rb') as handle:
    le = pickle.load(handle)

In [22]:
# Load 5classes(angry, dislike, fear, sad, surprise) LabelEncoder
with open(DATA_OUT_PATH+'negClassEncoder.pickle', 'rb') as handle:
    neg_le = pickle.load(handle)

In [12]:
# Fuctiong for Tokenizing Sentence
def electra_tokenizer(sent, MAX_LEN):
    encoded_dict = loaded_tokenizer.encode_plus(
        text=sent,
        add_special_tokens=True,
        max_length=MAX_LEN,
        padding='max_length',
        truncation=True,
        return_attention_mask=True
    )

    input_id = encoded_dict['input_ids']
    attention_mask = encoded_dict['attention_mask']
    token_type_id = encoded_dict['token_type_ids']

    return input_id, attention_mask, token_type_id

### 모델 정의

In [6]:
class TFElectraClassifier(tf.keras.Model):
    def __init__(self, model_name, dir_path, num_class):
        super().__init__()

        self.bert = TFElectraModel.from_pretrained(model_name, cache_dir=dir_path, from_pt=True)
        self.dropout = tf.keras.layers.Dropout(self.bert.config.hidden_dropout_prob)
        self.flatten = tf.keras.layers.Flatten()
        self.classifier = tf.keras.layers.Dense(num_class, name='classifier', activation='softmax', kernel_initializer=tf.keras.initializers.TruncatedNormal(self.bert.config.initializer_range))

    def call(self, inputs, attention_mask=None, token_type_ids=None, training=False):
        
        outputs = self.bert(inputs, attention_mask=attention_mask, token_type_ids=token_type_ids)
        last_hidden_state = outputs[0]
        last_hidden_state = self.flatten(last_hidden_state)
        last_hidden_state = self.dropout(last_hidden_state, training=training)
        logits = self.classifier(last_hidden_state)

        return logits

In [14]:
# ElectraClassifier for classifying 3classes(neg, neut, pos)
cls_model = TFElectraClassifier(model_name='monologg/koelectra-base-v3-discriminator', dir_path=os.path.join(BERT_CKPT, 'model'), num_class=CLASS_NUMBER)

Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFElectraModel: ['discriminator_predictions.dense.bias', 'discriminator_predictions.dense_prediction.weight', 'discriminator_predictions.dense.weight', 'electra.embeddings.position_ids', 'discriminator_predictions.dense_prediction.bias']
- This IS expected if you are initializing TFElectraModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFElectraModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFElectraModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFElectraModel for predictions without further train

In [20]:
# ElectraClassifier for classifying 5classes(angry, dislike, fear, sad, surprise)
neg_model = TFElectraClassifier(model_name='monologg/koelectra-base-v3-discriminator', dir_path=os.path.join(BERT_CKPT, 'model'), num_class=NEG_CLASS_NUMBER)

Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFElectraModel: ['discriminator_predictions.dense.bias', 'discriminator_predictions.dense_prediction.weight', 'discriminator_predictions.dense.weight', 'electra.embeddings.position_ids', 'discriminator_predictions.dense_prediction.bias']
- This IS expected if you are initializing TFElectraModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFElectraModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFElectraModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFElectraModel for predictions without further train

In [15]:
# For Loading Weights
a, b, c = electra_tokenizer('안녕하세요', MAX_LEN)
cls_model.call((np.array(a).reshape(1,-1), np.array(b).reshape(1,-1), np.array(c).reshape(1,-1)))
cls_model.built = True
cls_model.load_weights(DATA_OUT_PATH+'tf2_electra_plutchik_hs_6.h5')

In [21]:
# For Loading Weights
a, b, c = electra_tokenizer('안녕하세요', MAX_LEN)
neg_model.call((np.array(a).reshape(1,-1), np.array(b).reshape(1,-1), np.array(c).reshape(1,-1)))
neg_model.built = True
neg_model.load_weights(DATA_OUT_PATH+'tf2_electra_plutchik_hs_10.h5')

In [16]:
# Compile
optimizer = tf.keras.optimizers.Adam(3e-6)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
metric = tf.keras.metrics.SparseCategoricalAccuracy('accuracy')
cls_model.compile(optimizer=optimizer, loss=loss, metrics=[metric])

### 예측 준비

In [17]:
# Function for Preprocessing 3classes classification(neg, neut, pos)
def preprocessing(x):
    temp = x.split('\n')
    temp = ' '.join(temp)
    return temp

def slicing(x):
    x = x.split()
    res = []
    for i in range(0, len(x)+1, MAX_WORD):
        if len(x)-i-MAX_WORD < MAX_WORD//2:
            temp = x[i:]
            res.append(temp)
            break
        temp = x[i:i+MAX_WORD]
        res.append(temp)
    return res

def predict(lyrics):
    temp = []
    lyrics = preprocessing(lyrics)
    lyrics = slicing(lyrics)
    for lyric in lyrics:
        try:
            txt = ' '.join(lyric)
            temp.append(txt)
        except:
            continue

    input_ids = []
    attention_masks = []
    token_type_ids = []

    for lyric in temp:
        try:
            input_id, attention_mask, token_type_id = electra_tokenizer(lyric, MAX_LEN)
            input_ids.append(input_id)
            attention_masks.append(attention_mask)
            token_type_ids.append(token_type_id)

        except Exception as e:
            print(e)
            pass
    prediction = cls_model.predict(input_ids)

    return prediction

def transform(array):
    result = le.inverse_transform([np.argmax(array)])
    return result

In [23]:
# Function for Preprocessing 5classes classification(angry, dislike, fear, sad, surprise)
def neg_predict(lyrics):
    temp = []
    lyrics = preprocessing(lyrics)
    lyrics = slicing(lyrics)
    for lyric in lyrics:
        try:
            txt = ' '.join(lyric)
            temp.append(txt)
        except:
            continue

    input_ids = []
    attention_masks = []
    token_type_ids = []

    for lyric in temp:
        try:
            input_id, attention_mask, token_type_id = electra_tokenizer(lyric, MAX_LEN)
            input_ids.append(input_id)
            attention_masks.append(attention_mask)
            token_type_ids.append(token_type_id)

        except Exception as e:
            print(e)
            pass
    prediction = neg_model.predict(input_ids)

    return prediction

def neg_transform(array):
    result = neg_le.inverse_transform([np.argmax(array)])
    return result

## 예측

In [24]:
lyrics = pd.read_csv(DATA_IN_PATH+'db에넣을노래들.csv')
data = lyrics[['SONG_ID', 'SONG_TITLE', 'LYRICS']]

In [25]:
song_ids = lyrics['SONG_ID']

In [26]:
pred = dict()

In [27]:
for id in song_ids:
    lyric = data[data['SONG_ID']==id]['LYRICS'].values[0]
    prediction = predict(lyric)
    score = sum(prediction)/len(prediction)
    senti = transform(score)
    pred[id] = (score, senti)



In [29]:
data['score'] = ''
data['senti'] = ''

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  


In [31]:
for id in song_ids:
    sc, se = pred[id]
    data.loc[data[data['SONG_ID']==id].index, 'score'] = str(sc)
    data.loc[data[data['SONG_ID']==id].index, 'senti'] = se

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._setitem_single_column(loc, value, pi)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._setitem_single_column(ilocs[0], value, pi)


In [33]:
neg_data = data[data['senti']=='neg']
pos_data = data[data['senti']=='pos']
neg_ids = neg_data['SONG_ID']
pos_ids = pos_data['SONG_ID']

In [34]:
pos_pred = dict()
neg_pred = dict()

In [35]:
for id in neg_ids:
    lyric = data[data['SONG_ID']==id]['LYRICS'].values[0]
    prediction = neg_predict(lyric)
    score = sum(prediction)/len(prediction)
    senti = neg_transform(score)
    neg_pred[id] = (score, senti)



In [36]:
data['final_score'] = ''
data['final_senti'] = ''
data['total'] = ''

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  This is separate from the ipykernel package so we can avoid doing imports until


In [37]:
for key in neg_ids:
    sc, se = neg_pred[key]
    data.loc[data[data['SONG_ID']==key].index, 'final_score'] = str(sc)
    data.loc[data[data['SONG_ID']==key].index, 'final_senti'] = se

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._setitem_single_column(loc, value, pi)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._setitem_single_column(ilocs[0], value, pi)


In [38]:
for key in neg_ids:
    neg_sc = np.fromstring(data.loc[data[data['SONG_ID']==key].index, 'final_score'].tolist()[0][1:-1], dtype=float, sep=' ')
    pos_sc = np.fromstring(data.loc[data[data['SONG_ID']==key].index, 'score'].tolist()[0][1:-1], dtype=float, sep=' ')
    ratio = np.array(((pos_sc[int(le.transform(['neg'])[0])]+pos_sc[1]*0.5), (pos_sc[int(le.transform(['pos'])[0])]+pos_sc[1]*0.5)))
    rated_sc = np.append(np.max(neg_sc)*(ratio[1]/ratio[0]), neg_sc)
    total_sc = rated_sc/(sum(rated_sc))
    data.loc[data[data['SONG_ID']==key].index, 'total'] = str(total_sc)

In [39]:
for id in pos_ids:
    lyric = data[data['SONG_ID']==id]['LYRICS'].values[0]
    prediction = neg_predict(lyric)
    score = sum(prediction)/len(prediction)
    pos_pred[id] = (score, 'happy')



In [40]:
for key in pos_ids:
    sc, se = pos_pred[key]
    data.loc[data[data['SONG_ID']==key].index, 'final_score'] = str(sc)
    data.loc[data[data['SONG_ID']==key].index, 'final_senti'] = se

In [41]:
for key in pos_ids:
    neg_sc = np.fromstring(data.loc[data[data['SONG_ID']==key].index, 'final_score'].tolist()[0][1:-1], dtype=float, sep=' ')
    pos_sc = np.fromstring(data.loc[data[data['SONG_ID']==key].index, 'score'].tolist()[0][1:-1], dtype=float, sep=' ')
    ratio = np.array(((pos_sc[int(le.transform(['neg'])[0])]+pos_sc[1]*0.5), (pos_sc[int(le.transform(['pos'])[0])]+pos_sc[1]*0.5)))
    rated_sc = np.append(ratio[1], ratio[0]*neg_sc)
    total_sc = rated_sc/(sum(rated_sc))
    data.loc[data[data['SONG_ID']==key].index, 'total'] = str(total_sc)

In [42]:
data = data.drop(data[data['senti']=='neut'].index)

In [44]:
data.to_csv(DATA_IN_PATH+'testLyrics.csv', encoding='utf-8-sig')