In [0]:
# Imports.
from pyspark.sql.functions import lit
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import random

random.seed(42)
spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")

In [0]:
from functools import partial
from pyspark.sql import Row

def flatten_table(column_names, column_values):
    row = zip(column_names, column_values)
    return [
        Row(ColumnValue=value)
        for column, value in row
    ]

negative_tweets = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "false") \
  .load("/FileStore/tables/processedNegative.csv")
negative_tweets = negative_tweets.rdd.flatMap(partial(flatten_table, negative_tweets.columns)).toDF()
negative_tweets = negative_tweets.selectExpr("ColumnValue as tweets")
negative_tweets = negative_tweets.withColumn("sentiment", lit(0))
# display(negative_tweets)

positive_tweets = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "false") \
  .load("/FileStore/tables/processedPositive.csv")
positive_tweets = positive_tweets.rdd.flatMap(partial(flatten_table, positive_tweets.columns)).toDF()
positive_tweets = positive_tweets.selectExpr("ColumnValue as tweets")
positive_tweets = positive_tweets.withColumn("sentiment", lit(1))
# display(positive_tweets)

neutral_tweets = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "false") \
  .load("/FileStore/tables/processedNeutral.csv")
neutral_tweets = neutral_tweets.rdd.flatMap(partial(flatten_table, neutral_tweets.columns)).toDF()
neutral_tweets = neutral_tweets.selectExpr("ColumnValue as tweets")
neutral_tweets = neutral_tweets.withColumn("sentiment", lit(2))
# display(neutral_tweets)

In [0]:
import functools
  
# explicit function
def unionAll(dfs):
    return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)

tweets_combined = unionAll([negative_tweets, neutral_tweets, positive_tweets])
print((tweets_combined.count(), len(tweets_combined.columns)))
tweets_combined = tweets_combined.dropna()
print((tweets_combined.count(), len(tweets_combined.columns)))
display(tweets_combined)

(3873, 2)
(3868, 2)


tweets,sentiment
How unhappy some dogs like it though,0
talking to my over driver about where I'm goinghe said he'd love to go to New York too but since Trump it's probably not,0
Does anybody know if the Rand's likely to fall against the dollar? I got some money I need to change into R but it keeps getting stronger unhappy,0
I miss going to gigs in Liverpool unhappy,0
There isnt a new Riverdale tonight ? unhappy,0
it's that A*dy guy from pop Asia and then the translator so they'll probs go with them around Aus unhappy,0
Who's that chair you're sitting in? Is this how I find out. Everyone knows now. You've shamed me in pu,0
don't like how jittery caffeine makes me sad,0
My area's not on the list unhappy think I'll go LibDems anyway,0
I want fun plans this weekend unhappy,0


In [0]:
# Creating a permanent tweets_table.
# tweets_combined = spark.read.format("csv") \
#   .option("inferSchema", "true") \
#   .option("header", "true") \
#   .load("/FileStore/tables/tweets-3.csv")

permanent_table_name = "tweets_table"
tweets_combined.write.mode("overwrite").format("parquet").saveAsTable(permanent_table_name)

In [0]:
%sql
select * From tweets_table

tweets,sentiment
Pak PM survives removal scare,2
but court orders further probe into corruption charge.,2
Supreme Court quashes criminal complaint against cricketer for allegedly depicting himself as on magazine cover.,2
Art of Living's fights back over Yamuna floodplain damage,2
livid.,2
FCRA slap on NGO for lobbying...But was it doing so as part of govt campaign?,2
Why doctors,2
pharma companies are opposing names on,2
Why a bicycle and not a CM asked. His officer learnt ground reality -- and a dip in a river.,2
It's 2017. making law to ban And MHA is sitting on draft.,2


In [0]:
tweets_df = spark.table('tweets_table')
display(tweets_df)

tweets,sentiment
Pak PM survives removal scare,2
but court orders further probe into corruption charge.,2
Supreme Court quashes criminal complaint against cricketer for allegedly depicting himself as on magazine cover.,2
Art of Living's fights back over Yamuna floodplain damage,2
livid.,2
FCRA slap on NGO for lobbying...But was it doing so as part of govt campaign?,2
Why doctors,2
pharma companies are opposing names on,2
Why a bicycle and not a CM asked. His officer learnt ground reality -- and a dip in a river.,2
It's 2017. making law to ban And MHA is sitting on draft.,2


In [0]:
# Train and Test split.
(tweets_train, tweets_test) = tweets_combined.randomSplit([0.8, 0.2])

In [0]:
display(tweets_train.groupBy('sentiment').count())

sentiment,count
0,917
2,1244
1,982


In [0]:
tweets_train.show()

+--------------------+---------+
|              tweets|sentiment|
+--------------------+---------+
|  3 3 3 please un...|        0|
|  i was going to ...|        0|
|        2010 Week 11|        0|
|    2017 at 08:45AM)|        0|
|      24-20 unhappy |        0|
| Another ATBB on ...|        0|
| Hi Tharakaram! T...|        0|
| Hitches Ride For...|        0|
| I always know wh...|        0|
| I bet The media'...|        0|
| I have a cold un...|        0|
| I just hope ever...|        0|
| I know I'm not r...|        0|
| I really need to...|        0|
| I sincerely hope...|        0|
| I was a mess. I ...|        0|
| I'm a joke unhappy |        0|
| I'm aborting mis...|        0|
| I'm so glad I ha...|        0|
| I'm so sorry to ...|        0|
+--------------------+---------+
only showing top 20 rows



In [0]:
# Data cleaning and pre-processing (Used in ML Pipeline).
tokenizer = Tokenizer(inputCol="tweets", outputCol="tokenized_words")

stop_words_remover = StopWordsRemover(inputCol="tokenized_words", outputCol="stop_words_removed")

hashing_tf = HashingTF(inputCol="stop_words_removed", outputCol="tf_features")

idf = IDF(inputCol="tf_features", outputCol="features", minDocFreq=3)

In [0]:
# Naive Bayes Classification Model.
nbc = NaiveBayes(labelCol="sentiment", featuresCol="features")

In [0]:
pipeline_nbc = Pipeline(stages=[tokenizer, stop_words_remover, hashing_tf, idf, nbc])

# Fit the pipeline to training data.
model_nbc = pipeline_nbc.fit(tweets_train)

# Tranform for the test data.
prediction_nbc = model_nbc.transform(tweets_test)

In [0]:
# Test accuracy Evaluation (Naive Bayes Classifier).
evaluator_nbc = MulticlassClassificationEvaluator(
    labelCol="sentiment", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy_nbc = evaluator_nbc.evaluate(prediction_nbc)
print("Test Accuracy (Naive Bayes Classifier) = %g " % (accuracy_nbc * 100))

Test Accuracy (Naive Bayes Classifier) = 83.3103 


In [0]:
dtc = DecisionTreeClassifier(labelCol="sentiment", featuresCol="features")

In [0]:
pipeline_dtc = Pipeline(stages=[tokenizer, stop_words_remover, hashing_tf, idf, dtc])

# Fit the pipeline to training data.
model_dtc = pipeline_dtc.fit(tweets_train)

# Tranform for the test data.
prediction_dtc = model_dtc.transform(tweets_test)

In [0]:
# Test accuracy Evaluation (Decision Tree Classifier).
evaluator_dtc = MulticlassClassificationEvaluator(
    labelCol="sentiment", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy_dtc = evaluator_dtc.evaluate(prediction_dtc)
print("Test Accuracy (Decision Tree Classifier) = %g " % (accuracy_dtc * 100))

Test Accuracy (Decision Tree Classifier) = 87.4483 
