# BDP Final Assignment: Twitter Education, Part 3.a (Processing time and tag organization)

Recall

In [1]:
#Ensure we are using the right kernel
spark.version

'3.1.3'

In [2]:
import pandas as pd
import numpy as np
pd.set_option('display.max_colwidth', None)
pd.reset_option('display.max_rows')

#from itertools import compress 
import seaborn as sns 
import matplotlib.pyplot as plt

#import warnings
#warnings.filterwarnings(action='ignore')
#warnings.simplefilter('ignore')

In [3]:
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession

GCP Tools and Functions

In [4]:
from google.cloud import storage

In [5]:
# Reading data from open bucket
# Located at my BDP-bucket: gs://msca-bdp-students-bucket/shared_data/hjiang248/final_sdf
dataPath = 'gs://msca-bdp-students-bucket/shared_data/hjiang248/final_sdf_v9'

In [6]:
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

# Read data

In [7]:
%%time

education_DF = spark.read.parquet(dataPath)

                                                                                

CPU times: user 6.61 ms, sys: 4.56 ms, total: 11.2 ms
Wall time: 9.84 s


In [8]:
#education_DF.printSchema()

In [9]:
# select the dataset
data = education_DF.select([
                            x for x in education_DF.columns 
                            if ('qt' not in x) and (x not in ['retweet_count', 'favorite_count', 'quote_count', 'quoted_status'])
                           ])

# pick the source user id 
#data = data.withColumn('source_rt_usr_id', data.retweeted_status.user['id'])\
#           .withColumn('source_rt_id', data.retweeted_status.id)\
#           .drop('retweeted_status')

# rename columns
data = data.withColumnRenamed('rtstatus_favorite_count', 'favorite_count')\
           .withColumnRenamed('rtstatus_retweet_count', 'retweet_count')\
           .withColumnRenamed('rtstatus_quote_count', 'quote_count')


In [10]:
# cast dataType
data = data.withColumn('favorite_count', data['favorite_count'].cast(IntegerType())) \
           .withColumn('retweet_count', data['retweet_count'].cast(IntegerType())) 

data = data.withColumn('favorite_count', data['favorite_count'].cast(IntegerType())) \
           .withColumn('followers_count', data['followers_count'].cast(IntegerType()))

In [11]:
data.printSchema()

root
 |-- created_at: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- text: string (nullable = true)
 |-- retweeted: string (nullable = true)
 |-- favorite_count: integer (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- quote_count: long (nullable = true)
 |-- rt_hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- source_rt_usr_id: long (nullable = true)
 |-- source_rt_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- verified_user: boolean (nullable = true)
 |-- user_id: long (nullable = true)
 |-- user_name: string (nullable = true)
 |-- followers_count: integer (nullable = true)
 |-- user_description: string (nullable = true)



In [12]:
#%time
#data.count()

# Organization

Reference:
- Defining influencer (from https://bitrebels.com/social/hot-tips-become-noted-twitter-influencer/https://bitrebels.com/social/hot-tips-become-noted-twitter-influencer/)
- NGOs: https://www.constantcontact.com/blog/google-keywords-for-nonprofits/

In [13]:
orgDF = data #.limit(30000)

In [14]:
key_gov = ['president', 'senat', 'congress', 'prime minister', 'minister', 'parliament', 'bp', 'white house', 'mayor', 'democrat', 'republican']
key_universities = ['universit(y)?(ies)?', 'colleg(e)?(es)?', 'academy']
key_schools = ['junior high', 'senior high', 'primary school', 'secondary school', 'high school', 'vocational school', 'kindergarten']
key_news = ['times', 'daily', 'post', 'news', 'newspaper', 'reporter', 'journal(ist)?', 'anchor', 'youtube', 'tiktok', 'instagram']
key_ngos = ['non(-)?profit', 'NGO', 'foundation', 'non(-)?profit organization', 'not(-)?for(-)?profit', 'charity', 'nongovernmental organization']
key_influencer = ['singer', 'writer', 'musician', 'actress', 'author', 'entrepreneur', 'actor', 'actress', 'director']

def reg_adder(ls):
    output = []
    if (len(ls) > 0) and type(ls) == list:
        for word in ls:
            regword = '(?i)' + word
            output.append(regword)
        return output

In [15]:
# check if the user name and description is null
orgDF = orgDF.filter(orgDF.user_name.isNotNull() & orgDF.user_description.isNotNull())

# tagging users
orgDF = orgDF.withColumn("organization",
                                        F.when(orgDF.verified_user == 'false', 'Other') \
                                         .when((orgDF.user_description.rlike('|'.join(reg_adder(key_gov)))| (orgDF.user_name.rlike('|'.join(reg_adder(key_gov))))), 'Government_Entities') \
                                         .when((orgDF.user_description.rlike('|'.join(reg_adder(key_schools)))| (orgDF.user_name.rlike('|'.join(reg_adder(key_schools))))), 'Schools') \
                                         .when((orgDF.user_description.rlike('|'.join(reg_adder(key_universities)))| (orgDF.user_name.rlike('|'.join(reg_adder(key_universities))))), 'Universities') \
                                         .when((orgDF.user_description.rlike('|'.join(reg_adder(key_news)))| (orgDF.user_name.rlike('|'.join(reg_adder(key_news))))), 'News_Media') \
                                         .when((orgDF.user_description.rlike('|'.join(reg_adder(key_ngos)))| (orgDF.user_name.rlike('|'.join(reg_adder(key_ngos))))), 'NGOs') \
                                         .when( (orgDF.followers_count >= 100000) | \
                                                (
                                                    (data.user_description.rlike('|'.join(reg_adder(key_influencer))) | data.user_name.rlike('|'.join(reg_adder(key_influencer)))) & (orgDF.followers_count >= 100000)
                                                ), 'Celebrity_Influencer') \
                                         .otherwise('Other') )

Previous Method of Labeling organization

```
org_list = {
            'gov': ['president', 'senat', 'congress', 'prime minister', 'minister', 'parliament', 'bp', 'white house', 'mayor', 'democrat', 'republican'],
            'edu_institution': ['university', 'college', 'school', 'kindergarden', 'jr high', 'sr high', 'lecturer', 'teacher', 'instructor'],
            'scholar': ['phd', 'dr', 'professor', 'researcher', 'doctor'],
            'news_media': ['times', 'daily', 'post', 'news', 'reporter', 'journalist'],
            'celebrity': ['singer', 'writer', 'musician', 'actress', 'author', 'entrepreneur', 'actor', 'actress', 'director']
            }


from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# 1.define a tagger function that tells the user's organization
@F.udf(returnType=StringType())
def orgTagger(verified_user, user_name, description):
    if verified_user == False:
        return 'Other'
    else:
        for org in org_list:
            for keyword in org_list[org]:                
                if (keyword in user_name.lower()) or (keyword in description.lower()):
                    return org
                    continue
                pass
        return 'Unidentified'
                    
```

Implement
```
# check if the user name and description is null
orgDF = orgDF.filter(orgDF.user_name.isNotNull() & orgDF.user_description.isNotNull())

# tagging users
orgDF = orgDF.withColumn("organization",
                                        F.when(orgDF.verified_user == 'false', 'Other') \
                                        .when(orgTagger('verified_user', 'user_name', 'user_description') == 'gov', 'Government_Entities') \
                                        .when(orgTagger('verified_user', 'user_name', 'user_description') == 'edu_institution', 'Educational_Institution') \
                                        .when(orgTagger('verified_user', 'user_name', 'user_description') == 'scholar', 'Scholar') \
                                        .when(orgTagger('verified_user', 'user_name', 'user_description') == 'news_media', 'News_Media') \
                                        .when((orgDF.followers_count >= 100000) | (orgTagger('verified_user', 'user_name', 'user_description') == 'celebrity'), 'Celebrity') \
                                        .when(((orgDF.followers_count >= 10000) & (orgDF.followers_count < 100000)) , 'Influencer')
                                        .otherwise('Other') )
```


In [16]:
%%time

#fill the null value as 'Other'
#orgDF = orgDF.na.fill(value='Other',subset=["Organization"])

# test the code
org_count_DF = orgDF.select(['user_id', 'user_name', 'Organization']).dropDuplicates()

org_count = org_count_DF.groupBy('Organization').count()
org_count

CPU times: user 1.98 ms, sys: 4.46 ms, total: 6.45 ms
Wall time: 78 ms


22/12/07 22:48:06 WARN org.apache.spark.deploy.yarn.YarnAllocator: Cannot find executorId for container: container_1670446778819_0009_01_000010
22/12/07 22:48:52 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 13 for reason Container marked as failed: container_1670446778819_0009_01_000014 on host: hub-msca-bdp-dphub-students-backup-hjiang248-sw-lz5v.c.msca-bdp-students.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.
22/12/07 22:48:52 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 16 for reason Container marked as failed: container_1670446778819_0009_01_000017 on host: hub-msca-bdp-dphub-students-backup-hjiang248-sw-lz5v.c.msca-bdp-students.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.
22/12/07 22:48:52 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 13 on 

Organization,count
NGOs,767
Other,2758011
Government_Entities,3432
Celebrity_Influencer,3301
Schools,185
Universities,2442
News_Media,14587


# Timeseries

The variable `created_at` is a string written as `"created_at": "Wed Oct 10 20:19:24 +0000 2018"`. We need to convert it to a new format as `YYYY-MM-DD`. The process includes:
- spilt `created_at`
- read each truncated element of time
- convert to dateType
- save as `YYYY-MM-DD`

In [17]:
from pyspark.sql.functions import split

In [18]:
time = data.select(['id', 'created_at'])

In [19]:
# split `created_at`
time = time.withColumn('week_day', split(time['created_at'], ' ').getItem(0)) \
           .withColumn('month', split(time['created_at'], ' ').getItem(1)) \
           .withColumn('day', split(time['created_at'], ' ').getItem(2)) \
           .withColumn('time', split(time['created_at'], ' ').getItem(3)) \
           .withColumn('offset', split(time['created_at'], ' ').getItem(4)) \
           .withColumn('year', split(time['created_at'], ' ').getItem(5))

In [20]:
# convert month to standard MM
time = time.withColumn('month_num', from_unixtime(unix_timestamp(col('month'), 'MMM'), 'MM'))

In [21]:
#time.select(['created_at', 'month', 'month_num']).limit(5)

```
# merge the YYYY-MM-DD
time = time.withColumn('date_merge', concat_ws('-', 'year', 'month_num', 'day')) \
           .withColumn('date', to_date(unix_timestamp('date_merge', 'yyyy-mm-dd').cast('timestamp'))) \
           .drop('date_merge')
```

In [22]:
# merge the YYYY-MM-DD
time = time.withColumn('date_merge', concat_ws('-', 'year', 'month_num', 'day')) \
           .withColumn('date', to_date('date_merge', 'yyyy-MM-dd')) \
           .drop('date_merge')

In [23]:
#time.select(['created_at', 'month', 'month_num', 'date']).limit(5)

In [24]:
# remove the rest of time variables
removed_col = ['created_at','week_day', 'month', 'day', 'time', 'offset', 'year', 'month_num']

time = time.select([x for x in time.columns if x not in removed_col])

In [25]:
# check output
#test = time.limit(10).toPandas()
#test

Now we want to join the processed date to the original dataframe

In [26]:
data_joined = orgDF.join(time, 'id', 'left')

[Stage 15:>                                                        (0 + 4) / 25]

In [27]:
#data_joined.select(['created_at', 'id', 'date']).limit(10)

In [28]:
data_joined = data_joined.drop('lang').drop('created_at')

data_joined = data_joined.select(['date',  'id',  'text',  'retweeted',  'favorite_count',  'retweet_count',  'quote_count',  'source_rt_usr_id',  'source_rt_id',  'location',  'country',  
                               'country_code',  'verified_user',  'user_id',  'user_name',  'followers_count',  'user_description',  'organization'])

In [None]:
%%time

data_joined.limit(3)

# Save the results to parquet

In [30]:
# List all files in given COS directory
def list_blobs(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))

    for blob in blobs:
        print(blob.name + '\t' + str(blob.size))

# List all files in given COS directory
def list_blobs_pd(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))

    blob_name = []
    blob_size = []
    
    for blob in blobs:
        blob_name.append(blob.name)
        blob_size.append(blob.size)

    blobs_df = pd.DataFrame(list(zip(blob_name, blob_size)), columns=['Name','Size'])

    blobs_df = blobs_df.style.format({"Size": "{:,.0f}"}) 
    
    return blobs_df        

# Delete folder from COS bucket
def delete_folder(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))

    for blob in blobs:
        blob.delete()

In [31]:
# Saving results into individual bucket, students must update to their own bucket `msca-bdp-students-bucket` and use `CNET ID` as a folder prefix
bucket_write = 'msca-bdp-students-bucket'
folder_write = 'shared_data/hjiang248/final_sdf_v9_formatted_2' # v6 does not include quote status and exclude keyword 'public'

In [32]:
#data_joined.count()

In [34]:
#%%time
#data_joined.write.format('parquet').\
#mode('overwrite').\
#save('gs://' + bucket_write + '/' + folder_write)

                                                                                