# PySpark SetUp

In [1]:
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18[0m[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18[0m                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18[0m[33m0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Connecting to[0m                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
[33m0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Connecting to[0m                                                                          

In [3]:
!pip install afinn



In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [5]:
import findspark
import string
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType
import string
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, StopWordsRemover, Tokenizer, IDF
from pyspark.ml.classification import  NaiveBayes, LogisticRegression, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import re
from afinn import Afinn
import pandas



spark = SparkSession.builder.appName("MyRedditScrape").getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

# Loading the Data


In [6]:
df = spark.read.option("header", "false").option("quote", "\"").option("escape", "\"").option("inferSchema", "true").csv("/content/drive/MyDrive/490/Data/cleanData.csv").toDF("selftext", "subreddit")

In [7]:
# Take look and make sure everything is ok
df.show()

+--------------------+----------+
|            selftext| subreddit|
+--------------------+----------+
|ive had depressio...|depression|
|i just need to ve...|depression|
|i see the world s...|depression|
|hey reddit i hope...|depression|
|ive sought advice...|depression|
|does anyone else ...|depression|
|so a while ago i ...|depression|
|i get so anxious ...|depression|
|i recently  came ...|depression|
|im a   36 male fr...|depression|
|well im here to a...|depression|
|ive always felt l...|depression|
|is it bad that if...|depression|
|let me preface th...|depression|
|i just cant i tri...|depression|
|ive been done for...|depression|
|im 26 and just we...|depression|
|a lot of the time...|depression|
|it feels like no ...|depression|
|so i wake up in a...|depression|
+--------------------+----------+
only showing top 20 rows



# Labeling (AFINN)

In [8]:
# Create out Afinn object
afin = Afinn(language='en')

In [9]:
afin.score("I want to die")

-2.0

In [10]:
afin.score("I love the world")

3.0

In [11]:
type(afin.score("I want to die"))

float

In [12]:
# This Function will return 1 if the text is negative and 0 if the text is positive.
# This is based on the scoring from the Afinn object
udfNew = F.udf(lambda x: 1 if afin.score(x) < 0 else 0)

In [17]:
data = df.select(F.col('selftext'), udfNew(F.col('selftext')).alias('label'))
data = data.withColumn("label", F.col("label").cast("int"))

In [18]:
data.show()

+--------------------+-----+
|            selftext|label|
+--------------------+-----+
|ive had depressio...|    1|
|i just need to ve...|    1|
|i see the world s...|    1|
|hey reddit i hope...|    0|
|ive sought advice...|    1|
|does anyone else ...|    1|
|so a while ago i ...|    1|
|i get so anxious ...|    1|
|i recently  came ...|    1|
|im a   36 male fr...|    1|
|well im here to a...|    1|
|ive always felt l...|    1|
|is it bad that if...|    1|
|let me preface th...|    1|
|i just cant i tri...|    0|
|ive been done for...|    1|
|im 26 and just we...|    1|
|a lot of the time...|    0|
|it feels like no ...|    1|
|so i wake up in a...|    1|
+--------------------+-----+
only showing top 20 rows



In [19]:
data.registerTempTable("dataWithLabel")

In [None]:
sqlContext.sql("SELECT label, COUNT(*) as count from dataWithLabel GROUP BY label").show()

# Even Out the Data

In [21]:
temp1 = sqlContext.sql("SELECT * from dataWithLabel WHERE label = 1 LIMIT 50000")

In [22]:
temp2 = sqlContext.sql("SELECT * from dataWithLabel WHERE label = 0 LIMIT 50000")

In [23]:
data2 = temp1.union(temp2)

In [None]:
data2 = data2.withColumn("label", data2["label"].cast(IntegerType()))
data2 = data2.withColumn("selftext", data2["selftext"].cast(StringType()))

# Sample our Data

In [28]:
# We take a 10% Sample of our data
data = data2.sample(False, 0.01)

# Set Up Elements in Pipeline

In [29]:
tokenizer = Tokenizer(inputCol="selftext", outputCol="words")

In [30]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered", caseSensitive=False)

In [31]:
hashingTF = HashingTF(inputCol="filtered", outputCol="rawfeatures", numFeatures= 4096)

In [32]:
idf = IDF(inputCol="rawfeatures", outputCol="features", minDocFreq= 0)

In [33]:
lr = LogisticRegression(regParam=0.01, threshold=0.5)

In [34]:
nb = NaiveBayes()

In [35]:
lsvc = LinearSVC(regParam= 0.01, threshold=0.5)

In [36]:
pipeline1 = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])

In [37]:
pipeline2 = Pipeline(stages=[tokenizer, remover, hashingTF, idf, nb])

In [38]:
 pipeline3 = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lsvc])

# Test Train Split

In [39]:
# splits[0] is my training set, splits[1] is my testing set
splits = data.randomSplit([0.9, 0.1], 1234)

#Modeling and Predicting

In [40]:
# Logistic Regression Model
model1 = pipeline1.fit(splits[0])

PythonException: ignored

In [None]:
# Naive Bayes Model
model2 = pipeline2.fit(splits[0])

In [None]:
# Linear Support Vector Classification Model
model3 = pipeline3.fit(splits[0])

# Logistic Regression Analysis

In [None]:
predictions1 = model1.transform(splits[1])

In [None]:
predictions1.show()

In [None]:
# Binary Classification Evaluator


eval1 = BinaryClassificationEvaluator(metricName="areaUnderROC")
print("Area Under the ROC Curve: {}".format(eval1.evaluate(predictions1)))

In [None]:
# Multiclass Classification Evaluator


eval2 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
print("Accuracy: " + str(eval2.evaluate(predictions1)))

eval3 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")

print("Precision: " + str(eval2.evaluate(predictions1)))

# Naives Bayes Analysis

In [None]:
predictions2 = model2.transform(splits[1])

In [None]:
predictions2.show()

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|            selftext|label|               words|            filtered|         rawfeatures|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|17 days now shes ...|    0|[17, days, now, s...|[17, days, shes, ...|(4096,[122,125,63...|(4096,[122,125,63...|[-400.69183906900...|[0.99681312466661...|       0.0|
|175 high school s...|    0|[175, high, schoo...|[175, high, schoo...|(4096,[78,131,163...|(4096,[78,131,163...|[-2901.4526493752...|[0.99439078012153...|       0.0|
|21 year old male ...|    1|[21, year, old, m...|[21, year, old, m...|(4096,[32,109,120...|(4096,[32,109,120...|[-1905.1010826250...|[0.02692982929418...|       1.0|
|22y

In [None]:
# Binary Classification Evaluator


eval4 = BinaryClassificationEvaluator(metricName="areaUnderROC")
print("Area Under the ROC Curve: {}".format(eval1.evaluate(predictions2)))

In [None]:
# Multiclass Classification Evaluator


eval5 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
print("Accuracy: " + str(eval5.evaluate(predictions2)))

eval6 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")

print("Precision: " + str(eval6.evaluate(predictions2)))

Accuracy: 0.7768343909196532
Precision: 0.7818824488930595


# Linear Support Vector Classification Analysis

In [None]:
predictions3 = model3.transform(splits[1])

In [None]:
predictions3.show()

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|            selftext|label|               words|            filtered|         rawfeatures|            features|       rawPrediction|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|17 days now shes ...|    0|[17, days, now, s...|[17, days, shes, ...|(4096,[122,125,63...|(4096,[122,125,63...|[0.52678055893078...|       0.0|
|175 high school s...|    0|[175, high, schoo...|[175, high, schoo...|(4096,[78,131,163...|(4096,[78,131,163...|[1.57196235216494...|       0.0|
|21 year old male ...|    1|[21, year, old, m...|[21, year, old, m...|(4096,[32,109,120...|(4096,[32,109,120...|[-0.3517689099538...|       0.0|
|22yo male and pre...|    1|[22yo, male, and,...|[22yo, male, pret...|(4096,[32,131,148...|(4096,[32,131,148...|[-2.9120081207204.

In [None]:
# Binary Classification Evaluator


eval7 = BinaryClassificationEvaluator(metricName="areaUnderROC")
print("Area Under the ROC Curve: {}".format(eval7.evaluate(predictions3)))

Area Under the ROC Curve: 0.9283388771784026


In [None]:
# Multiclass Classification Evaluator


eval8 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
print("Accuracy: " + str(eval8.evaluate(predictions3)))

eval9 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")

print("Precision: " + str(eval9.evaluate(predictions3)))

Accuracy: 0.8428814013763518
Precision: 0.8619162777668492


# Saving the Logistic Regression Model

In [None]:
# Save our Logistic Regression Model
model1.save('/content/drive/MyDrive/490/Model')

#Resources

https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression

https://spark.apache.org/docs/1.5.1/mllib-naive-bayes.html

https://stackoverflow.com/questions/44779002/pyspark-to-pmml-field-label-does-not-exist-error

https://stackoverflow.com/questions/47707405/spark-logistic-regression-for-binary-classification-apply-new-threshold-for-pre

https://stackoverflow.com/questions/57716806/split-spark-dataframe-in-half-without-overlapping-data

https://hyukjin-spark.readthedocs.io/en/latest/reference/api/pyspark.sql.DataFrame.randomSplit.html

https://stackoverflow.com/questions/40163106/cannot-find-col-function-in-pyspark

https://spark.apache.org/docs/1.6.1/ml-guide.html

https://machinelearningmastery.com/roc-curves-and-precision-recall-curves-for-classification-in-python/

