<a href="https://colab.research.google.com/github/HannaKi/Finnish_sentiment_model/blob/main/Sentiment_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip --quiet -nc install transformers datasets

In [8]:
%%bash

wget https://korp.csc.fi/download/finsen/src/finsen-src.zip
unzip finsen-src.zip

Archive:  finsen-src.zip


File ‘finsen-src.zip’ already there; not retrieving.

replace finsen-src/finsen-src/FinnSentiment2020.tsv? [y]es, [n]o, [A]ll, [N]one, [r]ename:  NULL
(EOF or read error, treating as "[N]one" ...)


CalledProcessError: ignored

In [11]:
import seaborn as sns
import pandas as pd

import transformers
import datasets

import sklearn
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer

from time import time


In [10]:
fin_sent=pd.read_csv(
    r'/content/finsen-src/finsen-src/FinnSentiment2020.tsv',
    sep='\t',
    index_col=False, 
    header=None, 
    names=['A_sentiment','B_sentiment','C_sentiment','majority_value','derived_value','pre-annotated_smiley',
    'pre-annotated_review', 'split', 'batch', 'idx', 'text'],
)

In [None]:
mapping={1:'neg', 2:'neg', 3:'neut', 4:'pos', 5:'pos'}

fin_sent['label']=fin_sent.derived_value.map(mapping)
fin_sent.head()

In [None]:
# Explore the label balance
sns.countplot(data=fin_sent, x='label'); # Not such a good balance...! 

In [None]:
# downsize the data and balance classes # TODO: how to deal better with class imbalance!
pos=fin_sent[fin_sent.label=='pos']
neg=fin_sent[fin_sent.label=='neg']
neut=fin_sent[fin_sent.label=='neut']
neg=neg[:len(pos)]
neut=neut[:len(pos)]
test=pd.concat([pos, neut, neg])

sns.countplot(data=test, x='label'); 

# SVM baseline

In [None]:
y=fin_sent['label']
X=fin_sent['text']

X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.25, random_state=123)

vectorizer=TfidfVectorizer() # TODO: Better params?
feature_matrix_train=vectorizer.fit_transform(X_train)
feature_matrix_test=vectorizer.transform(X_test)

cost=[]
acc=[]
t0 = time() # start timer
for C in (0.001,0.01,0.1,1,10,100):
    classifier=sklearn.svm.LinearSVC(C=C, max_iter=5000)
    classifier.fit(feature_matrix_train, y_train)
    # print(f"C={C}\t{classifier.score(feature_matrix_test, y_test):0.2f}")
    cost.append(C)
    acc.append(classifier.score(feature_matrix_test, y_test))
t1 = time() # end timer

print(f"Fitting and evaluating the model took {(t1-t0):0.2f} seconds.")

best_cost=cost[np.argmax(acc)]
svm_classifier=sklearn.svm.LinearSVC(C=best_cost, max_iter=5000)
svm_classifier.fit(feature_matrix_train, y_train)
preds=svm_classifier.predict(feature_matrix_test)

In [None]:
# Columns represent the predicted labels and the rows represent the real labels

from sklearn.metrics import confusion_matrix

labels=list(set(y_train)) # labels for the image, not for the data :)
labels.sort()

cf_mat=confusion_matrix(y_test, preds, labels=labels)

def plot_cf_matrix(mat):
  sns.heatmap(mat, annot=True, fmt="d", xticklabels=labels, yticklabels=labels)#, annot_kws=m)
  plt.title("Confusion matrix for test data", fontsize = 16)
  plt.ylabel("True class", fontsize = 14)
  plt.xlabel("Predicted class", fontsize = 14)

plot_cf_matrix(cf_mat)

print(f"Model mean accuracy {svm_classifier.score(feature_matrix_test, y_test)}") # TODO: precision, recall, F1

# BERT

In [None]:
MODEL_NAME ='TurkuNLP/bert-base-finnish-cased-v1' # name from Hugging Face repository
BATCH_SIZE = 64 # Not optimized.
LEARNING_RATE = 2e-5 # Super important! Try this: 1e-5
TRAIN_EPOCHS = 2

In [None]:
tokenizer = transformers.AutoTokenizer.from_pretrained(MODEL_NAME) 

In [None]:
# Stratifying by column is only supported for ClassLabel column so make it!

class_names = ['pos', 'neut', 'neg']
dataset = datasets.Dataset.from_pandas(
  fin_sent[['text', 'label']], 
  #preserve_index=False, 
  features=datasets.Features( # provide the feature classes to make sure you get what you need
    {'text': datasets.Value('string'),
     'label': datasets.ClassLabel(names=class_names) # values 0 and 1, label names accept and reject
     }
     )
)

In [None]:
# is the data ok? take a look:
for i in range(5):
  print('Label:',dataset['label'][i], '\nFeedback:', dataset['text'][i])
  print()

In [None]:
# Split the data first in two parts. Note that this method shuffles the data and stratifies.

# Returns a dictionary (datasets.DatsetDict) with two random train and test subsets (train and test Dataset splits).
train_test = dataset.train_test_split(test_size=0.15, stratify_by_column='label') # 0.3

# Split the test data again to gain two datasets: one for development and one for testing
test_data = train_test['test'].train_test_split(test_size=0.5) # 0.2

# Store the data in a DatasetDict (so we can use a map function later to tokenize the data)
dataset = datasets.DatasetDict({
    'train': train_test['train'],
    'development': test_data['test'],
    'test': test_data['train']})

print(dataset) # check the splits

In [None]:
# truncation = True: The tokenizer limits the input size to 512 tokens. See QA-code for longer input if needed!

def encode_dataset(d):
  return tokenizer(d['text'], max_length=512, truncation=True)#,return_tensors='pt') 


encoded_dataset = dataset.map(encode_dataset) #tokenize all of the data with map-method

In [None]:
encoded_dataset=encoded_dataset.remove_columns('text')
encoded_dataset

In [None]:
def compute_metrics(pred):
    y_pred = pred.predictions.argmax(axis=1) 
    # we get the probability distribution out and the highest is selected with argmax
    y_true = pred.label_ids
    TP = len([a and b for a, b in zip(y_pred, y_true) if a == 1 and b == 1])
    TN = len([a and b for a, b in zip(y_pred, y_true) if a == 0 and b == 0])
    FN = len([a and b for a, b in zip(y_pred, y_true) if a == 0 and b == 1])
    FP = len([a and b for a, b in zip(y_pred, y_true) if a == 1 and b == 0])

    ACC = (TP+TN)/(TP+FP+FN+TN) # Overall accuracy
    PRE = TP/(TP+FP) # Precision: share of relevant items
    REC = TP/(TP+FN) # Recall: proportion of relevant items found
    F1 = (2*((PRE*REC)/(PRE+REC))) # Balance between precision and recall
    return {'accuracy': ACC,
            'precision': PRE, 
            'recall': REC,
            'F1-score':F1
            }

In [None]:
model = transformers.AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=len(set(df['label'])))

# Training arguments how to train, when to save the model weights
train_args = transformers.TrainingArguments(
    '/Users/kittiha/Analyysit/Teksti_analytiikkaa/BERT_model',
    save_strategy='epoch',
    evaluation_strategy='epoch', # look at the performence measures (accuracy, precision, recall, F1-score, what ever defined) after each epoch
    logging_strategy='epoch',
    learning_rate=LEARNING_RATE, 
    per_device_train_batch_size=BATCH_SIZE, 
    num_train_epochs=TRAIN_EPOCHS, 
    metric_for_best_model='F1-score', # use with Early Stopping callback
    load_best_model_at_end=True,      # restore the best model when training finishes
    weight_decay=0.01                # strength of weight decay
)

early_stopping = transformers.EarlyStoppingCallback(early_stopping_patience = 2)

In [None]:
trainer = transformers.Trainer(
    model,
    train_args,
    train_dataset = encoded_dataset['train'],
    eval_dataset = encoded_dataset['development'],
    tokenizer = tokenizer,
    compute_metrics = compute_metrics,
    callbacks=[early_stopping]
)

In [None]:
trainer.train() 
ml_metrics = trainer.evaluate() # check, that the best model was reloaded
print(ml_metrics)