In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import StopWordsRemover

In [3]:
from pyspark.sql.types import StructField, StructType
from pyspark.sql.types import ShortType, StringType, IntegerType, LongType
import pandas as pd
from pyspark.sql.functions import udf, col, lower, monotonically_increasing_id
import re
import string

pd.set_option('max_colwidth', None)

In [4]:
spark = SparkSession\
        .builder\
        .appName('TweetsDataPrep')\
        .getOrCreate()

In [5]:
csv_schema = StructType([
    StructField('__corrupted', StringType(), True),
    StructField('target', ShortType(), False), # target cannot be null
    StructField('ids', LongType(), False), # ids cannot be null
    StructField('date', StringType(), True),
    StructField('flag', StringType(), True),
    StructField('user', StringType(), True),
    StructField('text', StringType(), False) # text cannot be null
])
tweets_df = spark.read.csv(path="tweets-sentiment140-data", 
                           schema=csv_schema,
                           mode="PERMISSIVE", # corrupted records will be stored in column "__corrupted"
                           columnNameOfCorruptRecord='__corrupted').cache()
print("Total read records: {}".format(tweets_df.count()))
print("Total corrupted records: {}".format(tweets_df.filter("__corrupted is NOT NULL").count()))
print("Total valid records: {}".format(tweets_df.filter("__corrupted is NULL").count()))

Total read records: 1600000
Total corrupted records: 0
Total valid records: 1600000


In [6]:
tweets_df.printSchema()

root
 |-- __corrupted: string (nullable = true)
 |-- target: short (nullable = true)
 |-- ids: long (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



In [7]:
tweets_df.limit(5).toPandas()

Unnamed: 0,__corrupted,target,ids,date,flag,user,text
0,,0,1467810369,Mon Apr 06 22:19:45 PDT 2009,NO_QUERY,_TheSpecialOne_,"@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer. You shoulda got David Carr of Third Day to do it. ;D"
1,,0,1467810672,Mon Apr 06 22:19:49 PDT 2009,NO_QUERY,scotthamilton,is upset that he can't update his Facebook by texting it... and might cry as a result School today also. Blah!
2,,0,1467810917,Mon Apr 06 22:19:53 PDT 2009,NO_QUERY,mattycus,@Kenichan I dived many times for the ball. Managed to save 50% The rest go out of bounds
3,,0,1467811184,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,ElleCTF,my whole body feels itchy and like its on fire
4,,0,1467811193,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,Karoli,"@nationwideclass no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over there."


In [8]:
replace_target = udf(lambda x: x if x == 0 else 1)
tweets_df = tweets_df.withColumn("target", replace_target(tweets_df["target"])) # convert positive label: 4 => 1
tweets_df = tweets_df.select(tweets_df["text"], tweets_df["target"]) # only used text and target label
tweets_df = tweets_df.withColumn("text", lower(col("text"))) # convert text to lower case
tweets_df.show(5)

+--------------------+------+
|                text|target|
+--------------------+------+
|@switchfoot http:...|     0|
|is upset that he ...|     0|
|@kenichan i dived...|     0|
|my whole body fee...|     0|
|@nationwideclass ...|     0|
+--------------------+------+
only showing top 5 rows



In [9]:
# remove string patterns: "@xxx", "http:***", "https:***", non-meaningful-punctuation, digits
@udf
def re_text(text):
    rep_pattern = r"@\S+|https?:\S+|[^A-Za-z!'?]+" # keep ! ? '
    res = re.sub(rep_pattern, ' ', text).strip()
    if res is not None and len(res) > 20: # only keep text with > 20 chars
        return res
    else:
        return None
tweets_df = tweets_df.withColumn("clean_text", re_text(col("text")))
tweets_df.limit(10).toPandas()

Unnamed: 0,text,target,clean_text
0,"@switchfoot http://twitpic.com/2y1zl - awww, that's a bummer. you shoulda got david carr of third day to do it. ;d",0,awww that's a bummer you shoulda got david carr of third day to do it d
1,is upset that he can't update his facebook by texting it... and might cry as a result school today also. blah!,0,is upset that he can't update his facebook by texting it and might cry as a result school today also blah!
2,@kenichan i dived many times for the ball. managed to save 50% the rest go out of bounds,0,i dived many times for the ball managed to save the rest go out of bounds
3,my whole body feels itchy and like its on fire,0,my whole body feels itchy and like its on fire
4,"@nationwideclass no, it's not behaving at all. i'm mad. why am i here? because i can't see you all over there.",0,no it's not behaving at all i'm mad why am i here? because i can't see you all over there
5,@kwesidei not the whole crew,0,
6,need a hug,0,
7,"@loltrish hey long time no see! yes.. rains a bit ,only a bit lol , i'm fine thanks , how's you ?",0,hey long time no see! yes rains a bit only a bit lol i'm fine thanks how's you ?
8,@tatiana_k nope they didn't have it,0,nope they didn't have it
9,@twittera que me muera ?,0,


In [10]:
tweets_df_clean = tweets_df.select(col("clean_text"), col("target")).filter("clean_text is NOT NULL")
print("Number of cleaned records: {}".format(tweets_df_clean.count()))
tweets_df_clean.limit(10).toPandas()

Number of cleaned records: 1442394


Unnamed: 0,clean_text,target
0,awww that's a bummer you shoulda got david carr of third day to do it d,0
1,is upset that he can't update his facebook by texting it and might cry as a result school today also blah!,0
2,i dived many times for the ball managed to save the rest go out of bounds,0
3,my whole body feels itchy and like its on fire,0
4,no it's not behaving at all i'm mad why am i here? because i can't see you all over there,0
5,hey long time no see! yes rains a bit only a bit lol i'm fine thanks how's you ?,0
6,nope they didn't have it,0
7,spring break in plain city it's snowing,0
8,i just re pierced my ears,0
9,i couldn't bear to watch it and i thought the ua loss was embarrassing,0


In [11]:
print("Positive records: {}".format(tweets_df_clean.filter("target = 1").count()))
print("Negative records: {}".format(tweets_df_clean.filter("target = 0").count()))

Positive records: 714005
Negative records: 728389


In [12]:
tweets_df_clean = tweets_df_clean.select(monotonically_increasing_id().alias("tweet_id"), 
                                        col("clean_text").alias("text"),
                                        col("target"))
tweets_df_clean.limit(10).toPandas()

Unnamed: 0,tweet_id,text,target
0,0,awww that's a bummer you shoulda got david carr of third day to do it d,0
1,1,is upset that he can't update his facebook by texting it and might cry as a result school today also blah!,0
2,2,i dived many times for the ball managed to save the rest go out of bounds,0
3,3,my whole body feels itchy and like its on fire,0
4,4,no it's not behaving at all i'm mad why am i here? because i can't see you all over there,0
5,5,hey long time no see! yes rains a bit only a bit lol i'm fine thanks how's you ?,0
6,6,nope they didn't have it,0
7,7,spring break in plain city it's snowing,0
8,8,i just re pierced my ears,0
9,9,i couldn't bear to watch it and i thought the ua loss was embarrassing,0


In [16]:
tweets_df_clean.write.csv(path="tweets-sentiment140-data/tweets_df_clean", 
                          compression="gzip", 
                          header=True,
                         mode="overwrite")

In [17]:
tweets_df = spark.read.csv(path="tweets-sentiment140-data/tweets_df_clean",
                          inferSchema=True,
                          header=True)
tweets_df.printSchema()

root
 |-- tweet_id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- target: integer (nullable = true)



In [19]:
tweets_df.orderBy(col("tweet_id")).limit(10).toPandas()

Unnamed: 0,tweet_id,text,target
0,0,awww that's a bummer you shoulda got david carr of third day to do it d,0
1,1,is upset that he can't update his facebook by texting it and might cry as a result school today also blah!,0
2,2,i dived many times for the ball managed to save the rest go out of bounds,0
3,3,my whole body feels itchy and like its on fire,0
4,4,no it's not behaving at all i'm mad why am i here? because i can't see you all over there,0
5,5,hey long time no see! yes rains a bit only a bit lol i'm fine thanks how's you ?,0
6,6,nope they didn't have it,0
7,7,spring break in plain city it's snowing,0
8,8,i just re pierced my ears,0
9,9,i couldn't bear to watch it and i thought the ua loss was embarrassing,0
