In [None]:
# Install libraries
!pip install tweet-preprocessor
!pip install greek_stemmer
!pip install pyyaml==5.4.1

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tweet-preprocessor
  Downloading tweet_preprocessor-0.6.0-py3-none-any.whl (27 kB)
Installing collected packages: tweet-preprocessor
Successfully installed tweet-preprocessor-0.6.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting greek_stemmer
  Downloading greek_stemmer-0.1.1.tar.gz (6.9 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: greek_stemmer
  Building wheel for greek_stemmer (setup.py) ... [?25l[?25hdone
  Created wheel for greek_stemmer: filename=greek_stemmer-0.1.1-py3-none-any.whl size=6721 sha256=1af49deddb51f2b0c036269c95580f6199e2e04f1782d18fda966dfd5acad95a
  Stored in directory: /root/.cache/pip/wheels/cd/5f/74/41c1d13e787f8aa958796c4fdc1738bb11afac2df1d4c6d815
Successfully built greek_stemmer
Installing collected packages: greek_stemmer
Successfully 

In [None]:
# Load general libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from time import time

# Load preprocessing libraries
import pandas as pd
import preprocessor as p
import unicodedata as ud
from sklearn.model_selection import train_test_split, PredefinedSplit
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.utils.class_weight import compute_sample_weight
import nltk
nltk.download('stopwords')
nltk.download('punkt')
from nltk.tokenize import word_tokenize
from greek_stemmer import GreekStemmer

# Load classifier libraries
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import GridSearchCV

# Load evaluation libraries
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score, precision_recall_fscore_support, classification_report

# Load imbalance libraries
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler

# saving/loading sklearn models
from joblib import dump, load

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [None]:
# Preprocess tweets
def preprocess_tweets(X):
    X = clean_text(X)
    X = use_stemming(X)
    return X

def clean_text(X):
    X = [tweet.lower() for tweet in X]

    # Remove URLS, mentions
    p.set_options(p.OPT.URL, p.OPT.MENTION)
    X = [p.clean(tweet) for tweet in X]

    # Remove Greek accents
    d = {ord('\N{COMBINING ACUTE ACCENT}'):None}
    X = [ud.normalize('NFD',word).translate(d) for word in X]
    return X

def use_stemming(X):
    stemmer = GreekStemmer()
    X_new = []
    for tweet in X:
        tokens = word_tokenize(tweet, language="greek")
        X_new.append(" ".join([stemmer.stem(word.upper()) for word in tokens]))
    return X_new

def resampling(X_train, y_train, strategy='auto'):
  #Oversample minority classes(positive, negative) to numbers of majority class(neutral)
  if strategy == 'auto':
    ros = RandomOverSampler(sampling_strategy=strategy, random_state=42)
    X_res, y_res = ros.fit_resample(X_train, y_train)
  #Oversample minority class(positive) to numbers of majority class(neutral), then undersample positive and neutral to numbers of negative class
  elif strategy == 'combination':
    ros = RandomOverSampler(sampling_strategy='minority', random_state=42)
    X_ros, y_ros = ros.fit_resample(X_train, y_train)
    rus = RandomUnderSampler(sampling_strategy='auto', random_state=42)
    X_res, y_res = rus.fit_resample(X_ros, y_ros)

  return X_res, y_res


In [None]:
# Train a standard (sklearn) classifier using grid search with selected validation split
def train_standard_classifier(classifier, X_model, y_model, X_test, y_test, val_split, weight):
    # Set hyperparameters to optimize in each case
    if classifier == "RF":
      clf = RandomForestClassifier(random_state=42, class_weight=weight)
      param_grid = { 'max_depth': [10, 50, 100, None],
                    'max_features': ['sqrt'],
                    'min_samples_leaf': [1, 2, 4],
                    'min_samples_split': [2, 5, 10],
                    'n_estimators': [10, 100, 200]}
    elif classifier == "DT":
      clf = DecisionTreeClassifier(random_state=42, class_weight=weight)
      param_grid = {'criterion' : ["gini", "entropy", "log_loss"],
                    'max_depth': [10, 40, 70, 100, 130, 160, 190, 220, 250, None],
                    'max_features': ['sqrt'],
                    'min_samples_leaf': [1, 2, 4],
                    'min_samples_split': [2, 5, 10]}
    elif classifier == "GB":
       clf = GradientBoostingClassifier(random_state=42)
       param_grid = {'max_depth': [10, 50, 100, None],
                    'max_features': ['sqrt'],
                    'min_samples_leaf': [1, 2, 5],
                    'min_samples_split': [2, 10]}
    elif classifier == "NB":
      clf = GaussianNB()
      param_grid = {'var_smoothing' : [1e-9, 1e-8, 1e-7, 1e-6, 1e-5]}
    elif classifier == "SVM":
       clf = SVC(random_state=42, class_weight=weight)
       param_grid = {'C': [0.1, 1, 10, 100],
              'gamma': [1, 0.1, 0.01],
              'kernel': ["poly", "rbf"],
              'decision_function_shape': ["ovr"]}
    else:
        return None
    print("fitting model", classifier)
    vps = PredefinedSplit(test_fold=val_split)
    CV = GridSearchCV(estimator=clf, scoring="f1_macro", param_grid=param_grid, cv=vps)
    start = time()
    CV.fit(X_model, y_model)
    train_time = time() - start
    print("Fitting ended, time required:", train_time, "seconds.")
    print("Best method params:", CV.best_params_)
    y_pred = CV.predict(X_test)
    multi_metrics = precision_recall_fscore_support(y_test, y_pred)
    acc = accuracy_score(y_test, y_pred)
    print("Precision =", multi_metrics[0])
    print("Recall =", multi_metrics[1])
    print("f1 =", multi_metrics[2])
    print("Accuracy =", acc)
    print()
    return CV.best_estimator_, train_time, [multi_metrics[0], multi_metrics[1], multi_metrics[2],
                                            [precision_score(y_test, y_pred, average='macro'), recall_score(y_test, y_pred, average='macro'),
                                             f1_score(y_test, y_pred, average='macro'), acc]]

# Selecting Classifier type among all major categories
def select_classifier(classifier, X_model, y_model, X_test, y_test, split_index, weight):
    if classifier == "LSTM":
        return train_LSTM()
    elif classifier == "BERT":
        return train_BERT()
    else:
        return train_standard_classifier(classifier, X_model, y_model, X_test, y_test, split_index, weight)

In [None]:
# Retrieve 2015 dataset from Github (!!!!!should probably change link at some point)

url = "https://raw.githubusercontent.com/dimosbele/sentiment_analysis_greek/master/modeling/data/df_final.pkl"
dataset = pd.read_pickle(url)
y = np.array(dataset["Sentiment"])
for i in range(len(y)):
    y[i] += 1

In [None]:
# vectorizing data for standard classifier

def get_split_indexer(X_model, X_train):
    return [(-1 if column in X_train.index else 0) for column in X_model.index]

def vectorize(data, vectorizer):
    X_vector = vectorizer.transform(data)
    words = vectorizer.get_feature_names_out()
    X_vector = pd.DataFrame(X_vector.toarray(), index=data.index)
    X_vector.columns = words
    return(X_vector)

def create_sets(dataset, has_text=True, has_features=True, resampling_method=None):
  data_X = dataset.drop(["Text_only", "Tokens_stem", "Tokens", "Sentiment", "Bigrams2", "Match_Terms"], axis=1)
  data_X["Text"] = preprocess_tweets(data_X["Text"])
  # splitting data in Model (Train and Validation) including split, as well as Test
  X_model, X_test, y_model, y_test = train_test_split(data_X, y, test_size = 0.2, random_state=42, stratify=y)
  X_train, X_val , y_train, y_val = train_test_split(X_model, y_model,test_size = 0.125, random_state=42, stratify=y_model)

  # resample X_train/y_train, then merge with val data into new X_model and y_model

  # splitting X data to vectorizables and non-vectorizables
  X_model_text = X_model["Text"]
  X_test_text = X_test["Text"]
  X_train_text = train_text_domain = X_train["Text"]
  X_val_text = X_val["Text"]

  X_train_metadata = X_train.drop(columns=['Text'])
  X_val_metadata = X_val.drop(columns=['Text'])
  X_model_metadata = X_model.drop(columns=['Text'])
  X_test_metadata = X_test.drop(columns=['Text'])

  # vectorizing model from text dataframes
  vectorizer = TfidfVectorizer()
  vectorizer_fit = vectorizer.fit(X_train_text)
  X_model_text = vectorize(X_model_text, vectorizer_fit)
  X_train_text = vectorize(X_train_text, vectorizer_fit)
  X_val_text = vectorize(X_val_text, vectorizer_fit)
  X_test_text = vectorize(X_test_text, vectorizer_fit)


  if(has_text and has_features):
    # merging subdataframes
    X_train = X_train_text.join(X_train_metadata)
    X_val = X_val_text.join(X_val_metadata)
    X_test = X_test_text.join(X_test_metadata)
    if(resampling_method is not None):
      X_res, y_res = resampling(X_train, y_train, resampling_method)
      #X_res = pd.DataFrame(X_res.toarray(), index=X_train.index)
      y_res = pd.DataFrame(y_res)
      y_val = pd.DataFrame(y_val)
      X_model = pd.concat([X_val, X_res])
      y_model = pd.concat([y_val, y_res]).to_numpy().ravel()
    else:
      X_model = X_model_text.join(X_model_metadata)
    return X_model, y_model, X_test, y_test, X_train, y_train, train_text_domain
  elif(has_text):
    return X_model_text, y_model, X_test_text, y_test, X_train_text, y_train, train_text_domain
  elif(has_features):
    return X_model_metadata, y_model, X_test_metadata, y_test, X_train_metadata, y_train, train_text_domain
  else:
    print("error on inputs")
    return None, None, None, None, None, None

def save_models(best_estimators, has_text=False, has_imb=""):
  IMBL_String = ""
  Text_String = "_Features"
  if(has_imb == 'auto'):
    IMBL_String = "_IMB_auto"
  elif(has_imb == 'combination'):
    IMBL_String = "_IMB_combination"
  if(has_text):
    Text_String = "_Features_and_Text"
  models = ["DT", "GB", "RF", "NB", "SVM"]
  for estimator, model in zip(best_estimators, models):
    String = "best_" + model + Text_String + IMBL_String + ".joblib"
    dump(estimator, String)

def load_models(has_text=False, has_imb=""):
  IMBL_String = ""
  Text_String = "_Features"
  if(has_imb == 'auto'):
    IMBL_String = "_IMB_auto"
  elif(has_imb == 'combination'):
    IMBL_String = "_IMB_combination"
  if(has_text):
    Text_String = "_Features_and_Text"
  models = ["DT", "GB", "RF", "NB", "SVM"]
  best_estimators = []
  for model in models:
    String = "best_" + model + Text_String + IMBL_String + ".joblib"
    estimator = load(String)
    best_estimators.append(estimator)
  return best_estimators


In [None]:
X_model, y_model, X_test, y_test, X_train, y_train, train_text = create_sets(dataset, True, True, None)
weight = "balanced"
split_index = get_split_indexer(X_model, X_train)
print("Train/Val shape:", X_model.shape)
print("Train shape:", X_train.shape)
print("Test Shape:", X_test.shape)
print("Split Index length:", len(split_index))


  custom_rules = yaml.load(f.read())


Train/Val shape: (1312, 2733)
Train shape: (1148, 2733)
Test Shape: (328, 2733)
Split Index length: 1312


In [None]:
X_model, y_model, X_test, y_test, X_train, y_train, train_text = create_sets(dataset, True, False, "auto")
weight = None
split_index = get_split_indexer(X_model, X_train)
print("Train/Val shape:", X_model.shape, y_model.shape)
print("Train shape:", X_train.shape)
print("Test Shape:", X_test.shape)
print("Split Index length:", len(split_index))


  custom_rules = yaml.load(f.read())


Train/Val shape: (1312, 2704) (1312,)
Train shape: (1148, 2704)
Test Shape: (328, 2704)
Split Index length: 1312


In [None]:
# Using Classifiers
best_val_models = []
best_scores = []
train_times = []
models = ["DT", "GB", "RF", "NB", "SVM"]
print("weight is ", weight)
for model in models:
  classifier = model
  good_model, train_time, scores = select_classifier(classifier, X_model, y_model, X_test, y_test, split_index, weight)
  best_val_models.append(good_model)
  best_scores.append(scores)
  train_times.append(train_time)

weight is  None
fitting model DT
Fitting ended, time required: 12.088324785232544 seconds.
Best method params: {'criterion': 'gini', 'max_depth': 70, 'max_features': 'sqrt', 'min_samples_leaf': 1, 'min_samples_split': 2}
Precision = [0.73267327 0.77619048 0.23529412]
Recall = [0.63793103 0.83163265 0.25      ]
f1 = [0.68202765 0.80295567 0.24242424]
Accuracy = 0.7347560975609756

fitting model GB
Fitting ended, time required: 115.36433172225952 seconds.
Best method params: {'max_depth': 100, 'max_features': 'sqrt', 'min_samples_leaf': 2, 'min_samples_split': 2}
Precision = [0.80392157 0.81363636 1.        ]
Recall = [0.70689655 0.91326531 0.375     ]
f1 = [0.75229358 0.86057692 0.54545455]
Accuracy = 0.8140243902439024

fitting model RF
Fitting ended, time required: 59.56161308288574 seconds.
Best method params: {'max_depth': 100, 'max_features': 'sqrt', 'min_samples_leaf': 1, 'min_samples_split': 2, 'n_estimators': 10}
Precision = [0.8        0.81081081 0.83333333]
Recall = [0.6896551

In [None]:
save_models(best_val_models, True, "auto")

In [None]:
# loaded_models = load_models()
for model in best_val_models:
  print(model)
  y_pred = model.predict(X_test)
  print(classification_report(y_test, y_pred))

DecisionTreeClassifier(criterion='entropy', max_depth=40, max_features='sqrt',
                       random_state=42)
              precision    recall  f1-score   support

           0       0.61      0.62      0.62       116
           1       0.77      0.77      0.77       196
           2       0.31      0.31      0.31        16

    accuracy                           0.69       328
   macro avg       0.57      0.57      0.57       328
weighted avg       0.69      0.69      0.69       328

GradientBoostingClassifier(max_depth=10, max_features='sqrt',
                           min_samples_leaf=2, random_state=42)
              precision    recall  f1-score   support

           0       0.83      0.75      0.79       116
           1       0.84      0.91      0.88       196
           2       0.70      0.44      0.54        16

    accuracy                           0.83       328
   macro avg       0.79      0.70      0.73       328
weighted avg       0.83      0.83      0.83     

In [None]:
loaded_models = load_models(True, "auto")

In [None]:
GB_model = loaded_models[1]
X_elec = pd.read_csv("/content/sample_data/Εκλογές_2023_final.csv")
X_mits = pd.read_csv("/content/sample_data/Μητσοτακης_final.csv")
X_nd = pd.read_csv("/content/sample_data/ΝΔ_final.csv")
X_sir = pd.read_csv("/content/sample_data/ΣΥΡΙΖΑ_final.csv")
X_tsip = pd.read_csv("/content/sample_data/Τσιπρας_final.csv")

X_elec = preprocess_tweets(X_elec["0"])
X_mits = preprocess_tweets(X_mits["0"])
X_nd = preprocess_tweets(X_nd["0"])
X_sir = preprocess_tweets(X_sir["0"])
X_tsip = preprocess_tweets(X_tsip["0"])

vectorizer = TfidfVectorizer()
vectorizer.fit(train_text)
X_elec = vectorizer.transform(X_elec)
X_mits = vectorizer.transform(X_mits)
X_nd = vectorizer.transform(X_nd)
X_sir = vectorizer.transform(X_sir)
X_tsip = vectorizer.transform(X_tsip)

y_elec = GB_model.predict(X_elec)
y_mits = GB_model.predict(X_mits)
y_nd = GB_model.predict(X_nd)
y_sir = GB_model.predict(X_sir)
y_tsip = GB_model.predict(X_tsip)

  custom_rules = yaml.load(f.read())


In [None]:
pd.DataFrame(y_elec).to_csv("/content/sample_data/GB_elections.csv")
pd.DataFrame(y_mits).to_csv("/content/sample_data/GB_Mitsotakis.csv")
pd.DataFrame(y_nd).to_csv("/content/sample_data/GB_ND.csv")
pd.DataFrame(y_sir).to_csv("/content/sample_data/GB_Siriza.csv")
pd.DataFrame(y_tsip).to_csv("/content/sample_data/GB_Tsipras.csv")