In [None]:
from pyspark.sql.functions import *
from pyspark.ml.feature import * 
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression

In [None]:
# uploading the text file onto the cluster
path_to_df = "Tweets.csv"
input_df = spark.read.option("header","true").option("inferSchema","true").csv(path_to_df).dropna(subset='text')
input_df.show()

+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+-----------+--------------------+--------------------+--------------------+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|           name|negativereason_gold|retweet_count|                text|tweet_coord|       tweet_created|      tweet_location|       user_timezone|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+-----------+--------------------+--------------------+--------------------+
|570306133677760513|          neutral|                         1.0|          null|                     null|Virgin Americ

In [None]:
input_df = input_df.select([split(col("text")," ").alias("text_array"),'*'])
remover = StopWordsRemover(inputCol="text_array", outputCol="filtered_text")
ht = HashingTF(inputCol="filtered_text", outputCol="features")
indexer = StringIndexer(inputCol="airline_sentiment", outputCol="airline_sentiment_indexed")

pipeline = Pipeline(stages=[remover, ht, indexer])
model = pipeline.fit(input_df)
pipeline_df = model.transform(input_df)
cols = ('text', 'text_array', 'airline_sentiment')
df = pipeline_df.drop(*cols)
df.show()

+------------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------------+
|          tweet_id|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|           name|negativereason_gold|retweet_count|tweet_coord|       tweet_created|      tweet_location|       user_timezone|       filtered_text|            features|airline_sentiment_indexed|
+------------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------------+
|570306133677760513|              

In [None]:
train, test = df.randomSplit([0.8, 0.2], seed = 2000)

In [None]:
lr = LogisticRegression(featuresCol='features', labelCol='airline_sentiment_indexed', maxIter=5)
paramGrid = (ParamGridBuilder()
            .addGrid(lr.regParam, [0.1, 0.5, 2.0, 5.0])
            .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
            .addGrid(lr.maxIter, [1, 5, 10, 20])
            .build())
evaluator = MulticlassClassificationEvaluator(labelCol='airline_sentiment_indexed')
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)
cvModel = cv.fit(train)
predictions = cvModel.transform(test)

In [None]:
auc = evaluator.evaluate(predictions)
acc = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
tprbl = evaluator.evaluate(predictions, {evaluator.metricName: "truePositiveRateByLabel"})
hloss = evaluator.evaluate(predictions, {evaluator.metricName: "hammingLoss"})
lloss = evaluator.evaluate(predictions, {evaluator.metricName: "logLoss"})

print("AUC: {}".format(auc))
print("Accuracy: {}".format(acc))
print("True Positive Rate By Label: {}".format(tprbl))
print("Hamming Loss: {}".format(hloss))
print("Hamming Loss: {}".format(lloss))

AUC: 0.718533985467147
Accuracy: 0.7408835904628331
True Positive Rate By Label: 0.9264947888096544
Hamming Loss: 0.2591164095371669
Hamming Loss: 0.6257079534362642


In [None]:
bestModel = cvModel.bestModel
coefficientMatrix = bestModel.coefficientMatrix
regParam = bestModel.getRegParam()
maxIter = bestModel.getMaxIter()
elasticNetParam = bestModel.getElasticNetParam()

print("coefficientMatrix: {}".format(coefficientMatrix))
print("Reg Paramater: {}".format(regParam))
print("Max Iteration: {}".format(maxIter))
print("Elastic Net Param: {}".format(elasticNetParam))

coefficientMatrix: 3 X 262144 CSRMatrix
(0,7) 0.51
(0,19) -0.1708
(0,26) 0.2266
(0,48) -0.1789
(0,61) -0.8843
(0,83) 0.4354
(0,108) 0.3819
(0,141) -0.158
(0,144) 0.5899
(0,150) 0.0978
(0,152) 0.192
(0,156) -0.3838
(0,161) 0.2506
(0,184) 0.2703
(0,192) -0.3146
(0,210) -0.1656
..
..
Reg Paramater: 0.1
Max Iteration: 20
Elastic Net Param: 0.0
