In [1]:
import findspark

findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.functions import udf


spark: SparkSession = (
    SparkSession.builder.appName("data_exploration")
    .master("local[*]")
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
    .getOrCreate()
)
spark.sparkContext.setCheckpointDir("../checkpoints/")

##### Set schema and timeStampFormat

In [3]:
schema = (
    "polarity FLOAT, id LONG, date_time TIMESTAMP, query STRING, user STRING, text STRING"
)
timestampformat = "EEE MMM dd HH:mm:ss zzz yyyy"

spark_reader = spark.read.schema(schema)

##### Load dataframe

In [4]:
dataframe = spark_reader.csv(
    "../data/raw_data/training.1600000.processed.noemoticon.csv",
    quote="'",
    header=False,
    encoding="utf-8",
    timestampFormat=timestampformat
)

In [5]:
dataframe.limit(10).show()
dataframe.printSchema()
dataframe.count()

+--------+----------+-------------------+--------+---------------+--------------------+
|polarity|        id|          date_time|   query|           user|                text|
+--------+----------+-------------------+--------+---------------+--------------------+
|     0.0|1467810369|2009-04-07 08:49:45|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|     0.0|1467810672|2009-04-07 08:49:49|NO_QUERY|  scotthamilton|is upset that he ...|
|     0.0|1467810917|2009-04-07 08:49:53|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0.0|1467811184|2009-04-07 08:49:57|NO_QUERY|        ElleCTF|my whole body fee...|
|     0.0|1467811193|2009-04-07 08:49:57|NO_QUERY|         Karoli|@nationwideclass ...|
|     0.0|1467811372|2009-04-07 08:50:00|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|     0.0|1467811592|2009-04-07 08:50:03|NO_QUERY|        mybirch|         Need a hug |
|     0.0|1467811594|2009-04-07 08:50:03|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|     0.0|1467811795|2009-04-07 

1600000

##### Sample a small subset of dataframe to simulate streaming procedure <br>and subtract it from main dataframe in order to create static dataframe.

In [8]:
streaming_dataframe = dataframe.sampleBy(
    "polarity", fractions={0: 0.1875, 4: 0.1875}
).cache()

static_dataframe = dataframe.subtract(streaming_dataframe).cache()

In [25]:
streaming_dataframe.limit(10).orderBy('date_time', ascending=True).show()

+--------+----------+-------------------+--------+--------------+--------------------+
|polarity|        id|          date_time|   query|          user|                text|
+--------+----------+-------------------+--------+--------------+--------------------+
|     0.0|1467811594|2009-04-07 08:50:03|NO_QUERY|          coZZ|@LOLTrish hey  lo...|
|     0.0|1467812025|2009-04-07 08:50:09|NO_QUERY|       mimismo|@twittera que me ...|
|     0.0|1467812964|2009-04-07 08:50:22|NO_QUERY|lovesongwriter|Hollis' death sce...|
|     0.0|1467813579|2009-04-07 08:50:31|NO_QUERY|    starkissed|@LettyA ahh ive a...|
|     0.0|1467815923|2009-04-07 08:51:07|NO_QUERY|     fatkat309|some1 hacked my a...|
|     0.0|1467817502|2009-04-07 08:51:32|NO_QUERY|       Tmttq86|@fleurylis I don'...|
|     0.0|1467818603|2009-04-07 08:51:49|NO_QUERY|     kennypham|Sad, sad, sad. I ...|
|     0.0|1467819022|2009-04-07 08:51:56|NO_QUERY|   hpfangirl94|Falling asleep. J...|
|     0.0|1467819712|2009-04-07 08:52:06|NO

##### Details of sampled dataframe

In [16]:
streaming_dataframe.count()

299520

In [14]:
streaming_dataframe.groupby("polarity").count().show()

+--------+------+
|polarity| count|
+--------+------+
|     0.0|149582|
|     4.0|149938|
+--------+------+



##### Save sampled dataframe ordered by date and time; also, <br>because I want to read this file without Spark, I saved it in one partition.

In [26]:
streaming_dataframe.orderBy('date_time', ascending=True).coalesce(1).write.csv(
    "../data/streaming_data", mode="overwrite"
)

##### Details of static dataframe

In [11]:
static_dataframe.count()

1300480

In [13]:
static_dataframe.groupby("polarity").count().show()

+--------+------+
|polarity| count|
+--------+------+
|     4.0|650062|
|     0.0|650418|
+--------+------+



##### Analyze text column to see potential problems

In [18]:
text_problems = (
    static_dataframe.filter(
        f.col("text").contains("#")
        | f.col("text").contains("http:/")
        | f.col("text").contains("@")
        | f.col("text").contains("&")
    )
    .limit(30)
    .select("text")
)

text_problems.show(30, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                      |
+------------------------------------------------------------------------------------------------------------------------------------------+
|@Toddly00 lucky, I was up at 7:30am                                                                                                       |
|@omgcorrine im sorry honey  are you guys fighting?                                                                                        |
|@mitchelmusso i wanna go but i can't                                                                                                      |
|hate to say this cause i love @marthastewart 1 bowl chocolate but those coconut cupcakes kinda suck (p29). a waste of time and resources. |
|@uncommon_se

##### Check if static dataframe has any null values; we can see there is so no any null values

In [19]:
static_dataframe.count(), static_dataframe.na.drop().count()

(1300480, 1300480)

##### Prune and clean text column, and change polarity label of 4 to 1.<br> Also, empty texts have been removed.

In [20]:
import html

user_regex = r"(@\w{1,15})"
url_regex = r"((https?|ftp|file):\/{2,3})+([-\w+&@#/%=~|$?!:,.]*)|(www.)+([-\w+&@#/%=~|$?!:,.]*)"
email_regex = r"[\w.-]+@[\w.-]+\.[a-zA-Z]{1,}"


@udf
def html_unescape(s: str):
    if isinstance(s, str):
        return html.unescape(s)
    return s

In [21]:
clean_static_dataframe = (
    static_dataframe.repartition(12)
    .withColumn("text", f.regexp_replace("text", url_regex, ""))
    .withColumn("text", f.regexp_replace("text", email_regex, ""))
    .withColumn("text", f.regexp_replace("text", user_regex, ""))
    .withColumn("text", f.regexp_replace("text", "#", " "))
    .withColumn("text", html_unescape(f.col("text")))
    .withColumn("text", f.regexp_replace("text", "[^a-zA-Z']", " "))
    .withColumn("text", f.regexp_replace("text", " +", " "))
    .withColumn("text", f.trim("text"))
    .filter("text != ''")
    .withColumn("polarity", f.when(f.col("polarity") == 4.0, 1.0).otherwise(0.0))
).cache()

##### Details of clean dataframe

In [22]:
clean_static_dataframe.count()

1300461

In [31]:
clean_static_dataframe.groupby("polarity").count().show()

+--------+------+
|polarity| count|
+--------+------+
|     0.0|650413|
|     1.0|650048|
+--------+------+



##### Check if any errors are left

In [30]:
text_problems = (
    clean_static_dataframe.filter(
        f.col("text").contains("#")
        | f.col("text").contains("http:/")
        | f.col("text").contains("@")
        | f.col("text").contains("&")
    )
    .select("text")
)

text_problems.count()

0

##### Save static dataframe 

In [24]:
clean_static_dataframe.write.partitionBy('polarity').csv(
    "../data/clean_static_data", mode="overwrite"
)

In [32]:
spark.stop()