In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# PySpark function for replacing characters using a regex. We'll use this to remove newline characters.
from pyspark.sql.functions import regexp_replace, col

from pyspark.sql.types import StructField, StructType, StringType, IntegerType, TimestampType

# This will help catch some PySpark errors
from py4j.protocol import Py4JJavaError

# Create a SparkSession under the name "twitter-sentiment". Viewable via the Spark UI
spark = SparkSession.builder.appName("twitter-sentiment").getOrCreate()


In [2]:
TRAIN_DATA_PATH = "/home/haitien/Desktop/TwitterSentimentAnalysis_BigData20191/data/training.1600000.processed.noemoticon.csv"
TEST_DATA_PATH = "/home/haitien/Desktop/TwitterSentimentAnalysis_BigData20191/data/testdata.manual.2009.06.14.csv"

In [3]:
# Tạo schema để mô tả các trường của dữ liệu

fields = [StructField("label", IntegerType(), True),
          StructField("tweet_id", StringType(), True),
          StructField("date", TimestampType(), True),
          StructField("query_string", StringType(), True),
          StructField("user", StringType(), True),
          StructField("text", StringType(), True)]

schema = StructType(fields)


In [4]:
df_train = spark.read.format("csv").schema(schema).option("header", "false").load(TRAIN_DATA_PATH)
df_test = spark.read.format("csv").schema(schema).option("header", "false").load(TEST_DATA_PATH)

In [5]:
df_train.printSchema()

root
 |-- label: integer (nullable = true)
 |-- tweet_id: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- query_string: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



In [6]:
df_train.select(['text', 'label']).show()

+--------------------+-----+
|                text|label|
+--------------------+-----+
|@switchfoot http:...|    0|
|is upset that he ...|    0|
|@Kenichan I dived...|    0|
|my whole body fee...|    0|
|@nationwideclass ...|    0|
|@Kwesidei not the...|    0|
|         Need a hug |    0|
|@LOLTrish hey  lo...|    0|
|@Tatiana_K nope t...|    0|
|@twittera que me ...|    0|
|spring break in p...|    0|
|I just re-pierced...|    0|
|@caregiving I cou...|    0|
|@octolinz16 It it...|    0|
|@smarrison i woul...|    0|
|@iamjazzyfizzle I...|    0|
|Hollis' death sce...|    0|
|about to file taxes |    0|
|@LettyA ahh ive a...|    0|
|@FakerPattyPattz ...|    0|
+--------------------+-----+
only showing top 20 rows



In [7]:
df_test.select(['text', 'label']).show()

+--------------------+-----+
|                text|label|
+--------------------+-----+
|@stellargirl I lo...|    4|
|Reading my kindle...|    4|
|Ok, first assesme...|    4|
|@kenburbary You'l...|    4|
|@mikefish  Fair e...|    4|
|@richardebaker no...|    4|
|Fuck this economy...|    0|
|Jquery is my new ...|    4|
|       Loves twitter|    4|
|how can you not l...|    4|
|Check this video ...|    2|
|@Karoli I firmly ...|    0|
|House Corresponde...|    4|
|Watchin Espn..Jus...|    4|
|dear nike, stop w...|    0|
|#lebron best athl...|    4|
|I was talking to ...|    0|
|i love lebron. ht...|    4|
|@ludajuice Lebron...|    0|
|@Pmillzz lebron I...|    4|
+--------------------+-----+
only showing top 20 rows



In [8]:
print("Number rows on train data = {}".format(df_train.count()))
print("Number rows on test data = {}".format(df_test.count()))

Number rows on train data = 1600000
Number rows on test data = 498


In [9]:
df_train.groupBy('label').count().show()

+-----+------+
|label| count|
+-----+------+
|    4|800000|
|    0|800000|
+-----+------+



In [10]:
df_test.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    4|  182|
|    2|  139|
|    0|  177|
+-----+-----+



In [None]:
df_train.select('text').show()

In [12]:
from pyspark.sql.functions import trim, lower
# Emails
emailsRegex=r'[\w\.-]+@[\w\.-]+'

# Mentions
userMentionsRegex=r'(?<=^|(?<=[^a-zA-Z0-9-_\.]))@([A-Za-z]+[A-Za-z0-9]+)'

#Urls
urlsRegex=r'(f|ht)(tp)(s?)(://)(.*)[.|/][^ ]+'

#Numerics
numsRegex=r"\b\d+\b"

punctuationNotEmoticonsRegex=r'(?<=\w)[^\s\w](?![^\s\w])'


def clean_tweet(row):
    row = lower(row)
    row = regexp_replace(row, "n't", " not")
    row = regexp_replace(row, emailsRegex, " ")
    row = regexp_replace(row, userMentionsRegex, " ")
    row = regexp_replace(row, urlsRegex, " ")
    row = regexp_replace(row, numsRegex, " ")
    row = regexp_replace(row, punctuationNotEmoticonsRegex, " ")
    row = regexp_replace(row, r'(.)\1{2,}', r'\1\1')
    row = trim(row)
    return row

df_train_cleaned = df_train.select(['label', clean_tweet(col("text")).alias("text")])

In [13]:
df_train_cleaned.where(df_train_cleaned.text.isNull()).count()

0

In [None]:
df_train_cleaned.coalesce(1).write.format("csv").save("/home/haitien/Desktop/TwitterSentimentAnalysis_BigData20191/data/train_cleaned.csv")

In [14]:
df_train = spark.read.csv("/home/haitien/Desktop/TwitterSentimentAnalysis_BigData20191/data/train_cleaned.csv")

In [15]:
df_train.limit(10).toPandas()

Unnamed: 0,_c0,_c1
0,0,11- a11 that s a bummer11you shoulda got davi...
1,0,is upset that he ca not update his facebook by...
2,0,i dived many times for the ball managed to sa...
3,0,my whole body feels itchy and like its on fire
4,0,no it s not behaving at all i m mad why am ...
5,0,not the whole crew
6,0,need a hug
7,0,"hey long time no see yes.. rains a bit ,only..."
8,0,_k nope they did not have it
9,0,que me muera ?


In [21]:
df_train.select(['_c1']).limit(2).toPandas()

Unnamed: 0,_c1
0,11- a11 that s a bummer11you shoulda got davi...
1,is upset that he ca not update his facebook by...


In [22]:
df_train = df_train.withColumnRenamed("_c0", "label")
df_train = df_train.withColumnRenamed("_c1", "text")

In [23]:
df_train.show()

+-----+--------------------+
|label|                text|
+-----+--------------------+
|    0|11- a11  that s a...|
|    0|is upset that he ...|
|    0|i dived many time...|
|    0|my whole body fee...|
|    0|no  it s not beha...|
|    0|  not the whole crew|
|    0|          need a hug|
|    0|hey  long time no...|
|    0|_k nope they did ...|
|    0|      que me muera ?|
|    0|spring break in p...|
|    0|i just re pierced...|
|    0|i could not bear ...|
|    0|it it counts  idk...|
|    0|i would ve been t...|
|    0|i wish i got to w...|
|    0|hollis  death sce...|
|    0| about to file taxes|
|    0|ahh ive always wa...|
|    0|oh dear  were you...|
+-----+--------------------+
only showing top 20 rows



In [24]:
from pyspark.sql.functions import rand, when
df_train = df_train.orderBy(rand()) \
                   .limit(100000) \
                   .withColumn("label", when(col("label") > 0, 1).otherwise(0)) \
                   .select(["label", "text"])

In [27]:
df_train.where(df_train.text.isNull()).count()

0

In [26]:
df_train = df_train.na.drop(subset=["text"])

In [28]:
(train_set, val_set) = df_train.randomSplit([0.8, 0.2], seed = 2000)

In [30]:
train_set.show(2)

+-----+--------------------+
|label|                text|
+-----+--------------------+
|    0|!  i missed the j...|
|    0|!  i wish i could...|
+-----+--------------------+
only showing top 2 rows



In [31]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

#  class_k probability: 1/(1 + exp(-rawPrediction_k))


tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])


pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|                text|               words|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    0|!  i missed the j...|[!, , i, missed, ...|(262144,[9639,139...|[0.99675013921577...|[0.73041913761973...|       0.0|
|    0|!  i wish i could...|[!, , i, wish, i,...|(262144,[17893,20...|[6.51532457790884...|[0.99852161360287...|       0.0|
|    0|! yay  shame abou...|[!, yay, , shame,...|(262144,[28990,59...|[2.15566982092368...|[0.89619741286587...|       0.0|
|    0|!! i love it so m...|[!!, i, love, it,...|(262144,[2437,963...|[3.43072941531246...|[0.96865122477192...|       0.0|
|    0|   !?  what happened|[!?, , what, happ...|(262144,[29066,81...|[2.37413192974730...|[0.91483334512949...|       0.0|
+-----+-

In [32]:
df_test = df_test.filter(col("label") != 2.0) \
                 .withColumn("label", when(col("label") > 0, 1.0).otherwise(0.0)) \
                 .select(["label", "text"])

In [33]:
predictions = pipelineFit.transform(df_test)
predictions.limit(10).toPandas()

Unnamed: 0,label,text,words,features,rawPrediction,probability,prediction
0,1.0,@stellargirl I loooooooovvvvvveee my Kindle2. ...,"[@stellargirl, i, loooooooovvvvvveee, my, kind...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-5.65563212942664, 5.65563212942664]","[0.00348556965911595, 0.9965144303408839]",1.0
1,1.0,Reading my kindle2... Love it... Lee childs i...,"[reading, my, kindle2..., , love, it..., lee, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-10.686011238582468, 10.686011238582468]","[2.2862007167460163e-05, 0.9999771379928325]",1.0
2,1.0,"Ok, first assesment of the #kindle2 ...it fuck...","[ok,, first, assesment, of, the, #kindle2, ......","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[6.991131513516975, -6.991131513516975]","[0.9990808406866479, 0.0009191593133521278]",0.0
3,1.0,@kenburbary You'll love your Kindle2. I've had...,"[@kenburbary, you'll, love, your, kindle2., i'...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[2.4066804538353437, -2.4066804538353437]","[0.91733530503887, 0.08266469496113008]",0.0
4,1.0,@mikefish Fair enough. But i have the Kindle2...,"[@mikefish, , fair, enough., but, i, have, the...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-2.727225034688343, 2.727225034688343]","[0.06138585527978845, 0.9386141447202115]",1.0
5,1.0,@richardebaker no. it is too big. I'm quite ha...,"[@richardebaker, no., it, is, too, big., i'm, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-3.0108893823478153, 3.0108893823478153]","[0.04693634461432041, 0.9530636553856795]",1.0
6,0.0,Fuck this economy. I hate aig and their non lo...,"[fuck, this, economy., i, hate, aig, and, thei...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[1.778076832017175, -1.778076832017175]","[0.8554592311434474, 0.14454076885655268]",0.0
7,1.0,Jquery is my new best friend.,"[jquery, is, my, new, best, friend.]","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-8.514938837682429, 8.514938837682429]","[0.00020041120663714295, 0.999799588793363]",1.0
8,1.0,Loves twitter,"[loves, twitter]","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-3.0673450538560987, 3.0673450538560987]","[0.044474517247150444, 0.9555254827528495]",1.0
9,1.0,how can you not love Obama? he makes jokes abo...,"[how, can, you, not, love, obama?, he, makes, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[0.9191020297840933, -0.9191020297840933]","[0.7148591027541621, 0.28514089724583785]",0.0


In [34]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.6934873036567951

In [36]:
pipelineFit.write().save("saved_model/model3")

In [37]:
from pyspark.ml import PipelineModel
model1 = PipelineModel.read().load("saved_model/model")
model2 = PipelineModel.read().load("saved_model/model2")
model3 = PipelineModel.read().load("saved_model/model3")

In [None]:
predictions = model3.transform(df_test)

In [38]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
import matplotlib.pyplot as plt
%matplotlib inline

# Utility class for plotting ROC curve (https://stackoverflow.com/questions/52847408/pyspark-extract-roc-curve)
class CurveMetrics(BinaryClassificationMetrics):
    def __init__(self, *args):
        super(CurveMetrics, self).__init__(*args)

    def get_curve(self, method):
        rdd = getattr(self._java_model, method)().toJavaRDD()
        points = []
        for row in rdd.collect():
            points += [(float(row._1()), float(row._2()))]
        return points

preds = predictions.select("label", "probability") \
                   .rdd.map(lambda row: (float(row["probability"][1]), float(row["label"])))
roc_points = CurveMetrics(preds).get_curve("roc")

# Plot ROC curve
fig = plt.figure()
x_val = [x[0] for x in roc_points]
y_val = [x[1] for x in roc_points]
plt.title("ROC curve on test set")
plt.xlabel("False positive rate")
plt.ylabel("True positive rate")
plt.plot(x_val, y_val)
plt.show()

In [None]:
from pyspark.sql.functions import rand, when
df_train = df_train.orderBy(rand()) \
                   .limit(100000) \
                   .withColumn("label", when(col("label") > 0, 1).otherwise(1)) \
                   .select(["label", "text"])

In [None]:
df_train.where(df_train.text.isNull()).count()

In [None]:
df_train = df_train.na.drop(subset=["text"])

In [None]:
(train_set, val_set) = df_train.randomSplit([0.8, 0.2], seed = 2000)

In [None]:
train_set.show()

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

#  class_k probability: 1/(1 + exp(-rawPrediction_k))


tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])


pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

In [None]:
df_test = df_test.filter(col("label") != 2.0) \
                 .withColumn("label", when(col("label") > 0, 1.0).otherwise(0.0)) \
                 .select(["label", "text"])

In [None]:
predictions = pipelineFit.transform(df_test)
predictions.limit(10).toPandas()

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [None]:
pipelineFit.write().save("saved_model/model2")

In [None]:
from pyspark.ml import PipelineModel
model1 = PipelineModel.read().load("saved_model/model")
model2 = PipelineModel.read().load("saved_model/model2")

In [None]:
predictions = model1.transform(df_test)

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
import matplotlib.pyplot as plt
%matplotlib inline

# Utility class for plotting ROC curve (https://stackoverflow.com/questions/52847408/pyspark-extract-roc-curve)
class CurveMetrics(BinaryClassificationMetrics):
    def __init__(self, *args):
        super(CurveMetrics, self).__init__(*args)

    def get_curve(self, method):
        rdd = getattr(self._java_model, method)().toJavaRDD()
        points = []
        for row in rdd.collect():
            points += [(float(row._1()), float(row._2()))]
        return points

preds = predictions.select("label", "probability") \
                   .rdd.map(lambda row: (float(row["probability"][1]), float(row["label"])))
roc_points = CurveMetrics(preds).get_curve("roc")

# Plot ROC curve
fig = plt.figure()
x_val = [x[0] for x in roc_points]
y_val = [x[1] for x in roc_points]
plt.title("ROC curve on test set")
plt.xlabel("False positive rate")
plt.ylabel("True positive rate")
plt.plot(x_val, y_val)
plt.show()