In [0]:
import numpy as np
import pandas as pd
import pyspark.pandas as ps

In [0]:
# Import data from S3
from pyspark import SparkFiles

dbutils.fs.ls("s3://project-4-ag-2022/data.gzip")

df = spark.read.format("parquet").load("s3://project-4-ag-2022/data.gzip", header=True, inferSchema=True)

In [0]:
df.show()

+-----------------+--------------------+--------------------+-------------------+------------+-------+--------------------+--------------------+
|           author|                body|      normalizedBody|          subreddit|subreddit_id|     id|             content|             summary|
+-----------------+--------------------+--------------------+-------------------+------------+-------+--------------------+--------------------+
|splagaticusxoxo97|FALSE. Evidence: ...|FALSE. Evidence: ...|            atheism|    t5_2qh2p|c6bacqq|FALSE. Evidence: ...|dont fuck with re...|
|         phyzishy|Yeah, but most fo...|Yeah, but most fo...|          AskReddit|    t5_2qh1i|c6b52m8|Yeah, but most fo...|       stupid stuff.|
|       Perservere|Didn't they lose ...|Didn't they lose ...|    leagueoflegends|    t5_2rfxx|c6bftvc|Didn't they lose ...|just because you'...|
|      fallsuspect|You probably won'...|You probably won'...|          AskReddit|    t5_2qh1i|c6bncqn|You probably won'...|just ge

In [0]:
import sys

from operator import add
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
import pyspark.sql.functions as f

In [0]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = df.withColumn('length', length(df['body']))
data_df.show()

+-----------------+--------------------+--------------------+-------------------+------------+-------+--------------------+--------------------+------+
|           author|                body|      normalizedBody|          subreddit|subreddit_id|     id|             content|             summary|length|
+-----------------+--------------------+--------------------+-------------------+------------+-------+--------------------+--------------------+------+
|splagaticusxoxo97|FALSE. Evidence: ...|FALSE. Evidence: ...|            atheism|    t5_2qh2p|c6bacqq|FALSE. Evidence: ...|dont fuck with re...|   770|
|         phyzishy|Yeah, but most fo...|Yeah, but most fo...|          AskReddit|    t5_2qh1i|c6b52m8|Yeah, but most fo...|       stupid stuff.|   446|
|       Perservere|Didn't they lose ...|Didn't they lose ...|    leagueoflegends|    t5_2rfxx|c6bftvc|Didn't they lose ...|just because you'...|   569|
|      fallsuspect|You probably won'...|You probably won'...|          AskReddit|    t5_

In [0]:
data_df = data_df.filter(data_df['body'] != '')

In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='subreddit',outputCol='label')
tokenizer = Tokenizer(inputCol="body", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token', numFeatures=10000)
idf = IDF(inputCol='hash_token', outputCol='idf_token', minDocFreq=5)

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [0]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [0]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [0]:
indexer = StringIndexer(inputCol="label", outputCol="target") 
indexed = indexer.fit(cleaned).transform(cleaned) 

In [0]:
# Show label and resulting features
indexed.select(['target', 'features']).show()

+------+--------------------+
|target|            features|
+------+--------------------+
|   7.0|(10001,[52,157,20...|
|   0.0|(10001,[316,317,3...|
|   2.0|(10001,[208,468,5...|
|   0.0|(10001,[157,391,8...|
|   5.0|(10001,[384,1058,...|
|   0.0|(10001,[6,15,20,1...|
|   0.0|(10001,[55,92,198...|
|   0.0|(10001,[39,281,30...|
|   0.0|(10001,[37,143,19...|
|   0.0|(10001,[24,55,266...|
|   0.0|(10001,[5,166,237...|
|   0.0|(10001,[170,266,2...|
|   0.0|(10001,[55,80,196...|
|   7.0|(10001,[94,201,30...|
|   4.0|(10001,[44,199,23...|
|   4.0|(10001,[4,70,91,9...|
|   8.0|(10001,[15,100,15...|
|   0.0|(10001,[52,84,283...|
|   0.0|(10001,[91,209,36...|
|   0.0|(10001,[42,100,20...|
+------+--------------------+
only showing top 20 rows



In [0]:
final_df = indexed.select(['features', 'label'])
final_df = final_df.withColumnRenamed('target', 'label')

In [0]:
from pyspark.sql.types import IntegerType
final_df = final_df.withColumn("label", final_df["label"].cast(IntegerType()))

In [0]:
final_df.dtypes

Out[15]: [('features', 'vector'), ('label', 'int')]

In [0]:
final_df.count()

Out[16]: 1365644

In [0]:
final_df1 = final_df.dropna()

In [0]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = final_df1.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)
print(predictor)

NaiveBayesModel: uid=NaiveBayes_7ca1fc93f74f, modelType=multinomial, numClasses=10, numFeatures=10001


In [0]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator()
test_results = predictor.transform(testing)
acc = acc_eval.evaluate(test_results)
print(acc)

0.5493849785704983


In [0]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(10001,[0,1,2,4,7...|    1|[-23208.210849828...|[7.33122198560089...|       4.0|
|(10001,[0,1,2,7,4...|    1|[-16638.443838013...|[6.92990804445417...|       4.0|
|(10001,[0,1,2,44,...|    4|[-18455.074968456...|[4.65480567283902...|       4.0|
|(10001,[0,1,2,55,...|    0|[-8843.8483376284...|[5.12465528274189...|       4.0|
|(10001,[0,1,3,10,...|    0|[-28819.863634976...|[1.69095196882339...|       7.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 5 rows

