In [1]:
from utils import *
from config_file import DATASETS_PATH, DOWNLOAD_ROOT, DOWNLOAD_URL, TWEETS_PATH, TRAINING_DATA_URL

# To support both python 2 and python 3
from __future__ import division, print_function, unicode_literals

# Common imports
import numpy as np
import os

# to make this notebook's output stable across runs
np.random.seed(42)

# To plot pretty figures
%matplotlib inline

import matplotlib
import matplotlib.pyplot as plt
from mpl_toolkits.basemap import Basemap
plt.rcParams['axes.labelsize'] = 14
plt.rcParams['xtick.labelsize'] = 12
plt.rcParams['ytick.labelsize'] = 12

# Where to save the figures
PROJECT_ROOT_DIR = "."
PROJECT_ID = "happiness_over_countries"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", PROJECT_ID)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

# Ignore useless warnings (see SciPy issue #5998)
import warnings
warnings.filterwarnings(action="ignore", message="^internal gelsd")

In [2]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import string
import unicodedata
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import Tokenizer, RegexTokenizer, CountVectorizer, Word2Vec, IDF
from pyspark.ml.feature import  IndexToString
from pyspark.ml.feature import StringIndexer, StopWordsRemover, VectorAssembler, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, LinearSVC
from pyspark.ml.classification import NaiveBayes, MultilayerPerceptronClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vector
import re
from pyspark.conf import SparkConf
from pyspark.sql import functions as F
import string
import csv
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.types import ArrayType

In [3]:
spark = SparkSession.builder\
    .master("local[3]")\
    .appName("nlp")\
    .config("spark.executor.memory", "128g")\
    .config("spark.driver.memory", "128g")\
    .config("spark.memory.offHeap.enabled",True) \
    .config("spark.memory.offHeap.size","64g")\
    .config("spark.debug.maxToStringFields","256")\
    .getOrCreate()

## Training the model 

In [5]:
import requests, zipfile, io
r = requests.get(TRAINING_DATA_URL)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall()

KeyboardInterrupt: 

In [None]:
csv_path = os.path.join(DATASETS_PATH, 'training_data/training_text_classification.csv')
training_df = spark.read.format("libsvm")\
        .csv(csv_path, inferSchema=True, encoding = 'ISO-8859-1', header=False)\
        .selectExpr('_c0 as label', '_c5 as text')

In [None]:
udfhttps=udf(lambda text: remove_https(text), StringType())
udfNormalizeData=udf(lambda text: normalizeData(text), StringType())

LATIN_1_CHARS = (
    (' xe2 x80 x99', "'"),
    (' xc3 xa9', 'e'),
    (' xe2 x80 x90', '-'),
    (' xe2 x80 x91', '-'),
    (' xe2 x80 x92', '-'),
    (' xe2 x80 x93', '-'),
    (' xe2 x80 x94', '-'),
    (' xe2 x80 x94', '-'),
    (' xe2 x80 x98', "'"),
    (' xe2 x80 x9b', "'"),
    (' xe2 x80 x9c', '"'),
    (' xe2 x80 x9c', '"'),
    (' xe2 x80 x9d', '"'),
    (' xe2 x80 x9e', '"'),
    (' xe2 x80 x9f', '"'),
    #(' xe2 x80 xa6', '...'),
    (' xe2 x80 xa6', ''),
    (' xe2 x80 xb2', "'"),
    (' xe2 x80 xb3', "'"),
    (' xe2 x80 xb4', "'"),
    (' xe2 x80 xb5', "'"),
    (' xe2 x80 xb6', "'"),
    (' xe2 x80 xb7', "'"),
    (' xe2 x81 xba', "+"),
    (' xe2 x81 xbb', "-"),
    (' xe2 x81 xbc', "="),
    (' xe2 x81 xbd', "("),
    (' xe2 x81 xbe', ")"),
    (' xe2 x80 xa7', "."),
    ('.', " "),
)

udfDecoding=udf(lambda text: clean_latin1(text), StringType())

In [None]:
training_df= training_df.withColumn("text", udfhttps(training_df["text"]))
training_df = training_df.withColumn("text", udfNormalizeData(training_df["text"]))
training_df = training_df.withColumn("text", udfDecoding(training_df["text"]))
training_df = lower_words(training_df)

In [None]:
training_df = training_df.withColumn('length',length(training_df['text']))

In [None]:
training_df = training_df.withColumn("label", \
              when(training_df["label"] == 4, 1).otherwise(training_df["label"]))

In [None]:
training_df.where((col("label") == 0.0)).count()

In [None]:
training_df.where((col("label") == 1.0)).count()

In [None]:
acc_eval = MulticlassClassificationEvaluator()

# Preparing data for Logistic Regression, Random Forest, SVM

In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='features')
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
#featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4)
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
#labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel")

data_prep_pipe = Pipeline(stages=[tokenizer,stopremove, count_vec])
cleaner_lr = data_prep_pipe.fit(training_df)
data_lr = cleaner_lr.transform(training_df)

data_lr = data_lr.select(['label','features'])

# Logistic Regression

In [None]:
# Split the data into training and test sets (10% held out for testing)
(trainingData, testData) = data_lr.randomSplit([0.9, 0.1])

# We can also use the multinomial family for binary classification
mlr = LogisticRegression(family="multinomial")

# Train model.  This also runs the indexers.
model_lr = mlr.fit(trainingData)

In [None]:
# Make predictions.
predictions_lr = model_lr.transform(testData)
acc = acc_eval.evaluate(predictions_lr)
print("Accuracy of model at predicting sentiment was: {}".format(acc))

# Linear SVM

In [None]:
# Split the data into training and test sets (10% held out for testing)
(trainingData, testData) = clean_data.randomSplit([0.9, 0.1])
# Train model.
svm = LinearSVC(maxIter= 10)
model_svm = svm.fit(trainingData)


In [None]:
# Make predictions.
predictions_svm = model_svm.transform(testData)
acc = acc_eval.evaluate(predictions_svm)
print("Accuracy of model at predicting sentiment was: {}".format(acc))

# Naive Bayes

## Preparing data for Naive Bayes

In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
clean_up = VectorAssembler(inputCols=['tf_idf'],outputCol='features')

data_prep_pipe = Pipeline(stages=[tokenizer,stopremove, count_vec, idf, clean_up])
cleaner_nb = data_prep_pipe.fit(training_df)
data_np = cleaner_nb.transform(training_df)
data_np = data_np.select(['label','features'])

In [None]:
# Split the data into training and test sets (10% held out for testing)
(trainingData,testData) = data_np.randomSplit([0.9,0.1], seed=42)

# Train the Model
nb = NaiveBayes()
model_nb = nb.fit(trainingData)

In [None]:
# Make Prediction
predictions_nb =model_nb.transform(testData)

acc = acc_eval.evaluate(predictions_nb)
print("Accuracy of model at predicting sentiment was: {}".format(acc))

# Preprocessed Twitter Data

In [None]:
fetch_data(tgz_name="preproccessed_tweets_location.tar.gz")

In [None]:
h_words = spark.read.json('./datasets/sentiment_analysis/tweets_classification.json')
h_avg = spark.read.json('./datasets/sentiment_analysis/tweets_avg_happiness.json')

**Some extre cleaning**

In [None]:
h_words = h_words.withColumn('date' , regexp_replace('date', "[\.tT]", ' '))
h_avg = h_avg.withColumn('date' , regexp_replace('date', "[\.tT]", ' '))

split_col = pyspark.sql.functions.split(h_words['date'], ' ')
h_words = h_words.withColumn('time', split_col.getItem(1))
h_words = h_words.withColumn('date', split_col.getItem(0))
h_words = h_words.withColumn('date' , (concat(col("date"), lit(" "), col("time")))).drop('time')

split_col = pyspark.sql.functions.split(h_avg['date'], ' ')
h_avg = h_avg.withColumn('time', split_col.getItem(1))
h_avg = h_avg.withColumn('date', split_col.getItem(0))
h_avg = h_avg.withColumn('date' , (concat(col("date"), lit(" "), col("time")))).drop('time')

h_words = h_words.createOrReplaceTempView('h_words')
h_words = spark.sql('select row_number() over (order by "date") as num, * from h_words')

h_avg.createOrReplaceTempView('h_avg')
h_avg = spark.sql('select row_number() over (order by "date") as num, * from h_avg')

In [None]:
h_words.show(2)

In [None]:
h_avg.show(2)

In [None]:
# joining two dataframe (h_words, h_avg)

h_avg_words = h_words.join(h_avg, on='num', how = 'inner')\
                                    .drop('num').select(col('h_words.date').alias('date'),\
                                    col('happiness_words').alias('happiness_words'),\
                                    col('happiness_avg').alias('happiness_avg'))

In [None]:
# groupy data over each second

def group_data_over_second(df, groupby_col, happiness_words, happiness_avg):
    df = df.groupBy(groupby_col).agg(collect_list(happiness_words), collect_list(happiness_avg))
    df = df.select(col("date").alias("date"),\
                   col("collect_list(happiness_words)").alias("happiness_words"),\
                   col("collect_list(happiness_avg)").alias("happiness_avg"))
    def mean_list(x):
        summation = 0.00
        for val in x:
            #if val< 4 or val > 6:
            summation = float(summation) + float(val)
        return [float(summation)/float(len(x))]

    mean_list_udf = udf(lambda y: mean_list(y), ArrayType(FloatType()))

    df = df.withColumn('happiness_avg', mean_list_udf('happiness_avg'))
    df = df.withColumn("happiness_avg", explode('happiness_avg')).dropna()
    
    return df

h_avg_words= group_data_over_second(h_avg_words, 'date','happiness_words', 'happiness_avg')

In [None]:
# falt the array of arraies 
nmCols = h_avg_words.columns
h_avg_words=h_avg_words.withColumn("row_index" , F.monotonically_increasing_id()).select(["row_index"]+nmCols)

first_explode = h_avg_words.withColumn("first_explode", explode("happiness_words")).drop("happiness_words")

second_explode = first_explode.withColumn("second_explode", explode("first_explode")).drop('first_explode')

grouped = second_explode.orderBy('row_index').groupBy("row_index").agg(collect_list("second_explode"))\
                                            .drop("second_explode")

grouped = grouped.select(col("row_index").alias("row_index"),
                                   col("collect_list(second_explode)").alias("happiness_words"))

h_avg_words = h_avg_words.drop('happiness_words')

h_avg_words = h_avg_words.join(grouped, on='row_index', how = 'inner').sort('row_index').drop('row_index')

In [None]:
# removing the instances with less than four words
h_avg_words = h_avg_words.where(size(col("happiness_words")) >= 4)
h_avg_words = h_avg_words.withColumn("text", concat_ws(" ", "happiness_words"))
h_avg_words.show(2)

In [None]:
data_happiness = cleaner_lr.transform(h_avg_words)
data_happainess = data_happiness.select('date','happiness_avg','features')
happiness_prediction =model_svm.transform(data_happainess)
happiness_prediction.show(2)

In [None]:
happiness_prediction.where((col("happiness_avg") > 6.0) & (col("prediction") == 1)).count()

In [None]:
happiness_prediction.where((col("happiness_avg") < 4.0) & (col("prediction") == 0)).count()

In [None]:
happiness_prediction.where((col("happiness_avg") > 6.0) & (col("prediction") == 0)).count()

In [None]:
happiness_prediction.where((col("happiness_avg") < 4.0) & (col("prediction") == 1)).count()

In [None]:
happiness_prediction.where((col("happiness_avg")>= 4.0) & (col("happiness_avg")<= 6.0)).count()

In [None]:
happiness_prediction.count()

In [None]:
67139+14654

In [None]:
67139/81793

In [None]:
14654/81793

In [None]:
126+181

In [None]:
126/307

In [None]:
181/307