In [37]:
from pyspark.sql import SparkSession
import os
import numpy as np

In [38]:
spark = SparkSession.builder.appName('DataProcessing.com').getOrCreate()

In [39]:
os.getcwd()

'/Users/archanasrisubramanian/code/ArchanasriHarrisburgAssignments/CISC525/CISC-525-Final-Project'

In [40]:
df = spark.read.csv("covid19_tweets.csv", header=True, inferSchema=True)

                                                                                

In [41]:
df.show()

+--------------------+--------------------+--------------------+-------------------+--------------+------------+-------------------+--------------------+-------------------+--------------------+--------------------+-------------------+----------+
|           user_name|       user_location|    user_description|       user_created|user_followers|user_friends|    user_favourites|       user_verified|               date|                text|            hashtags|             source|is_retweet|
+--------------------+--------------------+--------------------+-------------------+--------------+------------+-------------------+--------------------+-------------------+--------------------+--------------------+-------------------+----------+
|             ᏉᎥ☻լꂅϮ|          astroworld|wednesday addams ...|2017-05-26 05:46:42|           624|         950|              18775|               False|2020-07-25 12:27:21|If I smelled the ...|                null| Twitter for iPhone|     False|
|     Tom Bas

In [42]:
df.select("text").show()

+--------------------+
|                text|
+--------------------+
|If I smelled the ...|
|Hey @Yankees @Yan...|
|@diane3443 @wdunl...|
|@brookbanktv The ...|
|25 July : Media B...|
|                null|
|#coronavirus #cov...|
|How #COVID19 Will...|
|You now have to w...|
|Praying for good ...|
|                null|
|                null|
|POPE AS GOD - Pro...|
|                null|
|49K+ Covid19 case...|
|                null|
|                null|
|                null|
|                null|
|                null|
+--------------------+
only showing top 20 rows



In [43]:
'''Data cleaning: Remove unwanted characters, punctuation marks, URLs, and other noise from the text.'''
from pyspark.sql.functions import regexp_replace

clean_df = df.withColumn("text", regexp_replace(df.text, r"http\S+", ""))


In [44]:
clean_df.select("text").show()

+--------------------+
|                text|
+--------------------+
|If I smelled the ...|
|Hey @Yankees @Yan...|
|@diane3443 @wdunl...|
|@brookbanktv The ...|
|25 July : Media B...|
|                null|
|#coronavirus #cov...|
|How #COVID19 Will...|
|You now have to w...|
|Praying for good ...|
|                null|
|                null|
|POPE AS GOD - Pro...|
|                null|
|49K+ Covid19 case...|
|                null|
|                null|
|                null|
|                null|
|                null|
+--------------------+
only showing top 20 rows



In [47]:
from pyspark.sql.functions import lower

lower_df = clean_df.withColumn("text", lower(clean_df.text))

In [49]:
lower_df.select("text").show()

+--------------------+
|                text|
+--------------------+
|if i smelled the ...|
|hey @yankees @yan...|
|@diane3443 @wdunl...|
|@brookbanktv the ...|
|25 july : media b...|
|                null|
|#coronavirus #cov...|
|how #covid19 will...|
|you now have to w...|
|praying for good ...|
|                null|
|                null|
|pope as god - pro...|
|                null|
|49k+ covid19 case...|
|                null|
|                null|
|                null|
|                null|
|                null|
+--------------------+
only showing top 20 rows



In [50]:
from pyspark.sql.functions import regexp_replace

no_nums_df = lower_df.withColumn("text", regexp_replace(lower_df.text, r"\d+", ""))

no_nums_df.select("text").show()

+--------------------+
|                text|
+--------------------+
|if i smelled the ...|
|hey @yankees @yan...|
|@diane @wdunlap @...|
|@brookbanktv the ...|
| july : media bul...|
|                null|
|#coronavirus #cov...|
|how #covid will c...|
|you now have to w...|
|praying for good ...|
|                null|
|                null|
|pope as god - pro...|
|                null|
|k+ covid cases st...|
|                null|
|                null|
|                null|
|                null|
|                null|
+--------------------+
only showing top 20 rows



In [51]:
no_hashtags_df = no_nums_df.withColumn("text", regexp_replace(no_nums_df.text, r"#\w+", ""))
no_mentions_df = no_hashtags_df.withColumn("text", regexp_replace(no_hashtags_df.text, r"@\w+", ""))
no_mentions_df.select("text").show()


+--------------------+
|                text|
+--------------------+
|if i smelled the ...|
|hey   and  - woul...|
|   trump never on...|
| the one gift  ha...|
| july : media bul...|
|                null|
|  deaths continue...|
|how  will change ...|
|you now have to w...|
|praying for good ...|
|                null|
|                null|
|pope as god - pro...|
|                null|
|k+ covid cases st...|
|                null|
|                null|
|                null|
|                null|
|                null|
+--------------------+
only showing top 20 rows



In [54]:
cleaned_df = no_mentions_df.dropna(subset=["text"])
cleaned_df.select("text").show()


+--------------------+
|                text|
+--------------------+
|if i smelled the ...|
|hey   and  - woul...|
|   trump never on...|
| the one gift  ha...|
| july : media bul...|
|  deaths continue...|
|how  will change ...|
|you now have to w...|
|praying for good ...|
|pope as god - pro...|
|k+ covid cases st...|
|     twitter web app|
|let's all protect...|
|rajasthan governm...|
|       july   update|
|second wave of  i...|
|it is during our ...|
|covid update: the...|
|      good patriots!|
|coronavirus - sou...|
+--------------------+
only showing top 20 rows



In [55]:
no_punc_df = cleaned_df.withColumn("text", regexp_replace(cleaned_df.text, r'[^\w\s]', ''))


In [56]:
no_punc_df.select("text").show()

+--------------------+
|                text|
+--------------------+
|if i smelled the ...|
|hey   and   would...|
|   trump never on...|
| the one gift  ha...|
| july  media bull...|
|  deaths continue...|
|how  will change ...|
|you now have to w...|
|praying for good ...|
|pope as god  prop...|
|k covid cases sti...|
|     twitter web app|
|lets all protect ...|
|rajasthan governm...|
|       july   update|
|second wave of  i...|
|it is during our ...|
|covid update the ...|
|       good patriots|
|coronavirus  sout...|
+--------------------+
only showing top 20 rows



In [57]:
from pyspark.sql.functions import split

tokenized_df = no_punc_df.withColumn("words", split(no_punc_df.text, "\s+"))
tokenized_df.select("text", "words").show()

+--------------------+--------------------+
|                text|               words|
+--------------------+--------------------+
|if i smelled the ...|[if, i, smelled, ...|
|hey   and   would...|[hey, and, wouldn...|
|   trump never on...|[, trump, never, ...|
| the one gift  ha...|[, the, one, gift...|
| july  media bull...|[, july, media, b...|
|  deaths continue...|[, deaths, contin...|
|how  will change ...|[how, will, chang...|
|you now have to w...|[you, now, have, ...|
|praying for good ...|[praying, for, go...|
|pope as god  prop...|[pope, as, god, p...|
|k covid cases sti...|[k, covid, cases,...|
|     twitter web app| [twitter, web, app]|
|lets all protect ...|[lets, all, prote...|
|rajasthan governm...|[rajasthan, gover...|
|       july   update|      [july, update]|
|second wave of  i...|[second, wave, of...|
|it is during our ...|[it, is, during, ...|
|covid update the ...|[covid, update, t...|
|       good patriots|  [, good, patriots]|
|coronavirus  sout...|[coronavir

In [63]:
from pyspark.sql.functions import regexp_replace

no_emoji_df = tokenized_df.withColumn("text", regexp_replace(tokenized_df.text, u'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F1E0-\U0001F1FF\U00002702-\U000027B0\U000024C2-\U0001F251]+', ''))

In [66]:
no_emoji_df.show()

+--------------------+--------------------+--------------------+-------------------+--------------+------------+-------------------+--------------------+-------------------+--------------------+--------------------+-------------------+----------+--------------------+
|           user_name|       user_location|    user_description|       user_created|user_followers|user_friends|    user_favourites|       user_verified|               date|                text|            hashtags|             source|is_retweet|               words|
+--------------------+--------------------+--------------------+-------------------+--------------+------------+-------------------+--------------------+-------------------+--------------------+--------------------+-------------------+----------+--------------------+
|             ᏉᎥ☻լꂅϮ|          astroworld|wednesday addams ...|2017-05-26 05:46:42|           624|         950|              18775|               False|2020-07-25 12:27:21|if i smelled the ...|   

In [69]:
output_file_path = "covid_19_clean.csv"
final_df = no_emoji_df.drop("words")

# Write the DataFrame to the CSV file
final_df.write.format("csv").option("header", "true").mode("overwrite").save(output_file_path)

                                                                                