# Initiate Library and Models

In [1]:
from transformers import BertTokenizer, BertForSequenceClassification
from transformers import RobertaTokenizer, RobertaForSequenceClassification
from transformers import pipeline

from sklearn.metrics import f1_score, confusion_matrix

from typing import Union
from tqdm import tqdm

import pandas as pd
import os

In [11]:
class SentimentPredictor:
    def __init__(self) -> None:
        # set bert model names
        bert_model_names = [
            'mdhugol/indonesia-bert-sentiment-classification',
            'poerwiyanto/bert-base-indonesian-522M-finetuned-sentiment',
            'hilmansw/indobert-finetuned-sentiment-happiness-index'
        ]

        # set roberta model names
        roberta_model_names = [
            'ayameRushia/roberta-base-indonesian-1.5G-sentiment-analysis-smsa'
        ]

        # create sentiment pipelines
        self.sentiment_pipelines = {}
        self.model_names = {}
       
        for model_name in bert_model_names:
            arch = 'bert'
            name = model_name.split('/')[0]
            model_pipeline = self.load_sentiment_pipeline(model_name, arch)
            self.sentiment_pipelines[name] = model_pipeline
            self.model_names[name] = arch

        for model_name in roberta_model_names:
            arch = 'roberta'
            name = model_name.split('/')[0]
            model_pipeline = self.load_sentiment_pipeline(model_name, arch)
            self.sentiment_pipelines[name] = model_pipeline
            self.model_names[name] = arch

    def predict(self, text:Union[str,list], model_name:str):
        if model_name not in self.sentiment_pipelines.keys():
            print('Please select model name from this list {}'.format(list(self.sentiment_pipelines.keys())))
            return
        
        model_arch = self.model_names.get(model_name)
        model_pipeline = self.sentiment_pipelines.get(model_name)
        predictions = self.postprocess_label(
            predictions = model_pipeline(text),
            model_name = model_name,
            model_arch = model_arch
        )

        return predictions
        
    @staticmethod
    def load_sentiment_pipeline(model_name:str, model_arch:str):
        if model_arch == 'bert':
            tokenizer = BertTokenizer.from_pretrained(model_name)
            model = BertForSequenceClassification.from_pretrained(model_name)
        elif model_arch == 'roberta':
            tokenizer = RobertaTokenizer.from_pretrained(model_name)
            model = RobertaForSequenceClassification.from_pretrained(model_name)

        sentiment_pipeline = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer)

        return sentiment_pipeline

    @staticmethod
    def postprocess_label(predictions:list, model_name:str, model_arch:str):
        bert_labels_1 = {'LABEL_0' : 'positive', 'LABEL_1': 'neutral', 'LABEL_2': 'negative'}
        bert_labels_2 = {'positif' : 'positive', 'netral': 'neutral', 'negatif': 'negative'}

        for i, prediction in enumerate(predictions.copy()):
            if model_arch == 'bert' and model_name in ['mdhugol', 'poerwiyanto']:
                predictions[i]['label'] = bert_labels_1.get(prediction['label'])
            if model_arch == 'bert' and model_name in ['hilmansw']:
                predictions[i]['label'] = bert_labels_2.get(prediction['label'])
            elif model_arch == 'roberta':
                predictions[i]['label'] = prediction['label'].lower()
        
        return predictions

sentiment_predictor = SentimentPredictor()



# Create and Save Predictions

In [3]:
# get model names
model_names = list(sentiment_predictor.model_names.keys())

# load data
csv_path = './dataset/single-word.csv'
df = pd.read_csv(csv_path)

# create column data
for model_name in model_names:
    if model_name+'_pred' not in df.columns:
        df[model_name+'_pred'] = ['']*len(df)
    if model_name+'_conf' not in df.columns:
        df[model_name+'_conf'] = [0.]*len(df)

# predict sentiment
for i, row in tqdm(df.copy().iterrows(), desc='Sentiment Analysis', ncols=100):
    sentence = row['sentence']
    for model_name in model_names:
        if row[model_name+'_pred'] != '':
            continue
        result = sentiment_predictor.predict(sentence, model_name)
        df.at[i, model_name+'_pred'] = result[0]['label']
        df.at[i, model_name+'_conf'] = result[0]['score'] 
        
# save dataframe to csv
df.to_csv(csv_path, index=False, sep=';')

Sentiment Analysis: 116it [00:33,  3.44it/s]


# Test and Evaluate Models

In [77]:
# open labeled csv
label_csv_path = './dataset/g-sheet-single-word.csv'
label_df = pd.read_csv(label_csv_path)
# display(label_df)
# open prediction csv
pred_csv_path = './dataset/single-word.csv'
pred_df = pd.read_csv(pred_csv_path, sep=';')

# join table
final_df = label_df.copy()
for col in pred_df.columns:
    # skip sentence and case column
    if col in ['sentence', 'case']:
        continue
    # join column
    final_df[col] = pred_df[col].copy()

# clean label side of data
def clean_label_side(final_df):
    # clean data: drop 'no' and 'case' column
    final_df.drop(columns=['no', 'case'], inplace=True)
  
    # clean data: drop skipped gt label
    final_df.drop(
        final_df.loc[
            final_df.label_adam.str.contains('-') |
            final_df.label_anthony.str.contains('-') |
            final_df.label_yoshua.str.contains('-')
        ].index,
        inplace=True
    )
    final_df.reset_index(drop=True, inplace=True)

    # clean data: drop null gt label
    final_df.drop(
        final_df.loc[
            final_df.label_adam.isna() |
            final_df.label_anthony.isna() |
            final_df.label_yoshua.isna()
        ].index,
        inplace=True
    )
    final_df.reset_index(drop=True, inplace=True)
    
    # aggregate data & drop draw label votes
    def aggregate_sentiment(row):
        sentiments = {}
        for key, val in row.items():
            if 'label_' not in key:
                continue
            if key not in sentiments.keys():
                sentiments[val] = 1
            else:
                sentiments[val] += 1
            draw = False
        sorted_sentiments = sorted(sentiments.items(), key=lambda x:x[1], reverse=True)
        sentiment = sorted_sentiments[0][0]
        if len(sorted_sentiments) > 1:
            if sorted_sentiments[0][1] == sorted_sentiments[1][1]:
                draw = True

        return sentiment, draw

    label_final = []
    draw_index = []
    for i, row in final_df.iterrows():
        sentiment, draw = aggregate_sentiment(row)
        label_final.append(sentiment)
        if draw:
            draw_index.append(i)
    final_df['label_final'] = label_final
    final_df.drop(draw_index, inplace=True)
    final_df.reset_index(drop=True, inplace=True)

    return final_df

# Noted : Single Word, No Clean
# final_df = clean_label_side(final_df) 
# display(final_df)

# analyze accuracy per model
CONFIDENCE_THRESHOLD = 0.8

def analyze_model(final_df, confidence_threshold):
    # get model names
    model_names = [col.replace('_pred', '') for col in final_df.columns if '_pred' in col]
    # display(model_names)

    # iterate models
    for model_name in model_names:
        # copy data for specific model
        model_df = final_df[['label_final', model_name+'_pred', model_name+'_conf']].copy()
        # display(model_df)

        # drop low confidence predictions
        model_df.drop(
            model_df.loc[
                model_df[model_name+'_conf'] < confidence_threshold
            ].index,
            inplace=True
        )

        # analyze model
        # print(model_name + '\n' + '='*20)
        labels = model_df['label_final'].to_list()
        preds = model_df[model_name+'_pred']
        # print('> Weighted F1 score')
        print(f1_score(labels, preds, average='weighted'))
        # print('> Per class F1 score')
        # print(f1_score(labels, preds, average=None))
        # print('> Confusion matrix')
        # print(confusion_matrix(labels, preds, labels=['negative', 'neutral', 'positive']))
        # print()
        # print('> Confusion matrix (normalized)')
        # display(confusion_matrix(labels, preds, labels=['negative', 'neutral', 'positive'], normalize='true'))
        # display()
        # print()
analyze_model(final_df, CONFIDENCE_THRESHOLD)

0.36685386685386684
0.023798899300907336
0.9554341133004925
0.002415458937198068
