In [None]:
!pip install pyspark

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import random
import os

from pyspark.sql import SparkSession 
from pyspark.ml  import Pipeline     
from pyspark.sql import SQLContext  
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.sql import functions as F

In [None]:
DATASET_COLUMNS = StructType([
    StructField("target", StringType(), True),
    StructField("ids", StringType(), True),
    StructField("date", StringType(), True),
    StructField("flag", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True)])

DATASET_ENCODING = "ISO-8859-1"

In [None]:
spark = SparkSession.builder.appName('Tweets Sentiment').getOrCreate()
df = spark.read.csv('../input/sentiment140',header = 'False',schema=DATASET_COLUMNS)
spark.sparkContext.setLogLevel('ERROR')

In [None]:
df.printSchema()

In [None]:
df.show(25)

In [None]:
df = df.dropna()
df.count()

In [None]:
def preprocessing(sparkDF,col):
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, r'http\S+', ''))
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, '@\w+', ''))
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, '#', ''))
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, 'RT', ''))
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, ':', ''))
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, '[^A-Za-z0-9]+', ' '))
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, '\-', ''))
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, '[ ]+', ' '))
    sparkDF = sparkDF.withColumn(col, F.trim(sparkDF[col]))

    return sparkDF

In [None]:
df = preprocessing(df,'text')

In [None]:
df.show(25)

In [None]:
# df.groupby("user").count().show()
df.groupBy('user').count().sort('count',ascending=False).show(10)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [None]:
(train_set, val_set, test_set) = df.randomSplit([0.98, 0.01, 0.01], seed = 99)

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

In [None]:
train_df.printSchema()

In [None]:
val_df.show(5)

In [None]:
val_df.printSchema()

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictionsLojistic = lrModel.transform(val_df)

In [None]:
predictionsLojistic.show(25)

In [None]:
predictionsLojistic.printSchema()

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictionsLojistic)

In [None]:
accuracy = predictionsLojistic.filter(predictionsLojistic.label == predictionsLojistic.prediction).count() / float(val_set.count())
accuracy

In [None]:
test_df = pipelineFit.transform(test_set)
testPredictionsLogistic = lrModel.transform(test_df)

In [None]:
testPredictionsLogistic.show(25)

In [None]:
test_accuracy = testPredictionsLogistic.filter(testPredictionsLogistic.label == testPredictionsLogistic.prediction).count() / float(test_set.count())
test_roc_auc = evaluator.evaluate(testPredictionsLogistic)
print("Logistic HashingTF Test Accuracy Score: {0:.4f}".format(test_accuracy))
print("Logistic HashingTF Test ROC-AUC: {0:.4f}".format(test_roc_auc))

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier()
rfModel = rf.fit(train_df)
predictionsForest = rfModel.transform(val_df)

In [None]:
predictionsForest.show(25)

In [None]:
testPredictionsForest = rfModel.transform(test_df)

In [None]:
testPredictionsForest.show(5)

In [None]:
test_accuracy = testPredictionsForest.filter(testPredictionsForest.label == testPredictionsForest.prediction).count() / float(test_set.count())
test_roc_auc = evaluator.evaluate(testPredictionsForest)
print("Random Forest HashingTF Test Accuracy Score: {0:.4f}".format(test_accuracy))
print("Random Forest HashingTF Test ROC-AUC: {0:.4f}".format(test_roc_auc))

In [None]:
from pyspark.ml.feature import CountVectorizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")

lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_set)
predictions2 = pipelineFit.transform(val_set)

In [None]:
predictions2.show(25)

In [None]:
accuracy = predictions2.filter(predictions2.label == predictions2.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions2)
print("Logistic CountVectorizer Accuracy Score: {0:.4f}".format(accuracy))
print("Logistic CountVectorizer ROC-AUC: {0:.4f}".format(roc_auc))

In [None]:
testPredictions = pipelineFit.transform(test_set)

In [None]:
testPredictions.show(25)


In [None]:
test_accuracy = testPredictions.filter(testPredictions.label == testPredictions.prediction).count() / float(test_set.count())
test_roc_auc = evaluator.evaluate(testPredictions)
print("Logistic CountVectorizer Test Accuracy Score: {0:.4f}".format(test_accuracy))
print("Logistic CountVectorizer Test ROC-AUC: {0:.4f}".format(test_roc_auc))

In [None]:
rf = RandomForestClassifier()
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, rf])
pipelineFitrf = pipeline.fit(train_set)
predictions2rf = pipelineFit.transform(val_set)

In [None]:
predictions2rf.show(25)

In [None]:
accuracy = predictions2rf.filter(predictions2rf.label == predictions2rf.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions2rf)
print("Random Forest CountVectorizer Accuracy Score: {0:.4f}".format(accuracy))
print("Random Forest CountVectorizer ROC-AUC: {0:.4f}".format(roc_auc))

In [None]:
testPredictionsRF = pipelineFitrf.transform(test_set)

In [None]:
testPredictionsRF.show(25)

In [None]:
test_accuracy = testPredictionsRF.filter(testPredictionsRF.label == testPredictionsRF.prediction).count() / float(test_set.count())
test_roc_auc = evaluator.evaluate(testPredictionsRF)
print("Random Forest CountVectorizer Test Accuracy Score: {0:.4f}".format(test_accuracy))
print("Random Forest CountVectorizer Test ROC-AUC: {0:.4f}".format(test_roc_auc))