In [None]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import re 
from collections import Counter
from sklearn.metrics import classification_report, make_scorer
from sklearn.model_selection import train_test_split
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import accuracy_score
from sklearn.svm import SVC
from sklearn.svm import LinearSVC
from sklearn.preprocessing import PolynomialFeatures
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import GridSearchCV, cross_val_predict, StratifiedShuffleSplit, train_test_split
import os.path as op
import sys
import joblib

## Transform labels into vectors

In [None]:
data=pd.read_csv(r"") #Import datasets with labels according to emotions
data=data.rename(columns={"text": "Text", "emotion": "Emotion"})
data=data.drop(["Unnamed: 0","Clean_Text"],axis=1)
data["vec_emo"]=""
for i in tqdm(range(0,len(data.index))): #possibility to use up to 7 emotions
    if data["Emotion"][i]=="joy":
        data["vec_emo"][i]="[ 1.  0.  0.  0.  0.  0.  0.]"
    elif data["Emotion"][i]=="fear":
        data["vec_emo"][i]="[ 0.  1.  0.  0.  0.  0.  0.]"
    elif data["Emotion"][i]=="anger":
        data["vec_emo"][i]="[ 0.  0.  1.  0.  0.  0.  0.]"
    elif data["Emotion"][i]=="sadness":
        data["vec_emo"][i]="[ 0.  0.  0.  1.  0.  0.  0.]"
    elif data["Emotion"][i]=="disgust":
        data["vec_emo"][i]="[ 0.  0.  0.  0.  1.  0.  0.]"
    elif data["Emotion"][i]=="shame":
        data["vec_emo"][i]="[ 0.  0.  0.  0.  0.  1.  0.]"
    elif data["Emotion"][i]=="guilt":
        data["vec_emo"][i]="[ 0.  0.  0.  0.  0.  0.  1.]"
data=data.drop(["Emotion"],axis=1)
data=data[["vec_emo","Text"]]
data.to_csv(r'export_data_in_txt_file', header=None, index=None, sep='\t', mode='a')

## Train classifier

In [None]:
#Import dataset and preprocess the text, scripts from 

basename = r"" #connect to basename
path_utils = op.join(basename , "utils")
sys.path.insert(0, path_utils)

from sys_utils import load_library
from tweet_utils import *

from preprocess import Preprocess
prep = Preprocess()

def preprocess_tweet(tweet):
    tweet = prep.replace_contractions(tweet)
    tweet = prep.replace_hashtags_URL_USER(tweet, mode_URL="delete", mode_Mentions="delete")
    tweet = prep.remove_repeating_characters(tweet)
    tweet = prep.remove_repeating_words(tweet)
    tweet = prep.tokenize(tweet)
    tweet = prep.to_lowercase(tweet)
    tweet = prep.remove_non_ascii(tweet)
    tweet = prep.replace_numbers(tweet)
    tweet = " ".join([word for word in tweet])
    return tweet

def read_data(file):
    data = []
    with open(file, 'r',encoding="utf8")as f:
        for line in f:
            line = line.strip()
            label = ' '.join(line[1:line.find("]")].strip().split())
            text = line[line.find("]")+1:].strip()
            data.append([label, text])
    return data

file = '' #text file name
df = read_data(file)
print("Number of instances: {}".format(len(df)))

#Preprocess
for i in tqdm(range(len(df))):
    df[i][1]=preprocess_tweet(df[i][1])

In [None]:
def ngram(token, n): 
    output = []
    for i in range(n-1, len(token)): 
        ngram = ' '.join(token[i-n+1:i+1])
        output.append(ngram) 
    return output

def create_feature(text, nrange=(1, 1)):
    text_features = [] 
    text = text.lower() 
    text_alphanum = re.sub('[^a-z0-9#]', ' ', text)
    for n in range(nrange[0], nrange[1]+1): 
        text_features += ngram(text_alphanum.split(), n)    
    text_punc = re.sub('[a-z0-9]', ' ', text)
    text_features += ngram(text_punc.split(), 1)
    return Counter(text_features)

def convert_label(item, name): 
    items = list(map(float, item.split()))
    label = ""
    for idx in range(len(items)): 
        if items[idx] == 1: 
            label += name[idx] + " "
    
    return label.strip()

emotions = ["joy", 'fear', "anger", "sadness","disgust", "shame", "guilt"]


X_all = []
y_all = []
for label, text in df:
    y_all.append(convert_label(label, emotions))
    X_all.append(create_feature(text, nrange=(1, 4)))


X_train, X_test, y_train, y_test = train_test_split(X_all, y_all, test_size = 0.2, random_state = 150)


def train_test(clf, X_train, X_test, y_train, y_test):
    clf.fit(X_train, y_train)
    train_acc = accuracy_score(y_train, clf.predict(X_train))
    test_acc = accuracy_score(y_test, clf.predict(X_test))
    return train_acc, test_acc

from sklearn.feature_extraction import DictVectorizer
vectorizer = DictVectorizer(sparse = True)
X_train = vectorizer.fit_transform(X_train)
X_test = vectorizer.transform(X_test)

svc = SVC(random_state=32)
svm = LinearSVC(tol=1e-1,random_state=32,max_iter=10000)
lsvc = CalibratedClassifierCV(svm)
rforest = RandomForestClassifier(random_state=32)
dtree = DecisionTreeClassifier()

clifs = [lsvc] #svm, svc, rforest, dtree

print("| {:25} | {} | {} |".format("Classifier", "Training Accuracy", "Test Accuracy"))
print("| {} | {} | {} |".format("-"*25, "-"*17, "-"*13))
for clf in clifs: 
    clf_name = clf.__class__.__name__
    train_acc, test_acc = train_test(clf, X_train, X_test, y_train, y_test)
    print("| {:25} | {:17.7f} | {:13.7f} |".format(clf_name, train_acc, test_acc))
    
print("-----Saving model-----")
import joblib
pipeline_file = open("file to save the model, save as .pkl","wb")
joblib.dump(clf,pipeline_file)
pipeline_file.close()

## Apply model to data

In [None]:
#Import model and data
loaded_model = joblib.load(r"") #load model

basename = r"" #path to basename
path_utils = op.join(basename , "utils")
sys.path.insert(0, path_utils)

from sys_utils import load_library
from tweet_utils import *

from preprocess import Preprocess
prep = Preprocess()

data=pd.read_csv(r"",usecols=["id","text"]) #import data to classify

In [None]:
def preprocess_tweet(tweet):
    tweet = prep.replace_contractions(tweet)
    tweet = prep.replace_hashtags_URL_USER(tweet, mode_URL="delete", mode_Mentions="delete")
    tweet = prep.remove_repeating_characters(tweet)
    tweet = prep.remove_repeating_words(tweet)
    tweet = prep.tokenize(tweet)
    tweet = prep.to_lowercase(tweet)
    tweet = prep.remove_non_ascii(tweet)
    tweet = prep.replace_numbers(tweet)
    tweet = " ".join([word for word in tweet])
    return tweet

data["clean_text"] = data.text.apply(preprocess_tweet) #preprocess tweet

def ngram(token, n): 
    output = []
    for i in range(n-1, len(token)): 
        ngram = ' '.join(token[i-n+1:i+1])
        output.append(ngram) 
    return output

def create_feature(text, nrange=(1, 1)):
    text_features = [] 
    text = text.lower() 
    text_alphanum = re.sub('[^a-z0-9#]', ' ', text)
    for n in range(nrange[0], nrange[1]+1): 
        text_features += ngram(text_alphanum.split(), n)    
    text_punc = re.sub('[a-z0-9]', ' ', text)
    text_features += ngram(text_punc.split(), 1)
    return Counter(text_features)

def convert_label(item, name): 
    items = list(map(float, item.split()))
    label = ""
    for idx in range(len(items)): 
        if items[idx] == 1: 
            label += name[idx] + " "
    
    return label.strip()

emotions = ["joy", 'fear', "anger", "sadness", "disgust" , "shame", "guilt"]


X_all = []
y_all = []
for text in data["clean_text"].to_list():
    X_all.append(create_feature(text, nrange=(1,4)))
    
X_class = vectorizer.transform(X_all)

#predict classes
probas_predicted=loaded_model.predict_proba(X_class)
class_predicted=loaded_model.predict(X_class)

data["proba_emotion"]=""
data["emotion"]=""
for i in tqdm(range(len(data.index))):
    data["proba_emotion"][i]=list(probas_predicted[i])
    data["emotion"][i]=class_predicted[i]
    
#export dataset with emotions
data.to_csv(r"path")