# Unsupervised Sentiment Classifier using K-Means for Twitter Political Data 
This notebook explains in details various steps implemented for an unsupervised classifier using K-Means. The steps we've taken have been broken into stages (represented as `<stage number> - <stage name>`).

# 1 - Getting Started
In this stage, we:

a. Install and import all dependencies required for the challenge.

b. Initialize Spark Session.

c. Read and cache dataset.

In [18]:
%matplotlib inline 
from src.jobs.spark_etl import extract
from src.utils.dataset import getPolarity
from src.utils.spark import initSparkSession
from src.transforms.Preprocessing import CleanTweet, polarityCalculator
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, MinMaxScaler, SQLTransformer, StringIndexer
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import numpy as np

In [19]:
# Spark Session for ETL job
sparkSession = initSparkSession(appName='k-Means')
sparkSession

24/05/12 14:04:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/05/12 14:04:06 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/05/12 14:04:06 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
24/05/12 14:04:06 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
24/05/12 14:04:06 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
24/05/12 14:04:06 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
24/05/12 14:04:06 WARN Utils: Service 'SparkUI' could not bind on port 4046. Attempting port 4047.
24/05/12 14:04:06 WARN Utils: Service 'SparkUI' could not bind on port 4047. Attempting port 4048.
24/05/12 14:04:06 WARN Utils: Service 'SparkUI' could not bind on port 4048. Attempting port 4049.


In [20]:
df = extract(sparkSession)
df_train, df_test = df.randomSplit([0.70, 0.30], seed=123456) 
df_train.cache()
df_test.cache()

DataFrame[created_at: timestamp, tweet_id: string, tweet: string, likes: float, retweet_count: float, source: string, user_id: string, user_name: string, user_screen_name: string, user_description: string, user_join_date: timestamp, user_followers_count: float, user_location: string, lat: double, long: double, city: string, country: string, continent: string, state: string, state_code: string, collected_at: timestamp, candidate: string]

# 2 - Spark User Defined Function (UDF)
In this stage, we implement an UDF for labelling our dataset using a rule-based algorithm. 

In [21]:
polarity = sparkSession.udf.register('Polarity', lambda record: getPolarity(record))

# 3 - Pipelines
In this stage, we implemented a pipeline for data cleaning and vectorizing features.    

In [22]:
# define pipeline stages
cleanTweet = CleanTweet()
regexTokenizer = RegexTokenizer(inputCol="cleaned_tweet", outputCol="cleaned_tweet_words", pattern="[^a-zA-Z0-9_#]")
stopWordsRemover = StopWordsRemover(inputCol='cleaned_tweet_words', outputCol='cleaned_tweet_nostop')
polarityCalc = polarityCalculator()
countVectorizer = CountVectorizer(inputCol='cleaned_tweet_nostop', outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features")
sentimentStringIdx = StringIndexer(inputCol = "polarity", outputCol = "label")
stages = [cleanTweet, regexTokenizer, stopWordsRemover, polarityCalc, countVectorizer, idf, sentimentStringIdx]

pipelineModel = Pipeline(stages=stages).fit(df_train)

df_train = pipelineModel.transform(df_train)

24/05/12 14:04:16 WARN MemoryStore: Not enough space to cache rdd_11_0 in memory! (computed 262.6 MiB so far)
24/05/12 14:04:16 WARN BlockManager: Persisting block rdd_11_0 to disk instead.
24/05/12 14:04:25 WARN MemoryStore: Not enough space to cache rdd_11_0 in memory! (computed 150.6 MiB so far)
24/05/12 14:04:26 WARN MemoryStore: Not enough space to cache rdd_11_0 in memory! (computed 150.6 MiB so far)
24/05/12 14:04:27 WARN MemoryStore: Not enough space to cache rdd_11_0 in memory! (computed 150.6 MiB so far)
                                                                                

In [23]:
df_train.printSchema()

root
 |-- tweet_id: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- tweet: string (nullable = true)
 |-- likes: float (nullable = true)
 |-- retweet_count: float (nullable = true)
 |-- source: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_join_date: timestamp (nullable = true)
 |-- user_followers_count: float (nullable = true)
 |-- user_location: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- candidate: string (nullable = false)
 |-- cleaned_tweet_nostop: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- polarity: string (nullable = true)
 |-- cv: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



# 4 - Implementing K-Means
## Experimenting with 3 clusters

In [24]:
kmeans = KMeans().setMaxIter(400).setK(3).setSeed(1).setDistanceMeasure('cosine')  
kMeansModel = kmeans.fit(df_train)
predictions = kMeansModel.transform(pipelineModel.transform(df_test))
predictions.select('tweet_id', 'tweet', 'label', 'prediction').limit(100).toPandas()

24/05/12 14:04:29 WARN MemoryStore: Not enough space to cache rdd_11_0 in memory! (computed 150.6 MiB so far)
24/05/12 14:04:30 WARN MemoryStore: Not enough space to cache rdd_11_0 in memory! (computed 150.6 MiB so far)
24/05/12 14:04:31 WARN MemoryStore: Not enough space to cache rdd_11_0 in memory! (computed 150.6 MiB so far)
24/05/12 14:04:33 WARN DAGScheduler: Broadcasting large task binary with size 1392.8 KiB
24/05/12 14:04:33 WARN MemoryStore: Not enough space to cache rdd_11_0 in memory! (computed 150.6 MiB so far)
24/05/12 14:04:33 WARN DAGScheduler: Broadcasting large task binary with size 1381.3 KiB
24/05/12 14:04:48 WARN DAGScheduler: Broadcasting large task binary with size 1392.9 KiB
                                                                                

Unnamed: 0,tweet_id,tweet,label,prediction
0,1.3165296068962304e+18,Ice Cube is teaming up to work with President ...,0.0,0
1,1.316532492795605e+18,@CNN @HarrietD428 This is true!\n\nIn a relate...,1.0,1
2,1.316546586227376e+18,"""First the corporate hack-media refused to cov...",1.0,1
3,1.3165466968648172e+18,#Iowa experienced severe devastation from the ...,1.0,1
4,1.3165493286773432e+18,"If @CBS is willing, I'd be happy to do a #Town...",1.0,1
...,...,...,...,...
95,1.3169061333664932e+18,The stage set up matters a lot in the tone of ...,1.0,1
96,1.3169063036425462e+18,Lie lie lie Lie. Right now #trump is trying to...,1.0,1
97,1.3169063288839086e+18,@MichaelJGwin @TVietor08 #Trump’s claim that h...,1.0,1
98,1.3169079052501975e+18,Actually a fine last question from #SavannahGu...,1.0,1


In [25]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="weightedPrecision")
evaluator.evaluate(predictions.withColumn("prediction", predictions.prediction.cast("double")))

24/05/12 14:04:48 WARN DAGScheduler: Broadcasting large task binary with size 1401.7 KiB
                                                                                

0.3820198419032508

In [26]:
# display centroids
np.array(kMeansModel.clusterCenters()).squeeze()

array([[3.07033307e-02, 4.56174286e-02, 6.05974062e-01, ...,
        0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [2.91576596e-01, 3.13148481e-01, 3.99225404e-02, ...,
        4.60213920e-04, 0.00000000e+00, 3.41958380e-04],
       [6.65875964e-02, 6.37044901e-02, 4.86834817e-02, ...,
        0.00000000e+00, 7.16349504e-04, 0.00000000e+00]])

## Experimenting with 2 clusters

In [27]:
# trying with just 2 clusters to see if we can improve accuracy
df = extract(sparkSession)
df_train, df_test = df.randomSplit([0.70, 0.30], seed=123456) 
df_train.cache()
df_test.cache()

24/05/12 14:04:50 WARN CacheManager: Asked to cache already cached data.
24/05/12 14:04:50 WARN CacheManager: Asked to cache already cached data.


DataFrame[created_at: timestamp, tweet_id: string, tweet: string, likes: float, retweet_count: float, source: string, user_id: string, user_name: string, user_screen_name: string, user_description: string, user_join_date: timestamp, user_followers_count: float, user_location: string, lat: double, long: double, city: string, country: string, continent: string, state: string, state_code: string, collected_at: timestamp, candidate: string]

In [28]:
pipelineModel = Pipeline(stages=stages).fit(df_train)
df_train = pipelineModel.transform(df_train)

24/05/12 14:04:51 WARN MemoryStore: Not enough space to cache rdd_11_1 in memory! (computed 158.9 MiB so far)
24/05/12 14:04:52 WARN MemoryStore: Not enough space to cache rdd_11_1 in memory! (computed 158.9 MiB so far)
24/05/12 14:04:53 WARN MemoryStore: Not enough space to cache rdd_11_1 in memory! (computed 158.9 MiB so far)
                                                                                

In [29]:
df_train.show(truncate=False)

+----------------------+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-------------+-------------------+----------------------+-------------------+--------------------+------------------------------+------------------+-------------------+----------+--------------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+--------------------------------------------------------------------------------------------------------------------

                                                                                

In [30]:
df_train = df_train.where("polarity = 'Negative' or polarity = 'Positive'")
df_train.show(truncate=False)

[Stage 64:>                                                         (0 + 1) / 1]

+----------------------+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+-------------+-------------------+----------------------+-------------------+--------------------+------------------------------+------------------+-------------------+----------+--------------------+----------+-----------------------------------------------------------------------------------------------

                                                                                

In [31]:
kmeans = KMeans().setMaxIter(400).setK(2).setSeed(1).setDistanceMeasure('cosine')  
kMeansModel = kmeans.fit(df_train)
predictions = kMeansModel.transform(pipelineModel.transform(df_test))
predictions.select('tweet_id', 'tweet', 'label', 'prediction').limit(100).toPandas()

24/05/12 14:04:56 WARN MemoryStore: Not enough space to cache rdd_11_1 in memory! (computed 158.9 MiB so far)
24/05/12 14:04:58 WARN MemoryStore: Not enough space to cache rdd_11_1 in memory! (computed 158.9 MiB so far)
24/05/12 14:04:59 WARN MemoryStore: Not enough space to cache rdd_11_1 in memory! (computed 158.9 MiB so far)
24/05/12 14:05:01 WARN DAGScheduler: Broadcasting large task binary with size 1204.8 KiB
24/05/12 14:05:02 WARN MemoryStore: Not enough space to cache rdd_11_1 in memory! (computed 158.9 MiB so far)
24/05/12 14:05:03 WARN DAGScheduler: Broadcasting large task binary with size 1189.9 KiB
24/05/12 14:05:03 WARN DAGScheduler: Broadcasting large task binary with size 1197.2 KiB


Unnamed: 0,tweet_id,tweet,label,prediction
0,1.3165296068962304e+18,Ice Cube is teaming up to work with President ...,0.0,1
1,1.316532492795605e+18,@CNN @HarrietD428 This is true!\n\nIn a relate...,1.0,0
2,1.316546586227376e+18,"""First the corporate hack-media refused to cov...",1.0,1
3,1.3165466968648172e+18,#Iowa experienced severe devastation from the ...,1.0,0
4,1.3165493286773432e+18,"If @CBS is willing, I'd be happy to do a #Town...",1.0,0
...,...,...,...,...
95,1.3169061333664932e+18,The stage set up matters a lot in the tone of ...,1.0,0
96,1.3169063036425462e+18,Lie lie lie Lie. Right now #trump is trying to...,1.0,0
97,1.3169063288839086e+18,@MichaelJGwin @TVietor08 #Trump’s claim that h...,1.0,0
98,1.3169079052501975e+18,Actually a fine last question from #SavannahGu...,1.0,0


In [32]:
evaluator.evaluate(predictions.withColumn("prediction", predictions.prediction.cast("double")))

24/05/12 14:05:03 WARN DAGScheduler: Broadcasting large task binary with size 1206.0 KiB
                                                                                

0.35775810109695777

In [33]:
# display centroids
np.array(kMeansModel.clusterCenters()).squeeze()

array([[0.18115224, 0.19479874, 0.14591879, ..., 0.00068508, 0.        ,
        0.00050905],
       [0.12571003, 0.13938571, 0.26261519, ..., 0.        , 0.        ,
        0.        ]])

In [34]:
sparkSession.stop()