In [0]:

from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


In [0]:
# Load Data


data = sqlContext.read.format("csv").option("header", "true") .option("delimiter", ",").option("inferSchema","true").load("/FileStore/tables/Tweets.csv")

data.show(3)

+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+----------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|      name|negativereason_gold|retweet_count|                text|tweet_coord|       tweet_created|tweet_location|       user_timezone|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+----------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+
|570306133677760513|          neutral|                         1.0|          null|                     null|Virgin America|                  null|   caird

In [0]:
data.columns

Out[69]: ['tweet_id',
 'airline_sentiment',
 'airline_sentiment_confidence',
 'negativereason',
 'negativereason_confidence',
 'airline',
 'airline_sentiment_gold',
 'name',
 'negativereason_gold',
 'retweet_count',
 'text',
 'tweet_coord',
 'tweet_created',
 'tweet_location',
 'user_timezone']

In [0]:
#  Data preprocessing

# Stop Word Remover
remover = StopWordsRemover(inputCol="words", outputCol="filtered_txt" )

# convert text column into words
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

# convert words to term-frequency vectors
hashingTF = HashingTF(inputCol="filtered_txt", outputCol="rawFeatures")

# convert string label into numeric formate
indexer = StringIndexer(inputCol="airline_sentiment", outputCol="label")


In [0]:
# remove NULL values from text column
dataFilter_DF = data.filter(data.text !="")
dataFilter_DF = data.filter(data.text.isNotNull())

# maps a string column of labels to an ML column of label indices
indexed = indexer.fit(dataFilter_DF).transform(dataFilter_DF)
indexed.show(5)

+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+----------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+-----+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|      name|negativereason_gold|retweet_count|                text|tweet_coord|       tweet_created|tweet_location|       user_timezone|label|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+----------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+-----+
|570306133677760513|          neutral|                         1.0|          null|                     null|Virgin America|             

In [0]:
lr = LogisticRegression(maxIter=10, regParam=0.02, elasticNetParam=0.8, featuresCol="rawFeatures", labelCol="label")


paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

# PIPELINE FOR PREPROCESSING AND LOGISTIC REGRESSION

pipeline = Pipeline(stages=[regexTokenizer,remover,hashingTF,lr])

crossval = CrossValidator(estimator=pipeline, evaluator=MulticlassClassificationEvaluator(),
                          estimatorParamMaps=paramGrid,
                          numFolds=3)  # use 3 folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(indexed)


In [0]:
predictons = cvModel.transform(indexed)

predictionAndLabels = predictons.select("prediction", "label")

predictionAndLabels.show()

 # 1=Neutral 2=Positive 0=Negative

+----------+-----+
|prediction|label|
+----------+-----+
|       0.0|  1.0|
|       0.0|  2.0|
|       0.0|  1.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  2.0|
|       0.0|  1.0|
|       0.0|  2.0|
|       0.0|  2.0|
|       1.0|  1.0|
|       0.0|  2.0|
|       0.0|  2.0|
|       1.0|  2.0|
|       2.0|  2.0|
|       0.0|  0.0|
|       2.0|  2.0|
|       0.0|  0.0|
|       0.0|  2.0|
|       0.0|  2.0|
+----------+-----+
only showing top 20 rows



In [0]:
evaluator = MulticlassClassificationEvaluator()
evaluator.setLabelCol("label")
evaluator.setMetricName("accuracy")
accuracy = evaluator.evaluate(predictons)
predictons.select("prediction", "label").show()

+----------+-----+
|prediction|label|
+----------+-----+
|       0.0|  1.0|
|       0.0|  2.0|
|       0.0|  1.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  2.0|
|       0.0|  1.0|
|       0.0|  2.0|
|       0.0|  2.0|
|       1.0|  1.0|
|       0.0|  2.0|
|       0.0|  2.0|
|       1.0|  2.0|
|       2.0|  2.0|
|       0.0|  0.0|
|       2.0|  2.0|
|       0.0|  0.0|
|       0.0|  2.0|
|       0.0|  2.0|
+----------+-----+
only showing top 20 rows



In [0]:
# ACCURACY METRIC

print("Accuracy:", accuracy)

Accuracy: 0.7344177145981411


In [0]:
# CHECKING OTHER METRICS
 
evaluator.setMetricName("f1")
F1_score = evaluator.evaluate(predictons)
 
evaluator.setMetricName("weightedRecall")
weightedRecall = evaluator.evaluate(predictons)
 
evaluator.setMetricName("weightedPrecision")
weightedPrecision = evaluator.evaluate(predictons)


# EVALUATION METRICS
 
print("Weighted precision: ",weightedPrecision)
print("Weighted recall: ",weightedRecall)
print("F1 score: ",F1_score)
print("Accuracy: ",accuracy)

Weighted precision:  0.7212643735683939
Weighted recall:  0.734417714598141
F1 score:  0.7017836311764857
Accuracy:  0.7344177145981411
