### Part -2
#### 1. Apply the filtering and feature reduction methods explored on initially sampled data.
#### 2. Save the filtered and feature reduced data into new parquet files.
#### 3. Make a new sample of 10k records from the filtered data for first hand analysis.

In [1]:
#Ensure we are using the right kernel
import sys
print(sys.version)
print(spark.version)

3.8.15 | packaged by conda-forge | (default, Nov 22 2022, 08:46:39) 
[GCC 10.4.0]
3.1.3


In [20]:
import time
import pyspark
import subprocess

#### Tuning Spark to increase the memory

In [3]:
sc = spark.sparkContext
print('Original spark.driver.maxResultSize: ' + sc._conf.get('spark.driver.maxResultSize'))

# Stop existing Spark environment
sc.stop()

# Waiting for the environment to stop
sleep_time = 10
print(f'Waiting for {sleep_time} seconds for the enviroment to stop...')
time.sleep(sleep_time)

# Applying new configuration and restarting Spark
conf = pyspark.SparkConf().setAll([('spark.driver.maxResultSize', '8g')])
sc = pyspark.SparkContext(conf=conf)

print('New spark.driver.maxResultSize: ' + sc._conf.get('spark.driver.maxResultSize'))

# Starting  Spark session with configs applied
spark = SparkSession(sc).builder.getOrCreate()

Original spark.driver.maxResultSize: 1920m
Waiting for 10 seconds for the enviroment to stop...


23/03/03 20:10:24 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/03/03 20:10:24 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/03/03 20:10:24 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/03/03 20:10:24 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


New spark.driver.maxResultSize: 8g


In [4]:
import pandas as pd
import numpy as np
pd.set_option('display.max_colwidth', None)
pd.reset_option('display.max_rows')
from itertools import compress 
from pyspark.sql.functions import *
from pyspark.sql.types import *
import seaborn as sns
import matplotlib.pyplot as plt
warnings.filterwarnings(action='ignore')
import os
import shutil
# import sh

In [5]:
# !pip uninstall -y nltk
# !pip install nltk --upgrade --no-cache-dir
# %pip install nltk -U

In [6]:
import nltk
# nltk.download('popular', halt_on_error=False)

In [7]:
import re
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.feature import CountVectorizer,  IDF, CountVectorizerModel, Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [8]:
# Display the spark DF in a beautified way
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

## To use legacy casting notation for date
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

### Read the JSON files

In [9]:
%time twitter_raw = spark.read.json('gs://msca-bdp-tweets/final_project')

23/03/03 20:13:43 WARN org.apache.spark.sql.execution.datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
                                                                                

CPU times: user 3.45 s, sys: 659 ms, total: 4.11 s
Wall time: 22min 56s


23/03/03 20:34:04 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


### Selecting the K-12/Education related tweets

In [11]:
## Remove all special characters such as hastags, mentions, etc. 

twit_filt = twitter_raw\
            .filter('user.followers_count > 0')\
            .filter('possibly_sensitive == FALSE or possibly_sensitive is NULL')\
            .filter('withheld_in_countries is NULL')\
            .filter('truncated == "False"')\
            .filter('lang == "en"')\
            .withColumn('text', lower('text'))\
            .withColumn('stripped_text', regexp_replace(col("text"),"[\$#,&%\".]",""))

In [12]:
# Count of the dataframe with just textual filtering before filtering based on any keywords
twit_filt.count()

                                                                                

76837462

In [None]:
twit_filt.describe()

In [13]:
## Dictionary for words similar to 'education/K-12' 

edu_keywords = ['accountability', 'achievement', 'books', 'classes', 'colleges', 'commonly', 'community', 'curriculum', 'degree',
                 'disabilities', 'district', 'districts', 'education', 'educational', 'elementary_school', 'found', 'grade', 'grades',
                 'high_school', 'improve', 'include', 'k-12', 'k12', 'kindergarten', 'learning', 'level', 'local', 'math', 
                 'middle_school', 'parents', 'pre_school', 'primary_school', 'private', 'progress', 'public', 'requirements', 
                 'schools', 'science', 'scores', 'standardized', 'standards', 'state', 'states', 'students', 'systems', 'teachers', 
                 'tuition', 'united', 'years', 'Nursery', 'PhD', 'academic', 'academic conference', 'academic institution', 
                 'academic scholar', 'academic success', 'academic training', 'academics', 'bachelors', 'e learning', 'e-learning',
                 'educational conference', 'educational research', 'educational stress', 'educationalist','grad', 'graduate', 
                 'home work', 'home-work', 'junior college', 'masters', 'masters degree', 'merit scholarships', 'nobel prize',
                 'online education', 'p-12', 'p12','physical education', 'physics', 'post-doctorate', 'post-grad', 'post-graduate', 
                 'postdoctorate', 'postgrad', 'postgraduate', 'professional education', 'professor','research', 'research associate', 
                 'scholarships','study', 'study load', 'studying','syllabus', 'teaching', 'textbook','undergrad', 'undergraduate']


removal_words = ['guns', 'fashion', 'gaming', 'makeup', 'shooting', 'gun', 'kill', 'porn', 'sex', 'die', 'protest', 'tragedy', 
                 'killed', 'murder', 'uvalde', 'health', 'shoot', 'deceased', 'beauty', 'horny', 'shootings', 'gunned',
                 'violen', 'ukraine', 'attack', 'dead', 'slaughter', 'crush', 'victim', 'massacre', 'trans', 'lgbt', 'threat', 'gay']


regex_edu ='|'.join(["(" + c +")" for c in edu_keywords])

regex_removal ='|'.join(["(" + c +")" for c in removal_words])

twit_filt = twit_filt.where(twit_filt['stripped_text'].rlike(regex_edu)).\
where(~twit_filt['stripped_text'].rlike(regex_removal))

In [None]:
# Checking the words which had the most effect on filtering the tweet_text

res = []
for word in edu_keywords:
    count = twit_filt.filter('tweet_text like "%' + word + '%"').count()
    res.append([word, count])

res = sorted(res, key = lambda x:x[1], reverse = True)
res[:20]

In [None]:
#count of the records after filtering based on keywords
twit_filt.count()

                                                                                

24550187

In [16]:
@udf
def get_importance(text):
    global edu_keywords
    words = text.split()
    total_count = 0
    for i in edu_keywords:
        occurance_count = words.count(i)
        total_count += occurance_count

    if(total_count > 1):
        return 1
    else:
        return 0

#On original 24.4 Million
twit_filt_filt = twit_filt.withColumn("important", get_importance("stripped_text"))

In [17]:
%%time
twit_filt_filt = twit_filt_filt.filter("important == 1")
twit_filt_filt.count()

[Stage 10:>                                                         (0 + 1) / 1]

CPU times: user 2.75 s, sys: 849 ms, total: 3.6 s
Wall time: 11min 55s


                                                                                

4814661

In [None]:
#save the filtered file into a parquet
twit_filt_filt.write.format("parquet").\
mode('overwrite').\
save('gs://msca-bdp-students-bucket/shared_data/saikrishnaj/keyword_filtered_data')

                                                                                

In [21]:
path = 'gs://msca-bdp-students-bucket/shared_data/saikrishnaj/keyword_filtered_data/'
cmd = 'hadoop fs -du -s -h ' + path

p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
for line in p.stdout.readlines():
    print (line)
    
retval = p.wait()

8.9 G  8.9 G  gs://msca-bdp-students-bucket/shared_data/saikrishnaj/keyword_filtered_data

