In [1]:
spark.version

'3.0.1'

In [2]:
import subprocess

from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [4]:
directory = 'gs://msca-bdp-tweets/final_project/'
# file = '*.json'
# path = directory + file
path = directory

In [5]:
cmd = 'hadoop fs -du -s -h ' + directory

p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
for line in p.stdout.readlines():
    print (line)
    
retval = p.wait()

156.2 G  156.2 G  gs://msca-bdp-tweets/final_project



In [6]:
%%time

tweets_df = spark.read.json(path)

CPU times: user 51.1 ms, sys: 11.9 ms, total: 63 ms
Wall time: 4min 43s


In [42]:
#select variables
data = tweets_df.select([tweets_df.created_at,
                         tweets_df.place.country.alias("country"),
                         tweets_df.id,
                         tweets_df.user, 
                         tweets_df.user['id'].alias("user_id"),
                         tweets_df.user['name'].alias("user_name"), 
                         tweets_df.user['description'].alias("user_description"), 
                         tweets_df.user['followers_count'].alias("followers_count"), 
                         tweets_df.retweeted_status.quote_count.alias("quote_count"),
                         tweets_df.retweeted_status,
                         tweets_df.retweeted_status.retweet_count.alias("retweet_count"),
                         tweets_df.retweeted_status.favorite_count.alias("favorite_count"),
                         tweets_df.text, 
                         tweets_df.user.verified.alias("verified_user")])

In [43]:
import pandas as pd

import pyspark.sql.functions as f

In [44]:
#convert columns with strings that I will use to lowercase

data = data.withColumn("text",f.lower(f.col("text")))

In [45]:
#create dataframes with strings with covid-related terms
covid_data = data.filter((data.text.contains("covid"))\
                          | (data.text.contains("coronavirus"))\
                          | (data.text.contains("pandemic"))\
                          | (data.text.contains("vaccin"))\
                          | (data.text.contains("booster"))\
                          | (data.text.contains("pfizer"))\
                          | (data.text.contains("moderna"))\
                          | (data.text.contains("j&j"))\
                          | (data.text.contains("johnson and johnson")))

In [46]:
#change any nas in count columns to 0
data_df = covid_data.na.fill(value=0,subset=["retweet_count", "quote_count", "favorite_count"])

In [47]:
#drop nulls from other columns
covid_tweets = data_df.na.drop(subset = ['user_description'])

In [48]:
covid_tweets

created_at,country,id,user,user_id,user_name,user_description,followers_count,quote_count,retweeted_status,retweet_count,favorite_count,text,verified_user
Sat Oct 30 02:52:...,,1454279983531188226,"[false, Mon May 2...",586248544,Emily Joy,Mostly skating ba...,70,0,,0,0,@garraarghhrumph ...,False
Sat Oct 30 02:52:...,,1454279984017600512,"[false, Sun Mar 2...",1109641163890319360,Judith Becker,Soldier of the Lion,919,0,"[,, Sat Oct 30 02...",1,0,rt @denise_old_la...,False
Sat Oct 30 02:52:...,,1454279984198021126,"[false, Wed Nov 1...",214105439,TheRealFourSeason...,Often tweets abou...,1869,0,,0,0,@pahpcorn @lisa_t...,False
Sat Oct 30 02:52:...,,1454279984466386954,"[false, Sat Nov 1...",954159751,Aaron Bradley,Futures trader. I...,650,0,,0,0,@polan13 @mark_do...,False
Sat Oct 30 02:52:...,,1454279985439522822,"[false, Sat Mar 1...",1107056255158251520,wadaaa | fucking ...,be a crave of pea...,129,29,"[,, Fri Oct 29 23...",810,12952,rt @choi_bts2: it...,False
Sat Oct 30 02:52:...,,1454279985519288323,"[false, Sat Feb 0...",20298096,Akira Morgendorffer,Chill. Spunky. Aw...,133,20,"[,, Sat Oct 30 01...",238,626,rt @lukewearechan...,False
Sat Oct 30 02:52:...,,1454279987012390913,"[false, Thu Feb 2...",117430610,Onkar Gharat,"Works @ #SPMRM, M...",100,0,,0,0,while one can sti...,False
Sat Oct 30 02:52:...,,1454279987289153536,"[false, Sun Jan 2...",19478009,Robert rGyatso,Add appropriate c...,557,10,"[,, Sat Oct 30 01...",34,71,rt @ctvnews: edmo...,False
Sat Oct 30 02:52:...,,1454279992066527234,"[false, Thu Jan 1...",104671559,Juanita✨🦋,Michoacán 🇲🇽❤️ ...,2758,1008,"[,, Thu Oct 28 08...",16026,142579,rt @laurenchloeee...,False
Sat Oct 30 02:52:...,,1454279993085677573,"[false, Sun Apr 2...",2454667705,Dr Debbie Wilson ...,Healthcare sustai...,1098,1,"[,, Fri Oct 29 04...",10,491,rt @hilstace: one...,False


In [16]:
covid_tweets.count()

13672943

In [17]:
covid_tweets.write.format("parquet")\
.mode('overwrite')\
.save('gs://msca-bdp-students-bucket/shared_data/abharathsingh/covid_tweets')

print('done')

done


## about the data slide

In [30]:
verified = covid_tweets.select(['user_id', 'verified_user'])

In [31]:
verified

user_id,verified_user
586248544,False
1109641163890319360,False
214105439,False
954159751,False
1107056255158251520,False
20298096,False
117430610,False
19478009,False
104671559,False
2454667705,False


In [34]:
verified2 = verified.dropDuplicates(['user_id', 'verified_user'])

In [None]:
verified2

user_id,verified_user
34993842,False
258389409,False
1441419239466881028,False
24245180,False
1454161872849043462,False
29453773,False
1381970541737865218,False
798846092,False
1082600286525501440,False
1258766479039754247,False


In [38]:
#verified2.count() #2729579

In [None]:
verified2.groupby('verified_user').count()

verified_user,count
True,58167
False,2671412


In [50]:
country = covid_tweets.select('user_id', 'country')

In [51]:
country

user_id,country
586248544,
1109641163890319360,
214105439,
954159751,
1107056255158251520,
20298096,
117430610,
19478009,
104671559,
2454667705,


In [52]:
country2 = country.dropDuplicates(['user_id', 'country'])

In [None]:
country2

user_id,country
1551479125,
1236086143529205761,
1052485525,
736549807901282304,
1881598274,
1286016980613554178,
340481991,
1346584805324988416,
1283228185195421699,
803016158,


In [None]:
country2.na.drop(subset = ['country'])

user_id,country
2300779218,Australia
21062787,United States
182906952,United States
45540923,Estonia
48185455,United States
1060284390,Kingdom of Saudi ...
49828582,Australia
1340997913502130178,United States
547944161,United Kingdom
14205312,United States


In [57]:
country3 = country2.groupby('country').count()

In [None]:
country3.orderBy("count", ascending = False).show(20, truncate = False)

+---------------------------+-------+
|country                    |count  |
+---------------------------+-------+
|null                       |2708673|
|United States              |20583  |
|United Kingdom             |7019   |
|Canada                     |2018   |
|India                      |1789   |
|Australia                  |1231   |
|South Africa               |741    |
|Ireland                    |714    |
|New Zealand                |406    |
|Republic of the Philippines|301    |
|Uganda                     |217    |
|Kenya                      |202    |
|Malaysia                   |191    |
|Spain                      |173    |
|France                     |172    |
|Pakistan                   |156    |
|Germany                    |127    |
|Mexico                     |99     |
|Italy                      |96     |
|Nigeria                    |96     |
+---------------------------+-------+
only showing top 20 rows

