In [3]:
try:
    spark.stop()
except:
    pass

# Using findspark to find automatically the spark folder
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

In [4]:
import string

# Remove punctuations from the sentence
def remove_punctuation(sentence):
    punctuations = list(string.punctuation)
    extra_punctuations = ['.', '``', '...', '\'s', '--', '-', 'n\'t', '_', '–']
    punctuations += extra_punctuations
    filtered = [w for w in sentence.lower() if w not in punctuations]
    return ("".join(filtered)).split()


In [5]:
from pyspark.ml.feature import HashingTF, IDF

# Calculate term frequency–inverse document frequency for reflecting importance of words in Tweet.
# :param data_rdd: input data rdd
# :return: transformed dataframe

def tf_idf(data_rdd):
    data_rdd_df = data_rdd.toDF()
    hashing_tf = HashingTF(inputCol = 'words', outputCol = 'tf_features')
    tf_data = hashing_tf.transform(data_rdd_df)

    idf_data = IDF(inputCol = 'tf_features', outputCol = 'features').fit(tf_data)
    tf_idf_data = idf_data.transform(tf_data)
    return tf_idf_data.select(['label', 'words', 'features'])


In [6]:
import numpy as np
import pandas as pd

from pyspark.ml.classification import NaiveBayes, LogisticRegression

# Apply Naive Bayes Classifier to test data for predicting sentiment of Tweets.
# :param training_df: Trained labelled data
# :param testing_df: Test data
# :return: transformed dataframe of predicted labels for tweets

def naive_bayes_classifier(training_df, testing_df):
    nb = NaiveBayes()
    model = nb.fit(training_df)

    return model.transform(testing_df).select(['label', 'words', 'prediction'])



# Apply Logistic Regression Classifier to test data for predicting sentiment of Tweets.
# :param training_df: Trained labelled data
# :param testing_df: Test data
# :return: transformed dataframe of predicted labels for tweets  
def logistic_regression_classifier(training_df, testing_df):
    lor = LogisticRegression(regParam = 0.01)
    model = lor.fit(training_df)
    return model.transform(testing_df).select(['label', 'words', 'prediction'])


# Calculate accuracy of model against actual data
def calculate_accuracy(result_df):
    return 1.0 * result_df.filter(result_df.label == result_df.prediction).count() / result_df.count()


# Generate Confusion Matrix for showing the performance of algorithm.
# :param result_df: Dataframe returned from the model
# :return: pandas dataframe
def confusion_matrix(result_df):
    true_positives = result_df.filter((result_df.label == 1.0) & (result_df.prediction == 1.0)).count()
    true_negatives = result_df.filter((result_df.label == 0.0) & (result_df.prediction == 0.0)).count()
    false_positives = result_df.filter((result_df.label == 0.0) & (result_df.prediction == 1.0)).count()
    false_negatives = result_df.filter((result_df.label == 1.0) & (result_df.prediction == 0.0)).count()
 
    print('true_positives', true_positives)
    print('true_negatives', true_negatives)
    print('false_positives', false_positives)
    print('false_negatives', false_negatives)


In [7]:
from nltk.corpus import stopwords
from pyspark.ml.feature import StopWordsRemover
from stop_words import get_stop_words


def get_rdd_form_text(file_path):
    data = sc.textFile(file_path)
    col_rdd = data.map(lambda x: (x.split('\t')[0], x[-1]))
    punctuation_removed_rdd = col_rdd.map(lambda x: (remove_punctuation(x[0]), float(x[1])))
    return punctuation_removed_rdd


def stop_words_remover(stopWords, data_df):
    remover = StopWordsRemover(inputCol = 'text', outputCol = 'words', stopWords = stopWords)
    return remover.transform(data_df).select(['label', 'words'])


def get_en_mixed_data():
    punctuation_removed_rdd = get_rdd_form_text('data/data.txt')
    stopWords = stopwords.words('english')
    data_df = sqlContext.createDataFrame(punctuation_removed_rdd, ['text', 'label'])
    data_df = stop_words_remover(stopWords = stopWords, data_df = data_df)
    return data_df


def get_uk_mixed_data():
    sqlContext = SQLContext(sc)
    path = 'data/data.json'
    tweetsDF = sqlContext.read.json(path)
    data = tweetsDF.select('isPositive', 'message')
    filteredData = data.rdd.filter(lambda s: s.message)
    mappedData = filteredData.map(lambda s: (float(1 if s.isPositive == True else 0), remove_punctuation(s.message)))

    data_df = sqlContext.createDataFrame(mappedData, ['label', 'text'])
    customWords = ['погано', 'поганий', 'добре', 'добрий']
    stopWords = get_stop_words('ukrainian') + customWords
    data_df = stop_words_remover(stopWords = stopWords, data_df = data_df)
    return data_df


def get_uk_twitter_data():
    sqlContext = SQLContext(sc)
    path = 'data/twitter-data-3.json'
    tweetsDF = sqlContext.read.json(path)
    data = tweetsDF.select('isPositive', 'text')
    filteredData = data.rdd.filter(lambda s: s.text)
    mappedData = filteredData.map(lambda s: (float(1 if s.isPositive == True else 0), remove_punctuation(s.text)))

    data_df = sqlContext.createDataFrame(mappedData, ['label', 'text'])
    customWords = ['добре', 'добрий', '😊', '😘', 'погано', 'поганий',  '😭', '😡']
    stopWords = get_stop_words('ukrainian') + customWords
    data_df = stop_words_remover(stopWords = stopWords, data_df = data_df)
    return data_df

# def test_data():
#     data = sc.textFile("data/test.txt")
#     col_rdd = data.map(lambda x: (x.split('\t')[0], x[-1]))
#     punctuation_removed_rdd = col_rdd.map(lambda x: (float(x[1]), remove_punctuation(x[0])))
#     data_df = sqlContext.createDataFrame(punctuation_removed_rdd, ["label", 'words'])
#     return data_df
# test_data()


# Display accuracy and confusion matrix for the models
# :param predicted_df: predicted dataframe of test data
def show_stats(predicted_df):
    predicted_df.show(5)
    accuracy = calculate_accuracy(predicted_df)
    confusion_table = confusion_matrix(predicted_df)
    print('Accuracy of the model:', round(accuracy * 100, 2))
    print('Confusion Matrix:', confusion_table)


In [8]:
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

locale = sc._jvm.java.util.Locale
locale.setDefault(locale.forLanguageTag('en-US'))

In [9]:
# filtered_data_df = get_en_mixed_data()
filtered_data_df = get_uk_mixed_data()
# filtered_data_df = get_uk_twitter_data()

splitParams = [0.7, 0.3]
training, test = filtered_data_df.rdd.randomSplit(splitParams, seed = 0)

print('count training rdd:', training.count())
print('count test rdd:', test.count())

train_df = tf_idf(training)
test_df = tf_idf(test)

print('naive_bayes_classifier:')
nb_predicted_df = naive_bayes_classifier(train_df, test_df)
show_stats(nb_predicted_df)

print('logistic_regression_classifier:')
nb_predicted_df = logistic_regression_classifier(train_df, test_df)
show_stats(nb_predicted_df)


count training rdd: 2089
count test rdd: 903
naive_bayes_classifier:
+-----+--------------------+----------+
|label|               words|prediction|
+-----+--------------------+----------+
|  1.0|[sound, quality, ...|       1.0|
|  1.0|[good, quality, t...|       1.0|
|  0.0|[design, odd, ear...|       1.0|
|  1.0|[highly, recommen...|       1.0|
|  0.0|[advise, everyone...|       1.0|
+-----+--------------------+----------+
only showing top 5 rows

true_positives 325
true_negatives 351
false_positives 101
false_negatives 126
Accuracy of the model: 74.86
Confusion Matrix: None
logistic_regression_classifier:
+-----+--------------------+----------+
|label|               words|prediction|
+-----+--------------------+----------+
|  1.0|[sound, quality, ...|       1.0|
|  1.0|[good, quality, t...|       1.0|
|  0.0|[design, odd, ear...|       1.0|
|  1.0|[highly, recommen...|       1.0|
|  0.0|[advise, everyone...|       1.0|
+-----+--------------------+----------+
only showing top 5 rows
