In [1]:
# (1) Import the required Python dependencies
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import LongType, DoubleType, IntegerType, StringType, BooleanType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

from sparknlp.base import *
from sparknlp.annotator import Tokenizer as NLPTokenizer
from sparknlp.annotator import Stemmer, Normalizer

In [2]:
# (2) Instantiate a Spark Context
conf = SparkConf().setMaster("spark://192.168.56.10:7077") \
    .set("spark.jars", '/opt/anaconda3/lib/python3.6/site-packages/sparknlp/lib/sparknlp.jar') \
    .setAppName("Natural Language Processing - Sentiment Analysis")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
sc.getConf().getAll()

[('spark.executor.memory', '2g'),
 ('spark.driver.port', '37750'),
 ('spark.driver.host', '192.168.56.10'),
 ('spark.executor.id', 'driver'),
 ('spark.executor.cores', '2'),
 ('spark.jars',
  '/opt/anaconda3/lib/python3.6/site-packages/sparknlp/lib/sparknlp.jar'),
 ('spark.repl.local.jars',
  'file:///opt/anaconda3/lib/python3.6/site-packages/sparknlp/lib/sparknlp.jar'),
 ('spark.rdd.compress', 'true'),
 ('spark.master', 'spark://192.168.56.10:7077'),
 ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
 ('spark.kryoserializer.buffer.max', '128m'),
 ('spark.driver.memory', '2g'),
 ('spark.driver.maxResultSize', '0'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'Natural Language Processing - Sentiment Analysis'),
 ('spark.app.id', 'app-20181019093755-0000'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.cores', '1')]

In [3]:
# (3) Load the labelled Airline Tweet Corpus
schema = StructType([
    StructField("unit_id", LongType()), 
    StructField("golden", BooleanType()), 
    StructField("unit_state", StringType()), 
    StructField("trusted_judgments", IntegerType()), 
    StructField("last_judgment_at", StringType()), 
    StructField("airline_sentiment", StringType()), 
    StructField("airline_sentiment_confidence", DoubleType()), 
    StructField("negative_reason", StringType()), 
    StructField("negative_reason_confidence", DoubleType()), 
    StructField("airline", StringType()), 
    StructField("airline_sentiment_gold", StringType()), 
    StructField("name", StringType()), 
    StructField("negative_reason_gold", StringType()), 
    StructField("retweet_count", IntegerType()), 
    StructField("text", StringType()), 
    StructField("tweet_coordinates", StringType()), 
    StructField("tweet_created", StringType()), 
    StructField("tweet_id", StringType()), 
    StructField("tweet_location", StringType()), 
    StructField("user_timezone", StringType())
])

airline_tweets_df = sqlContext.read.format('com.databricks.spark.csv').schema(schema) \
    .options(header = 'true', inferschema = 'false') \
    .load('/data/workspaces/jillur.quddus/jupyter/notebooks/Machine-Learning-with-Apache-Spark-QuickStart-Guide/chapter06/data/twitter-data/airline-tweets-labelled-corpus.csv')
airline_tweets_df.show(2)

+---------+------+----------+-----------------+----------------+-----------------+----------------------------+---------------+--------------------------+--------------+----------------------+--------+--------------------+-------------+--------------------+-----------------+-------------+-----------+--------------+--------------------+
|  unit_id|golden|unit_state|trusted_judgments|last_judgment_at|airline_sentiment|airline_sentiment_confidence|negative_reason|negative_reason_confidence|       airline|airline_sentiment_gold|    name|negative_reason_gold|retweet_count|                text|tweet_coordinates|tweet_created|   tweet_id|tweet_location|       user_timezone|
+---------+------+----------+-----------------+----------------+-----------------+----------------------------+---------------+--------------------------+--------------+----------------------+--------+--------------------+-------------+--------------------+-----------------+-------------+-----------+--------------+--------

In [4]:
# (4) Since we are only interested in detecting tweets with negative sentiment, generate a new label 
# whereby if the sentiment is negative, the label is TRUE (Positive Outcome) otherwise FALSE
airline_tweets_with_labels_df = airline_tweets_df.withColumn("negative_sentiment_label", 
    when(col("airline_sentiment") == "negative", lit("true")).otherwise(lit("false"))) \
    .select("unit_id", "text", "negative_sentiment_label")
airline_tweets_with_labels_df.show(10, False)

+---------+----------------------------------------------------------------------------------------------------------------------------------+------------------------+
|unit_id  |text                                                                                                                              |negative_sentiment_label|
+---------+----------------------------------------------------------------------------------------------------------------------------------+------------------------+
|681448150|@VirginAmerica What @dhepburn said.                                                                                               |false                   |
|681448153|@VirginAmerica plus you've added commercials to the experience... tacky.                                                          |false                   |
|681448156|@VirginAmerica I didn't today... Must mean I need to take another trip!                                                           |false             

In [5]:
# (5) Pre-Process the tweets using the Feature Transformers NATIVE to Spark MLlib
# (5.1) Remove any tweets with null textual content
# (5.2) Tokenize the textual content using the Tokenizer Feature Transformer
# (5.3) Remove Stop Words from the sequence of tokens using the StopWordsRemover Feature Transformer
# (5.4) Concatenate the filtered sequence of tokens into a single string for 3rd party pre-processing (-> spark-nlp)

filtered_df = airline_tweets_with_labels_df.filter("text is not null")
tokenizer = Tokenizer(inputCol="text", outputCol="tokens_1")
tokenized_df = tokenizer.transform(filtered_df)
remover = StopWordsRemover(inputCol="tokens_1", outputCol="filtered_tokens")
preprocessed_part_1_df = remover.transform(tokenized_df)
preprocessed_part_1_df = preprocessed_part_1_df.withColumn("concatenated_filtered_tokens", 
    concat_ws(" ", col("filtered_tokens")))
preprocessed_part_1_df.show(3, False)

+---------+------------------------------------------------------------------------+------------------------+------------------------------------------------------------------------------------+------------------------------------------------------------------+----------------------------------------------------------+
|unit_id  |text                                                                    |negative_sentiment_label|tokens_1                                                                            |filtered_tokens                                                   |concatenated_filtered_tokens                              |
+---------+------------------------------------------------------------------------+------------------------+------------------------------------------------------------------------------------+------------------------------------------------------------------+----------------------------------------------------------+
|681448150|@VirginAmerica What @dhepb

In [6]:
# (6) Define a NLP pipeline to pre-process the tweets using the spark-nlp 3rd party library
# (6.1) Annotate the string containing the concatenated filtered tokens using the DocumentAssembler Transformer
# (6.2) Re-tokenize the document using the Tokenizer Annotator
# (6.3) Apply Stemming to the Tokens using the Stemmer Annotator
# (6.4) Clean and lowercase all the Tokens using the Normalizer Annotator

document_assembler = DocumentAssembler().setInputCol("concatenated_filtered_tokens")
tokenizer = NLPTokenizer().setInputCols(["document"]).setOutputCol("tokens_2")
stemmer = Stemmer().setInputCols(["tokens_2"]).setOutputCol("stems")
normalizer = Normalizer().setInputCols(["stems"]).setOutputCol("normalised_stems")
pipeline = Pipeline(stages=[document_assembler, tokenizer, stemmer, normalizer])
pipeline_model = pipeline.fit(preprocessed_part_1_df)
preprocessed_df = pipeline_model.transform(preprocessed_part_1_df)
preprocessed_df.select("unit_id", "text", "negative_sentiment_label", "normalised_stems").show(3, False)
preprocessed_df.dtypes

+---------+------------------------------------------------------------------------+------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|unit_id  |text                                                                    |negative_sentiment_label|normalised_stems                                                                                                                                                                                                                                                                                                                        |
+---------+------------------------------------------------------------------------+------------------------+-------------

[('unit_id', 'bigint'),
 ('text', 'string'),
 ('negative_sentiment_label', 'string'),
 ('tokens_1', 'array<string>'),
 ('filtered_tokens', 'array<string>'),
 ('concatenated_filtered_tokens', 'string'),
 ('document',
  'array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>>>'),
 ('tokens_2',
  'array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>>>'),
 ('stems',
  'array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>>>'),
 ('normalised_stems',
  'array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>>>')]

In [7]:
# (7) We could proceed to use the 3rd party annotators available in spark-nlp to train a sentiment model, such as
# SentimentDetector and ViveknSentimentDetector respectively
# However in this case study, we will use the Feature Extractors native to Spark MLlib to generate feature vectors 
# to train our subsequent machine learning model. In this case, we will use MLlib's TF-IDF Feature Extractor.

# (7.1) Extract the normalised stems from the spark-nlp Annotator Array Structure
exploded_df = preprocessed_df.withColumn("stems", explode("normalised_stems")) \
    .withColumn("stems", col("stems").getItem("result")) \
    .select("unit_id", "negative_sentiment_label", "text", "stems")
exploded_df.show(10, False)

# (7.2) Group by Unit ID and aggregate then normalised stems into a sequence of tokens
aggregated_df = exploded_df.groupBy("unit_id").agg(concat_ws(" ", collect_list(col("stems"))), 
    first("text"), first("negative_sentiment_label")) \
    .toDF("unit_id", "tokens", "text", "negative_sentiment_label") \
    .withColumn("tokens", split(col("tokens"), " ").cast("array<string>"))
aggregated_df.show(10, False)

+---------+------------------------+------------------------------------------------------------------------+-------------+
|unit_id  |negative_sentiment_label|text                                                                    |stems        |
+---------+------------------------+------------------------------------------------------------------------+-------------+
|681448150|false                   |@VirginAmerica What @dhepburn said.                                     |virginamerica|
|681448150|false                   |@VirginAmerica What @dhepburn said.                                     |dhepburn     |
|681448150|false                   |@VirginAmerica What @dhepburn said.                                     |said         |
|681448153|false                   |@VirginAmerica plus you've added commercials to the experience... tacky.|virginamerica|
|681448153|false                   |@VirginAmerica plus you've added commercials to the experience... tacky.|plu          |
|6814481

In [8]:
# (8) Generate Term Frequency Feature Vectors by passing the sequence of tokens to the HashingTF Transformer.
# Then fit an IDF Estimator to the Featurized Dataset to generate the IDFModel.
# Finally pass the TF Feature Vectors to the IDFModel to scale based on frequency across the corpus

hashingTF = HashingTF(inputCol="tokens", outputCol="raw_features", numFeatures=4096)
features_df = hashingTF.transform(aggregated_df)

idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(features_df)
scaled_features_df = idf_model.transform(features_df)
scaled_features_df.cache()
scaled_features_df.show(8, False)

+---------+----------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------+------------------------+-------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|unit_id  |tokens                                                                                                    |text                                                                                                                                            |negative

In [9]:
# (9) Index the label column using StringIndexer
# Now a label of 1.0 = FALSE (Not Negative Sentiment) and a label of 0.0 = TRUE (Negative Sentiment)
indexer = StringIndexer(inputCol = "negative_sentiment_label", outputCol = "label").fit(scaled_features_df)
scaled_features_indexed_label_df = indexer.transform(scaled_features_df)

In [10]:
# (10) Split the index-labelled Scaled Feature Vectors into Training and Test DataFrames
train_df, test_df = scaled_features_indexed_label_df.randomSplit([0.9, 0.1], seed=12345)
train_df.count(), test_df.count()

(13129, 1503)

In [11]:
# (11) Train a Classification Tree Model on the Training DataFrame
decision_tree = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label')
decision_tree_model = decision_tree.fit(train_df)

In [12]:
# (12) Apply the Trained Classification Tree Model to the Test DataFrame to make predictions
test_decision_tree_predictions_df = decision_tree_model.transform(test_df)
print("TEST DATASET PREDICTIONS AGAINST ACTUAL LABEL: ")
test_decision_tree_predictions_df.select("prediction", "label", "text").show(10, False)

TEST DATASET PREDICTIONS AGAINST ACTUAL LABEL: 
+----------+-----+---------------------------------------------------------------------------------------------------------------------------------------------------+
|prediction|label|text                                                                                                                                               |
+----------+-----+---------------------------------------------------------------------------------------------------------------------------------------------------+
|0.0       |0.0  |@united @UCtraveladvisor - I would have loved to respond to your website until I saw the really long form. In business the new seats are bad       |
|0.0       |0.0  |@united resolved and im sick and tired of waiting on you. I want my refund and I'd like to speak to someone about it.                              |
|0.0       |0.0  |@USAirways why now just announce delay of 4478 from PVD when you knew captain was already on delaye

In [13]:
# (13) Compute the Confusion Matrix for our Decision Tree Classifier on the Test DataFrame
predictions_and_label = test_decision_tree_predictions_df.select("prediction", "label").rdd
metrics = MulticlassMetrics(predictions_and_label)
print("N = %g" % test_decision_tree_predictions_df.count())
print(metrics.confusionMatrix())

N = 1503
DenseMatrix([[735., 199.],
             [216., 353.]])


In [14]:
# (14) Persist the trained Decision Tree Classifier to disk for later use
decision_tree_model.save('/data/workspaces/jillur.quddus/jupyter/notebooks/Machine-Learning-with-Apache-Spark-QuickStart-Guide/chapter06/models/airline-sentiment-analysis-decision-tree-classifier')

In [15]:
# (15) Stop the Spark Context
sc.stop()