In [1]:
#Ensure we are using the right kernel
spark.version

'3.0.1'

In [2]:
import os
import shutil
import pandas as pd
# import sh
#from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [3]:
pd.set_option("max_colwidth", 100)

In [4]:
from google.cloud import storage

In [5]:
# List all files in given COS directory
def list_blobs(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))

    for blob in blobs:
        print(blob.name + '\t' + str(blob.size))

In [6]:
# List all files in given COS directory
def list_blobs_pd(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))

    blob_name = []
    blob_size = []
    
    for blob in blobs:
        blob_name.append(blob.name)
        blob_size.append(blob.size)

    blobs_df = pd.DataFrame(list(zip(blob_name, blob_size)), columns=['Name','Size'])

    blobs_df.style.format({"Size": "{:,.0f}"}) 
    
    return blobs_df

In [7]:
# Delete folder from COS bucket
def delete_folder(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))

    for blob in blobs:
        blob.delete()

In [8]:
#!hadoop fs -ls "gs://msca-bdp-tweets/final_project"

In [9]:
import os
import subprocess
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [10]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [11]:
directory = 'gs://msca-bdp-tweets/final_project/'
# file = '*.json'
# path = directory + file
path = directory

In [12]:
cmd = 'hadoop fs -du -s -h ' + directory

p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
for line in p.stdout.readlines():
    print (line)
    
retval = p.wait()

156.2 G  156.2 G  gs://msca-bdp-tweets/final_project



In [15]:
%%time

tweets_spark = spark.read.json(path)

In [14]:
tweets_spark.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- additional_media_info: struct (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- embeddable: boolean (nullable = true)
 |    |    |    |    |-- monetizable: bo

In [20]:
tweets_spark.select('retweeted_status')

retweeted_status
"[,, Sat Oct 30 02..."
""
""
"[,, Sat Oct 30 02..."
"[,, Sat Oct 30 02..."
""
""
"[,, Fri Oct 29 17..."
"[,, Fri Oct 29 16..."
""


In [13]:
display(tweets_spark)

contributors,coordinates,created_at,display_text_range,entities,extended_entities,extended_tweet,favorite_count,favorited,filter_level,geo,id,id_str,in_reply_to_screen_name,in_reply_to_status_id,in_reply_to_status_id_str,in_reply_to_user_id,in_reply_to_user_id_str,is_quote_status,lang,place,possibly_sensitive,quote_count,quoted_status,quoted_status_id,quoted_status_id_str,quoted_status_permalink,reply_count,retweet_count,retweeted,retweeted_status,source,text,timestamp_ms,truncated,user,withheld_in_countries
,,Sat Oct 30 02:52:...,,"[[],, [], [], [[7...",,,0,False,low,,1454279981312495619,1454279981312495619,,,,,,True,en,,,0,"[,, Fri Oct 29 16...",1.4541302164738785e+18,1.4541302164738785e+18,[twitter.com/disc...,0,0,False,"[,, Sat Oct 30 02...","<a href=""https://...",RT @LilNasX: y’al...,1635562341417,False,"[false, Sun May 2...",
,,Sat Oct 30 02:52:...,,"[[],, [], [[twitt...",,"[[0, 238], [[[[16...",0,False,low,,1454279982096789505,1454279982096789505,,,,,,False,en,,False,0,,,,,0,0,False,,"<a href=""http://t...",“At a political f...,1635562341604,True,"[false, Mon Feb 2...",
,,Sat Oct 30 02:52:...,"[17, 77]","[[],, [], [], [[2...",,,0,False,low,,1454279983531188226,1454279983531188226,GarrAarghHrumph,1.4542797138584576e+18,1.4542797138584576e+18,266929217.0,266929217.0,False,en,,,0,,,,,0,0,False,,"<a href=""http://t...",@GarrAarghHrumph ...,1635562341946,False,"[false, Mon May 2...",
,,Sat Oct 30 02:52:...,,"[[],, [], [], [[1...",,,0,False,low,,1454279984017600512,1454279984017600512,,,,,,True,en,,,0,"[,, Fri Oct 29 21...",1.4541916525458432e+18,1.4541916525458432e+18,[twitter.com/jjxt...,0,0,False,"[,, Sat Oct 30 02...","<a href=""https://...",RT @Denise_Old_La...,1635562342062,False,"[false, Sun Mar 2...",
,,Sat Oct 30 02:52:...,,"[[],, [], [], [[7...",,,0,False,low,,1454279983929626624,1454279983929626624,,,,,,True,en,,,0,"[,, Fri Oct 29 16...",1.4541302164738785e+18,1.4541302164738785e+18,[twitter.com/disc...,0,0,False,"[,, Sat Oct 30 02...","<a href=""http://t...",RT @LilNasX: y’al...,1635562342041,False,"[false, Fri May 0...",
,,Sat Oct 30 02:52:...,"[26, 140]","[[],, [], [[twitt...",,"[[26, 162], [[],,...",0,False,low,,1454279984198021126,1454279984198021126,pahpcorn,1.454279515476091e+18,1.454279515476091e+18,3150308066.0,3150308066.0,False,en,,,0,,,,,0,0,False,,"<a href=""http://t...",@pahpcorn @Lisa_T...,1635562342105,True,"[false, Wed Nov 1...",
,,Sat Oct 30 02:52:...,"[19, 81]","[[],, [], [], [[3...",,,0,False,low,,1454279984466386954,1454279984466386954,polan13,1.45427880883234e+18,1.45427880883234e+18,38108853.0,38108853.0,False,en,,,0,,,,,0,0,False,,"<a href=""http://t...",@polan13 @mark_do...,1635562342169,False,"[false, Sat Nov 1...",
,,Sat Oct 30 02:52:...,,"[[],, [], [], [[2...",,,0,False,low,,1454279984722419712,1454279984722419712,,,,,,False,en,,,0,,,,,0,0,False,"[,, Fri Oct 29 17...","<a href=""http://t...",RT @LegendaryEner...,1635562342230,False,"[false, Sat Feb 0...",
,,Sat Oct 30 02:52:...,,"[[],, [], [[gript...",,,0,False,low,,1454279985292840962,1454279985292840962,,,,,,False,en,,False,0,,,,,0,0,False,"[,, Fri Oct 29 16...","<a href=""http://t...",RT @RWMaloneMD: H...,1635562342366,False,"[false, Mon Nov 0...",
,,Sat Oct 30 02:52:...,,"[[],, [], [], []]",,,0,False,low,,1454279985284489219,1454279985284489219,,,,,,True,en,,,0,"[,, Fri Oct 29 16...",1.4541302164738785e+18,1.4541302164738785e+18,[twitter.com/disc...,0,0,False,,"<a href=""http://t...",Clown ass.,1635562342364,False,"[false, Tue Feb 2...",


In [58]:
#tweets_spark.describe(['contributors']).show()

In [35]:
tweets_spark[['retweet_count']].describe()

summary,retweet_count
count,25191000.0
mean,0.0
stddev,0.0
min,0.0
max,0.0


In [37]:
from pyspark.sql.functions import isnan, when, count, col
tweets_spark.select([count(when(col(c).isNull(), c)).alias(c) for c in tweets_spark.columns]).show()

+------------+-----------+----------+------------------+--------+-----------------+--------------+--------------+---------+------------+--------+---+------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+---------------+----+--------+------------------+-----------+-------------+----------------+--------------------+-----------------------+-----------+-------------+---------+----------------+------+----+------------+---------+----+---------------------+
|contributors|coordinates|created_at|display_text_range|entities|extended_entities|extended_tweet|favorite_count|favorited|filter_level|     geo| id|id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_reply_to_user_id_str|is_quote_status|lang|   place|possibly_sensitive|quote_count|quoted_status|quoted_status_id|quoted_status_id_str|quoted_status_permalink|reply_count|retweet_count|retweeted|retweeted_status|source|text|t

contributors			display_text_range	entities	extended_entities	extended_tweet		favorited	filter_level	geo	id		in_reply_to_screen_name		in_reply_to_status_id_str	in_reply_to_user_id	in_reply_to_user_id_str	is_quote_status			possibly_sensitive		quoted_status		quoted_status_id_str	quoted_status_permalink			retweeted	retweeted_status	source	text	timestamp_ms	truncated		withheld_in_countries

created_at, id_str, in_reply_to_user_id, user, coordinates, place, reply_count, retweet_count, favorite_count, quote_count, lang

In [None]:
#drop_vars = ['coordinates', 'created_at', 'entities', 'favorite_count', 'id_str', in_reply_to_status_id	in_reply_to_status_id_str	in_reply_to_user_id	in_reply_to_user_id_str	is_quote_status	lang	place	possibly_sensitive	quote_count	quoted_status	quoted_status_id	quoted_status_id_str	quoted_status_permalink	reply_count	retweet_count	retweeted	retweeted_status	source	text	timestamp_ms	truncated	user	withheld_in_countries]

In [29]:
#select_columns = ['created_at', 'id_str', 'in_reply_to_user_id', 'user, coordinates', \
                  'place', 'reply_count', 'retweet_count', 'favorite_count', 'quote_count', 'lang']

In [60]:
#tweets_selected = tweets_spark.select(['created_at', 'id_str', 'in_reply_to_user_id', 'user, coordinates', \
#                  'place', 'reply_count', 'retweet_count', 'favorite_count', 'quote_count', 'lang'])

In [None]:
drop_cols = ['contributors', 'display_text_range', 'entities', 'extended_entities', 'extended_tweet', 'favorited',\
 'filter_level', 'geo', 'id', 'in_reply_to_screen_name', 'in_reply_to_status_id', 'in_reply_to_user_id',\ 
 'is_quote_status', 'possibly_sensitive', 'quoted_status', 'quoted_status_id', 'quoted_status_id_str', \
 'quoted_status_permalink', 'retweeted', 'retweeted_status', 'source', 'timestamp_ms', 'truncated', 'withheld_in_countries']

created_at, id_str, in_reply_to_user_id, user, coordinates, place, 
reply_count, retweet_count, favorite_count, quote_count, lang

In [29]:
tweets_df = tweets_spark.select\
(['created_at', 'id_str', 'text', 'entities', 'in_reply_to_user_id', 'user', 'retweeted_status', 'coordinates', 'place', 
'reply_count', 'retweet_count', 'favorite_count', 'quote_count', 'lang', 'geo'])

In [30]:
display(tweets_df)

created_at,id_str,text,entities,in_reply_to_user_id,user,retweeted_status,coordinates,place,reply_count,retweet_count,favorite_count,quote_count,lang,geo
Sat Oct 30 02:52:...,1454279981312495619,RT @LilNasX: y’al...,"[[],, [], [], [[7...",,"[false, Sun May 2...","[,, Sat Oct 30 02...",,,0,0,0,0,en,
Sat Oct 30 02:52:...,1454279982096789505,“At a political f...,"[[],, [], [[twitt...",,"[false, Mon Feb 2...",,,,0,0,0,0,en,
Sat Oct 30 02:52:...,1454279983531188226,@GarrAarghHrumph ...,"[[],, [], [], [[2...",266929217.0,"[false, Mon May 2...",,,,0,0,0,0,en,
Sat Oct 30 02:52:...,1454279984017600512,RT @Denise_Old_La...,"[[],, [], [], [[1...",,"[false, Sun Mar 2...","[,, Sat Oct 30 02...",,,0,0,0,0,en,
Sat Oct 30 02:52:...,1454279983929626624,RT @LilNasX: y’al...,"[[],, [], [], [[7...",,"[false, Fri May 0...","[,, Sat Oct 30 02...",,,0,0,0,0,en,
Sat Oct 30 02:52:...,1454279984198021126,@pahpcorn @Lisa_T...,"[[],, [], [[twitt...",3150308066.0,"[false, Wed Nov 1...",,,,0,0,0,0,en,
Sat Oct 30 02:52:...,1454279984466386954,@polan13 @mark_do...,"[[],, [], [], [[3...",38108853.0,"[false, Sat Nov 1...",,,,0,0,0,0,en,
Sat Oct 30 02:52:...,1454279984722419712,RT @LegendaryEner...,"[[],, [], [], [[2...",,"[false, Sat Feb 0...","[,, Fri Oct 29 17...",,,0,0,0,0,en,
Sat Oct 30 02:52:...,1454279985292840962,RT @RWMaloneMD: H...,"[[],, [], [[gript...",,"[false, Mon Nov 0...","[,, Fri Oct 29 16...",,,0,0,0,0,en,
Sat Oct 30 02:52:...,1454279985284489219,Clown ass.,"[[],, [], [], []]",,"[false, Tue Feb 2...",,,,0,0,0,0,en,


In [16]:
#list_str = ['COVID', 'Corona', 'vaccine', 'Vaccine', 'CDC', 'shot', 'Shot', 'Wuhan']
#tweets_df2 = tweets_df.rdd.filter(lambda r: any(s in r[0] for s in list_str))

In [None]:
#tweets_df2.count()

0

In [62]:
#tweets_df2 = tweets_df.limit(10000).filter(lambda line: 'Covid' in line)

In [31]:
tweets_df = tweets_df.withColumn('text2', lower(tweets_df.text))

In [32]:
tweets_df2 = tweets_df.filter((tweets_df.text2.contains('covid')) | (tweets_df.text2.contains('vaccine')) \
                              | (tweets_df.text2.contains('quarantine')) | (tweets_df.text2.contains('shot')) \
                              | (tweets_df.text2.contains('outbreak')) | (tweets_df.text2.contains('isolation')) \
                              | (tweets_df.text2.contains('isolation')) | (tweets_df.text2.contains('spread')) \
                              | (tweets_df.text2.contains('pfizer')) | (tweets_df.text2.contains('moderna')) \
                              | (tweets_df.text2.contains('cdc')) | (tweets_df.text2.contains('distancing')) \
                              | (tweets_df.text2.contains('mask')) | (tweets_df.text2.contains('sars')) \
                              | (tweets_df.text2.contains('rona')) | (tweets_df.text2.contains('pandemic')) )                   

In [33]:
display(tweets_df2)

created_at,id_str,text,entities,in_reply_to_user_id,user,retweeted_status,coordinates,place,reply_count,retweet_count,favorite_count,quote_count,lang,geo,text2
Sat Oct 30 02:52:...,1454279983531188226,@GarrAarghHrumph ...,"[[],, [], [], [[2...",266929217.0,"[false, Mon May 2...",,,,0,0,0,0,en,,@garraarghhrumph ...
Sat Oct 30 02:52:...,1454279984017600512,RT @Denise_Old_La...,"[[],, [], [], [[1...",,"[false, Sun Mar 2...","[,, Sat Oct 30 02...",,,0,0,0,0,en,,rt @denise_old_la...
Sat Oct 30 02:52:...,1454279984198021126,@pahpcorn @Lisa_T...,"[[],, [], [[twitt...",3150308066.0,"[false, Wed Nov 1...",,,,0,0,0,0,en,,@pahpcorn @lisa_t...
Sat Oct 30 02:52:...,1454279984466386954,@polan13 @mark_do...,"[[],, [], [], [[3...",38108853.0,"[false, Sat Nov 1...",,,,0,0,0,0,en,,@polan13 @mark_do...
Sat Oct 30 02:52:...,1454279984722419712,RT @LegendaryEner...,"[[],, [], [], [[2...",,"[false, Sat Feb 0...","[,, Fri Oct 29 17...",,,0,0,0,0,en,,rt @legendaryener...
Sat Oct 30 02:52:...,1454279985292840962,RT @RWMaloneMD: H...,"[[],, [], [[gript...",,"[false, Mon Nov 0...","[,, Fri Oct 29 16...",,,0,0,0,0,en,,rt @rwmalonemd: h...
Sat Oct 30 02:52:...,1454279985439522822,RT @choi_bts2: It...,"[[],, [], [], [[9...",,"[false, Sat Mar 1...","[,, Fri Oct 29 23...",,,0,0,0,0,en,,rt @choi_bts2: it...
Sat Oct 30 02:52:...,1454279985519288323,RT @Lukewearechan...,"[[],, [], [], [[9...",,"[false, Sat Feb 0...","[,, Sat Oct 30 01...",,,0,0,0,0,en,,rt @lukewearechan...
Sat Oct 30 02:52:...,1454279987012390913,While one can sti...,"[[],, [], [[twitt...",117430610.0,"[false, Thu Feb 2...",,,,0,0,0,0,en,,while one can sti...
Sat Oct 30 02:52:...,1454279987289153536,RT @CTVNews: Edmo...,"[[], [[,, pic.twi...",,"[false, Sun Jan 2...","[,, Sat Oct 30 01...",,,0,0,0,0,en,,rt @ctvnews: edmo...


In [26]:
tweets_df2.count()

16847876

In [None]:
from pyspark.sql.functions import isnan, when, count, col
tweets_df2.select([count(when(col(c).isNull(), c)).alias(c) for c in tweets_df2.columns]).show()

+----------+------+----+--------+-------------------+----+-----------+--------+-----------+-------------+--------------+-----------+----+-----+
|created_at|id_str|text|entities|in_reply_to_user_id|user|coordinates|   place|reply_count|retweet_count|favorite_count|quote_count|lang|text2|
+----------+------+----+--------+-------------------+----+-----------+--------+-----------+-------------+--------------+-----------+----+-----+
|         0|     0|   0|       0|           14725107|   0|   16846145|16770337|          0|            0|             0|          0|   0|    0|
+----------+------+----+--------+-------------------+----+-----------+--------+-----------+-------------+--------------+-----------+----+-----+



In [34]:
tweets_df2 = tweets_df2.withColumn('coordinates2', tweets_df2.geo.coordinates)

In [35]:
tweets_df2 = tweets_df2.withColumn('coordinates3', tweets_df2.place.bounding_box.coordinates)

In [38]:
tweets_df2 = tweets_df2.withColumn('country1', tweets_df2.place.country)

In [41]:
tweets_df2 = tweets_df2.withColumn('full_loc', tweets_df2.place.full_name)

In [43]:
#tweets_df2 = tweets_df2.withColumn('city1', tweets_df2.place.name)

In [44]:
tweets_df2

created_at,id_str,text,entities,in_reply_to_user_id,user,retweeted_status,coordinates,place,reply_count,retweet_count,favorite_count,quote_count,lang,geo,text2,coordinates2,coordinates3,country1,city1,full_loc
Sat Oct 30 02:52:...,1454279983531188226,@GarrAarghHrumph ...,"[[],, [], [], [[2...",266929217.0,"[false, Mon May 2...",,,,0,0,0,0,en,,@garraarghhrumph ...,,,,,
Sat Oct 30 02:52:...,1454279984017600512,RT @Denise_Old_La...,"[[],, [], [], [[1...",,"[false, Sun Mar 2...","[,, Sat Oct 30 02...",,,0,0,0,0,en,,rt @denise_old_la...,,,,,
Sat Oct 30 02:52:...,1454279984198021126,@pahpcorn @Lisa_T...,"[[],, [], [[twitt...",3150308066.0,"[false, Wed Nov 1...",,,,0,0,0,0,en,,@pahpcorn @lisa_t...,,,,,
Sat Oct 30 02:52:...,1454279984466386954,@polan13 @mark_do...,"[[],, [], [], [[3...",38108853.0,"[false, Sat Nov 1...",,,,0,0,0,0,en,,@polan13 @mark_do...,,,,,
Sat Oct 30 02:52:...,1454279984722419712,RT @LegendaryEner...,"[[],, [], [], [[2...",,"[false, Sat Feb 0...","[,, Fri Oct 29 17...",,,0,0,0,0,en,,rt @legendaryener...,,,,,
Sat Oct 30 02:52:...,1454279985292840962,RT @RWMaloneMD: H...,"[[],, [], [[gript...",,"[false, Mon Nov 0...","[,, Fri Oct 29 16...",,,0,0,0,0,en,,rt @rwmalonemd: h...,,,,,
Sat Oct 30 02:52:...,1454279985439522822,RT @choi_bts2: It...,"[[],, [], [], [[9...",,"[false, Sat Mar 1...","[,, Fri Oct 29 23...",,,0,0,0,0,en,,rt @choi_bts2: it...,,,,,
Sat Oct 30 02:52:...,1454279985519288323,RT @Lukewearechan...,"[[],, [], [], [[9...",,"[false, Sat Feb 0...","[,, Sat Oct 30 01...",,,0,0,0,0,en,,rt @lukewearechan...,,,,,
Sat Oct 30 02:52:...,1454279987012390913,While one can sti...,"[[],, [], [[twitt...",117430610.0,"[false, Thu Feb 2...",,,,0,0,0,0,en,,while one can sti...,,,,,
Sat Oct 30 02:52:...,1454279987289153536,RT @CTVNews: Edmo...,"[[], [[,, pic.twi...",,"[false, Sun Jan 2...","[,, Sat Oct 30 01...",,,0,0,0,0,en,,rt @ctvnews: edmo...,,,,,


In [19]:
#tweets_df3 = tweets_df2.drop('in_reply_to_user_id', 'coordinates', 'place')

In [26]:
#display(tweets_df3)

In [None]:
%%time

tweets_df2.write.format("parquet").\
mode('overwrite').\
save('gs://msca-bdp-students-bucket/shared_data/nukagvilia/tweets3')

CPU times: user 76.8 ms, sys: 33.2 ms, total: 110 ms
Wall time: 6min 48s


In [67]:
tweets_df3.printSchema()

root
 |-- created_at: string (nullable = true)
 |-- id_str: string (nullable = true)
 |-- text: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- additional_media_info: struct (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- embeddable: boolean (nullable = true)
 |    |    |    |    |-- monetizable: boolean (nullable = true)
 |    |    |    |    |-- title: string (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable =

In [21]:
#tweets_df2.groupBy('place').count()

In [31]:
#df.select("body.Sw1", "body.Sw2")

tweets_df3.select('user.name')

name
Emily Joy
Aaron Bradley
Bright Starlight
Akira Morgendorffer
Juanita✨🦋
Dr Debbie Wilson ...
Chuck W
Pal-Mac Athletics
Gracie DG
Steve Bottari
