### Mariam Adeyemo
BDP Final Project

## Part 1

**EDA and Features Selection**

In [1]:
#Ensure we are using the right kernel
spark.version

'3.1.3'

In [2]:
import os
import subprocess
import shutil
import pandas as pd
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

In [3]:
warnings.filterwarnings(action='ignore')
spark = SparkSession.builder.getOrCreate()

In [5]:
#Add "eagerEval.enabled" to beautify the way Spark DF is displayed
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

### Loading the data

In [6]:
# Saving results into individual bucket
bucket_write = 'msca-bdp-data-shared'

In [7]:
directory = 'gs://msca-bdp-tweets/final_project/'
path = directory

In [8]:
cmd = 'hadoop fs -du -s -h ' + directory

p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
for line in p.stdout.readlines():
    print (line)
    
retval = p.wait()

498.7 G  498.7 G  gs://msca-bdp-tweets/final_project



In [9]:
#checking source data
!hadoop fs -ls 'gs://msca-bdp-tweets/final_project/' | head

Found 50696 items
-rwx------   3 root root          0 2023-02-08 13:58 gs://msca-bdp-tweets/final_project/_SUCCESS
-rwx------   3 root root    4500466 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00000-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    4107431 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00001-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    4672123 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00002-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    5186684 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00003-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    4729662 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00004-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    4605529 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00005-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx-----

### Reading the data into spark dataframe

In [10]:
%%time

tweets_raw = spark.read.json(path)

23/03/07 05:31:37 WARN org.apache.spark.sql.execution.datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
                                                                                

CPU times: user 1.43 s, sys: 274 ms, total: 1.7 s
Wall time: 7min 38s


23/03/07 05:36:39 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [None]:
%%time

tweets_raw.select(count('*').alias('count_raw_tweets')).show()

[Stage 4:>                                                          (0 + 1) / 1]

+----------------+
|count_raw_tweets|
+----------------+
|        99994342|
+----------------+

CPU times: user 570 ms, sys: 104 ms, total: 674 ms
Wall time: 2min 59s


                                                                                

In [12]:
#checking the schema of the data
tweets_raw.printSchema()

root
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- additional_media_info: struct (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- embeddable: boolean (nullable = true)
 |    |    |    |    |-- monetizable: boolean (nullable = true)
 |    |    |    |   

### Preprocessing and Cleaning the tweet_text column in the twitter data

In [13]:
from pyspark.ml.feature import RegexTokenizer

unwanted_expression = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

lower_case = udf(lambda x: x.lower())

tweets_raw = tweets_raw.withColumn("text", lower(regexp_replace("text", "[\$#,&%\".]", "")))
tweets_raw = unwanted_expression.transform(tweets_raw)
tweets_raw = tweets_raw.withColumn("text", concat_ws(" ", "words"))
tweets_raw = tweets_raw.drop("words")

### Filter tweets relating to either primary, secondary, or higher education

In [14]:
tweets_eng = tweets_raw.filter(tweets_raw.lang == 'en')

In [15]:
#creating a list of filter words for education
education_related_words = ['elementary school', 'middle school', 'high school', 'higher education', 'k-12', 'preschool',
                           'college', 'kindergarten', 'students', 'tuition', 'university', 'education', 'classroom', 'game based learning',
                          'teach', 'teacher', 'edu', 'digital education', 'education system', 'steam based learning']

exclude_words = ["died", "shoot", "kill", "killed", "deceased", "murder", "attack", "sex", "threesome", "horny",
           "shooting", "porn", "pornography", "shot", "gunned", "shootings", "gun", "guns", "uvalde", "football"]

In [16]:
pattern1 = "|".join(education_related_words)
pattern2 = "|".join(exclude_words)

In [18]:
#Discard irrelevant tweets using the filter words identified above
filtered_tweets = tweets_eng.filter(
    col("text").rlike(pattern1)
).filter(
    ~col("text").rlike(pattern2)
)
                    

In [19]:
#checking the count of the filtered tweets data
filtered_tweets.select(count('*').alias('count_filtered_tweets')).show()

[Stage 7:>                                                          (0 + 1) / 1]

+---------------------+
|count_filtered_tweets|
+---------------------+
|             36600074|
+---------------------+



                                                                                

In [None]:
#save the filtered tweet into my bucket in a parquet format for easier loading
filtered_tweets.write.format("parquet").\
mode('overwrite').\
save('gs://msca-bdp-students-bucket/shared_data/mariamoluwatobi/filtered_tweets')

                                                                                

### Exploratory data analysis

In [21]:
path_read = 'gs://msca-bdp-students-bucket/shared_data/mariamoluwatobi/filtered_tweets'

In [22]:
!hadoop fs -ls 'gs://msca-bdp-students-bucket/shared_data/mariamoluwatobi/filtered_tweets' | head

Found 5742 items
-rwx------   3 root root          0 2023-03-07 06:51 gs://msca-bdp-students-bucket/shared_data/mariamoluwatobi/filtered_tweets/_SUCCESS
-rwx------   3 root root    4347964 2023-03-07 06:21 gs://msca-bdp-students-bucket/shared_data/mariamoluwatobi/filtered_tweets/part-00000-38b88faf-7440-44dc-99b6-990c22eaa327-c000.snappy.parquet
-rwx------   3 root root    5373670 2023-03-07 06:21 gs://msca-bdp-students-bucket/shared_data/mariamoluwatobi/filtered_tweets/part-00001-38b88faf-7440-44dc-99b6-990c22eaa327-c000.snappy.parquet
-rwx------   3 root root    6005552 2023-03-07 06:21 gs://msca-bdp-students-bucket/shared_data/mariamoluwatobi/filtered_tweets/part-00002-38b88faf-7440-44dc-99b6-990c22eaa327-c000.snappy.parquet
-rwx------   3 root root    5188696 2023-03-07 06:21 gs://msca-bdp-students-bucket/shared_data/mariamoluwatobi/filtered_tweets/part-00003-38b88faf-7440-44dc-99b6-990c22eaa327-c000.snappy.parquet
-rwx------   3 root root    6335042 2023-03-07 06:21 gs://msca-bdp-

In [23]:
%%time

tweets_filt = spark.read.parquet(path_read)

CPU times: user 0 ns, sys: 5.64 ms, total: 5.64 ms
Wall time: 1.43 s


In [None]:
tweets_filt.limit(10)

                                                                                

coordinates,created_at,display_text_range,entities,extended_entities,extended_tweet,favorite_count,favorited,filter_level,geo,id,id_str,in_reply_to_screen_name,in_reply_to_status_id,in_reply_to_status_id_str,in_reply_to_user_id,in_reply_to_user_id_str,is_quote_status,lang,place,possibly_sensitive,quote_count,quoted_status,quoted_status_id,quoted_status_id_str,quoted_status_permalink,quoted_text,reply_count,retweet_count,retweeted,retweeted_from,retweeted_status,source,text,timestamp_ms,truncated,tweet_text,user,withheld_in_countries
,Wed Jun 15 15:16:...,,"{[], [{{null, nul...","{[{{null, null, f...",,0,False,low,,1537091559996399617,1537091559996399617,,,,,,False,en,,True,0,,,,,,0,0,RT,icyjaayyy,"{null, Wed Jun 15...","<a href=""http://t...",rt icyjaayyy craz...,1655306160340,False,Crazy in high sch...,"{false, Sat Aug 2...",
,Wed Jun 15 15:16:...,,"{[], null, [], []...",,,0,False,low,,1537091566447337479,1537091566447337479,,,,,,False,en,,,0,,,,,,0,0,RT,davidpaleologos,"{null, Wed Jun 15...","<a href=""http://t...",rt davidpaleologo...,1655306161878,False,Suffolk Universit...,"{false, Wed Feb 2...",
,Wed Jun 15 15:16:...,,"{[], null, [], []...",,,0,False,low,,1537091567927889920,1537091567927889920,,,,,,False,en,,,0,,,,,,0,0,RT,mrscollins_AVID,"{null, Tue Jun 14...","<a href=""http://t...",rt mrscollins_avi...,1655306162231,False,The power of refl...,"{false, Tue Apr 0...",
,Wed Jun 15 15:16:...,,"{[{[98, 103], eBa...",,,0,False,low,,1537091568481538049,1537091568481538049,,,,,,False,en,,False,0,,,,,,0,0,,eBay,,"<a href=""https://...",americas spirit g...,1655306162363,False,Americas Spirit G...,"{false, Sat Mar 2...",
,Wed Jun 15 15:16:...,,"{[], null, [], []...",,,0,False,low,,1537091577138618368,1537091577138618368,,,,,,False,en,,,0,,,,,,0,0,RT,michaelharriot,"{null, Tue Jun 14...","<a href=""http://t...",rt michaelharriot...,1655306164427,False,I don’t care what...,"{false, Thu Jul 1...",
,Wed Jun 15 15:16:...,,"{[{[111, 125], Op...",,,0,False,low,,1537091577700618240,1537091577700618240,,,,,,False,en,,,0,,,,,,0,0,RT,OpenLibraryON,"{null, Wed Jun 15...","<a href=""https://...",rt openlibraryon ...,1655306164561,False,Breaking news! Th...,"{false, Tue May 0...",
,Wed Jun 15 15:16:...,,"{[], null, [], [{...",,,0,False,low,,1537091581429284868,1537091581429284868,,,,,,False,en,,False,0,,,,,,0,0,RT,ForeignMalawi,"{null, Wed Jun 15...","<a href=""http://t...",rt foreignmalawi ...,1655306165450,False,Master's Scholars...,"{false, Thu Feb 1...",
,Wed Jun 15 15:16:...,,"{[], null, [], []...",,,0,False,low,,1537091582473777154,1537091582473777154,,,,,,False,en,,,0,,,,,,0,0,RT,itsJeffTiedrich,"{null, Tue Jun 14...","<a href=""https://...",rt itsjefftiedric...,1655306165699,False,are you telling m...,"{false, Mon Oct 0...",
,Wed Jun 15 15:16:...,,"{[], null, [], [{...",,"{[0, 206], {[], n...",0,False,low,,1537091583652290562,1537091583652290562,,,,,,True,en,,,0,"{null, Tue Jun 14...",1.5367934906250732e+18,1.5367934906250732e+18,{twitter.com/aoc/...,Sadly defunding s...,0,0,,,,"<a href=""http://t...",hey dummy did you...,1655306165980,True,Hey Dummy: Did yo...,"{false, Fri Dec 0...",
,Wed Jun 15 15:16:...,"[0, 140]","{[], null, [], [{...",,"{[0, 275], {[{[24...",0,False,low,,1537091583207686144,1537091583207686144,,,,,,False,en,,False,0,,,,,,0,0,,,,"<a href=""https://...",free webinar book...,1655306165874,True,"FREE WEBINAR, BOO...","{false, Wed Jun 2...",


In [25]:
#identifying poorly populated columns
missing_values = tweets_filt.agg(*[
    (count(when(col(c).isNull(), c)) / count('*')).alias(c)
    for c in tweets_filt.columns
])

missing_values.show(truncate=False)



+------------------+----------+------------------+--------+------------------+------------------+--------------+---------+------------+------------------+---+------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+---------------+----+------------------+------------------+-----------+------------------+------------------+--------------------+-----------------------+------------------+-----------+-------------+---------+----------------+------------------+------+----+------------+---------+----------+----+---------------------+
|coordinates       |created_at|display_text_range|entities|extended_entities |extended_tweet    |favorite_count|favorited|filter_level|geo               |id |id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_reply_to_user_id_str|is_quote_status|lang|place             |possibly_sensitive|quote_count|quoted_status     |quoted_status_id  |quoted_sta

                                                                                

### Feature selection

In [26]:
tweets_filt.printSchema()

root
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- additional_media_info: struct (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- embeddable: boolean (nullable = true)
 |    |    |    |    |-- monetizable: boolean (nullable = true)
 |    |    |    |   

In [27]:
#selecting the well populated features to use for further analysis based on the result from the missing values analysis
tweets_feats = tweets_filt.select(
            col('user.id_str').alias('user_id'),
            col('user.name').alias('user_name'),
            col('user.screen_name').alias('user_screen_name'),
            col('user.created_at').alias('user_created_at'),
            col('user.location').alias('user_location'),
            col('user.description').alias('user_description'),
            col('user.verified').alias('user_verified'),
            col('user.followers_count').alias('user_followers_count'),
            col('user.friends_count').alias('user_friends_count'),
            col('user.favourites_count').alias('user_favourites_count'),
            col('user.statuses_count').alias('user_statuses_count'),
            col('id_str').alias('tweet_id_str'),
            col('created_at').alias('tweet_created_at'),
            col('tweet_text'),
            col('text'),
            col('quote_count').alias('tweet_quote_count'),
            col('is_quote_status').alias('tweet_is_quote_status'),
            col('timestamp_ms').alias('tweet_timestamp_ms'),
            col('retweeted_from').alias('tweet_retweeted_from'),
            col('reply_count').alias('tweet_reply_count'),
            col('retweeted_status').alias('tweet_retweeted_status'),
            col('possibly_sensitive').alias('tweet_possibly_sensitive'),
            col('favorite_count').alias('tweet_favorite_count'),
            col('place.country').alias('tweet_location'),
            col('place.full_name').alias('tweet_city'),
            col('coordinates.coordinates').alias('tweet_coordinates')
)           
    

#### Spark DF Variable Casting

In [28]:
#enable the legacy timestamp parser for date
spark.conf.set('spark.sql.legacy.timeParserPolicy', 'LEGACY')

tweets_feats = tweets_feats.withColumn('user_created_at', to_timestamp(col('user_created_at'), 'EEE MMM dd HH:mm:ss zzzzz yyyy'))
tweets_feats = tweets_feats.withColumn('tweet_created_at', to_timestamp(col('tweet_created_at'), 'EEE MMM dd HH:mm:ss zzzzz yyyy'))

In [29]:
type(tweets_feats)

pyspark.sql.dataframe.DataFrame

In [30]:
tweets_feats.limit(10)

                                                                                

user_id,user_name,user_screen_name,user_created_at,user_location,user_description,user_verified,user_followers_count,user_friends_count,user_favourites_count,user_statuses_count,tweet_id_str,tweet_created_at,tweet_text,text,tweet_quote_count,tweet_is_quote_status,tweet_timestamp_ms,tweet_retweeted_from,tweet_reply_count,tweet_retweeted_status,tweet_possibly_sensitive,tweet_favorite_count,tweet_location,tweet_city,tweet_coordinates
1049560369,lisa witham,cheezwitham,2012-12-31 05:14:32,,,False,7632,3742,1182212,1134557,1590664416101425152,2022-11-10 11:15:24,Parma CSD not mov...,parma csd not mov...,0,False,1668078924571,,0,,False,0,,,
79316343,V. GUERIN-RACINE ...,Veroniqueguerin,2009-10-02 23:27:34,"Caen & Sens, France",Psychotherapist,False,120,710,1595,3081,1590664417330749442,2022-11-10 11:15:24,A clip today from...,rt benwoodfinden ...,0,False,1668078924864,BenWoodfinden,0,"{null, Fri Oct 07...",,0,,,
1532630963934711808,SAMUEL NDERITU,samnmKE,2022-06-03 07:51:30,Kenya,CHRISTIAN. FAMILY...,False,521,781,11148,11609,1590664419159076864,2022-11-10 11:15:25,A man who went to...,rt mwendiajnr a m...,0,False,1668078925300,MwendiaJnr,0,"{null, Thu Nov 10...",,0,,,
2887542734,Nayan Gadhiya,ngadhiya97,2014-11-02 14:55:11,,Medico| Humor| Po...,False,786,832,8299,6286,1590664422598791171,2022-11-10 11:15:26,Gujarat started 5...,rt brendanmirror ...,0,False,1668078926120,BrendanMIRROR,0,"{null, Tue Nov 08...",,0,,,
921974038045474816,Rajiv 🇮🇳,Rajiv25479263,2017-10-22 05:38:34,"Bhopal, India",Right Time to und...,False,1313,2054,324090,65522,1590664425065054211,2022-11-10 11:15:26,STUDENTS: tomorro...,rt sbabones stude...,0,False,1668078926708,sbabones,0,"{null, Wed Nov 09...",,0,,,
499036642,Dorothea Kleine @...,dorotheakleine,2012-02-21 18:10:57,"Sheffield, England",Prof @Sheffieldun...,False,2556,788,3556,6423,1590664427619377152,2022-11-10 11:15:27,Lecturer in Respo...,rt profwagner_rhu...,0,False,1668078927317,ProfWagner_RHUL,0,"{null, Thu Nov 10...",False,0,,,
1461841183416672256,Unc,yamsandearmuffs,2021-11-19 23:38:14,"Dallas, TX",Just here for a g...,False,157,609,685,3009,1590664436830064645,2022-11-10 11:15:29,Colorado just vot...,rt davenewworld_2...,0,False,1668078929513,davenewworld_2,0,"{null, Thu Nov 10...",,0,,,
1005248964,Jasmine Bell 🦋🌹❤️,JasBell_IsFedUp,2012-12-12 01:10:13,xxxx,,False,3088,4925,307245,172730,1590664442018410497,2022-11-10 11:15:30,Thankfully we got...,rt ragesheen than...,0,False,1668078930750,RageSheen,0,"{null, Thu Nov 10...",,0,,,
1457101285275029504,SETH🥷🏾🦇,SethRaver1,2021-11-06 21:43:56,🦇🎄🕸,#CHELSEAFC💙 #RAV...,False,690,283,54854,17442,1590664442135851008,2022-11-10 11:15:30,Today I had an ex...,rt sethraver1 tod...,0,False,1668078930778,SethRaver1,0,"{null, Thu Nov 10...",,0,,,
1012379409444229122,Sustainable Bus,BusSustainable,2018-06-28 16:57:15,,News on electric ...,False,2840,1029,1759,2501,1590664444933439488,2022-11-10 11:15:31,The Ohio State Un...,the ohio state un...,0,False,1668078931445,,0,,False,0,,,


In [31]:
tweets_feats.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_screen_name: string (nullable = true)
 |-- user_created_at: timestamp (nullable = true)
 |-- user_location: string (nullable = true)
 |-- user_description: string (nullable = true)
 |-- user_verified: boolean (nullable = true)
 |-- user_followers_count: long (nullable = true)
 |-- user_friends_count: long (nullable = true)
 |-- user_favourites_count: long (nullable = true)
 |-- user_statuses_count: long (nullable = true)
 |-- tweet_id_str: string (nullable = true)
 |-- tweet_created_at: timestamp (nullable = true)
 |-- tweet_text: string (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_quote_count: long (nullable = true)
 |-- tweet_is_quote_status: boolean (nullable = true)
 |-- tweet_timestamp_ms: string (nullable = true)
 |-- tweet_retweeted_from: string (nullable = true)
 |-- tweet_reply_count: long (nullable = true)
 |-- tweet_retweeted_status: struct (nullable = true)
 | 

In [32]:
#save the processed tweet data into my bucket in a parquet format for easier loading
tweets_feats.write.format("parquet").\
mode('overwrite').\
save('gs://msca-bdp-students-bucket/shared_data/mariamoluwatobi/final_tweet_data')

                                                                                