### Project description

The project is about predicting the number of recommendations of a comment performed on New York Times Comments
dataset that can be found under the link https://www.kaggle.com/datasets/aashita/nyt-comments.
Main challanges of this project are:
* handling large data volume
* feature engineering
* etc..
To 

### Imports

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import DateType
import pandas as pd

In [2]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster("local[8]").setAppName("big_data")
sc = SparkContext.getOrCreate(conf=conf)

In [3]:
spark = SparkSession.builder.getOrCreate()

### Reading data

In [4]:
def read_spark_df(path):
    return spark.read.option("multiline",True).option('lineSep','\n').option("header", True).option("delimiter", ",").option("inferSchema",True).csv(path)

In [5]:
articles_df = read_spark_df('data/nyt-articles-2020.csv')
comments_df = read_spark_df('data/nyt-comments-2020.csv')

In [6]:
comments_df = comments_df.limit(10000)

In [7]:
df = comments_df.withColumn("updateDate", comments_df['updateDate'].cast(DateType()))

In [8]:
articles_df.printSchema()

root
 |-- newsdesk: string (nullable = true)
 |-- section: string (nullable = true)
 |-- subsection: string (nullable = true)
 |-- material: string (nullable = true)
 |-- headline: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- word_count: string (nullable = true)
 |-- pub_date: string (nullable = true)
 |-- n_comments: string (nullable = true)
 |-- uniqueID\r: string (nullable = true)



In [9]:
comments_df.printSchema()

root
 |-- commentID: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- commentSequence: integer (nullable = true)
 |-- userID: integer (nullable = true)
 |-- userDisplayName: string (nullable = true)
 |-- userLocation: string (nullable = true)
 |-- userTitle: string (nullable = true)
 |-- commentBody: string (nullable = true)
 |-- createDate: string (nullable = true)
 |-- updateDate: string (nullable = true)
 |-- approveDate: string (nullable = true)
 |-- recommendations: string (nullable = true)
 |-- replyCount: string (nullable = true)
 |-- editorsSelection: string (nullable = true)
 |-- parentID: string (nullable = true)
 |-- parentUserDisplayName: string (nullable = true)
 |-- depth: string (nullable = true)
 |-- commentType: string (nullable = true)
 |-- trusted: string (nullable = true)
 |-- recommendedFlag: integer (nullable = true)
 |-- permID: string (nullable = true)
 |-- isAnonymous: string (nullable = true)
 |-- articleID: string (nullable = true)



We can observe that although we used inferSchema some of the columns should be stored as a different data type. Let's fix it.

In [10]:
comments_df=comments_df.withColumn('recommendations',comments_df['recommendations'].cast("float"))\
                        .withColumn("createDate", comments_df['createDate'].cast(DateType()))\
                        .withColumn("updateDate", comments_df['updateDate'].cast(DateType()))\
                        .withColumn("approveDate", comments_df['approveDate'].cast(DateType()))\
                        .withColumn('replyCount',comments_df['replyCount'].cast("int"))\
                        .withColumn('depth',comments_df['depth'].cast("int"))\
                        .withColumn('isAnonymous',comments_df['isAnonymous'].cast("int"))\
                        .withColumn('editorsSelection',comments_df['editorsSelection'].cast("int"))
#actually some of the above columns are boolean but pyspark does not provide such datatype so we cast them to int

In [11]:
articles_df = articles_df.withColumn('word_count',articles_df['word_count'].cast("int"))\
                         .withColumn("pub_date", articles_df['pub_date'].cast(DateType()))\
                         .withColumn('n_comments',articles_df['n_comments'].cast("int"))\

In [12]:
comments_df.printSchema()

root
 |-- commentID: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- commentSequence: integer (nullable = true)
 |-- userID: integer (nullable = true)
 |-- userDisplayName: string (nullable = true)
 |-- userLocation: string (nullable = true)
 |-- userTitle: string (nullable = true)
 |-- commentBody: string (nullable = true)
 |-- createDate: date (nullable = true)
 |-- updateDate: date (nullable = true)
 |-- approveDate: date (nullable = true)
 |-- recommendations: float (nullable = true)
 |-- replyCount: integer (nullable = true)
 |-- editorsSelection: integer (nullable = true)
 |-- parentID: string (nullable = true)
 |-- parentUserDisplayName: string (nullable = true)
 |-- depth: integer (nullable = true)
 |-- commentType: string (nullable = true)
 |-- trusted: string (nullable = true)
 |-- recommendedFlag: integer (nullable = true)
 |-- permID: string (nullable = true)
 |-- isAnonymous: integer (nullable = true)
 |-- articleID: string (nullable = true)



In [13]:
articles_df.printSchema()

root
 |-- newsdesk: string (nullable = true)
 |-- section: string (nullable = true)
 |-- subsection: string (nullable = true)
 |-- material: string (nullable = true)
 |-- headline: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- word_count: integer (nullable = true)
 |-- pub_date: date (nullable = true)
 |-- n_comments: integer (nullable = true)
 |-- uniqueID\r: string (nullable = true)



Let's take a look at our target variable

In [14]:
comments_df.describe(['recommendations']).show()

+-------+-----------------+
|summary|  recommendations|
+-------+-----------------+
|  count|            10000|
|   mean|          20.9887|
| stddev|97.33120007767721|
|    min|              0.0|
|    max|           3816.0|
+-------+-----------------+



We can observe that data is contains outliers. We will get rid of them using quantiles.

In [15]:
upper_limit = comments_df.approxQuantile('recommendations', [ 0.9], 0.05)[0]

In [16]:
comments_df = comments_df.filter((col('recommendations')<upper_limit))

In [17]:
comments_df.describe(['recommendations']).show()

+-------+-----------------+
|summary|  recommendations|
+-------+-----------------+
|  count|             8512|
|   mean|5.095277255639098|
| stddev|5.478552516337315|
|    min|              0.0|
|    max|             23.0|
+-------+-----------------+



In [18]:
gre_histogram = comments_df.select('recommendations').rdd.flatMap(lambda x: x).histogram(11)

# Loading the Computed Histogram into a Pandas Dataframe for plotting
pd.DataFrame(
    list(zip(*gre_histogram)), 
    columns=['bin', 'frequency']
).set_index(
    'bin'
).plot(kind='bar');

Transformr text to vectors

In [19]:
# from pyspark.mllib.feature import Word2Vec

In [20]:
from pyspark.ml.feature import StopWordsRemover,Word2Vec
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
import pyspark.sql.functions as F

In [55]:
comments_df=comments_df.withColumn("createDateInt", F.unix_timestamp(comments_df['createDate']))

data = articles_df.select(col("abstract"),col("pub_date"),
                          col("keywords"),
                          col("uniqueID\r").alias('articleID'))
data = data.withColumn("articleID",expr("substring(articleID, 1, length(articleID)-1)"))
full_df = comments_df.join(data, on=["articleID"])
full_df = full_df.withColumn("pubDateInt", F.unix_timestamp(full_df['pub_date']))

full_df = full_df.withColumn('time_passed', abs(full_df['pubDateInt'] - full_df['createDateInt']))

# a = comments_df.select('articleID').distinct().take(1)
# b = data.select(col('articleID')).take(1)
# print(a, b)

In [56]:
full_df.printSchema()

root
 |-- articleID: string (nullable = true)
 |-- commentID: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- commentSequence: integer (nullable = true)
 |-- userID: integer (nullable = true)
 |-- userDisplayName: string (nullable = true)
 |-- userLocation: string (nullable = true)
 |-- userTitle: string (nullable = true)
 |-- commentBody: string (nullable = true)
 |-- createDate: date (nullable = true)
 |-- updateDate: date (nullable = true)
 |-- approveDate: date (nullable = true)
 |-- recommendations: float (nullable = true)
 |-- replyCount: integer (nullable = true)
 |-- editorsSelection: integer (nullable = true)
 |-- parentID: string (nullable = true)
 |-- parentUserDisplayName: string (nullable = true)
 |-- depth: integer (nullable = true)
 |-- commentType: string (nullable = true)
 |-- trusted: string (nullable = true)
 |-- recommendedFlag: integer (nullable = true)
 |-- permID: string (nullable = true)
 |-- isAnonymous: integer (nullable = true)
 |-- creat

In [57]:
def get_vector(df,column='commentBody'):
    df = df.withColumn(column, trim(regexp_replace(column,'(@\w+)|[^a-zA-Z\s]', '')))
    df = df.select(split(col(column)," ").alias(column))
    remover = StopWordsRemover(inputCol=column, outputCol="filtered")
    filtered = remover.transform(df)
    word2vec = Word2Vec(inputCol="filtered", outputCol="vector")
    model = word2vec.fit(filtered)
    return model.transform(filtered)

In [58]:
df = get_vector(full_df)
df2 = get_vector(full_df, column='abstract')

In [59]:
df.show()
df2.show()

+--------------------+--------------------+--------------------+
|         commentBody|            filtered|              vector|
+--------------------+--------------------+--------------------+
|[Nothing, here, i...|[Nothing, truthfu...|[0.05439707271399...|
|[The, legal, thri...|[legal, thriller,...|[0.02881309679812...|
|[This, is, a, gre...|[great, case, stu...|[0.04588875124696...|
|[Add, forprofit, ...|[Add, forprofit, ...|[-0.0375565004069...|
|[S, I, think, the...|[think, point, hu...|[0.01957566989585...|
|[S, Thats, agreed...|[Thats, agreed, J...|[0.07047333251684...|
|[Firstly, everyon...|[Firstly, everyon...|[-0.0113774585701...|
|[Protect, veteran...|[Protect, veteran...|[0.06493437384842...|
|[Massive, subsidi...|[Massive, subsidi...|[-0.0162929578117...|
|[This, veteran, w...|[veteran, widowed...|[-0.0080836191225...|
|[Since, the, pers...|[Since, person, s...|[-0.0032923898543...|
|[Sorry, about, yo...|[Sorry, treatment...|[0.09255785489920...|
|[I, am, a, vetera...|[ve

In [60]:
# comments_df=comments_df.withColumn("createDateInt",comments_df.createDateInt.cast('double'))
# # df=df.withColumn("vector",df.vector.cast('array<bigint>'))
# df.withColumn(
#     "vector", 
#     F.array_union(df.vector, F.array(comments_df.createDateInt))
# ).show()



In [61]:
df = df.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
full_df = full_df.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
full_df = full_df.join(df, on=["row_index"]).drop("row_index")

In [62]:
df2 = df2.select(col("vector").alias('abstract_vector'))
df2 = df2.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
full_df = full_df.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
full_df = full_df.join(df2, on=["row_index"]).drop("row_index")

In [64]:
from pyspark.ml.feature import VectorAssembler

numericCols = ['vector', 'abstract_vector', 'time_passed']
assembler = VectorAssembler(inputCols=numericCols, outputCol="features")
full_df = assembler.transform(full_df)

In [65]:
full_df = full_df.drop(*numericCols)

In [66]:
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ 0, 6, 12, 18, float('Inf') ],inputCol="recommendations", outputCol="buckets")
full_df = bucketizer.setHandleInvalid("keep").transform(full_df)

In [68]:
full_df.printSchema()

root
 |-- articleID: string (nullable = true)
 |-- commentID: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- commentSequence: integer (nullable = true)
 |-- userID: integer (nullable = true)
 |-- userDisplayName: string (nullable = true)
 |-- userLocation: string (nullable = true)
 |-- userTitle: string (nullable = true)
 |-- commentBody: string (nullable = true)
 |-- createDate: date (nullable = true)
 |-- updateDate: date (nullable = true)
 |-- approveDate: date (nullable = true)
 |-- recommendations: float (nullable = true)
 |-- replyCount: integer (nullable = true)
 |-- editorsSelection: integer (nullable = true)
 |-- parentID: string (nullable = true)
 |-- parentUserDisplayName: string (nullable = true)
 |-- depth: integer (nullable = true)
 |-- commentType: string (nullable = true)
 |-- trusted: string (nullable = true)
 |-- recommendedFlag: integer (nullable = true)
 |-- permID: string (nullable = true)
 |-- isAnonymous: integer (nullable = true)
 |-- creat

In [69]:
#### TODO ####

# from pyspark.mllib.linalg.distributed import RowMatrix
# from pyspark.mllib.linalg import Vectors, VectorUDT
# from pyspark.sql.functions import udf, array

# mat = RowMatrix(full_df.select('features'))
# pc = mat.computePrincipalComponents(10)
# projected = mat.multiply(pc)

IllegalArgumentException: requirement failed: DataFrame must have a single vector type column

In [43]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = full_df

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="buckets", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="buckets", featuresCol="indexedFeatures")

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "buckets", "features").show(15)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="buckets", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

+--------------+-------+--------------------+
|predictedLabel|buckets|            features|
+--------------+-------+--------------------+
|           0.0|    3.0|[-0.0213082146025...|
|           0.0|    2.0|[0.00879926614080...|
|           0.0|    0.0|[-0.0183993745595...|
|           0.0|    0.0|(201,[100,101,102...|
|           0.0|    0.0|[0.01398731380322...|
|           0.0|    0.0|[0.03632894159718...|
|           0.0|    0.0|[0.03271815881181...|
|           0.0|    0.0|[0.05916589977568...|
|           0.0|    0.0|[0.07192069188588...|
|           0.0|    0.0|[0.08958277180499...|
|           0.0|    0.0|[0.02133534631866...|
|           0.0|    0.0|[0.02119789324014...|
|           0.0|    0.0|[-0.0079683041221...|
|           0.0|    0.0|[0.00568002799179...|
|           0.0|    0.0|[0.01157510414682...|
+--------------+-------+--------------------+
only showing top 15 rows

Test Error = 0.335054
RandomForestClassificationModel: uid=RandomForestClassifier_3e3fb794058f, numT