# Spark Activity 3

Working with the **vaccination_tweets.csv** file. This is also saved in the data folder

Importing all necessary modules

In [1]:
import pyspark
import pandas as pd
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql.types import *
from s3fs.core import S3FileSystem
from pyspark.sql.functions import *

#### Data Loading

In [2]:
sc = SparkContext('local','TweetCleaningApp')
sqlContext = SQLContext(sc)

In [3]:
my_bucket = '686springbucket-emmanuel'
my_key = 'Assignment6/vaccination_tweets.csv'

s3 = S3FileSystem(anon=False)
data = pd.read_csv(s3.open('{}/{}'.format(my_bucket, my_key), mode='rb'))

I then created a schema with the various data types for the columns

In [4]:
myschema = StructType([StructField("id", StringType(),False),\
                       StructField("user_name", StringType(),False),\
                       StructField("user_location", StringType(),False),\
                       StructField("user_description", StringType(),False),\
                       StructField("user_created", StringType(),False),\
                       StructField("user_followers", IntegerType(),False),\
                       StructField("user_friends", IntegerType(),False),\
                       StructField("user_favourites", IntegerType(),False),\
                       StructField("user_verified", BooleanType(),False),\
                       StructField("date", StringType(),False),\
                       StructField("text", StringType(),False),\
                       StructField("hashtags", StringType(),False),\
                       StructField("source", StringType(),False),\
                       StructField("retweets", IntegerType(),False),\
                       StructField("favorites", IntegerType(),False),\
                       StructField("is_retweet", BooleanType(),False)])


The spark data frame was created and the first 5 observations was printed to confirm everything works well so far

In [5]:
df = sqlContext.createDataFrame(data,schema=myschema)
df.show(5)

+-----------+--------------------+--------------------+--------------------+-------------+--------------+------------+---------------+-------------+--------------+--------------------+--------------------+-------------------+--------+---------+----------+
|         id|           user_name|       user_location|    user_description| user_created|user_followers|user_friends|user_favourites|user_verified|          date|                text|            hashtags|             source|retweets|favorites|is_retweet|
+-----------+--------------------+--------------------+--------------------+-------------+--------------+------------+---------------+-------------+--------------+--------------------+--------------------+-------------------+--------+---------+----------+
|1.34054E+18|          Rachel Roh|La Crescenta-Mont...|Aggregator of Asi...| 4/8/09 17:52|           405|        1692|           3247|        false| 12/20/20 6:06|Same folks said d...|  ['PfizerBioNTech']|Twitter for Android|       

Now we convert columns **date** to datetype because, the schema can not force a string to date, a new column called **N_date** was created which would be used for subsequent dataframe filtering.

In [6]:
df=df.withColumn("N_date",to_date(col("date"), "MM/dd/yy"))

### Questions 1 - 3

#### 1. The researchers only wants data between 2018-2020
#### 2. The researcher only wants data where the tweeter has more than 0 friends
#### 3. The researcher only wants tweets whose tweet text aren’t marked as None

**NOTE**: The date filter was done based on the new date column **N_date**

In [7]:
ftrd_data = df.filter(df['N_date']>='2018-01-01') and df.filter(df['N_date']<='2020-12-31')
ftrd_data = ftrd_data.filter(ftrd_data['user_friends']>0)
ftrd_data = ftrd_data.filter(ftrd_data['text']!='None')

#### 4. The researcher wants you to use regular expressions to remove all punctuation in the tweet text except exclamation marks and also wants you to remove all numbers from the tweets

The regex expression was applied to the **text** column based on the instruction in the question and a new column was created named **clnd_text**, the **text** column was also dropped and the first five rows of the **clnd_text** was printed.

In [8]:
ftrd_data = ftrd_data.withColumn("clnd_text",regexp_replace(ftrd_data['text'], r'[^!a-zA-Z\s]','')).drop(ftrd_data["text"])
ftrd_data.select(ftrd_data["clnd_text"]).show(5)

+--------------------+
|           clnd_text|
+--------------------+
|Same folks said d...|
|While the world h...|
|coronavirus Sputn...|
|Facts are immutab...|
|Explain to me aga...|
+--------------------+
only showing top 5 rows



#### 5. Whatever is left after you have applied the above filters, the researcher wants you figure out the average number of retweets (round up) then tweets whose number of retweets are at the average mark “AVERAGE” everything below the average mark “BELOW AVERAGE” and everything above the average mark “ABOVE AVERAGE”.  They want this data in another column marked “average_rating” 

The average retweet was calculated and rounded up as instructed which was then assigned to the number **n** and then compared with the "retweets" column shown below:

In [9]:
n=ftrd_data.agg(ceil(mean("retweets"))).collect()[0][0]
n

2

The new colum **average_rating** was created as directed from question 5, I also printed the first 10 records, to confirm if the correct rating was given based on the **retweets** column showing the number of retweets, code is shown below:

In [10]:
ftrd_data = ftrd_data.withColumn('average_rating', when(col("retweets")>n,"ABOVE AVERAGE")\
                                 .when(col("retweets")==n,"AVERAGE")\
                                .otherwise("BELOW AVERAGE"))
ftrd_data.select(ftrd_data["average_rating"], ftrd_data["retweets"]).show(10)

+--------------+--------+
|average_rating|retweets|
+--------------+--------+
| BELOW AVERAGE|       0|
| BELOW AVERAGE|       1|
| BELOW AVERAGE|       0|
| ABOVE AVERAGE|     446|
| BELOW AVERAGE|       0|
| BELOW AVERAGE|       0|
| BELOW AVERAGE|       0|
|       AVERAGE|       2|
|       AVERAGE|       2|
| BELOW AVERAGE|       0|
+--------------+--------+
only showing top 10 rows



#### 6. Using the hashtags column, give a report at the end of your code with the top 5 hashtags.

**Bonus: 5 points if you standardize the hashtags (thinking about capitalization, etc)**


The standardiztion I considered was capitalization where all values of column **hashtags** was changed to lower case. Also, the nan values(missing) was removed, so we only have hashtags that were actually inputed by users, I further did more cleaning so the hastag columns appear as a string rather than a list (a comparison was later done, to see what changed after the cleaning), see code below:

In [11]:
DF = ftrd_data.withColumn("H_hashtags", lower(col("hashtags")))
DF = DF.filter(DF['H_hashtags']!='nan')
DF = DF.withColumn("N_hashtags",regexp_replace(DF['H_hashtags'], r'[^(a-zA-Z|0-9|,)]',''))
DF.select(DF["hashtags"], DF["N_hashtags"]).show(10)

+--------------------+--------------------+
|            hashtags|          N_hashtags|
+--------------------+--------------------+
|  ['PfizerBioNTech']|      pfizerbiontech|
|['coronavirus', '...|coronavirus,sputn...|
|['whereareallthes...|whereareallthesic...|
|     ['vaccination']|         vaccination|
|['BidenHarris', '...|bidenharris,elect...|
|['CovidVaccine', ...|covidvaccine,covi...|
|['CovidVaccine', ...|covidvaccine,covi...|
|['PfizerBioNTech'...|pfizerbiontech,va...|
|['COVID19', 'Covi...|covid19,covidvacc...|
|  ['PfizerBioNTech']|      pfizerbiontech|
+--------------------+--------------------+
only showing top 10 rows



**NOTE**: From the above punctuations was removed from the hashtags, so an hashtag of covid_19 and covid19 are seen as the same, etc.

The old hashtags columns which was not standardized / cleaned and is then removed from the data, see code below:

In [12]:
DF = DF.drop(DF["hashtags"])
DF = DF.drop(DF["H_hashtags"])

Since an individual may use more than one hashtags and the goal is to count every distinct hashtag, I splitted the newly created hashtag column by comma and changed its type to array, this was done, so I could use the **explode** function has it only works on a column with its type being an array.

In [13]:
L1 = DF.select(split(col("N_hashtags"),",").alias("N_hashtagsArray")).drop("N_hashtags")
L1.show()

+--------------------+
|     N_hashtagsArray|
+--------------------+
|    [pfizerbiontech]|
|[coronavirus, spu...|
|[whereareallthesi...|
|       [vaccination]|
|[bidenharris, ele...|
|[covidvaccine, co...|
|[covidvaccine, co...|
|[pfizerbiontech, ...|
|[covid19, covidva...|
|    [pfizerbiontech]|
|           [vaccine]|
|[yellowfever, cov...|
|[iran, coronaviru...|
|      [covidvaccine]|
|[covidiots, coron...|
|      [fda, vaccine]|
|    [pfizerbiontech]|
|           [vaccine]|
|[thankyounhs, pfi...|
|[stayhome, stayat...|
+--------------------+
only showing top 20 rows



A new column serving has a counter is then created with all its entries as 1

In [14]:
L1 = L1.withColumn("hshtgct", lit(1))
L2 = L1.select(L1.hshtgct,explode(L1.N_hashtagsArray))
L2.show()

+-------+--------------------+
|hshtgct|                 col|
+-------+--------------------+
|      1|      pfizerbiontech|
|      1|         coronavirus|
|      1|            sputnikv|
|      1|         astrazeneca|
|      1|      pfizerbiontech|
|      1|             moderna|
|      1|             covid19|
|      1|whereareallthesic...|
|      1|      pfizerbiontech|
|      1|         vaccination|
|      1|         bidenharris|
|      1|        election2020|
|      1|        covidvaccine|
|      1|             covid19|
|      1|      pfizerbiontech|
|      1|             moderna|
|      1|        covidvaccine|
|      1|      covid19vaccine|
|      1|                  us|
|      1|             pakustv|
+-------+--------------------+
only showing top 20 rows



The above was then converted to an rdd, so the tasks of counting hashtags could be done, see code below:

In [15]:
hsh_tag = L2.select(L2['col'],L2['hshtgct'])
Rd = hsh_tag.rdd
Rd= Rd.reduceByKey(lambda x, y: x+y)
Rd = Rd.sortBy(lambda word: -word[1])

The top 5 hashtags are:

In [16]:
print(Rd.collect()[0:5])

[('pfizerbiontech', 969), ('covid19', 377), ('vaccine', 233), ('covidvaccine', 181), ('pfizer', 106)]


**NOTE**: The dataframe exported as a csv file named **cleaned_data.csv** is the **ftrd_data** because **question 6** wanted a report on hashtags and hence more cleaning was done to remove records of nan (missing) hashtags which was assigned to dataframe **DF** which would have affected the cleaned dataframe **ftrd_data** and all computations from **question 1-5**. It is also saved in the data folder

In [17]:
bytes_to_write = ftrd_data.toPandas().to_csv(None).encode()
with s3.open('s3://686springbucket-emmanuel/Assignment6/cleaned_data.csv', 'wb') as f:
   	f.write(bytes_to_write)