## Data Cleaning

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [2]:
spark = SparkSession.builder.appName("Time Series Sentiment Analysis").getOrCreate()

In [3]:
# Load the data ProjectTweets.csv into hadoop in the named folder 'user1'
df = spark.read.csv('/user1/ProjectTweets.csv', header=False, inferSchema=True)

                                                                                

In [4]:
# Display the structure of schema
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: long (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)



In [5]:
from pyspark.sql import Row

# Extract the first row which contains the original header information
header_row = df.first()

# Rename the columns as specified
new_column_names = ["index", "user_id", "timestamp", "query", "username", "tweet_text"]
for i, colname in enumerate(df.columns):
    df = df.withColumnRenamed(colname, new_column_names[i])

# Drop the first row from the DataFrame to avoid duplication
df = df.filter(df.index != header_row[0])

# Construct a new DataFrame with header row 
header_df = spark.createDataFrame([header_row], new_column_names)

# Concatenate header DataFrame and original DataFrame
df = header_df.union(df)

# Drop the "query" column
df = df.drop("query")

# Show the DataFrame to verify
df.show()

+-----+----------+--------------------+---------------+--------------------+
|index|   user_id|           timestamp|       username|          tweet_text|
+-----+----------+--------------------+---------------+--------------------+
|    0|1467810369|Mon Apr 06 22:19:...|_TheSpecialOne_|@switchfoot http:...|
|    1|1467810672|Mon Apr 06 22:19:...|  scotthamilton|is upset that he ...|
|    2|1467810917|Mon Apr 06 22:19:...|       mattycus|@Kenichan I dived...|
|    3|1467811184|Mon Apr 06 22:19:...|        ElleCTF|my whole body fee...|
|    4|1467811193|Mon Apr 06 22:19:...|         Karoli|@nationwideclass ...|
|    5|1467811372|Mon Apr 06 22:20:...|       joy_wolf|@Kwesidei not the...|
|    6|1467811592|Mon Apr 06 22:20:...|        mybirch|         Need a hug |
|    7|1467811594|Mon Apr 06 22:20:...|           coZZ|@LOLTrish hey  lo...|
|    8|1467811795|Mon Apr 06 22:20:...|2Hood4Hollywood|@Tatiana_K nope t...|
|    9|1467812025|Mon Apr 06 22:20:...|        mimismo|@twittera que me ...|

In [6]:
from pyspark.sql.functions import when, count, col

# Counting missing data
missing_data_count = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()
missing_data_count

                                                                                

[Row(index=0, user_id=0, timestamp=0, username=0, tweet_text=0)]

In [7]:
from pyspark.sql.functions import regexp_replace, col

# Initial data cleanup steps
# Remove URLs
df = df.withColumn("cleaned_text", regexp_replace(col("tweet_text"), "(http://[^\\s]+|https://[^\\s]+)", ""))

# Remove mentions
df = df.withColumn("cleaned_text", regexp_replace(col("cleaned_text"), "(@[\\w]+)", ""))

# Remove hashtags
df = df.withColumn("cleaned_text", regexp_replace(col("cleaned_text"), "(#[\\w]+)", ""))

# Remove other special characters (like &, *, %, etc.). Originally we have removed all ! and ? however I think this
# will impact negativly the sentiment analysis so I decided to keep them in
df = df.withColumn("cleaned_text", regexp_replace(col("cleaned_text"), "[&*%$#@]+", ""))

# Removing multiple spaces left after removal
df = df.withColumn("cleaned_text", regexp_replace(col("cleaned_text"), "\\s+", " "))

# Trimming spaces at the beginning and the end
df = df.withColumn("cleaned_text", regexp_replace(col("cleaned_text"), "^\\s+|\\s+$", ""))

# Counting rows that had URLs, mentions, hashtags, and special characters removed
affected_count = df.filter(col("tweet_text") != col("cleaned_text")).count()

print(f"Number of rows affected by the cleanup: {affected_count}")



Number of rows affected by the cleanup: 1599999


                                                                                

In [8]:
df.show()

+-----+----------+--------------------+---------------+--------------------+--------------------+
|index|   user_id|           timestamp|       username|          tweet_text|        cleaned_text|
+-----+----------+--------------------+---------------+--------------------+--------------------+
|    0|1467810369|Mon Apr 06 22:19:...|_TheSpecialOne_|@switchfoot http:...|- Awww, that's a ...|
|    1|1467810672|Mon Apr 06 22:19:...|  scotthamilton|is upset that he ...|is upset that he ...|
|    2|1467810917|Mon Apr 06 22:19:...|       mattycus|@Kenichan I dived...|I dived many time...|
|    3|1467811184|Mon Apr 06 22:19:...|        ElleCTF|my whole body fee...|my whole body fee...|
|    4|1467811193|Mon Apr 06 22:19:...|         Karoli|@nationwideclass ...|no, it's not beha...|
|    5|1467811372|Mon Apr 06 22:20:...|       joy_wolf|@Kwesidei not the...|  not the whole crew|
|    6|1467811592|Mon Apr 06 22:20:...|        mybirch|         Need a hug |          Need a hug|
|    7|1467811594|Mo

In [9]:
# Check the datatype of the 'timestamp' column
timestamp_type = df.schema["timestamp"].dataType

# Show the first few entries of the 'timestamp' column
df.select("timestamp").show(5)

print(f"The datatype of the timestamp column is: {timestamp_type}")

+--------------------+
|           timestamp|
+--------------------+
|Mon Apr 06 22:19:...|
|Mon Apr 06 22:19:...|
|Mon Apr 06 22:19:...|
|Mon Apr 06 22:19:...|
|Mon Apr 06 22:19:...|
+--------------------+
only showing top 5 rows

The datatype of the timestamp column is: StringType


In [10]:
#The timestamp is in the format typically seen with Twitter data, which looks like "Mon Apr 06 22:19:..." 
# and is a StringType in the DataFrame. For time series analysis, it would be beneficial to convert this 
# StringType to a TimestampType in Spark.

#Error while trying to convert the time: You may get a different result due to the upgrading of Spark 3.0: 
#Fail to recognize 'EEE MMM dd HH:mm:ss +SSSS yyyy' pattern in the DateTimeFormatter. 
#1) You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0. 
#2) You can form a valid datetime pattern with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

from pyspark.sql.functions import unix_timestamp, from_unixtime

# Correct the format to match the provided timestamps
timestamp_format = "EEE MMM dd HH:mm:ss zzz yyyy"

# Convert the string to timestamp type
df = df.withColumn("timestamp", 
                   from_unixtime(unix_timestamp(df["timestamp"], timestamp_format)).cast("timestamp"))

# Show the converted timestamps
df.select("timestamp").show(5)

+-------------------+
|          timestamp|
+-------------------+
|2009-04-07 06:19:45|
|2009-04-07 06:19:49|
|2009-04-07 06:19:53|
|2009-04-07 06:19:57|
|2009-04-07 06:19:57|
+-------------------+
only showing top 5 rows



In [11]:
df.select("cleaned_text").show(20)

+--------------------+
|        cleaned_text|
+--------------------+
|- Awww, that's a ...|
|is upset that he ...|
|I dived many time...|
|my whole body fee...|
|no, it's not beha...|
|  not the whole crew|
|          Need a hug|
|hey long time no ...|
|nope they didn't ...|
|      que me muera ?|
|spring break in p...|
|I just re-pierced...|
|I couldn't bear t...|
|It it counts, idk...|
|i would've been t...|
|I wish I got to w...|
|Hollis' death sce...|
| about to file taxes|
|ahh ive always wa...|
|Oh dear. Were you...|
+--------------------+
only showing top 20 rows



In [12]:
#I decided to change all upper case letters to lower case to ensure consistency in the 
#text data and to reduce the dimensionality of the data

from pyspark.sql.functions import lower

df = df.withColumn("cleaned_text", lower(col("cleaned_text")))

df.select("cleaned_text").show(20)

+--------------------+
|        cleaned_text|
+--------------------+
|- awww, that's a ...|
|is upset that he ...|
|i dived many time...|
|my whole body fee...|
|no, it's not beha...|
|  not the whole crew|
|          need a hug|
|hey long time no ...|
|nope they didn't ...|
|      que me muera ?|
|spring break in p...|
|i just re-pierced...|
|i couldn't bear t...|
|it it counts, idk...|
|i would've been t...|
|i wish i got to w...|
|hollis' death sce...|
| about to file taxes|
|ahh ive always wa...|
|oh dear. were you...|
+--------------------+
only showing top 20 rows



In [13]:
# Delete the original tweet_text and rename the cleaned_text

df = df.drop("tweet_text").withColumnRenamed("cleaned_text", "tweets")

In [14]:
df.show(5)

+-----+----------+-------------------+---------------+--------------------+
|index|   user_id|          timestamp|       username|              tweets|
+-----+----------+-------------------+---------------+--------------------+
|    0|1467810369|2009-04-07 06:19:45|_TheSpecialOne_|- awww, that's a ...|
|    1|1467810672|2009-04-07 06:19:49|  scotthamilton|is upset that he ...|
|    2|1467810917|2009-04-07 06:19:53|       mattycus|i dived many time...|
|    3|1467811184|2009-04-07 06:19:57|        ElleCTF|my whole body fee...|
|    4|1467811193|2009-04-07 06:19:57|         Karoli|no, it's not beha...|
+-----+----------+-------------------+---------------+--------------------+
only showing top 5 rows



In [15]:
df = df.withColumnRenamed("user_id", "userid")

In [16]:
df.show(5)

+-----+----------+-------------------+---------------+--------------------+
|index|    userid|          timestamp|       username|              tweets|
+-----+----------+-------------------+---------------+--------------------+
|    0|1467810369|2009-04-07 06:19:45|_TheSpecialOne_|- awww, that's a ...|
|    1|1467810672|2009-04-07 06:19:49|  scotthamilton|is upset that he ...|
|    2|1467810917|2009-04-07 06:19:53|       mattycus|i dived many time...|
|    3|1467811184|2009-04-07 06:19:57|        ElleCTF|my whole body fee...|
|    4|1467811193|2009-04-07 06:19:57|         Karoli|no, it's not beha...|
+-----+----------+-------------------+---------------+--------------------+
only showing top 5 rows

