# Big Data Project

#### Dataset mount preparation

###### Define schema

In [None]:
sc = spark.sparkContext

# cancer_file = '/mnt/project/cancer/2022/11/23/00'
rdd = sc.textFile(cancer_file)
rdd.count()

Out[6]: 4694

In [None]:
# cache the inflation since we will use it frequently
cancer.cache()

Out[7]: DataFrame[tweetId: bigint, name: string, username: string, tweet: string, public_metrics: string, location: string, geo: string, created_at: string]

In [None]:
cancer.rdd.getNumPartitions()

Out[8]: 17

In [None]:
cancer.count()

Out[9]: 201401

In [None]:
cancer.show(5)

+-------------------+-----------------+--------------+--------------------+--------------+------------------+----+--------------------+
|            tweetId|             name|      username|               tweet|public_metrics|          location| geo|          created_at|
+-------------------+-----------------+--------------+--------------------+--------------+------------------+----+--------------------+
|1595417110309920768|          Britney| Paris_Dreams_|You probably beli...|           139| LA ✈ NY ✈ HEAVEN |None|Wed Nov 23 14:00:...|
|1595417110393794561|~DiamondPrincess~| AprilCarter07|You probably beli...|            11|              None|None|Wed Nov 23 14:00:...|
|1595417110750330881|           Jayson|CallMeCheez012|You probably beli...|            77|          Bear, DE|None|Wed Nov 23 14:00:...|
|1595417110788079617|  Lauren Hornback|      Lolo_717|You probably beli...|           123|    Louisville, KY|None|Wed Nov 23 14:00:...|
|1595417113258516481| G P Y S Y 🧞‍♀️✨|magicalGyp

#### Data Cleaning

In [None]:
import pyspark.sql.functions as F

In [None]:
# drop duplicates
data_cleaned = cancer.distinct()
data_cleaned.count()

Out[12]: 201217

In [None]:
# drop null/missing values
data_cleaned = data_cleaned.na.drop()
data_cleaned.count()

Out[13]: 200804

In [None]:
# check each column
for i in data_cleaned.columns:
    print(i,data_cleaned.filter(F.col(i).isNull()).count())

tweetId 0
name 0
username 0
tweet 0
public_metrics 0
location 0
geo 0
created_at 0


In [None]:
# Remove URL
# Remove special characters
# Remove usename start with @
# Remove hashtags starting with "#" 
# Substituting multiple spaces with single space
# Lowercase all text
# Trim the leading/trailing whitespaces
data_cleaned = data_cleaned.withColumn('tweet', F.regexp_replace('tweet', r"http\S+", "")) \
                    .withColumn('tweet', F.regexp_replace('tweet', r"[^a-zA-Z]", " ")) \
                    .withColumn('tweet',F.regexp_replace('tweet',r"@\w+"," ")) \
                    .withColumn('tweet',F.regexp_replace('tweet',r"#\w+"," ")) \
                    .withColumn('tweet', F.regexp_replace('tweet', r"\s+", " ")) \
                    .withColumn('tweet', F.lower('tweet')) \
                    .withColumn('tweet', F.trim('tweet')) 
display(data_cleaned)

tweetId,name,username,tweet,public_metrics,location,geo,created_at
1595417147836346370,BrookeG.,buhrooke33,you probably believe you know what pursuits bring you the most more for cancer,367,,,Wed Nov 23 14:01:04 +0000 2022
1595417151934201859,MommyAtHeart,BadAssBitch321,you probably believe you know what pursuits bring you the most more for cancer,68,Melissa,,Wed Nov 23 14:01:05 +0000 2022
1595417216585191426,Christi Lynn,bobbyssister,you probably believe you know what pursuits bring you the most more for cancer,282,Everywhere,,Wed Nov 23 14:01:20 +0000 2022
1595417264941330432,Frank,nobuenochis,you probably believe you know what pursuits bring you the most more for cancer,358,"New Jersey, USA",,Wed Nov 23 14:01:32 +0000 2022
1595417324240408576,Lv_2clutch,Lv_so300,you probably believe you know what pursuits bring you the most more for cancer,498,,,Wed Nov 23 14:01:46 +0000 2022
1595417346843496448,Lala JULY 14 #cancer,That_Poeticgirl,you probably believe you know what pursuits bring you the most more for cancer,189,,,Wed Nov 23 14:01:51 +0000 2022
1595417488158003200,Raaeeee 🥀,QueenRaaaeee_,you probably believe you know what pursuits bring you the most more for cancer,754,,,Wed Nov 23 14:02:25 +0000 2022
1595417606215069696,amy,rejectsamy,you probably believe you know what pursuits bring you the most more for cancer,10,,,Wed Nov 23 14:02:53 +0000 2022
1595417917348368387,Tom Gulitti,TomGulittiNHL,rt amaliebenjamin four year old gordie white was diagnosed with an almost always fatal cancer dipg in february here s my piece on gord,36245,,,Wed Nov 23 14:04:07 +0000 2022
1595418010579513344,Nicole,Nicole_Wienges,you probably believe you know what pursuits bring you the most more for cancer,171,,,Wed Nov 23 14:04:29 +0000 2022


In [None]:
# seams the geo column doesn't have any useful information, could be dropped
display(data_cleaned.filter("geo != 'None'"))

tweetId,name,username,tweet,public_metrics,location,geo,created_at
1595726687270211584,STIGMABASE | EDU,stigmabase,tw emirates news agency a gulf conference discusses plans to improve healthcare for cancer patients barr,984,Worldwide,"{'type': 'Point', 'coordinates': [45.51950705, -73.6197588]}",Thu Nov 24 10:31:04 +0000 2022
1595477715888619528,Moriah Rae Knight,mrsknight2021,on our way to the hospital to get me tested for breast cancer yippee wausau wisconsin,424,"Wausau, WI","{'type': 'Point', 'coordinates': [44.9591, -89.6303]}",Wed Nov 23 18:01:44 +0000 2022
1595794439582404609,"L.Ford, RN~BSN",aSweet_a10tion,you might be moments from discovering your aptitude for a task more for cancer,506,Somewhere,Between MS & LA,
1595432052585037824,"L.Ford, RN~BSN",aSweet_a10tion,you probably believe you know what pursuits bring you the most more for cancer,505,Somewhere,Between MS & LA,
1595522460828372992,TMJ-BUF IT Jobs,tmj_buf_it,our vision is to free our world from the fear pain and loss of cancer one breakthrough discovery at a time join,319,"Buffalo, NY","{'type': 'Point', 'coordinates': [42.8982751, -78.8687846]}",Wed Nov 23 20:59:32 +0000 2022
1595518087792427008,TMJ - BNA IT Jobs,tmj_bna_it,at hca healthcare we are ready to help you take the next step in your career with tuition reimbursement student,396,"Nashville, TN","{'type': 'Point', 'coordinates': [36.1523805, -86.7893889]}",Wed Nov 23 20:42:10 +0000 2022
1595542294798532609,TMJ-BUF IT Adm. Jobs,tmj_BUF_adm,roswell park comprehensive cancer center is hiring in buffalo ny read about our latest it job opening via the l,196,"Buffalo, NY","{'type': 'Point', 'coordinates': [42.8984058, -78.8670861]}",Wed Nov 23 22:18:21 +0000 2022
1595496772784295938,Jordan Bond,j007,checkin the nodes and things sarah cannon cancer institute in overland park ks,858,"Kansas City, MO","{'type': 'Point', 'coordinates': [38.9095189, -94.652077]}",Wed Nov 23 19:17:28 +0000 2022
1595419626627702784,TMJ-BUF IT Adm. Jobs,tmj_BUF_adm,join the roswell park comprehensive cancer center team see our latest it job openings including statistical pro,196,"Buffalo, NY","{'type': 'Point', 'coordinates': [42.8984058, -78.8670861]}",Wed Nov 23 14:10:55 +0000 2022
1595484126726873088,TMJ - BNA IT Jobs,tmj_bna_it,interested in a job in nashville tn this could be a great fit click the link in our bio to apply cancer registr,396,"Nashville, TN","{'type': 'Point', 'coordinates': [36.1523805, -86.7893889]}",Wed Nov 23 18:27:13 +0000 2022


#### Data Preprocessing

In [None]:
!pip install textblob
# !pip install --upgrade pip

Collecting textblob
  Downloading textblob-0.17.1-py2.py3-none-any.whl (636 kB)
[?25l[K     |▌                               | 10 kB 19.7 MB/s eta 0:00:01[K     |█                               | 20 kB 8.9 MB/s eta 0:00:01[K     |█▌                              | 30 kB 11.8 MB/s eta 0:00:01[K     |██                              | 40 kB 5.7 MB/s eta 0:00:01[K     |██▋                             | 51 kB 5.5 MB/s eta 0:00:01[K     |███                             | 61 kB 6.3 MB/s eta 0:00:01[K     |███▋                            | 71 kB 6.7 MB/s eta 0:00:01[K     |████▏                           | 81 kB 6.9 MB/s eta 0:00:01[K     |████▋                           | 92 kB 7.6 MB/s eta 0:00:01[K     |█████▏                          | 102 kB 6.5 MB/s eta 0:00:01[K     |█████▋                          | 112 kB 6.5 MB/s eta 0:00:01[K     |██████▏                         | 122 kB 6.5 MB/s eta 0:00:01[K     |██████▊                         | 133 kB 6.5 MB/s eta 0:0

In [None]:
# create sentiment label

from textblob import TextBlob

# define a function to get sentiment text
def get_sentiment(text):
    '''
    based on different return values, stands for neutral,negative,positive separately
    0 'neutral'
    1 'negative'
    2 'positive'
    '''
    blob = TextBlob(text)
    sentiment = blob.sentiment.polarity
    if sentiment > 0:
        return 2  # 'positive'
    elif sentiment < 0:
        return 1  # 'negative'
    else:
        return 0  # 'neutral'
    
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

#convert function to UDF
get_sentiment_udf = udf(get_sentiment,StringType()) 

In [None]:
# get sentiment label
data_cleaned = data_cleaned.withColumn('label',get_sentiment_udf(F.col('tweet')))

In [None]:
# convert data type
data_cleaned = data_cleaned.withColumn('label',F.col('label').cast('int'))

In [None]:
# clean created_at column, to extract the first part
data_cleaned = data_cleaned.withColumn('time',F.trim(F.split(F.col('created_at'), "\+")[0]))

In [None]:
# drop unneccessary columns
data_cleaned = data_cleaned.drop('tweetId','name','username','location','geo','created_at')

In [None]:
data_cleaned = data_cleaned.withColumn('public_metrics',F.col('public_metrics').cast('int'))

In [None]:
data_cleaned.printSchema()

root
 |-- tweet: string (nullable = true)
 |-- public_metrics: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- time: string (nullable = true)



In [None]:
data_cleaned.rdd.getNumPartitions()

Out[25]: 8

In [None]:
display(data_cleaned)

tweet,public_metrics,label,time
you probably believe you know what pursuits bring you the most more for cancer,367,2,Wed Nov 23 14:01:04
you probably believe you know what pursuits bring you the most more for cancer,68,2,Wed Nov 23 14:01:05
you probably believe you know what pursuits bring you the most more for cancer,282,2,Wed Nov 23 14:01:20
you probably believe you know what pursuits bring you the most more for cancer,358,2,Wed Nov 23 14:01:32
you probably believe you know what pursuits bring you the most more for cancer,498,2,Wed Nov 23 14:01:46
you probably believe you know what pursuits bring you the most more for cancer,189,2,Wed Nov 23 14:01:51
you probably believe you know what pursuits bring you the most more for cancer,754,2,Wed Nov 23 14:02:25
you probably believe you know what pursuits bring you the most more for cancer,10,2,Wed Nov 23 14:02:53
rt amaliebenjamin four year old gordie white was diagnosed with an almost always fatal cancer dipg in february here s my piece on gord,36245,2,Wed Nov 23 14:04:07
you probably believe you know what pursuits bring you the most more for cancer,171,2,Wed Nov 23 14:04:29


#### Feature Engineering

In [None]:
# since all data are from year of 2022 and December, so we will ignore year and month
# clean created_at column to extract date/day of week
# since data are collected from two days, so date will not impact result
# only extract hour
data_ml = data_cleaned.withColumn('hours',F.substring(F.split(F.col('time'),' ')[3],0,2))

In [None]:
data_ml = data_ml.na.fill({'hours': 'unknown'})

In [None]:
data_ml.rdd.getNumPartitions()

Out[29]: 8

In [None]:
# drop time column
data_model = data_ml.select('tweet','label','public_metrics','hours')

In [None]:
# data_ml.printSchema()
data_model.rdd.getNumPartitions()

Out[31]: 8

In [None]:
from pyspark.ml.feature import StringIndexer

#### Train the model

In [None]:
# Using pipline to combine all steps
from pyspark.ml.feature import NGram, VectorAssembler, StopWordsRemover, HashingTF, IDF, Tokenizer, StringIndexer, NGram, ChiSqSelector, VectorAssembler,MinMaxScaler,CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator

# Use 70% cases for training, 30% cases for testing
train, test = data_model.randomSplit([0.7, 0.3], seed=42)

# categorical features
cat_indexers = StringIndexer(inputCol='hours', outputCol= 'hours_index')

# Create transformers for the ML pipeline
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms

# set words group
ngram = NGram(n=2, inputCol="filtered", outputCol="2gram")
ngram_hashingtf = HashingTF(inputCol="2gram", outputCol="2gram_tf", numFeatures=20000)
ngram_idf = IDF(inputCol='2gram_tf', outputCol="2gram_idf", minDocFreq=5) 

# Assemble all text features
assemberInputs = ['hours_index','1gram_idf', '2gram_idf']
assembler = VectorAssembler(inputCols=assemberInputs, outputCol="rawFeatures")

# Chi-square variable selection
selector = ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")

# Regression model estimator
lr = LogisticRegression(maxIter=100)

# Build the pipeline
pipeline = Pipeline(stages=[cat_indexers,tokenizer,stopword_remover,cv,idf,ngram,ngram_hashingtf,ngram_idf,assembler,selector,lr])

In [None]:
# Pipeline model fitting
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

##### Try using different models

In [None]:
# DecesionTree
from pyspark.ml.classification import DecisionTreeClassifier

# Use 70% cases for training, 30% cases for testing
train, test = data_model.randomSplit([0.7, 0.3], seed=42)

# categorical features
cat_indexers = StringIndexer(inputCol='hours', outputCol= 'hours_index')

# Create transformers for the ML pipeline
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms

# set words group
ngram = NGram(n=2, inputCol="filtered", outputCol="2gram")
ngram_hashingtf = HashingTF(inputCol="2gram", outputCol="2gram_tf", numFeatures=20000)
ngram_idf = IDF(inputCol='2gram_tf', outputCol="2gram_idf", minDocFreq=5) 

# Assemble all text features
assemberInputs = ['hours_index','1gram_idf', '2gram_idf']
assembler = VectorAssembler(inputCols=assemberInputs, outputCol="rawFeatures")

# Chi-square variable selection
selector = ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")

# Regression model estimator
dt = DecisionTreeClassifier()

# Build the pipeline
pipeline_dt = Pipeline(stages=[cat_indexers,tokenizer,stopword_remover,cv,idf,ngram,ngram_hashingtf,ngram_idf,assembler,selector,dt])

In [None]:
# Pipeline model fitting
pipeline_model_dt = pipeline_dt.fit(train)
predictions_dt = pipeline_model_dt.transform(test)

In [None]:
# Random Forest
from pyspark.ml.classification import RandomForestClassifier

# Use 70% cases for training, 30% cases for testing
train, test = data_model.randomSplit([0.7, 0.3], seed=42)

# categorical features
cat_indexers = StringIndexer(inputCol='hours', outputCol= 'hours_index')

# Create transformers for the ML pipeline
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms

# set words group
ngram = NGram(n=2, inputCol="filtered", outputCol="2gram")
ngram_hashingtf = HashingTF(inputCol="2gram", outputCol="2gram_tf", numFeatures=20000)
ngram_idf = IDF(inputCol='2gram_tf', outputCol="2gram_idf", minDocFreq=5) 

# Assemble all text features
assemberInputs = ['hours_index','1gram_idf', '2gram_idf']
assembler = VectorAssembler(inputCols=assemberInputs, outputCol="rawFeatures")

# Chi-square variable selection
selector = ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")

# Regression model estimator
rf = RandomForestClassifier()

# Build the pipeline
pipeline_rf = Pipeline(stages=[cat_indexers,tokenizer,stopword_remover,cv,idf,ngram,ngram_hashingtf,ngram_idf,assembler,selector,rf])

In [None]:
# Pipeline model fitting
pipeline_model_rf = pipeline_rf.fit(train)
predictions_rf = pipeline_model_rf.transform(test)

#### model evaluation

In [None]:
aglo_name = ['LogisticRegression','DecisionTreeClassifier','RandomForestClassifier']
predictions_name = [predictions,predictions_dt,predictions_rf]
combine = zip(aglo_name,predictions_name)
for i,j in combine:
    acc = j.filter(j.label == j.prediction).count() / float(test.count())
    ra_curve = evaluator.evaluate(j)
    print('{} Accuracy: {:.2f}'.format(i,acc))
    print('{} ROC_AUC: {:.2f}'.format(i,ra_curve))
    print('*****************************')

LogisticRegression Accuracy: 0.93
LogisticRegression ROC_AUC: 0.93
*****************************
DecisionTreeClassifier Accuracy: 0.62
DecisionTreeClassifier ROC_AUC: 0.53
*****************************
RandomForestClassifier Accuracy: 0.62
RandomForestClassifier ROC_AUC: 0.53
*****************************


In [None]:
# select necessary columns for athena & visulazation
predictions_athena_lr = predictions.select('tweet','label','public_metrics','hours','tokens','filtered','2gram','prediction')
predictions_athena_dt = predictions_dt.select('tweet','label','public_metrics','hours','tokens','filtered','2gram','prediction')
predictions_athena_rf = predictions_rf.select('tweet','label','public_metrics','hours','tokens','filtered','2gram','prediction')

In [None]:
predictions.printSchema()

root
 |-- tweet: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- public_metrics: integer (nullable = true)
 |-- hours: string (nullable = false)
 |-- hours_index: double (nullable = false)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- cv: vector (nullable = true)
 |-- 1gram_idf: vector (nullable = true)
 |-- 2gram: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- 2gram_tf: vector (nullable = true)
 |-- 2gram_idf: vector (nullable = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [None]:
# drop vectors and array data types
lr_pre = predictions.drop('tokens','filtered','cv','1gram_idf','2gram','2gram_tf','2gram_idf','rawFeatures','features','rawPrediction','probability')
dt_pre = predictions_dt.drop('tokens','filtered','cv','1gram_idf','2gram','2gram_tf','2gram_idf','rawFeatures','features','rawPrediction','probability')
rf_pre = predictions_rf.drop('tokens','filtered','cv','1gram_idf','2gram','2gram_tf','2gram_idf','rawFeatures','features','rawPrediction','probability')

In [None]:
rf_pre.printSchema()

root
 |-- tweet: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- public_metrics: integer (nullable = true)
 |-- hours: string (nullable = false)
 |-- hours_index: double (nullable = false)
 |-- prediction: double (nullable = false)



#### save dataset to s3 bucket

In [None]:
# save raw data to bucket
# cancer.write.mode('overwrite').option('header','false').option('delimiter','\t').csv('/mnt/my_bucket/raw_data/')

In [None]:
# save cleaned data to bucket
# data_cleaned.write.mode('overwrite').option('header','false').option('delimiter','\t').csv('/mnt/my_bucket/cleaned_data/')

In [None]:
# save raw_prediction to bucket
# since have vector and array type, save to parquet type
predictions.write.mode('overwrite').option('header','false').option('delimiter','\t').parquet('/mnt/my_bucket/predict_data/ls/')
predictions_dt.write.mode('overwrite').option('header','false').option('delimiter','\t').parquet('/mnt/my_bucket/predict_data/dt/')
predictions_rf.write.mode('overwrite').option('header','false').option('delimiter','\t').parquet('/mnt/my_bucket/predict_data/rf/')

In [None]:
# save filtered prediction to bucket
predictions_athena_lr.write.option('header','false').option('delimiter','\t').parquet('/mnt/my_bucket/predict_data/athena/lr/')
predictions_athena_dt.write.option('header','false').option('delimiter','\t').parquet('/mnt/my_bucket/predict_data/athena/dt/')
predictions_athena_rf.write.option('header','false').option('delimiter','\t').parquet('/mnt/my_bucket/predict_data/athena/rf/')

In [None]:
# save cleaned dataframe to bucket (no vector and array data types)
# save to csv type
lr_pre.write.option('header','false').option('delimiter','\t').csv('/mnt/my_bucket/predict_data/athena/lr_pre/')
dt_pre.write.option('header','false').option('delimiter','\t').csv('/mnt/my_bucket/predict_data/athena/dt_pre/')
rf_pre.write.option('header','false').option('delimiter','\t').csv('/mnt/my_bucket/predict_data/athena/rf_pre/')