In [None]:
from collections import OrderedDict, defaultdict, Counter
import re
import pandas as pd
from nltk.tokenize import TweetTokenizer
import nltk
import numpy as np
from nltk.corpus import stopwords
from sklearn import metrics
from sklearn.metrics import classification_report
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler

In [None]:
#removing the stopwords
def remove_stopwords(text, is_lower_case=False):
    tweet_tokenizer = TweetTokenizer()
    stopword_list=nltk.corpus.stopwords.words('english')
    tokens = tweet_tokenizer.tokenize(text)
    tokens = [token.strip() for token in tokens]
    if is_lower_case:
        filtered_tokens = [token for token in tokens if token not in stopword_list]
    else:
        filtered_tokens = [token for token in tokens if token.lower() not in stopword_list]
    filtered_text = ' '.join(filtered_tokens)    
    return filtered_text

In [None]:
def clean_df(file):
    data = pd.read_csv(file, sep='\t', names=["id", "polarity", "tweet"])
    data = data.drop_duplicates()
    data['tweet']=data['tweet'].apply(remove_stopwords)
    data["tweaet"] = data["tweet"].str.lower() # lowercase
    data = data.reset_index(drop=True)
    return data

In [None]:
def generate_mpqa_count(tweet, tokenizer):
    count_pos = 0
    for token in tokenizer.tokenize(tweet):
        token = token.lower()
        if token in word_dict.keys():
            for value in word_dict[token]:
                if value == 'positive':
                    count_pos += 1
    result_dict = dict()
    result_dict['positive'] = count_pos
    return result_dict

In [None]:
def count_total_score(tweet, tokenizer):
    neg_list = []
    pos_list = []
    for token in tokenizer.tokenize(tweet):
        token = token.lower()
        if len(word_dict[token]) >= 3:
            score = word_dict[token][2]
            if score == 1:
                pos_list.append(1)
            else:
                neg_list.append(-1)
    return {'total_sum': sum(pos_list) + sum(neg_list)}

In [None]:
def max_token(tweet, tokenizer):
    
    neg_list = []
    pos_list = []
    
    for token in tokenizer.tokenize(tweet):
        token = token.lower()
        if len(word_dict[token]) >= 3:
            score = word_dict[token][2]
            if score == 1:
                pos_list.append(1)
            else:
                neg_list.append(-1)

    try:
        pos_max = max(pos_list)
    except ValueError:
        pos_max = 0
    try:
        neg_max = min(neg_list)
    except ValueError:
        neg_max = 0
        
    return {'max' : pos_max}

In [None]:
def last_token(tweet, tokenizer):
    for token in reversed(tokenizer.tokenize(tweet)):
        token = token.lower()
        if len(word_dict[token]) >= 3:
            score = word_dict[token][2]
            if score == 1:
                return {'last_polarity' : 1}
            elif score == -1:
                return {'last_polarity' : -1}
            else:
                continue
        else:
            continue
    
    return {'last_polarity' : 0}

In [None]:
def all_feats(tweet, tokenizer):
    count_tokens = generate_mpqa_count(tweet, tokenizer)
    pol = count_total_score(tweet, tokenizer)
    max_tok = max_token(tweet, tokenizer)
    last = last_token(tweet, tokenizer)
    result = dict()
    for dictionary in [count_tokens, pol, max_tok, last]:
        result.update(dictionary)
    return result

In [None]:
def get_feature(only_tweet_data, tokenizer):
    mpqa_counts = [all_feats(tweet, tokenizer) for tweet in only_tweet_data]
    mpqa_df = pd.DataFrame(mpqa_counts, index=only_tweet_data.index)
    mpqa_df = mpqa_df.fillna(0)
    mpqa_np = mpqa_df.to_numpy()
    return mpqa_np

In [None]:
data = pd.read_csv('./data/subjectivity_clues_hltemnlp05/subjclueslen1-HLTEMNLP05.tff', sep='\t',names=["values"])
result = []
for i in range(len(data["values"])):
    token = data["values"][i].split()
    for j in range(6):
        token[j] = token[j][token[j].index("=")+1:]
    result.append(token)
word_dict = defaultdict(list)

for word_line in result:
    word_dict[word_line[2]] = [word_line[0],word_line[5]]
    if word_line[5] == "negative":
        word_dict[word_line[2]].append(-1)
    elif word_line[5] == "positive":
        word_dict[word_line[2]].append(1)

In [None]:
tweet_tokenizer = TweetTokenizer()
train_data = clean_df("./data/dataset/twitter-2013train-A.txt")
only_tweet_train_data = train_data['tweet']
train_mpqa_feature = get_feature(only_tweet_train_data, tweet_tokenizer)

train_labels = train_data.polarity
result = []
for x in train_labels:
    if x == "positive":
        result.append(2)
    elif x == "negative":
        result.append(1)
    elif x == "neutral":
        result.append(0)
train_labels = np.array(result)
scaler = StandardScaler()
train_mpqa_feature = scaler.fit_transform(train_mpqa_feature)
train_features = np.array(train_mpqa_feature)

print("train labels: ", train_labels) 
print("train features:", train_features) 
print("train labels shape: ", train_labels.shape) 
print("train features shape:", train_features.shape)

In [None]:
tweet_tokenizer = TweetTokenizer()
dev_data = clean_df("./data/dataset/twitter-2013dev-A.txt")
only_tweet_dev_data = dev_data['tweet']
dev_mpqa_feature = get_feature(only_tweet_dev_data, tweet_tokenizer)

dev_labels = dev_data.polarity
result = []
for x in dev_labels:
    if x == "positive":
        result.append(2)
    elif x == "negative":
        result.append(1)
    elif x == "neutral":
        result.append(0)
dev_labels = np.array(result)
scaler = StandardScaler()
dev_mpqa_feature = scaler.fit_transform(dev_mpqa_feature)
dev_features = np.array(dev_mpqa_feature)

print("dev labels: ", dev_labels) 
print("dev features:", dev_features) 
print("dev_labels shape: ", dev_labels.shape) 
print("dev_features shape:", dev_features.shape) 

In [None]:

clf = SVC(kernel='linear', C=0.005, probability=True)

sample_weight = np.array([3.14 if i == 1 else 1 for i in train_labels])

clf.fit(train_features, train_labels, sample_weight = sample_weight)

predictions = clf.predict(dev_features)

print("A Classification Report showing the per-class Precision, Recall and F1-score \n\n", metrics.classification_report(dev_labels, predictions,target_names=['neutral','negative','positive']))

In [None]:

clf = SVC(kernel='linear', C=1, probability=True)

sample_weight = np.array([3.14 if i == 1 else 1 for i in train_labels])

clf.fit(train_features, train_labels, sample_weight = sample_weight)

predictions = clf.predict(dev_features)

print("A Classification Report showing the per-class Precision, Recall and F1-score \n\n", metrics.classification_report(dev_labels, predictions,target_names=['neutral','negative','positive']))

In [None]:

clf = SVC(kernel='linear', C=0.005, probability=True)

arr = []

for i in train_labels:
    if i == 1:
        arr.append(3.14)
    elif i == 2:
        arr.append(1.25)
    else:
        arr.append(1)

sample_weight = np.array(arr)
clf.fit(train_features, train_labels, sample_weight = sample_weight)

predictions = clf.predict(dev_features)

print("A Classification Report showing the per-class Precision, Recall and F1-score \n\n", metrics.classification_report(dev_labels, predictions,target_names=['neutral','negative','positive']))

In [None]:

clf = SVC(kernel='linear', C=1, probability=True)

arr = []

for i in train_labels:
    if i == 1:
        arr.append(3.14)
    elif i == 2:
        arr.append(1.25)
    else:
        arr.append(1)


sample_weight = np.array(arr)
clf.fit(train_features, train_labels, sample_weight = sample_weight)

predictions = clf.predict(dev_features)

print("A Classification Report showing the per-class Precision, Recall and F1-score \n\n", metrics.classification_report(dev_labels, predictions,target_names=['neutral','negative','positive']))

In [None]:
from sklearn.model_selection import GridSearchCV
  
param_grid = {'C': [0.005, 0.1, 0.5, 1], 
              'kernel': ['linear','rbf']} 
  
grid = GridSearchCV(SVC(), param_grid, refit = True, verbose = 3)
  
grid.fit(train_features, train_labels)

print(grid.best_params_)
  
print(grid.best_estimator_)

grid_predictions = grid.predict(dev_features)
  
print(classification_report(dev_labels, grid_predictions))

In [None]:
tweet_tokenizer = TweetTokenizer()
dev_data = clean_df("./data/dataset/twitter-2013test-A.txt")
only_tweet_dev_data = dev_data['tweet']
dev_mpqa_feature = get_feature(only_tweet_dev_data, tweet_tokenizer)

dev_labels = dev_data.polarity
result = []
for x in dev_labels:
    if x == "positive":
        result.append(2)
    elif x == "negative":
        result.append(1)
    elif x == "neutral":
        result.append(0)
dev_labels = np.array(result)
scaler = StandardScaler()
dev_mpqa_feature = scaler.fit_transform(dev_mpqa_feature)
dev_features = np.array(dev_mpqa_feature)

print("dev labels: ", dev_labels) 
print("dev features:", dev_features) 
print("dev_labels shape: ", dev_labels.shape) 
print("dev_features shape:", dev_features.shape) 

In [None]:

file = open("mpaq_lexicon_less_features_paper_vector.txt", "w+")
for i in dev_mpqa_feature:
    content = str(i)
    file.write(content)
file.close()

In [None]:

clf = SVC(kernel='linear', C=0.005, probability=True)

sample_weight = np.array([3.14 if i == 1 else 1 for i in train_labels])

clf.fit(train_features, train_labels, sample_weight = sample_weight)

predictions = clf.predict(dev_features)

print("A Classification Report showing the per-class Precision, Recall and F1-score \n\n", metrics.classification_report(dev_labels, predictions,target_names=['neutral','negative','positive']))

In [None]:

clf = SVC(kernel='linear', C=1, probability=True)

sample_weight = np.array([3.14 if i == 1 else 1 for i in train_labels])

clf.fit(train_features, train_labels, sample_weight = sample_weight)

predictions = clf.predict(dev_features)

print("A Classification Report showing the per-class Precision, Recall and F1-score \n\n", metrics.classification_report(dev_labels, predictions,target_names=['neutral','negative','positive']))

In [None]:

clf = SVC(kernel='linear', C=0.005, probability=True)

arr = []

for i in train_labels:
    if i == 1:
        arr.append(3.14)
    elif i == 2:
        arr.append(1.25)
    else:
        arr.append(1)


sample_weight = np.array(arr)
clf.fit(train_features, train_labels, sample_weight = sample_weight)

predictions = clf.predict(dev_features)

print("A Classification Report showing the per-class Precision, Recall and F1-score \n\n", metrics.classification_report(dev_labels, predictions,target_names=['neutral','negative','positive']))

In [None]:

clf = SVC(kernel='linear', C=1, probability=True)

arr = []

for i in train_labels:
    if i == 1:
        arr.append(3.14)
    elif i == 2:
        arr.append(1.25)
    else:
        arr.append(1)


sample_weight = np.array(arr)
clf.fit(train_features, train_labels, sample_weight = sample_weight)

predictions = clf.predict(dev_features)

print("A Classification Report showing the per-class Precision, Recall and F1-score \n\n", metrics.classification_report(dev_labels, predictions,target_names=['neutral','negative','positive']))

In [None]:
from sklearn.model_selection import GridSearchCV
  
param_grid = {'C': [0.005, 0.1, 0.5, 1], 
              'kernel': ['linear','rbf']} 
  
grid = GridSearchCV(SVC(), param_grid, refit = True, verbose = 3)
  
grid.fit(train_features, train_labels)

print(grid.best_params_)
  
print(grid.best_estimator_)

grid_predictions = grid.predict(dev_features)
  
print(classification_report(dev_labels, grid_predictions))