In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, HashingTF, IDF,Tokenizer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from preprocessing import TransformText
import pandas as pd

# Import Data

In [2]:
spark = SparkSession.builder.master("local[1]") \
        .appName("Sentiment Spark Training") \
        .getOrCreate()
        
schema = StructType() \
        .add("tweet_id", IntegerType(), False) \
        .add("entity", StringType(), True) \
        .add("sentiment", StringType(), True)  \
        .add("content", StringType(), True)
        
tweets = spark.read.option("header", True) \
        .schema(schema) \
        .csv("./data/twitter_training.csv")
cleaned_tweets = tweets.withColumn('content', when(isnull(col('content')), "") \
                .otherwise(col('content')))

In [3]:
cleaned_tweets.printSchema()
cleaned_tweets.show(5)

root
 |-- tweet_id: integer (nullable = true)
 |-- entity: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- content: string (nullable = true)

+--------+-----------+---------+--------------------+
|tweet_id|     entity|sentiment|             content|
+--------+-----------+---------+--------------------+
|    2401|Borderlands| Positive|I am coming to th...|
|    2401|Borderlands| Positive|im getting on bor...|
|    2401|Borderlands| Positive|im coming on bord...|
|    2401|Borderlands| Positive|im getting on bor...|
|    2401|Borderlands| Positive|im getting into b...|
+--------+-----------+---------+--------------------+
only showing top 5 rows



In [4]:
print("Total of tweets:  ", cleaned_tweets.count())
print("Total of unique entities:  ", cleaned_tweets.select('entity').distinct().count())
print("Total of unique sentiments:  ", cleaned_tweets.select('sentiment').distinct().count())
print(cleaned_tweets.select('sentiment').distinct().show())

Total of tweets:   74681
Total of unique entities:   32
Total of unique sentiments:   4
+----------+
| sentiment|
+----------+
|Irrelevant|
|  Positive|
|   Neutral|
|  Negative|
+----------+

None


# Transform Data

In [5]:
encoder = StringIndexer(inputCol='sentiment', outputCol='sentiment_label').fit(cleaned_tweets)
tweets = encoder.transform(cleaned_tweets)

print(tweets.groupBy('sentiment_label').count().show())

+---------------+-----+
|sentiment_label|count|
+---------------+-----+
|            0.0|22542|
|            1.0|20831|
|            3.0|12990|
|            2.0|18318|
+---------------+-----+

None


In [6]:
tokenizer = Tokenizer(inputCol='content', outputCol='words')
tokenized_tweets = tokenizer.transform(tweets)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurized_data = hashingTF.transform(tokenized_tweets)

idf = IDF(inputCol="rawFeatures", outputCol="features")
tfidf_tweets = idf.fit(featurized_data).transform(featurized_data)

In [7]:
tweets = tfidf_tweets.select(['sentiment_label', 'features'])

featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(tweets)
tweets_train = featureIndexer.transform(tweets)
tweets_train = tweets_train.select(['sentiment_label','indexedFeatures'])

In [8]:
print(tweets_train.printSchema())
print(tweets_train.show(5))

root
 |-- sentiment_label: double (nullable = false)
 |-- indexedFeatures: vector (nullable = true)

None
+---------------+--------------------+
|sentiment_label|     indexedFeatures|
+---------------+--------------------+
|            1.0|(20,[0,1,5,6,8,9,...|
|            1.0|(20,[0,1,3,6,11,1...|
|            1.0|(20,[0,1,3,9,11,1...|
|            1.0|(20,[0,1,3,9,11,1...|
|            1.0|(20,[0,1,3,11,12,...|
+---------------+--------------------+
only showing top 5 rows

None


# Train model

In [9]:
dt = DecisionTreeClassifier(labelCol="sentiment_label", featuresCol="indexedFeatures")
model = dt.fit(tweets_train)

# Predict and Evaluate

In [24]:
tweets_test = spark.read.option("header", True) \
        .schema(schema) \
        .csv("./data/twitter_validation.csv")
cleaned_tweets_test = tweets_test.withColumn('content', when(isnull(col('content')), "").otherwise(col('content')))

In [25]:
cleaned_tweets_test.show(5)

+--------+---------------+----------+--------------------+
|tweet_id|         entity| sentiment|             content|
+--------+---------------+----------+--------------------+
|     921| AssassinsCreed|   Neutral|Get ready for the...|
|    4367|          CS-GO|Irrelevant|friendship ended ...|
|    7446|LeagueOfLegends|  Negative|My League of Lege...|
|    5417|    Hearthstone|  Positive|Hey guys, I just ...|
|    4650|         Google|  Positive|It's great that b...|
+--------+---------------+----------+--------------------+
only showing top 5 rows



In [26]:
# encoder = StringIndexer(inputCol='sentiment', outputCol='sentiment_label').fit(cleaned_tweets_test)
encoded_tweets_test = encoder.transform(cleaned_tweets_test)
print(encoded_tweets_test.groupBy('sentiment').count().show())

+----------+-----+
| sentiment|count|
+----------+-----+
|Irrelevant|  110|
|   Neutral|  188|
|  Positive|  172|
|  Negative|  168|
+----------+-----+

None


In [27]:
transformer = TransformText(spark, "./data/twitter_validation.csv")
tweets_test = transformer.fit()

+---------------+-----+
|sentiment_label|count|
+---------------+-----+
|            0.0|  188|
|            1.0|  172|
|            3.0|  110|
|            2.0|  168|
+---------------+-----+

None


In [35]:
tweets_test = tweets_test.select(['sentiment_label','indexedFeatures'])
tweets_test.show(5)

+---------------+--------------------+
|sentiment_label|     indexedFeatures|
+---------------+--------------------+
|            0.0|(20,[0,4,5,6,7,8,...|
|            3.0|(20,[2,3,4,5,8,9,...|
|            2.0|(20,[1,3,4,5,7,8,...|
|            1.0|(20,[0,1,2,3,4,5,...|
|            1.0|(20,[0,1,2,3,5,6,...|
+---------------+--------------------+
only showing top 5 rows



In [37]:
predictions = model.transform(tweets_train)
predictions = predictions.select(['sentiment_label','prediction'])

In [38]:
predictions.show(10)

+---------------+----------+
|sentiment_label|prediction|
+---------------+----------+
|            1.0|       0.0|
|            1.0|       1.0|
|            1.0|       0.0|
|            1.0|       0.0|
|            1.0|       1.0|
|            1.0|       0.0|
|            1.0|       0.0|
|            1.0|       0.0|
|            1.0|       0.0|
|            1.0|       0.0|
+---------------+----------+
only showing top 10 rows



In [39]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="sentiment_label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " %(1.0 - accuracy))
print("Test Accuracy = %g " %accuracy)

Test Error = 0.66539 
Test Accuracy = 0.33461 
