In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import matplotlib.pyplot as plt
%matplotlib inline

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover
from pyspark.sql.functions import col, udf,regexp_replace,isnull
from pyspark.sql.types import StringType,IntegerType
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, LogisticRegression, DecisionTreeClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [2]:
# Spark NLP
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[4]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.3.2")\
    .getOrCreate()

In [3]:
#create Spark session
spark = SparkSession.builder.appName('RedditComments').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','8g')])

#print spark configuration settings
spark.sparkContext.getConf().getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.dynamicAllocation.minExecutors', '1'),
 ('spark.sql.warehouse.dir', 'file:/spark-warehouse'),
 ('spark.history.fs.logDirectory',
  'gs://dataproc-temp-us-central1-84427460872-fixxspuh/97cd0fe0-90c7-4b68-ba8a-fcc718886ab3/spark-job-history'),
 ('spark.executor.memory', '5g'),
 ('spark.driver.host',
  'cluster-e4d0-m.us-central1-b.c.big-data-platforms-329618.internal'),
 ('spark.yarn.am.memory', '640m'),
 ('spark.cores.max', '4'),
 ('spark.executor.cores', '4'),
 ('spark.app.startTime', '1638388665963'),
 ('spark.executor.instances', '2'),
 ('spark.driver.memory', '8g'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.yarn.unmanagedAM.enabled', 'true'),
 ('spark.sql.autoBroadcastJoinThreshold', '43m'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.port', '37961'),
 ('spark.ui.filters',
  'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
 ('spark.driver.appUIAddress',
  'http://cluster-e4d0-m.us-central1-b.c.big

In [4]:
df = spark.read \
    .option("delimiter",",") \
    .option("multiLine","true") \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .csv("gs://reddit-data-team-1/data_cleaned.csv",inferSchema=True, header=True)

21/12/01 19:57:55 WARN org.apache.hadoop.util.concurrent.ExecutorHelper: Thread (Thread[GetFileInfo #1,5,main]) interrupted: 
java.lang.InterruptedException
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:510)
	at com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:88)
	at org.apache.hadoop.util.concurrent.ExecutorHelper.logThrowableFromAfterExecute(ExecutorHelper.java:48)
	at org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor.afterExecute(HadoopThreadPoolExecutor.java:90)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1157)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
                                                                                

In [6]:
df.printSchema()

root
 |-- body: string (nullable = true)
 |-- clean_comment: string (nullable = true)
 |-- category: integer (nullable = true)



In [7]:
display(df)

DataFrame[body: string, clean_comment: string, category: int]

In [8]:
df.summary().show()

                                                                                

+-------+--------------------+--------------------+-------------------+
|summary|                body|       clean_comment|           category|
+-------+--------------------+--------------------+-------------------+
|  count|             4864687|             4849322|            4864688|
|   mean|            Infinity|                 NaN|0.18691332311548037|
| stddev|                 NaN|                 NaN| 0.7967911112011219|
|    min|	"Mother's Little...|	now playing
mumf...|                 -1|
|    25%|              3820.0|              1999.0|                  0|
|    50%|              6790.0|              5670.0|                  0|
|    75%|              9837.0|              9164.0|                  1|
|    max|                  🛃|����������������...|                  1|
+-------+--------------------+--------------------+-------------------+



In [9]:
df.dtypes

[('body', 'string'), ('clean_comment', 'string'), ('category', 'int')]

### Preprocessing

In [10]:
data = df

In [11]:
from pyspark.sql.functions import col

data.groupBy("category").count().orderBy(col("count").desc()).show()

[Stage 5:>                                                          (0 + 1) / 1]

+--------+-------+
|category|  count|
+--------+-------+
|       1|2083852|
|       0|1606259|
|      -1|1174577|
+--------+-------+



                                                                                

In [13]:
from pyspark.sql.functions import col

data.groupBy("category").count().orderBy(col("count").desc()).show()

[Stage 8:>                                                          (0 + 1) / 1]

+--------+-------+
|category|  count|
+--------+-------+
|       1|2083852|
|       0|1606259|
|      -1|1174577|
+--------+-------+



                                                                                

In [14]:
data = data.withColumn("clean_comment",regexp_replace(col('body'), '\d+', ''))
data.show(5)

+--------------------+--------------------+--------+
|                body|       clean_comment|category|
+--------------------+--------------------+--------+
|gg this one's ove...|gg this one's ove...|       0|
|No one has a Euro...|No one has a Euro...|       0|
|That the kid "..r...|That the kid "..r...|      -1|
|                NSFL|                NSFL|       0|
|Get back to your ...|Get back to your ...|       0|
+--------------------+--------------------+--------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import regexp_replace, trim, col, lower
def removePunctuation(column):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        column (Column): A Column containing a sentence.

    Returns:
        Column: A Column named 'sentence' with clean-up operations applied.
    """
    return trim(lower(regexp_replace(column, '[^\sa-zA-Z0-9]', ''))).alias('sentence')


In [16]:
data = data.withColumn("cleaned",removePunctuation(col('clean_comment')))


In [17]:
data.show(2)

+--------------------+--------------------+--------+--------------------+
|                body|       clean_comment|category|             cleaned|
+--------------------+--------------------+--------+--------------------+
|gg this one's ove...|gg this one's ove...|       0|gg this ones over...|
|No one has a Euro...|No one has a Euro...|       0|no one has a euro...|
+--------------------+--------------------+--------+--------------------+
only showing top 2 rows



In [18]:
data.count()

                                                                                

4864688

In [19]:
data = data.dropna()
data.count()

                                                                                

4864687

### Model Pipeline

In [20]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="cleaned", outputCol="words", pattern="\\W")

# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 
#add_stopwords = ["http","https","amp","rt","t","c","the",'narendra','modi','...','“','”','’','…','modi’'] 

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
#countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=15000, minDF=5)
countVectors = CountVectorizer(inputCol="filtered", outputCol="features")

label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")

In [21]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])


In [22]:
pipelineFit = pipeline.fit(data)

                                                                                

In [23]:
#pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)


21/12/01 20:15:57 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.4 MiB
[Stage 37:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+-----+
|                body|       clean_comment|category|             cleaned|               words|            filtered|            features|label|
+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+-----+
|gg this one's ove...|gg this one's ove...|       0|gg this ones over...|[gg, this, ones, ...|[gg, this, ones, ...|(262144,[0,2,14,1...|  1.0|
|No one has a Euro...|No one has a Euro...|       0|no one has a euro...|[no, one, has, a,...|[no, one, has, a,...|(262144,[1,7,12,1...|  1.0|
|That the kid "..r...|That the kid "..r...|      -1|that the kid remi...|[that, the, kid, ...|[that, kid, remin...|(262144,[4,5,28,3...|  2.0|
|                NSFL|                NSFL|       0|                nsfl|              [nsfl]|              [nsfl]|(262144,[13710],[...|  1.0|

                                                                                

### Partition Training & Test sets

In [None]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))


21/12/01 20:16:00 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.4 MiB
                                                                                

Training Dataset Count: 3891654


21/12/01 20:25:48 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.4 MiB
[Stage 41:>                                                         (0 + 1) / 1]

Test Dataset Count: 973033


                                                                                

### Model Training and Evaluation

Logistic Regression using Count Vector Features

Our model will make predictions and score on the test set; we then look at the top 10 predictions from the highest probability.

In [None]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

# predictions.filter(predictions['prediction'] == 0).select("clean_text","category","probability","label","prediction")\
# .orderBy("probability", ascending=False).show(n = 10, truncate = 30)

21/12/01 20:35:31 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.4 MiB
21/12/01 20:45:15 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.4 MiB
21/12/01 20:55:36 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/12/01 20:55:37 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
21/12/01 20:55:37 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.4 MiB
21/12/01 20:55:48 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.4 MiB
21/12/01 20:56:00 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.4 MiB
21/12/01 20:56:12 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.4 MiB
21/12/01 20:56:24 WARN org.apache.spark.scheduler.DAGSchedule

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)


21/12/01 20:59:43 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 9.4 MiB
                                                                                

0.7704587422225653

### Logistic Regression using TF-IDF Features

In [None]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)


                                                                                

In [None]:
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)



                                                                                

In [None]:
predictions = lrModel.transform(testData)

# predictions.filter(predictions['prediction'] == 0) \
#     .select("clean_text","category","probability","label","prediction") \
#     .orderBy("probability", ascending=False) \
#     .show(n = 10, truncate = 30)

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

                                                                                

0.7288555668318899

### Naive Bayes

In [None]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
nbModel = nb.fit(trainingData)
predictions = nbModel.transform(testData)
# predictions.filter(predictions['prediction'] == 0) \
#     .select("clean_text","category","probability","label","prediction") \
#     .orderBy("probability", ascending=False) \
#     .show(n = 10, truncate = 30)

                                                                                

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
nbAccuracy = evaluator.evaluate(predictions)
print(nbAccuracy)

[Stage 103:>                                                        (0 + 1) / 1]

0.7347902005300311


                                                                                

### DecisionTreeClassifier

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(trainingData)
predictions = dtModel.transform(testData)
# predictions.filter(predictions['prediction'] == 0) \
#     .select("clean_text","category","probability","label","prediction") \
#     .orderBy("probability", ascending=False) \
#     .show(n = 10, truncate = 30)

                                                                                

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
dtAccuracy = evaluator.evaluate(predictions)
print(dtAccuracy) 

[Stage 115:>                                                        (0 + 1) / 1]

0.4305849554692257


                                                                                

### Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
# predictions.filter(predictions['prediction'] == 0) \
#     .select("clean_text","category","probability","label","prediction") \
#     .orderBy("probability", ascending=False) \
#     .show(n = 10, truncate = 30)

21/12/02 01:31:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1059.5 KiB
                                                                                

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
rfAccuracy = evaluator.evaluate(predictions)
print(rfAccuracy) 

[Stage 129:>                                                        (0 + 1) / 1]

0.2570274066280338


                                                                                

### OnevsRest classifer

In [None]:
from pyspark.ml.classification import LogisticRegression, OneVsRest

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

ovr = OneVsRest(classifier=lr)

ovrModel = ovr.fit(trainingData)

# score the model on test data.
predictions = ovrModel.transform(testData)

# predictions.filter(predictions['prediction'] == 0) \
#     .select("clean_text","category","label","prediction") \
#     .show(n = 10, truncate = 30)

[Stage 159:>                                                        (0 + 1) / 1]

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
ovrAccuracy = evaluator.evaluate(predictions)
print(ovrAccuracy)

### Visualization

In [None]:
import matplotlib.pyplot as plt
import numpy as np
model = ['Logistic regression', 'Naive Bayes', 'Random Forest', 'OneVsRest']
accuracy = [lrAccuracy,nbAccuracy,rfAccuracy, ovrAccuracy]

In [None]:
def plot_bar_x():
    # this is for plotting purpose
    index = np.arange(len(model))
    plt.bar(index, accuracy)
    plt.xlabel('models', fontsize=10)
    plt.ylabel('prediction accuracy', fontsize=10)
    plt.xticks(index, model, fontsize=10, rotation=30)
    plt.title('Accuracy of each model')
    plt.show()
    
plot_bar_x()