# Setting up environment and defining helper functions

In [1]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
import os
import subprocess
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
import sys
print(sys.version)
print(spark.version)
import pandas as pd
pd.set_option('display.max_colwidth', None)
pd.reset_option('display.max_rows')
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt

from itertools import compress 
import re
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.feature import CountVectorizer,  IDF, CountVectorizerModel, Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row

%pip install nltk -U
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

3.8.13 | packaged by conda-forge | (default, Mar 25 2022, 06:04:10) 
[GCC 10.3.0]
3.1.3
[0mNote: you may need to restart the kernel to use updated packages.


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [4]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [5]:
from google.cloud import storage

def list_blobs(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))

    for blob in blobs:
        print(blob.name + '\t' + str(blob.size))
        
        
# Reading data from open bucket, avaible to all students
bucket_read = 'msca-bdp-tweets'
folder_read = 'final_project'

# Saving results into individual bucket, students must update to their own bucket `msca-bdp-students-bucket` and use `CNET ID` as a folder prefix
bucket_write = 'msca-bdp-students-bucket'
folder_write = 'shared_data/kaitongh/filtered_dataset'

## Step 1: Loading the data

In [None]:
%%time

path = 'gs://msca-bdp-tweets/final_project/'

tweets_df = spark.read.json(path)

22/12/07 01:57:46 WARN org.apache.spark.sql.execution.datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
                                                                                

CPU times: user 3.51 s, sys: 871 ms, total: 4.38 s
Wall time: 20min 55s


22/12/07 02:14:52 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


#### Checking the schema 

In [11]:
# tweets_df.printSchema()

## Step 2: Data Cleansing

In [7]:
#### Filtering out tweets that have no text

tweets_df = tweets_df.filter(tweets_df.text.isNotNull())

#### Selecting relevant columns 

In [8]:
tweets_df.limit(1)

22/12/02 19:58:26 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 35 for reason Container marked as failed: container_1670009274515_0001_01_000037 on host: hub-msca-bdp-dphub-students-kaitongh-sw-rk52.c.msca-bdp-students.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.
22/12/02 19:58:26 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 37 for reason Container marked as failed: container_1670009274515_0001_01_000039 on host: hub-msca-bdp-dphub-students-kaitongh-sw-rk52.c.msca-bdp-students.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.
22/12/02 19:58:26 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 35 on hub-msca-bdp-dphub-students-kaitongh-sw-rk52.c.msca-bdp-students.internal: Container marked as failed: container_1670009274515_0001_01_000037 on host: hub-msca-

coordinates,created_at,display_text_range,entities,extended_entities,extended_tweet,favorite_count,favorited,filter_level,geo,id,id_str,in_reply_to_screen_name,in_reply_to_status_id,in_reply_to_status_id_str,in_reply_to_user_id,in_reply_to_user_id_str,is_quote_status,lang,place,possibly_sensitive,quote_count,quoted_status,quoted_status_id,quoted_status_id_str,quoted_status_permalink,quoted_text,reply_count,retweet_count,retweeted,retweeted_from,retweeted_status,source,text,timestamp_ms,truncated,tweet_text,user,withheld_copyright,withheld_in_countries
,Tue Oct 04 01:43:...,,"{[], null, [], []...",,,0,False,low,,1577112036168736768,1577112036168736768,,,,,,False,en,,,0,,,,,,0,0,RT,BulletinSport,"{null, Mon Oct 03...","<a href=""http://t...",RT @BulletinSport...,1664847785404,False,The Martinsville ...,"{false, Tue Jan 2...",,


#### EDA

In [None]:
### checking the retweet_count column
### there is only 0 in this column, we can't use this to calculate the total number of retweets
tweets_df.select('retweet_count').distinct().limit(5)

                                                                                

retweet_count
0


In [10]:
### limiting the sample size for faster eda 
sub = tweets_df.limit(100000).toPandas()

                                                                                

In [11]:
sub.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 40 columns):
 #   Column                     Non-Null Count   Dtype  
---  ------                     --------------   -----  
 0   coordinates                34 non-null      object 
 1   created_at                 100000 non-null  object 
 2   display_text_range         12722 non-null   object 
 3   entities                   100000 non-null  object 
 4   extended_entities          3806 non-null    object 
 5   extended_tweet             15603 non-null   object 
 6   favorite_count             100000 non-null  int64  
 7   favorited                  100000 non-null  bool   
 8   filter_level               100000 non-null  object 
 9   geo                        34 non-null      object 
 10  id                         100000 non-null  int64  
 11  id_str                     100000 non-null  object 
 12  in_reply_to_screen_name    11894 non-null   object 
 13  in_reply_to_status_id      115

In [13]:
### checking the retweeted_status column 
sub['retweeted_status'].head(2)

0    (None, Tue May 24 21:54:24 +0000 2022, [0, 140], ([], None, [], [Row(display_url='twitter.com/i/web/status/1…', expanded_url='https://twitter.com/i/web/status/1529219289005400064', indices=[117, 140], url='https://t.co/7XLz71jM8u')], []), None, ([0, 279], ([], [Row(additional_media_info=Row(description=None, embeddable=None, monetizable=False, title=None), description=None, display_url='pic.twitter.com/9fkJ13vWGd', expanded_url='https://twitter.com/ABC/status/1529219289005400064/video/1', id=1529219222970281984, id_str='1529219222970281984', indices=[280, 303], media_url='http://pbs.twimg.com/amplify_video_thumb/1529219222970281984/img/rr2UoSQsvksDKCP-.jpg', media_url_https='https://pbs.twimg.com/amplify_video_thumb/1529219222970281984/img/rr2UoSQsvksDKCP-.jpg', sizes=Row(large=Row(h=540, resize='fit', w=540), medium=Row(h=540, resize='fit', w=540), small=Row(h=540, resize='fit', w=540), thumb=Row(h=150, resize='crop', w=150)), source_status_id=None, source_status_id_str=None, sou

In [16]:
tweets_df.select('retweeted_status.retweet_count').limit(5)

                                                                                

retweet_count
48.0
5.0
""
36.0
4.0


In [18]:
sub[['text','retweeted_from','retweeted']].head(20)

Unnamed: 0,text,retweeted_from,retweeted
0,"RT @ABC: “Why are you here?!""\n\nA furious Sen. Chris Murphy demands answers from senators following Texas school shooting.\n\n“Why do you spen…",ABC,RT
1,Indiana High School Softball | Live Streaming\nTBA vs Fishers\nEastbrook vs Oak Hill\nTBA vs Wabash\nSouth Newton vs Cl… https://t.co/QvvQQUUOG8,,
2,"RT @jaketapper: Fifteen have been killed in a shooting at Robb elementary school, according to Governor Greg Abbott — 14 students and 1 te…",jaketapper,RT
3,#Uvalde is just another reason why you should home school your kids. Exposing your kids to the degenerate version o… https://t.co/KiEpSzUJtf,,
4,"RT @Josh_Moon: 14 dead elementary school kids. 14. Not from CRT. Or ""woke"" history. Which red states took great pains to ban. But from our…",Josh_Moon,RT
5,RT @Jim_Jordan: Last week we learned:\n\n-Hillary Clinton ordered the dissemination of the Trump/Alfa Bank hoax.\n\n-President Biden thanked NS…,Jim_Jordan,RT
6,RT @meganbang3: My son was not allowed to walk in his high school graduation yesterday because of his beaded cap and eagle feather. They wa…,meganbang3,RT
7,"RT @jewishaction: We're horrified by the news of the shooting at Robb Elementary School in Uvalde, Texas.\n\n14 children and one teacher murd…",jewishaction,RT
8,RT @LRiddickESPN: Uh…question …WTF are those of you “in charge” that can make significant change happen in the name of safety ACTUALLY DOIN…,LRiddickESPN,RT
9,@TheUSASingers My daughter had a lockdown incident today at her school today hours before what happened in Texas😩 M… https://t.co/GdKqjKVjb2,,


In [25]:
### checking the lang column
### only 'en' is in the column, drop this 
sub['lang'].unique()

array(['en'], dtype=object)

In [26]:
### checking possible text columns for the tweet content 
### text and tweet_text are basically the same, except that text also contains the retweeted account. 
### I will keep only the tweet_text column
sub[['text', 'tweet_text']].head(10)

Unnamed: 0,text,tweet_text
0,"RT @ABC: “Why are you here?!""\n\nA furious Sen. Chris Murphy demands answers from senators following Texas school shooting.\n\n“Why do you spen…","“Why are you here?!""\n\nA furious Sen. Chris Murphy demands answers from senators following Texas school shooting.\n\n“Why do you spend all this time running for the United States Senate...if your answer, is as the slaughter increases, as our kids run for their lives—we do nothing?” https://t.co/9fkJ13vWGd"
1,Indiana High School Softball | Live Streaming\nTBA vs Fishers\nEastbrook vs Oak Hill\nTBA vs Wabash\nSouth Newton vs Cl… https://t.co/QvvQQUUOG8,Indiana High School Softball | Live Streaming\nTBA vs Fishers\nEastbrook vs Oak Hill\nTBA vs Wabash\nSouth Newton vs Clinton Central\nTBA vs Muncie Central\nTBA vs Shakamak\nWatch Live Here : https://t.co/WLu5HSoUK6\ntoday @ 7p https://t.co/epcCQgjoSb
2,"RT @jaketapper: Fifteen have been killed in a shooting at Robb elementary school, according to Governor Greg Abbott — 14 students and 1 te…","Fifteen have been killed in a shooting at Robb elementary school, according to Governor Greg Abbott — 14 students and 1 teacher. \n \nAbbott said the shooter is also deceased."
3,#Uvalde is just another reason why you should home school your kids. Exposing your kids to the degenerate version o… https://t.co/KiEpSzUJtf,#Uvalde is just another reason why you should home school your kids. Exposing your kids to the degenerate version of America the left has created is almost child abuse at this point.
4,"RT @Josh_Moon: 14 dead elementary school kids. 14. Not from CRT. Or ""woke"" history. Which red states took great pains to ban. But from our…","14 dead elementary school kids. 14. Not from CRT. Or ""woke"" history. Which red states took great pains to ban. But from our infatuation with goddamn guns. How is it possible that our children dying at school isn't too much?"
5,RT @Jim_Jordan: Last week we learned:\n\n-Hillary Clinton ordered the dissemination of the Trump/Alfa Bank hoax.\n\n-President Biden thanked NS…,Last week we learned:\n\n-Hillary Clinton ordered the dissemination of the Trump/Alfa Bank hoax.\n\n-President Biden thanked NSBA for writing a letter targeting parents at school board meetings.\n\n-And Democrats think men can get pregnant.\n\nCan’t make it up.
6,RT @meganbang3: My son was not allowed to walk in his high school graduation yesterday because of his beaded cap and eagle feather. They wa…,My son was not allowed to walk in his high school graduation yesterday because of his beaded cap and eagle feather. They wanted him to put on a plain one and he said nope not walking then. I’m so proud of him and furious at the same time. https://t.co/wbIy5gjjnX
7,"RT @jewishaction: We're horrified by the news of the shooting at Robb Elementary School in Uvalde, Texas.\n\n14 children and one teacher murd…","We're horrified by the news of the shooting at Robb Elementary School in Uvalde, Texas.\n\n14 children and one teacher murdered. Multiple others injured and traumatized.\n\nWe must act to end gun violence. Our children and our teachers — and all of us — deserve to live and thrive."
8,RT @LRiddickESPN: Uh…question …WTF are those of you “in charge” that can make significant change happen in the name of safety ACTUALLY DOIN…,Uh…question …WTF are those of you “in charge” that can make significant change happen in the name of safety ACTUALLY DOING help make it so our children don’t DIE BY GETTING SHOT in school! WHAT ARE YOU ACTUALLY DOING ABOUT IT?!?!
9,@TheUSASingers My daughter had a lockdown incident today at her school today hours before what happened in Texas😩 M… https://t.co/GdKqjKVjb2,"@TheUSASingers My daughter had a lockdown incident today at her school today hours before what happened in Texas😩 My daughter asked me why in our neighborhood. I sadly had to tell her it’s not about black or white, rich or poor neighborhoods it’s ALL OF Americas problem"


In [28]:
tweets_df.select(['user.name', 'user.screen_name']).limit(10)

                                                                                

name,screen_name
shiaoma,shiaoma
High School Sports,Gabriel50407921
FullyDedicated2Thee,2Short2Sweet
Knowledge And Faith,LBR_TY
✨noelain✨,disneymama0113
Earthangel,Earthangel7558
Salvadorean Poodl...,Lizzy9839
la perra ivette,Guti_0730
Alex Calix,jacalix90
Lorraine L. Hayden,Ms_Raine


In [8]:
df = tweets_df.select([tweets_df.created_at,
                      tweets_df.id.alias('tweet_id'),
                      tweets_df.tweet_text,
                      tweets_df.user['id'].alias('user_id'),
                      tweets_df.user['name'].alias('user_name'),
                      tweets_df.user['screen_name'].alias('user_screen_name'),
                      tweets_df.user['location'].alias('user_location'),
                      tweets_df.user['description'].alias('user_description'),
                      tweets_df.place['country'].alias('tweet_country'),
                      tweets_df.place['full_name'].alias('tweet_location'),
                      tweets_df.retweeted_status['retweet_count'],
                      tweets_df.retweeted_from,
                      tweets_df.retweeted,
                      tweets_df.favorite_count,
                      tweets_df.timestamp_ms])

In [9]:
#### Checking the schema of this filtered dataset

df.printSchema()

root
 |-- created_at: string (nullable = true)
 |-- tweet_id: long (nullable = true)
 |-- tweet_text: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_screen_name: string (nullable = true)
 |-- user_location: string (nullable = true)
 |-- user_description: string (nullable = true)
 |-- tweet_country: string (nullable = true)
 |-- tweet_location: string (nullable = true)
 |-- retweeted_status.retweet_count: long (nullable = true)
 |-- retweeted_from: string (nullable = true)
 |-- retweeted: string (nullable = true)
 |-- favorite_count: long (nullable = true)
 |-- timestamp_ms: string (nullable = true)



In [31]:
df.limit(3)

                                                                                

created_at,tweet_id,tweet_text,user_id,user_name,user_screen_name,user_location,user_description,tweet_country,tweet_location,retweeted_status.retweet_count,retweeted_from,retweeted,favorite_count
Wed May 25 00:43:...,1529261786217689090,Get involved. Get...,241666780,Rachel Heine,RachelHeine,"Los Angeles, CA",🧛🏻‍♀️ vp of bra...,,,16,shannonrwatts,RT,0
Wed May 25 00:43:...,1529261786247069697,A westerner will ...,1356163969594097665,jameel Khan,jameelK60094344,,Nothing hurts a g...,,,2901,lapatina_,RT,0
Wed May 25 00:43:...,1529261786087800833,A gun did not gro...,1518737390386524164,Mary Ross,beagle0318,,,,,1138,ChristianWalk1r,RT,0


In [13]:
#### To see how many rows we have
df.count()

                                                                                

99992797

#### Analyzing text data to find the most frequent keywords
##### I will use ~10% of the full dataset in this procedure 

In [38]:
text = df.limit(10000000)

text_rdd = text.rdd.map(lambda x : x['text']).filter(lambda x: x is not None)

#### Removing stopwords in the text field 

StopWords = stopwords.words("english")

tokens = text_rdd\
             .map( lambda document: document.strip().lower())\
             .map( lambda document: re.split(" ", document))\
             .map( lambda word: [x for x in word if x.isalnum()])\
             .map( lambda word: [x for x in word if x not in StopWords])\
             .map( lambda word: [x for x in word if len(x) > 3] ) \
             .map( lambda word: ' '.join(word))

In [15]:
tokens.take(3)

                                                                                

['really believe kids really worry safe school crazy crazy',
 'school shootings worried ridiculous',
 'teacher mireles killed along least students shooting elementary school']

In [39]:
%%time

wordCounts = tokens.flatMap(lambda text: text.split(' ')) \
                    .map(lambda word: (word, 1)) \
                    .reduceByKey(lambda a, b: a+b)


wordCountsSorted = wordCounts.map(lambda x:(x[1],x[0])).sortByKey(ascending=False)

CPU times: user 15.8 ms, sys: 0 ns, total: 15.8 ms
Wall time: 81.9 ms


In [None]:
wordCountsSorted.take(50)

22/12/04 00:55:35 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 42.0 (TID 69744) (hub-msca-bdp-dphub-students-kaitongh-w-1.c.msca-bdp-students.internal executor 73): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1558, in __getitem__
    idx = self.__fields__.index(item)
ValueError: 'text' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 594, in process
    out_iter = func(split_index, iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 42.0 failed 10 times, most recent failure: Lost task 0.9 in stage 42.0 (TID 69753) (hub-msca-bdp-dphub-students-kaitongh-sw-z9wt.c.msca-bdp-students.internal executor 71): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1558, in __getitem__
    idx = self.__fields__.index(item)
ValueError: 'text' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 594, in process
    out_iter = func(split_index, iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 418, in func
    return f(iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2144, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 240, in mergeValues
    for k, v in iterator:
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_8954/2292434514.py", line 3, in <lambda>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1563, in __getitem__
    raise ValueError(item)
ValueError: text

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:134)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2304)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2252)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2252)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2491)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2433)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2422)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2244)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1558, in __getitem__
    idx = self.__fields__.index(item)
ValueError: 'text' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 594, in process
    out_iter = func(split_index, iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 418, in func
    return f(iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2144, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 240, in mergeValues
    for k, v in iterator:
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_8954/2292434514.py", line 3, in <lambda>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1563, in __getitem__
    raise ValueError(item)
ValueError: text

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:134)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In the 50 words above, 12 of them are clearly related to education. Which are:

school /college /university /student /kids /professor /children /public /teacher /education /elementary /class 


#### Discarding irrelevant tweets
Irrelevant tweets include those that do not contain the most frequent keywords identified above 

In [10]:
#### converting the text in the text column to lowercase
df = df.withColumn('tweet_text', lower(col('tweet_text')))

In [11]:
filtered_df = df.filter(df.tweet_text.contains('school')\
                       |df.tweet_text.contains('college')\
                       |df.tweet_text.contains('university')\
                       |df.tweet_text.contains('kids')\
                       |df.tweet_text.contains('professor')\
                       |df.tweet_text.contains('children')\
                       |df.tweet_text.contains('public')\
                       |df.tweet_text.contains('teacher')\
                       |df.tweet_text.contains('education')\
                       |df.tweet_text.contains('elementary')\
                       |df.tweet_text.contains('class')\
                       |df.tweet_text.contains('student'))

In [None]:
filtered_df.count()

                                                                                

99100416

In [None]:
filtered_df.write.format('parquet').\
mode('overwrite').\
save('gs://' + bucket_write + '/' + folder_write)

                                                                                

In [34]:
#list_blobs(bucket_name=bucket_write, folder_name=folder_write)