In [1]:
import sys
print(sys.version)
print(spark.version)

3.8.15 | packaged by conda-forge | (default, Nov 22 2022, 08:46:39) 
[GCC 10.4.0]
3.1.3


In [2]:
import os
import shutil
import io
import re
import warnings
from datetime import datetime
import pytz
import string

In [3]:
import pandas as pd
import numpy as np
pd.set_option('display.max_colwidth', None)
pd.reset_option('display.max_rows')
from itertools import compress 
from pyspark.sql.functions import *
from pyspark.sql.types import *
import seaborn as sns
import matplotlib.pyplot as plt
warnings.filterwarnings(action='ignore')

In [4]:
import nltk
nltk.download('omw-1.4')
nltk.download('stopwords')
nltk.download('wordnet')
from nltk.corpus import stopwords
from nltk.corpus import wordnet

[nltk_data] Downloading package omw-1.4 to /root/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [5]:
pd.set_option('display.max_colwidth', None)

In [6]:
# Options
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [7]:
def list_blobs(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))

    for blob in blobs:
        print(blob.name + '\t' + str(blob.size))

In [8]:
def list_blobs_pd(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))

    blob_name = []
    blob_size = []
    
    for blob in blobs:
        blob_name.append(blob.name)
        blob_size.append(blob.size)

    blobs_df = pd.DataFrame(list(zip(blob_name, blob_size)), columns=['Name','Size'])

    blobs_df.style.format({"Size": "{:,.0f}"}) 
    
    return blobs_df

In [9]:
def delete_folder(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))

    for blob in blobs:
        blob.delete()

In [10]:
!hadoop fs -ls 'gs://msca-bdp-tweets/final_project/' | head

Found 50696 items
-rwx------   3 root root          0 2023-02-08 13:58 gs://msca-bdp-tweets/final_project/_SUCCESS
-rwx------   3 root root    4500466 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00000-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    4107431 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00001-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    4672123 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00002-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    5186684 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00003-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    4729662 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00004-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    4605529 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00005-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx-----

In [11]:
# Reading data from open bucket, avaible to all students
bucket_read = 'msca-bdp-tweets'

# Saving results into individual bucket, students must update to their own bucket
bucket_write = 'msca-bdp-data-shared'

### Read Data

In [None]:
tweets_original = spark.read.json('gs://msca-bdp-tweets/final_project/')

23/03/03 19:37:31 WARN org.apache.spark.sql.execution.datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
23/03/03 20:07:15 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [13]:
spark = SparkSession.builder.getOrCreate()

##Add "eagerEval.enabled" to beautify the way Spark DF is displayed
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

## To use legacy casting notation for date
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [11]:
#Checking for the total number of records
print(tweets_original.count())



99994342


                                                                                

### Considering only English Tweets

In [14]:
tweets_original = tweets_original.filter(tweets_original.lang == 'en')

In [15]:
tweets_filt = tweets_original\
.withColumn('tweet_text', lower('tweet_text'))\
.withColumn('stripped', regexp_replace(col("tweet_text"),"[\$#,&%\".]",""))

### Filtering out education related tweets

In [16]:
edu_words = ['education', 'educator', 'exam', 'classroom', 'learn', 'degree', 'education news', 'student', 'university', 'college', 'teacher', 
          'professor', 'phd', 'masters degree', 'bachelors degree' 'library', 'research', 'graduate school', 'undergraduate', 'teach', 
         'school', 'training', 'public school', 'educational', 'primary school', 'secondary school', 'higher education', 'tuition', 'courses',
        'exam', 'lecturer', 'lecture', 'educate', 'children', 'studying', 'government school', 'curriculum', 'tuition', 'tutor','grades']

edu_words = [word.lower().replace('_', ' ') for word in list(set(edu_words))]
edu_words

['masters degree',
 'government school',
 'undergraduate',
 'school',
 'grades',
 'children',
 'education',
 'degree',
 'higher education',
 'tutor',
 'educate',
 'education news',
 'lecturer',
 'college',
 'educator',
 'curriculum',
 'student',
 'lecture',
 'training',
 'teach',
 'teacher',
 'tuition',
 'courses',
 'phd',
 'educational',
 'university',
 'secondary school',
 'research',
 'exam',
 'classroom',
 'graduate school',
 'primary school',
 'bachelors degreelibrary',
 'studying',
 'professor',
 'learn',
 'public school']

In [18]:
from pyspark.sql import functions as f
@f.udf(returnType=IntegerType())
def count_words(tweet):
    tweet = list(tweet.split())
    count = 0
    for word in edu_words: # Use list created in the global scope
        count += tweet.count(word)
    return count

In [19]:
tweets_filt = tweets_filt.withColumn('edu_count', count_words('tweet_text'))

In [20]:
# Filter for tweets that contain at least 2 education related words
tweets_filt = tweets_filt.filter(f.col('edu_count') >= 2)

In [21]:
tweets_filt.count()

                                                                                

16114741

### Removing non-educational tweets

In [22]:
removal_words = ['guns', 'fashion', 'gaming', 'makeup', 'shooting', 'sports', 'business', 'gun', 'kill', 'food',
                'news', 'travel', 'killed', 'murder', 'uvalde', 'health', 'shoot', 'deceased', 'movie', 'politics', 'beauty', 'horny',
                'shootings', 'gunned', 'fitness', 'music', 'shopping', 'attack']

In [23]:
regex_removal ='|'.join(["(" + c +")" for c in removal_words])

In [24]:
tweets_filt = tweets_filt.where(~tweets_filt['tweet_text'].rlike(regex_removal))

In [None]:
tweets_filt.count()

23/03/03 20:32:33 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 34 for reason Container marked as failed: container_1677865265526_0008_01_000034 on host: hub-msca-bdp-dphub-students-pranavr569-sw-n2qq.c.msca-bdp-students.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.
23/03/03 20:32:33 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 34 on hub-msca-bdp-dphub-students-pranavr569-sw-n2qq.c.msca-bdp-students.internal: Container marked as failed: container_1677865265526_0008_01_000034 on host: hub-msca-bdp-dphub-students-pranavr569-sw-n2qq.c.msca-bdp-students.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.
23/03/03 20:32:33 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 32 for reason Container marked as failed: container_1677865265526_0008_01_000032 on host: hub

13128895

### Writing the filtered data to disk as a parquet file

In [None]:
tweets_filt.write.format("parquet").\
mode('overwrite').\
save('gs://' + 'msca-bdp-students-bucket/shared_data/pranavr569' + '/tweets_filtered')

                                                                                

In [28]:
!hadoop fs -ls 'gs://msca-bdp-students-bucket/shared_data/pranavr569/tweets_filtered' | head

Found 5742 items
-rwx------   3 root root          0 2023-03-03 21:01 gs://msca-bdp-students-bucket/shared_data/pranavr569/tweets_filtered/_SUCCESS
-rwx------   3 root root    2364455 2023-03-03 20:37 gs://msca-bdp-students-bucket/shared_data/pranavr569/tweets_filtered/part-00000-554fe560-9be0-449a-a51f-ee5129f09842-c000.snappy.parquet
-rwx------   3 root root    2651042 2023-03-03 20:37 gs://msca-bdp-students-bucket/shared_data/pranavr569/tweets_filtered/part-00001-554fe560-9be0-449a-a51f-ee5129f09842-c000.snappy.parquet
-rwx------   3 root root    3018283 2023-03-03 20:37 gs://msca-bdp-students-bucket/shared_data/pranavr569/tweets_filtered/part-00002-554fe560-9be0-449a-a51f-ee5129f09842-c000.snappy.parquet
-rwx------   3 root root    2582534 2023-03-03 20:37 gs://msca-bdp-students-bucket/shared_data/pranavr569/tweets_filtered/part-00003-554fe560-9be0-449a-a51f-ee5129f09842-c000.snappy.parquet
-rwx------   3 root root    2621646 2023-03-03 20:37 gs://msca-bdp-students-bucket/shared_da