In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import StringIndexer

In [0]:
input_url = "https://raw.githubusercontent.com/hvreddy654/big-data/main/Tweets.csv"
from pyspark import SparkFiles
spark.sparkContext.addFile(input_url)
data = spark.read.csv("file://"+SparkFiles.get("Tweets.csv"), header=True, inferSchema= True)
data.show()


+--------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+-----------+--------------------+--------------------+--------------------+
|            tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|           name|negativereason_gold|retweet_count|                text|tweet_coord|       tweet_created|      tweet_location|       user_timezone|
+--------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+-----------+--------------------+--------------------+--------------------+
|  570306133677760513|          neutral|                         1.0|          null|                     null|Virgi

In [0]:
df = data.select("text", "airline_sentiment")
df.show()

+--------------------+-----------------+
|                text|airline_sentiment|
+--------------------+-----------------+
|@VirginAmerica Wh...|          neutral|
|@VirginAmerica pl...|         positive|
|@VirginAmerica I ...|          neutral|
|"@VirginAmerica i...|         negative|
|@VirginAmerica an...|         negative|
|@VirginAmerica se...|         negative|
|                null|             null|
|@VirginAmerica ye...|         positive|
|@VirginAmerica Re...|          neutral|
|@virginamerica We...|         positive|
|@VirginAmerica it...|         positive|
|@VirginAmerica di...|          neutral|
|@VirginAmerica I ...|         positive|
|@VirginAmerica Th...|         positive|
|@VirginAmerica @v...|         positive|
|@VirginAmerica Th...|         positive|
|@VirginAmerica SF...|         negative|
|@VirginAmerica So...|         positive|
|@VirginAmerica  I...|         negative|
|I ❤️ flying @Virg...|         positive|
+--------------------+-----------------+
only showing top

In [0]:
df = df.na.drop(subset=["text"])
df.show()

+--------------------+-----------------+
|                text|airline_sentiment|
+--------------------+-----------------+
|@VirginAmerica Wh...|          neutral|
|@VirginAmerica pl...|         positive|
|@VirginAmerica I ...|          neutral|
|"@VirginAmerica i...|         negative|
|@VirginAmerica an...|         negative|
|@VirginAmerica se...|         negative|
|@VirginAmerica ye...|         positive|
|@VirginAmerica Re...|          neutral|
|@virginamerica We...|         positive|
|@VirginAmerica it...|         positive|
|@VirginAmerica di...|          neutral|
|@VirginAmerica I ...|         positive|
|@VirginAmerica Th...|         positive|
|@VirginAmerica @v...|         positive|
|@VirginAmerica Th...|         positive|
|@VirginAmerica SF...|         negative|
|@VirginAmerica So...|         positive|
|@VirginAmerica  I...|         negative|
|I ❤️ flying @Virg...|         positive|
|@VirginAmerica yo...|         positive|
+--------------------+-----------------+
only showing top

In [0]:
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
stopword = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered_text")
hashingTF = HashingTF(inputCol=stopword.getOutputCol(), outputCol="features")
indexer = StringIndexer(inputCol = "airline_sentiment", outputCol="label")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[indexer,tokenizer,stopword, hashingTF, lr])


In [0]:
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100]) \
    .addGrid(lr.regParam, [0.1, 0.01, 0.05,0.25,0.5]) \
    .addGrid(lr.maxIter,[10,50,100,200])\
    .build()

In [0]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=4)  

In [0]:
training, test = df.randomSplit([0.8, 0.2])
training.show(40)

+--------------------+-----------------+
|                text|airline_sentiment|
+--------------------+-----------------+
|"""LOL you guys a...|         positive|
|".@JetBlue ooooka...|         negative|
|".@united It's wo...|         negative|
|".@united You may...|         negative|
|"@AmericanAir  so...|         negative|
|"@AmericanAir  up...|         negative|
|"@AmericanAir ""A...|         positive|
|"@AmericanAir ""A...|         positive|
|"@AmericanAir ""T...|          neutral|
|"@AmericanAir ""W...|         negative|
|"@AmericanAir ""b...|         negative|
|"@AmericanAir ""s...|         negative|
|"@AmericanAir $60...|         negative|
|"@AmericanAir -- ...|         negative|
|"@AmericanAir @US...|         negative|
|"@AmericanAir @ma...|         negative|
|"@AmericanAir @un...|         negative|
|"@AmericanAir AA1...|         negative|
|"@AmericanAir Aft...|         negative|
|"@AmericanAir By ...|         negative|
|"@AmericanAir Cha...|         negative|
|"@AmericanAir D

In [0]:
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)



In [0]:
prediction = cvModel.transform(test)
selected = prediction.select("text", "airline_sentiment", "probability", "prediction")
for row in selected.collect():
    print(row)

Row(text='".@AmericanAir @USAirways Add insult to injury you guys ""misplaced"" my bag. ', airline_sentiment='negative', probability=DenseVector([0.5121, 0.1212, 0.3668]), prediction=0.0)
Row(text='"@AmericanAir ""overweight"" flight = you sold more tickets than you had seats. We all know that. Let\'s call it what it is."', airline_sentiment='negative', probability=DenseVector([0.6147, 0.3421, 0.0432]), prediction=0.0)
Row(text='"@AmericanAir @USAirways ""ma\'am if you have a complaint you should visit our customer service desk"" {sees line ~45 people deep}"', airline_sentiment='negative', probability=DenseVector([0.7082, 0.1202, 0.1716]), prediction=0.0)
Row(text='"@AmericanAir @emxlyy ""The wheel was broken when we got it. We swear."""', airline_sentiment='negative', probability=DenseVector([0.2074, 0.4683, 0.3243]), prediction=1.0)
Row(text='"@AmericanAir @united and a complete lack of faith in your companies. It\'s really a shame that you think telling me ""it\'s been a challenging

In [0]:
prediction.show()

+--------------------+-----------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                text|airline_sentiment|label|               words|       filtered_text|            features|       rawPrediction|         probability|prediction|
+--------------------+-----------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|".@AmericanAir @U...|         negative|  0.0|[".@americanair, ...|[".@americanair, ...|(100,[16,36,64,68...|[0.59165281430165...|[0.51205791300919...|       0.0|
|"@AmericanAir ""o...|         negative|  0.0|["@americanair, "...|["@americanair, "...|(100,[1,7,8,29,38...|[1.08063825345850...|[0.61472846122900...|       0.0|
|"@AmericanAir @US...|         negative|  0.0|["@americanair, @...|["@americanair, @...|(100,[3,9,12,13,3...|[1.06365648951445...|[0.70818852836917...|       0.0|
|"@AmericanAir @em...|

In [0]:
label_prediction = prediction.select("label","prediction")
label_prediction.show()
label_prediction = label_prediction.rdd
label_prediction.collect()

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       0.0|
|  0.0|       2.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
+-----+----------+
only showing top 20 rows

Out[14]: [Row(label=0.0, prediction=0.0),
 Row(label=0.0, prediction=0.0),
 Row(label=0.0, prediction=0.0),
 Row(label=0.0, prediction=1.0),
 Row(label=0.0, prediction=0.0),
 Row(label=0.0, prediction=2.0),
 Row(label=0.0, prediction=0.0),
 Row(label=0.0, prediction=0.0),
 Row(label=0.0, prediction=0.0),
 Row(label=0.0, prediction=0.0),
 Row(label=0.0, prediction=0.0),
 Row(label=0.0, prediction=0.0),
 Row(label=0.0, prediction=0.0),
 Row(label=0.0, prediction=0.0),
 Row(label=0.0, prediction=0.0),
 Row(label=0.0

In [0]:
metrics = MulticlassMetrics(label_prediction)

# overall statistics
confusion = metrics.confusionMatrix()
precision = metrics.precision(1.0)
recall = metrics.recall(1.0)
f1Score = metrics.fMeasure(1.0)
accur = metrics.accuracy
# result = ({
print("Summary Stats") 
print("Confusion matrix:")
print(confusion)
print("Accuracy = %s" % accur)
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

# Weighted stats
print("Weighted recall = %s" % metrics.weightedRecall)
print("Weighted precision = %s" % metrics.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)
# })

# result.saveAsTextFile(outputFile)

# df1 = pd.DataFrame(result)
# print(df
# rdd2=spark.sparkContext.parallelize(result)

# rdd2.take(10)

Summary Stats
Confusion matrix:
DenseMatrix([[1677.,  412.,  271.],
             [  85.,  142.,   71.],
             [  58.,   50.,  129.]])
Accuracy = 0.6728842832469776
Precision = 0.23509933774834438
Recall = 0.47651006711409394
F1 Score = 0.3148558758314856
Weighted recall = 0.6728842832469776
Weighted precision = 0.7977692087205226
Weighted F(1) Score = 0.7163514303253187
Weighted F(0.5) Score = 0.76089471653981
Weighted false positive rate = 0.24673974855469705


In [0]:
display("Summary Stats")
display("Accuracy = %s" % accur)
display("Precision = %s" % precision)
display("Recall = %s" % recall)
display("F1 Score = %s" % f1Score)

# Weighted stats
display("Weighted recall = %s" % metrics.weightedRecall)
display("Weighted precision = %s" % metrics.weightedPrecision)
display("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
display("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
display("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)

'Summary Stats''Accuracy = 0.6728842832469776''Precision = 0.23509933774834438''Recall = 0.47651006711409394''F1 Score = 0.3148558758314856''Weighted recall = 0.6728842832469776''Weighted precision = 0.7977692087205226''Weighted F(1) Score = 0.7163514303253187''Weighted F(0.5) Score = 0.76089471653981''Weighted false positive rate = 0.24673974855469705'

In [0]:
columns = ["accuracy","precision","recall","fMeasure","truePositiveRate","falsePositiveRate","weightedRecall","weightedPrecision","weightedFMeasure","weightedFalsePositiveRate"]
tempList = [metrics.accuracy,metrics.precision(float(i)),metrics.recall(float(i)),metrics.fMeasure(float(i)),metrics.truePositiveRate(float(i)),metrics.falsePositiveRate(float(i)),metrics.weightedRecall,metrics.weightedPrecision,metrics.weightedFMeasure(),metrics.weightedFalsePositiveRate] 
tempDf = spark.createDataFrame(data = [tempList], schema=columns)
tempDf.display()

accuracy,precision,recall,fMeasure,truePositiveRate,falsePositiveRate,weightedRecall,weightedPrecision,weightedFMeasure,weightedFalsePositiveRate
0.6728842832469776,0.2738853503184713,0.5443037974683544,0.364406779661017,0.5443037974683544,0.128668171557562,0.6728842832469776,0.7977692087205226,0.7163514303253187,0.246739748554697
