# Preparing reddit data using Spark for Machine Learning

# Project Group #27
#### Clara Richter, Elise Rust, Yujia Jin
##### ANLY 502
##### Project Deliverable #3
#####Dec 5, 2022

The original dataset for this notebook is described in [The Pushshift Reddit Dataset](https://arxiv.org/pdf/2001.08435.pdf) paper.

In [0]:
## Load necessary packages
#import findspark
#findspark.init()
import pandas as pd
import numpy as np
import json
import pyspark.sql.functions as f
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StringType,BooleanType,DateType
from pyspark.sql import SparkSession
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler, VectorIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, Model



In [0]:
spark = SparkSession.builder.appName("reddit").getOrCreate()

In [0]:
spark

### Load in Data

In [0]:
## Read in sentment dataframes again
## 1) Comments
comments_sentiment = spark.read.parquet("dbfs:/tmp/out/com_kpis_sent.parquet")

## 2) Submissions
titles_sentiment = spark.read.parquet("dbfs:/tmp/out/sub_kpis_sent.parquet")

### View data and ensure cleanliness

In [0]:
comments_sentiment.show(5)

+-------------+--------------------+-------+-----+---------+-------------------+----+-----+----+-----------+--------------+------------+----------------+-------------+-------------+---------------+-----------+-------+----------+--------+-------------------+---------+
|       author|                body|link_id|score|subreddit|               time|year|month|hour|time_of_day|comment_length|dummy_police|dummy_healthcare|dummy_climate|dummy_economy|dummy_education|dummy_trump|    CPI|Unemp_Rate|     DOW|               date|sentiment|
+-------------+--------------------+-------+-----+---------+-------------------+----+-----+----+-----------+--------------+------------+----------------+-------------+-------------+---------------+-----------+-------+----------+--------+-------------------+---------+
| ThatGuy_Gary|Geez.\n\nIf they ...| mkgo9d|    2|democrats|2021-04-05 18:57:57|2021|    4|  18|  Afternoon|           108|       false|           false|        false|        false|          false

In [0]:
titles_sentiment.show(5)

+---------+--------------------+-----+------------+------+-------------------+----+-----+----+-----------+------------+-------------+------------+----------------+-------------+-------------+---------------+-----------+-----+----------+--------+-------------------+---------+
|subreddit|               title|score|num_comments|    id|               time|year|month|hour|time_of_day|title_length|election_year|dummy_police|dummy_healthcare|dummy_climate|dummy_economy|dummy_education|dummy_trump|  CPI|Unemp_Rate|     DOW|               date|sentiment|
+---------+--------------------+-----+------------+------+-------------------+----+-----+----+-----------+------------+-------------+------------+----------------+-------------+-------------+---------------+-----------+-----+----------+--------+-------------------+---------+
|democrats|Nancy Pelosi's ho...|    1|           1|kopbry|2021-01-02 02:58:53|2021|    1|   2|      Night|         132|        false|       false|           false|        f

In [0]:
# Print schema and get datatypes
print(comments_sentiment.printSchema())
print(titles_sentiment.printSchema())

root
 |-- author: string (nullable = true)
 |-- body: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- score: long (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- year: long (nullable = true)
 |-- month: long (nullable = true)
 |-- hour: long (nullable = true)
 |-- time_of_day: string (nullable = true)
 |-- comment_length: long (nullable = true)
 |-- dummy_police: boolean (nullable = true)
 |-- dummy_healthcare: boolean (nullable = true)
 |-- dummy_climate: boolean (nullable = true)
 |-- dummy_economy: boolean (nullable = true)
 |-- dummy_education: boolean (nullable = true)
 |-- dummy_trump: boolean (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemp_Rate: double (nullable = true)
 |-- DOW: double (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- sentiment: string (nullable = true)

None
root
 |-- subreddit: string (nullable = true)
 |-- title: string (nullable = true)
 |-- score: long (

#### Subset relevant ML columns

Our analysis leads us to use the comments dataframe to try to do 'subreddit' classification and the titles dataframe to try to do 'CPI' and 'DOW' regression. Given the results from the NLP analysis, there are more distinct differences across text and other variables for subreddits amongst the comments; thus, subreddit classification via comments makes more analytical sense. Additionally, the titles dataframe consists mostly of headlines which may offer insight into economic trends such as inflation and stock market crashes in regression analysis.

In [0]:
# select columns for ML regression for KPIs from posts
cols = ['DOW', 'CPI', 'Unemp_Rate', 'title']

titles_KPI = titles_sentiment.select(cols).na.drop() # Drop NAs
titles_KPI.show(5)

+--------+-----+----------+--------------------+
|     DOW|  CPI|Unemp_Rate|               title|
+--------+-----+----------+--------------------+
|29982.62|262.2|       6.4|Nancy Pelosi's ho...|
|29982.62|262.2|       6.4|"This Whole Thing...|
|29982.62|262.2|       6.4|Chaos on Capitol ...|
|29982.62|262.2|       6.4|Right-wing extrem...|
|29982.62|262.2|       6.4|Here are six vide...|
+--------+-----+----------+--------------------+
only showing top 5 rows



In [0]:
# select columns for ML predicting subreddit from comments
cols = ['subreddit', 'body', ]

com_subred = comments_sentiment.select(cols).na.drop() # Drop NAs
com_subred.show(5)

+---------+--------------------+
|subreddit|                body|
+---------+--------------------+
|democrats|Geez.\n\nIf they ...|
|democrats|Wow! I didn't kno...|
|democrats|We need more than...|
|democrats|The Republican pa...|
|democrats|It is their own u...|
+---------+--------------------+
only showing top 5 rows



In [0]:
com_subred.groupBy("subreddit").count().show()

+----------+------+
| subreddit| count|
+----------+------+
|Republican| 40710|
|  politics|354747|
| democrats| 45409|
+----------+------+



In [0]:
rep_group = com_subred.filter(com_subred.subreddit=="Republican")
pol_group = com_subred.filter(com_subred.subreddit=="politics").limit(40710)
dem_group = com_subred.filter(com_subred.subreddit=="democrats").limit(40710)

In [0]:
# join all
com_subred = rep_group.union(pol_group)
com_subred = com_subred.union(dem_group)
com_subred.groupBy("subreddit").count().show()

+----------+-----+
| subreddit|count|
+----------+-----+
| democrats|40710|
|  politics|40710|
|Republican|40710|
+----------+-----+



In [0]:
com_subred.show(5)

+----------+--------------------+
| subreddit|                body|
+----------+--------------------+
|Republican|It's hard to be a...|
|Republican|  What a dumb bitch.|
|Republican|This is like an e...|
|Republican|Thank God for our...|
|Republican|    I love my state.|
+----------+--------------------+
only showing top 5 rows



In [0]:
com_subred_pd = com_subred.toPandas()

In [0]:
# select columns for ML predicting subreddit from titles
cols = ['subreddit', 'title']

title_subred = titles_sentiment.select(cols).na.drop() # Drop NAs
title_subred.show(5)

+---------+--------------------+
|subreddit|               title|
+---------+--------------------+
|democrats|Nancy Pelosi's ho...|
|democrats|"This Whole Thing...|
|democrats|Chaos on Capitol ...|
|democrats|Right-wing extrem...|
|democrats|Here are six vide...|
+---------+--------------------+
only showing top 5 rows



In [0]:
# check the number of each subreddit
title_subred.groupBy("subreddit").count().show()

+----------+-----+
| subreddit|count|
+----------+-----+
|Republican|13738|
|  politics|14715|
| democrats|13561|
+----------+-----+



In [0]:
rep_title = title_subred.filter(title_subred.subreddit=="Republican").limit(13561)
pol_title = title_subred.filter(title_subred.subreddit=="politics").limit(13561)
dem_title = title_subred.filter(title_subred.subreddit=="democrats")

In [0]:
# join all
title_subred = rep_title.union(pol_title)
title_subred = title_subred.union(dem_title)
title_subred.groupBy("subreddit").count().show()

+----------+-----+
| subreddit|count|
+----------+-----+
| democrats|13561|
|Republican|13561|
|  politics|13561|
+----------+-----+



In [0]:
title_subred_pd = title_subred.toPandas()

### Data Preparation for Modeling

* Remaining feature transformations and additional textual processing steps

##### Contained in Pipeline in ML Notebook
* ML transformations (string indexer and vectorizer)
* Split data into training and testing data

##### Remaining Textual Processing: Tokenize text

In [0]:
!pip install nltk
import re
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')
stop_words = stopwords.words('english')
from nltk.stem.snowball import SnowballStemmer
st = SnowballStemmer('english')

Collecting nltk
  Downloading nltk-3.7-py3-none-any.whl (1.5 MB)
[?25l[K     |▏                               | 10 kB 38.7 MB/s eta 0:00:01[K     |▍                               | 20 kB 12.8 MB/s eta 0:00:01[K     |▋                               | 30 kB 17.9 MB/s eta 0:00:01[K     |▉                               | 40 kB 10.0 MB/s eta 0:00:01[K     |█                               | 51 kB 11.2 MB/s eta 0:00:01[K     |█▎                              | 61 kB 13.2 MB/s eta 0:00:01[K     |█▌                              | 71 kB 11.2 MB/s eta 0:00:01[K     |█▊                              | 81 kB 9.3 MB/s eta 0:00:01[K     |██                              | 92 kB 10.3 MB/s eta 0:00:01[K     |██▏                             | 102 kB 10.9 MB/s eta 0:00:01[K     |██▍                             | 112 kB 10.9 MB/s eta 0:00:01[K     |██▋                             | 122 kB 10.9 MB/s eta 0:00:01[K     |██▉                             | 133 kB 10.9 MB/s eta 0:00:01

In [0]:
### Define function data_clean(text) to automate this 
def data_clean(text):
    # change to lower and remove spaces on either side
    cleaned_text = text.apply(lambda x: x.lower().strip())

    # remove extra spaces in between
    cleaned_text = cleaned_text.apply(lambda x: re.sub(' +', ' ', x))

    # remove punctuation
    cleaned_text = cleaned_text.apply(lambda x: re.sub('[^a-zA-Z]', ' ', x))

    # remove stopwords and get the stem
    cleaned_text = cleaned_text.apply(lambda x: ' '.join(st.stem(text) for text in x.split() if text not in stop_words))

    return cleaned_text
    

# Clean comments
com_subred_pd['body'] = data_clean(com_subred_pd['body'])
title_subred_pd['title'] = data_clean(title_subred_pd['title'])

In [0]:
title_subred_pd

Unnamed: 0,subreddit,title
0,Republican,biden border crisi grow florida congressman re...
1,Republican,democrat prevail new mexico special elect
2,Republican,texa governor sign bill prohibit school teach ...
3,Republican,judg slap capitol rioter slap blm member wors
4,Republican,check shirt neighbor made wear march downtown
...,...,...
40678,democrats,passeng tell dc terrorist get plane usernamech...
40679,democrats,leader far right proud boy arrest washington
40680,democrats,donald trump impeach second time
40681,democrats,lunaci made laugh pretti hard parti echo chamb...


In [0]:
com_subred_pd

Unnamed: 0,subreddit,body
0,Republican,hard centrist parti consid centrist white supr...
1,Republican,dumb bitch
2,Republican,like episod black mirror wtf go administr clea...
3,Republican,thank god governor ron di santi florida bruh g...
4,Republican,love state
...,...,...
122125,democrats,republican parti doom day nomin trump argument
122126,democrats,big divid happen trump allow run republican ow...
122127,democrats,take back tomorrow demand fan take capitol build
122128,democrats,matter would anyon take serious think bit late


In [0]:
# https://datascience-enthusiast.com/Python/PySpark_ML_with_Text_part1.html
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="title", outputCol="words")
words_kpi = tokenizer.transform(titles_KPI)
words_kpi.show(5)

+--------+-----+----------+--------------------+--------------------+
|     DOW|  CPI|Unemp_Rate|               title|               words|
+--------+-----+----------+--------------------+--------------------+
|29982.62|262.2|       6.4|Nancy Pelosi's ho...|[nancy, pelosi's,...|
|29982.62|262.2|       6.4|"This Whole Thing...|["this, whole, th...|
|29982.62|262.2|       6.4|Chaos on Capitol ...|[chaos, on, capit...|
|29982.62|262.2|       6.4|Right-wing extrem...|[right-wing, extr...|
|29982.62|262.2|       6.4|Here are six vide...|[here, are, six, ...|
+--------+-----+----------+--------------------+--------------------+
only showing top 5 rows



In [0]:
from pyspark.ml.feature import CountVectorizer

count = CountVectorizer(inputCol="words", outputCol="rawFeatures")
model = count.fit(words_kpi)
featurizedData_kpi = model.transform(words_kpi)
featurizedData_kpi.show(5)

+--------+-----+----------+--------------------+--------------------+--------------------+
|     DOW|  CPI|Unemp_Rate|               title|               words|         rawFeatures|
+--------+-----+----------+--------------------+--------------------+--------------------+
|29982.62|262.2|       6.4|Nancy Pelosi's ho...|[nancy, pelosi's,...|(53194,[1,4,6,11,...|
|29982.62|262.2|       6.4|"This Whole Thing...|["this, whole, th...|(53194,[0,3,4,6,1...|
|29982.62|262.2|       6.4|Chaos on Capitol ...|[chaos, on, capit...|(53194,[4,5,7,29,...|
|29982.62|262.2|       6.4|Right-wing extrem...|[right-wing, extr...|(53194,[0,1,5,12,...|
|29982.62|262.2|       6.4|Here are six vide...|[here, are, six, ...|(53194,[2,5,7,12,...|
+--------+-----+----------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [0]:
# Apply term frequency–inverse document frequency (TF-IDF)
# (down-weighs features which appear frequently)
from pyspark.ml.feature import  IDF

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData_kpi)
title_data = idfModel.transform(featurizedData_kpi)

title_data = title_data.select("DOW", "CPI", "Unemp_Rate", "features")
title_data.show(5)

+--------+-----+----------+--------------------+
|     DOW|  CPI|Unemp_Rate|            features|
+--------+-----+----------+--------------------+
|29982.62|262.2|       6.4|(53194,[1,4,6,11,...|
|29982.62|262.2|       6.4|(53194,[0,3,4,6,1...|
|29982.62|262.2|       6.4|(53194,[4,5,7,29,...|
|29982.62|262.2|       6.4|(53194,[0,1,5,12,...|
|29982.62|262.2|       6.4|(53194,[2,5,7,12,...|
+--------+-----+----------+--------------------+
only showing top 5 rows



In [0]:
# prep text data from predicting subreddits from comments
com_subred = spark.createDataFrame(com_subred_pd)

# Tokenize text
tokenizer = Tokenizer(inputCol="body", outputCol="words")
words_subred = tokenizer.transform(com_subred)

# Vectorize
count = CountVectorizer(inputCol="words", outputCol="rawFeatures")
model = count.fit(words_subred)
featurizedData_subred = model.transform(words_subred)

# apply TF-IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData_subred)

com_data = idfModel.transform(featurizedData_subred)
com_data = com_data.select("subreddit", "features")
com_data.show(5)

+----------+--------------------+
| subreddit|            features|
+----------+--------------------+
|Republican|(33365,[78,150,31...|
|Republican|(33365,[563,1075]...|
|Republican|(33365,[26,42,96,...|
|Republican|(33365,[125,140,3...|
|Republican|(33365,[69,206],[...|
+----------+--------------------+
only showing top 5 rows



In [0]:
# prep text data from predicting subreddits from title
title_subred = spark.createDataFrame(title_subred_pd)

# Tokenize text
tokenizer = Tokenizer(inputCol="title", outputCol="words")
words_subred = tokenizer.transform(title_subred)

# Vectorize
count = CountVectorizer(inputCol="words", outputCol="rawFeatures")
model = count.fit(words_subred)
featurizedData_subred = model.transform(words_subred)

# apply TF-IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData_subred)

title_subred_data = idfModel.transform(featurizedData_subred)
title_subred_data = title_subred_data.select("subreddit", "features")
title_subred_data.show(5)

+----------+--------------------+
| subreddit|            features|
+----------+--------------------+
|Republican|(16325,[1,17,62,6...|
|Republican|(16325,[2,6,7,644...|
|Republican|(16325,[17,27,56,...|
|Republican|(16325,[10,133,19...|
|Republican|(16325,[197,258,5...|
+----------+--------------------+
only showing top 5 rows



In [0]:
# export dataset for predicting KPIs from posts
title_data.write.parquet("/tmp/out/title_KPI.parquet")

In [0]:
# export dataset for predicting subreddits from commments
com_data.write.parquet("/tmp/out/com_subred1.parquet")

In [0]:
# export dataset for predicting subreddits from titles
title_subred_data.write.parquet("/tmp/out/title_subred_data.parquet")