### Install Spark

In [None]:
!scala -version

Scala code runner version 2.12.12 -- Copyright 2002-2020, LAMP/EPFL and Lightbend, Inc.


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
.appName('1.2. BigQuery Storage & Spark SQL - Python')\
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar') \
.getOrCreate()

### Import modules

In [None]:
import time
from itertools import islice
from itertools import compress 

In [None]:
import os
import shutil
import pandas as pd
import numpy as np
# import sh
from pyspark.sql.functions import *
#from pyspark.sql import functions as F
from pyspark.sql.types import *
import seaborn as sns
import matplotlib.pyplot as plt
# warnings.filterwarnings(action='ignore')

In [None]:
import sys
print(sys.version)
print(spark.version)

3.8.5 | packaged by conda-forge | (default, Aug 29 2020, 01:22:49) 
[GCC 7.5.0]
3.0.1


In [None]:
import pandas as pd
import numpy as np
pd.set_option('display.max_colwidth', None)
pd.reset_option('display.max_rows')
from itertools import compress 
from pyspark.sql.functions import *
from pyspark.sql.types import *
import seaborn as sns
import matplotlib.pyplot as plt
# warnings.filterwarnings(action='ignore')

In [None]:
import re
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.feature import CountVectorizer,  IDF, CountVectorizerModel, Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [None]:
#Set-up COS functions for GCS
from google.cloud import storage

### Define Helper Functions

In [None]:
# List all files in given COS directory
def list_blobs(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))

    for blob in blobs:
        print(blob.name + '\t' + str(blob.size))

In [None]:
# List all files in given COS directory in dataframe format
def list_blobs_pd(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))

    blob_name = []
    blob_size = []
    
    for blob in blobs:
        blob_name.append(blob.name)
        blob_size.append(blob.size)

    blobs_df = pd.DataFrame(list(zip(blob_name, blob_size)), columns=['Name','Size'])

    blobs_df = blobs_df.style.format({"Size": "{:,.0f}"}) 
    
    return blobs_df

In [None]:
# Delete folder from COS bucket
def delete_folder(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))

    for blob in blobs:
        blob.delete()

#### Add "eagerEval.enabled" to beautify the way Spark DF is displayed

In [None]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

### Load Data

In [None]:
# Reading data from open bucket
bucket_read = 'msca-bdp-tweets'

# Saving results into individual bucket
bucket_write = 'msca-bdp-students-bucket'

In [None]:
# 改
tweets_spark = spark.read.json('gs://msca-bdp-tweets/final_project')

In [None]:
tweets_spark.count()

25191000

In [None]:
tweets_spark.limit(10)

contributors,coordinates,created_at,display_text_range,entities,extended_entities,extended_tweet,favorite_count,favorited,filter_level,geo,id,id_str,in_reply_to_screen_name,in_reply_to_status_id,in_reply_to_status_id_str,in_reply_to_user_id,in_reply_to_user_id_str,is_quote_status,lang,place,possibly_sensitive,quote_count,quoted_status,quoted_status_id,quoted_status_id_str,quoted_status_permalink,reply_count,retweet_count,retweeted,retweeted_status,source,text,timestamp_ms,truncated,user,withheld_in_countries
,,Fri Oct 22 09:59:...,,"[[],, [], [[twitt...",,"[[0, 231], [[],, ...",0,False,low,,1451488292013105161,1451488292013105161,,,,,,False,en,,,0,,,,,0,0,False,,"<a href=""https://...",The last time I a...,1634896750821,True,"[false, Sat Apr 0...",
,,Fri Oct 22 09:59:...,,"[[], [[,, pic.twi...","[[[,, pic.twitter...",,0,False,low,,1451488292520726530,1451488292520726530,,,,,,False,en,,False,0,,,,,0,0,False,"[,, Fri Oct 22 09...","<a href=""https://...",RT @Claudia_Sahm:...,1634896750942,False,"[false, Wed Sep 2...",
,,Fri Oct 22 09:59:...,,"[[], [[,, pic.twi...","[[[,, pic.twitter...",,0,False,low,,1451488292495560706,1451488292495560706,,,,,,True,en,,False,0,"[,, Wed Oct 20 01...",1.450633096311812e+18,1.450633096311812e+18,[twitter.com/skyn...,0,0,False,"[,, Thu Oct 21 16...","<a href=""http://t...",RT @harry_pinderr...,1634896750936,False,"[false, Fri Dec 0...",
,,Fri Oct 22 09:59:...,"[0, 43]","[[], [[, Óculos E...","[[[, Óculos Escur...",,0,False,low,,1451488292776591361,1451488292776591361,,,,,,True,en,,False,0,"[,, Thu Oct 21 16...",1.4512252332732252e+18,1.4512252332732252e+18,[twitter.com/Mich...,0,0,False,,"<a href=""http://t...",It's just your he...,1634896751003,False,"[false, Fri May 0...",
,,Fri Oct 22 09:59:...,,"[[],, [], [], [[3...",,,0,False,low,,1451488293858816003,1451488293858816003,,,,,,True,en,,,0,"[,, Wed Oct 20 17...",1.450872210529337e+18,1.450872210529337e+18,[twitter.com/pete...,0,0,False,"[,, Thu Oct 21 16...","<a href=""http://t...",RT @doctor_oxford...,1634896751261,False,"[false, Sat May 2...",
,,Fri Oct 22 09:59:...,,"[[],, [], [], [[8...",,,0,False,low,,1451488294244585488,1451488294244585488,,,,,,False,en,,,0,,,,,0,0,False,"[,, Fri Oct 22 00...","<a href=""http://t...",RT @OccupyDemocra...,1634896751353,False,"[false, Wed Sep 1...",
,,Fri Oct 22 09:59:...,,"[[],, [], [], [[5...",,,0,False,low,,1451488292877348879,1451488292877348879,,,,,,False,en,,,0,,,,,0,0,False,"[,, Thu Oct 21 06...","<a href=""http://t...",RT @MaxBlumenthal...,1634896751027,False,"[false, Sat Aug 1...",
,,Fri Oct 22 09:59:...,"[15, 140]","[[],, [], [[twitt...",,"[[15, 170], [[],,...",0,False,low,,1451488294487744512,1451488294487744512,SimeonBrownMP,1.451362314532262e+18,1.451362314532262e+18,8.473695914832773e+17,8.473695914832773e+17,False,en,,,0,,,,,0,0,False,,"<a href=""http://t...",@SimeonBrownMP Bu...,1634896751411,True,"[false, Tue Dec 3...",
,,Fri Oct 22 09:59:...,,"[[],, [], [[reut....",,,0,False,low,,1451488296329154567,1451488296329154567,,,,,,False,en,,False,0,,,,,0,0,False,"[,, Fri Oct 22 09...","<a href=""https://...",RT @Reuters: U.S....,1634896751850,False,"[false, Sat Jul 0...",
,,Fri Oct 22 09:59:...,,"[[[[107, 115], CO...",,,0,False,low,,1451488296597610497,1451488296597610497,,,,,,True,en,,,0,"[,, Thu Oct 21 20...",1.451284862485373e+18,1.451284862485373e+18,[twitter.com/Neur...,0,0,False,"[,, Fri Oct 22 00...","<a href=""http://t...",RT @dysclinic: A ...,1634896751914,False,"[false, Sat Nov 2...",


In [None]:
# tweets_spark.printSchema()

## Step 1. Discard Irrelevant Tweets



In [None]:
# list the covid-19 related words
related_words = ['covid','segreation','fauci','death rate','health insurance','cure','self-isolate','positive','transmission','incubation period','asymptomatic','superspreader','super-spreader','n95','tested','prevention','immunization','anosmia','antibod','variant','vaccine','vaccination','vaccinated','mask','infected','isolation','injection','swab','confirmed cases','dose','social distanc','epidemic','pandemic','community spread','trace','tracing','quarantine','coronavirus','pfizer','moderna','booster']

In [None]:
from pyspark.sql.functions import lower,col

In [None]:
tweets_spark = tweets_spark.withColumn('low_text',lower(col('text')))

In [None]:
tweets_spark_covid = tweets_spark.filter("low_text LIKE '%covid%' OR low_text LIKE '%segreation%' OR low_text LIKE '%fauci%' OR low_text LIKE '%death rate%' OR low_text LIKE '%health insurance%' OR low_text LIKE '%cure%' OR low_text LIKE '%self-isolate%' OR low_text LIKE '%positive%' OR low_text LIKE '%transmission%' OR low_text LIKE '%incubation period%' OR low_text LIKE '%asymptomatic%' OR low_text LIKE '%super spreader%' OR low_text LIKE '%superspreader%' OR low_text LIKE '%n95%' OR low_text LIKE '%tested%' OR low_text LIKE '%prevention%' OR low_text LIKE '%immunization%' OR low_text LIKE '%anosmia%' OR low_text LIKE '%anosmia%' OR low_text LIKE '%anosmia%' OR low_text LIKE '%antibod%' OR low_text LIKE '%variant%' OR low_text LIKE '%vaccine%' OR low_text LIKE '%vaccination%' OR low_text LIKE '%trace%' OR low_text LIKE '%vaccinated%' OR low_text LIKE '%mask%' OR low_text LIKE '%infected%' OR low_text LIKE '%isolation%' OR low_text LIKE '%injection%' OR low_text LIKE '%swab%' OR low_text LIKE '%confirmed cases%' OR low_text LIKE '%dose%' OR low_text LIKE '%social distanc%' OR low_text LIKE '%epidemic%' OR low_text LIKE '%pandemic%' OR low_text LIKE '%community spread%' OR low_text LIKE '%tracing%' OR low_text LIKE '%quarantine%' OR low_text LIKE '%coronavirus%' OR low_text LIKE '%pfizer%' OR low_text LIKE '%moderna%' OR low_text LIKE '%booster%' ")

In [None]:
total_covid_tweets = tweets_spark_covid.count()
total_covid_tweets

17439954

## Step 2. Complete thorough EDA to identify which variables can be used to profile the Twitterers 

### 2.1. Identify organization of users

In [None]:
tweets_extended = tweets_spark_covid.withColumn('user_id',col('user.id')).\
withColumn('user_id_str',col('user.id_str')).\
withColumn('user_name',col('user.name')).\
withColumn('user_location',col('user.location')).\
withColumn('user_description',col('user.description')).\
withColumn('user_verified',col('user.verified')).\
withColumn('user_followers',col('user.followers_count')).\
withColumn('country',col('place.country')).\
withColumn('country_code',col('place.country_code')).\
withColumn('place_type',col('place.place_type')).\
withColumn('place_name',col('place.name')).\
withColumn('place_full_name',col('place.full_name')).\
withColumn('is_retweet_status',when(tweets_spark.retweeted_status.isNotNull(),'true').otherwise('false')).\
withColumn('original_retweet_count',col('retweeted_status.retweet_count')).\
withColumn('is_quote_status',when(tweets_spark.quoted_status.isNotNull(),'true').otherwise('false')).\
withColumn('original_quote_count',col('quoted_status.quote_count')).\
withColumn('original_ret_id_str',col('retweeted_status.id_str')).\
withColumn('original_quo_id_str',col('quoted_status.id_str')).\
withColumn('original_retaccount_id_str',col('retweeted_status.user.id_str')).\
withColumn('original_retaccount_name',col('retweeted_status.user.name')).\
withColumn('original_retaccount_description',col('retweeted_status.user.description')).\
withColumn('original_retaccount_location',col('retweeted_status.user.location')).\
withColumn('original_retaccount_verified',col('retweeted_status.user.verified')).\
withColumn('original_retaccount_followers',col('retweeted_status.user.followers_count')).\
withColumn('original_retaccount_country',col('retweeted_status.place.country')).\
withColumn('original_retaccount_country_code',col('retweeted_status.place.country_code')).\
withColumn('original_retaccount_place_type',col('retweeted_status.place.place_type')).\
withColumn('original_retaccount_place_name',col('retweeted_status.place.name')).\
withColumn('original_retaccount_place_full_name',col('retweeted_status.place.full_name')).\
withColumn('coordinates',col('coordinates.coordinates'))

In [None]:
# define crital words for different groups of accounts
health_related_words = ['health','disease','covid','vaccine','medical','hospital','clinic','cancer']
news_related_words = ['news','press','broadcast','journalism','report']
government_related_words = ['government','office','president','secretary','minister','representative','embassy','ambassador','department']

In [None]:
tweets_extended = tweets_extended.withColumn('user_identification',\
                            when((col('user_verified')==True)&((col('user_description').contains("health")) |(col('user_description').contains("disease"))|(col('user_description').contains("covid"))|(col('user_description').contains("vaccine"))|(col('user_description').contains("medical"))|(col('user_description').contains("hospital"))|(col('user_description').contains("clinic"))|(col('user_description').contains("cancer"))),"health").\
                                otherwise(when((col('user_verified')==True)&((col('user_description').contains("news")) |(col('user_description').contains("press"))|(col('user_description').contains("broadcast"))|(col('user_description').contains("journalism"))|(col('user_description').contains("report"))),"news").\
                                               otherwise(when((col('user_verified')==True)&((col('user_description').contains("government")) |(col('user_description').contains("office"))|(col('user_description').contains("president"))|(col('user_description').contains("secretary"))|(col('user_description').contains("minister"))|(col('user_description').contains("representative"))|(col('user_description').contains("embassy"))|(col('user_description').contains("ambassador"))|(col('user_description').contains("department"))),"government").\
                                                         otherwise(when(col('user_followers')>100000,"social_media_influencer").\
                                                                  otherwise("other")))))

In [None]:
# take a look
tweets_extended.filter(tweets_extended.user_identification=='health').select('user_description','user_identification').limit(5)

user_description,user_identification
Greater Than COVI...,health
A worldwide movem...,health
Largest independe...,health
BBC health corres...,health
Farm girl from Ne...,health


In [None]:
tweets_extended = tweets_extended.withColumn('ori_identification',\
                            when((col('original_retaccount_verified')==True)&((col('original_retaccount_description').contains("health")) |(col('original_retaccount_description').contains("disease"))|(col('original_retaccount_description').contains("covid"))|(col('original_retaccount_description').contains("vaccine"))|(col('original_retaccount_description').contains("medical"))|(col('original_retaccount_description').contains("hospital"))|(col('original_retaccount_description').contains("clinic"))|(col('original_retaccount_description').contains("cancer"))),"health").\
                                otherwise(when((col('original_retaccount_verified')==True)&((col('original_retaccount_description').contains("news")) |(col('original_retaccount_description').contains("press"))|(col('original_retaccount_description').contains("broadcast"))|(col('original_retaccount_description').contains("journalism"))|(col('original_retaccount_description').contains("report"))),"news").\
                                               otherwise(when((col('original_retaccount_verified')==True)&((col('original_retaccount_description').contains("government")) |(col('original_retaccount_description').contains("office"))|(col('original_retaccount_description').contains("president"))|(col('original_retaccount_description').contains("secretary"))|(col('original_retaccount_description').contains("minister"))|(col('original_retaccount_description').contains("representative"))|(col('original_retaccount_description').contains("embassy"))|(col('original_retaccount_description').contains("ambassador"))|(col('original_retaccount_description').contains("department"))),"government").\
                                                         otherwise(when(col('original_retaccount_followers')>100000,"social_media_influencer").\
                                                                  otherwise("other")))))

In [None]:
tweets = tweets_extended.select('id_str','text','created_at','country','country_code','place_name','coordinates','original_retweet_count',\
                                'user_id_str','user_name','user_location','user_verified','user_followers','user_identification','ori_identification',\
                                'original_ret_id_str','original_quo_id_str','original_retaccount_id_str','original_retaccount_name')

In [None]:
# take a look
tweets.limit(10)

id_str,text,created_at,country,country_code,place_name,coordinates,original_retweet_count,user_id_str,user_name,user_location,user_verified,user_followers,user_identification,ori_identification,original_ret_id_str,original_quo_id_str,original_retaccount_id_str,original_retaccount_name
1456350860808704010,RT @miles_commodo...,Thu Nov 04 20:01:...,,,,,1.0,389675356,m t cartier,south carolina,False,371,other,other,1.4563495451814216e+18,1.4563476832501268e+18,8.244720449139834e+17,Miles Commodore
1456350860871675908,RT @dallasnews: ‘...,Thu Nov 04 20:01:...,,,,,7.0,33189500,RUTH ELLA OWENS,"Dallas, TX",False,2530,other,social_media_infl...,1.4560352111589868e+18,,15679641.0,Dallas Morning News
1456350861005934596,RT @trishgreenhal...,Thu Nov 04 20:01:...,,,,,245.0,1319937637,Mike Farrington 💙,,False,634,other,social_media_infl...,1.456299658402312e+18,1.4562843076648223e+18,462021312.0,Trisha Greenhalgh
1456350863186972673,Well sweet I have...,Thu Nov 04 20:01:...,,,,,,1332503413079351296,BrainStreamers(Fl...,,False,251,other,other,,,,
1456350863841271814,RT @JoshMandelOhi...,Thu Nov 04 20:01:...,,,,,63.0,788929601309384704,Deborah schoonmaker,,False,157,other,other,1.4563400397812613e+18,,35664186.0,Josh Mandel
1456350863656734721,Tomorrow! 10:30 A...,Thu Nov 04 20:01:...,,,,,,31195359,Pitt Social Work,"Pittsburgh, PA",False,3635,other,other,,,,
1456350864143228930,RT @WonderlandNew...,Thu Nov 04 20:01:...,,,,,15.0,1372931358054551558,Pedro Gonzalez,,False,222,other,other,1.4562662823915889e+18,,33518217.0,Canada's Wonderland
1456350865619505152,RT @ProLifeAll: J...,Thu Nov 04 20:01:...,,,,,3.0,1330150464588029959,Christina Aspires...,,False,1188,other,other,1.4562662922566e+18,,9.191988990337923e+17,ConsistentLifeEthic
1456350866294804487,RT @RandPaul: I g...,Thu Nov 04 20:01:...,,,,,1160.0,1347999703426682880,Chewy,,False,998,other,social_media_infl...,1.4563443266032558e+18,,216881337.0,Senator Rand Paul
1456350866232025090,RT @KamalaHarris:...,Thu Nov 04 20:01:...,,,,,145.0,871399252815204354,Terry Egan,A BUCKEYE living ...,False,485,other,social_media_infl...,1.4563475190138593e+18,,30354991.0,Kamala Harris


In [None]:
tweets.write.format("json").mode("overwrite").save('gs://' + bucket_write + '/shared/hedan/original_tweets')

In [None]:
# tweets = spark.read.json('gs://' + bucket_write + '/shared/hedan/original_tweets')