# 1. First steps

## 1.1. Install PySpark for the project

In [None]:
!pip install --force-reinstall pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m13.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=6d8de0bcecad73c2ed1ed6c9685dd6d53a59ecd0dc1ca7564a7412e719843b1a
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.7
  

## 1.2. Connect my drive for MINDsmall dataset access

In [None]:
from google.colab import drive
drive.mount('/content/drive/')
dir = 'drive/MyDrive/Datasets'

Mounted at /content/drive/


## 1.3. Import all necessary utilities and create Spark session

In [None]:
# import pandas as pd
# import datetime as dt
# from functools import reduce
# from operator import add
# Import all Spark utilities for data processing
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType, TimestampType, StructType, StructField, IntegerType, StringType, FloatType, DoubleType, ArrayType
from pyspark.sql.functions import col, sqrt, desc, asc, split, explode, from_json, get_json_object, inline
from pyspark.sql.functions import from_unixtime, unix_timestamp, array, monotonically_increasing_id, lit, min, max, to_date

# Import all Spark utilities for MLP architecture
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create Spark session
spark = SparkSession.builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .master("local[*]") \
    .config("spark.executor.memory", "12g") \
    .config("spark.driver.memory", "12g") \
    .config("spark.memory.offHeap.enabled", True) \
    .config('spark.sql.parquet.int96RebaseModeInRead', 'LEGACY') \
    .config('spark.sql.parquet.int96RebaseModeInWrite', 'LEGACY') \
    .config("spark.memory.offHeap.size","12g") \
    .config("spark.sql.shuffle.partitions",64) \
    .getOrCreate()

# 2. Data processing

## 2.1. Combine data from train and dev files

In [None]:
# Policy for time converting
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

# Load behaviors.tsv from train dataset
behaviors_train_df = spark.read.csv(dir + '/MINDsmall/MINDsmall_train/behaviors.tsv', sep=r'\t', header=False) \
    .selectExpr('_c0 AS impressionID',
                '_c1 AS userID',
                '_c2 AS time',
                '_c3 AS history',
                '_c4 AS impressions')

# Load news.tsv from train dataset
news_train_df = spark.read.csv(dir + '/MINDsmall/MINDsmall_train/news.tsv', sep=r'\t', header=False) \
    .selectExpr('_c0 AS newsID',
                '_c1 AS category',
                '_c2 AS subcategory',
                '_c3 AS title',
                '_c4 AS abstract',
                '_c5 AS url',
                '_c6 AS titleEntities',
                '_c7 AS abstractEntities')

# Load behaviors.tsv from dev dataset
behaviors_dev_df = spark.read.csv(dir + '/MINDsmall/MINDsmall_dev/behaviors.tsv', sep=r'\t', header=False) \
    .selectExpr('_c0 AS impressionID',
                '_c1 AS userID',
                '_c2 AS time',
                '_c3 AS history',
                '_c4 AS impressions')

# Load news.tsv from dev dataset
news_dev_df = spark.read.csv(dir + '/MINDsmall/MINDsmall_dev/news.tsv', sep=r'\t', header=False) \
    .selectExpr('_c0 AS newsID',
                '_c1 AS category',
                '_c2 AS subcategory',
                '_c3 AS title',
                '_c4 AS abstract',
                '_c5 AS url',
                '_c6 AS titleEntities',
                '_c7 AS abstractEntities')

# Combine data from train and dev datasets
behaviors_df = behaviors_train_df.union(behaviors_dev_df).dropDuplicates()
behaviors_df = behaviors_df.withColumn("time", to_date(col("time"), "MM/dd/yyyy"))
news_df = news_train_df.union(news_dev_df).dropDuplicates()

# behaviors_df.show(n=5)
# print("behaviors_df's rows: ", behaviors_df.count())

# news_df.show(n=5)
# print("news_df's rows: ", news_df.count())

In [None]:
print("impressionID NULL value count: ", behaviors_df.where(col("impressionID").isNull()).count())
print("userID NULL value count: ", behaviors_df.where(col("userID").isNull()).count())
print("time NULL value count: ", behaviors_df.where(col("time").isNull()).count())
print("history NULL value count: ", behaviors_df.where(col("history").isNull()).count())
print("impressions NULL value count: ", behaviors_df.where(col("impressions").isNull()).count())

# Show samples of NULL value history
# behaviors_df.where(col("history").isNull()).show(n=5)

impressionID NULL value count:  0
userID NULL value count:  0
time NULL value count:  0
history NULL value count:  5452
impressions NULL value count:  0


In [None]:
print("newsID NULL value count: ", news_df.where(col("newsID").isNull()).count())
print("category NULL value count: ", news_df.where(col("category").isNull()).count())
print("subcategory NULL value count: ", news_df.where(col("subcategory").isNull()).count())
print("title NULL value count: ", news_df.where(col("title").isNull()).count())
print("abstract NULL value count: ", news_df.where(col("abstract").isNull()).count())
print("url NULL value count: ", news_df.where(col("url").isNull()).count())
print("titleEntities NULL value count: ", news_df.where(col("titleEntities").isNull()).count())
print("abstractEntities NULL value count: ", news_df.where(col("abstractEntities").isNull()).count())

# Show dataframes of NULL-value-having columns
# news_df.where(col("abstract").isNull()).show(n=5)
# news_df.where(col("titleEntities").isNull()).show(n=5)
# news_df.where(col("abstractEntities").isNull()).show(n=5)

newsID NULL value count:  0
category NULL value count:  0
subcategory NULL value count:  0
title NULL value count:  0
abstract NULL value count:  3415
url NULL value count:  0
titleEntities NULL value count:  3
abstractEntities NULL value count:  4


In [None]:
user_count_df = behaviors_df.select('userID').groupBy('userID').count().withColumnRenamed("count","userCount").orderBy(desc('userCount'))
news_count_df = news_df.select('newsID').groupBy('newsID').count().withColumnRenamed("count","newsCount").orderBy(desc('newsCount'))

user_count_df.show(n=5)
news_count_df.show(n=5)

+------+---------+
|userID|userCount|
+------+---------+
|U32146|       69|
|U44201|       50|
|U57047|       45|
|U15740|       45|
|U20833|       41|
+------+---------+
only showing top 5 rows

+------+---------+
|newsID|newsCount|
+------+---------+
|N48865|        1|
|N56379|        1|
|N56442|        1|
|N11904|        1|
|N48051|        1|
+------+---------+
only showing top 5 rows



## 2.2. Create table of users' click activities

In [None]:
# Load history click activities
history_click_df = behaviors_df.select("userID", "time",
                                       explode(split("history", " ").alias("newsID")).alias("newsID")) \
                                       .withColumn("click", lit(1.0).cast(DoubleType()))

# Load current click activities
current_click_df = behaviors_df.select("userID", "time",
                                    explode(split("impressions", " ").alias("click")).alias("click"))
current_click_df = current_click_df.select("userID", "time",
                                     split("click", "-").alias("click"))
current_click_df = current_click_df.select("userID", "time",
                                           current_click_df.click.getItem(0).alias("newsID"),
                                           current_click_df.click.getItem(1).alias("click").cast(DoubleType()))

# Combine data from history and current click activities
user_click_df = history_click_df.union(current_click_df).dropDuplicates()
user_click_df = user_click_df.groupBy("userID", "newsID", "click").agg(min("time").alias("time"))
temp_df = user_click_df.groupBy("userID", "newsID").agg(max("click").alias("click"))
user_click_df = user_click_df.join(temp_df, on=["userID", "newsID", "click"], how="inner")


user_click_df.show(n=5)
print("Total interacts: ", user_click_df.count())
print("Total clicked interacts: ", user_click_df.where("click==1.0").count())
# print(user_click_df.count())

+------+------+-----+----------+
|userID|newsID|click|      time|
+------+------+-----+----------+
|    U1|N10646|  1.0|2019-11-15|
|    U1|N13374|  1.0|2019-11-15|
|    U1|N14637|  0.0|2019-11-15|
|    U1|N62058|  1.0|2019-11-15|
|   U10|N11784|  0.0|2019-11-15|
+------+------+-----+----------+
only showing top 5 rows

Total interacts:  9440772
Total clicked interacts:  2356550


In [None]:
print("userID NULL value count: ", user_click_df.where(col("userID").isNull()).count())
print("newsID NULL value count: ", user_click_df.where(col("newsID").isNull()).count())
print("click NULL value count: ", user_click_df.where(col("click").isNull()).count())
print("time NULL value count: ", user_click_df.where(col("time").isNull()).count())

userID NULL value count:  0
newsID NULL value count:  0
click NULL value count:  0
time NULL value count:  0


In [None]:
temp_df_1 = user_click_df.where("click==1.0").groupBy("userID").count().withColumnRenamed("count","clickCount").orderBy(desc("clickCount"))
temp_df_2 = user_click_df.groupBy("userID").count().withColumnRenamed("count","totalAppearance").orderBy(desc("totalAppearance"))
temp_df_3 = temp_df_1.join(temp_df_2, on=["userID"], how="left").orderBy(desc("clickCount"))

temp_df_3.show(n=5)

+------+----------+---------------+
|userID|clickCount|totalAppearance|
+------+----------+---------------+
|U63482|       658|           1445|
|U59594|       504|            982|
|U84756|       499|           1261|
| U2784|       467|            956|
|U72489|       463|           1064|
+------+----------+---------------+
only showing top 5 rows



## 2.3. Create embedding news PySpark Dataframe from calculated dataframes

In [None]:
# Load embedding vectors file
embedding_train_df = spark.read.csv(dir + '/MINDsmall/MINDsmall_train/entity_embedding.csv', header=True) \
    .withColumnRenamed("0","entityID")

embedding_dev_df = spark.read.csv(dir + '/MINDsmall/MINDsmall_dev/entity_embedding.csv', header=True) \
    .withColumnRenamed("0","entityID")

embedding_df = embedding_train_df.union(embedding_dev_df).dropDuplicates()
embedding_df = embedding_df.select("entityID", *(col(c).cast(DoubleType()).alias(c) for c in embedding_df.columns[1:]))

embedding_df.show(n=5)
# print(embedding_df.schema)

+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------+--------+---------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+---------+---------+---------+

In [None]:
titleSchema = ArrayType(
    StructType([
        StructField("Label", StringType(), True),
        StructField("Type", StringType(), True),
        StructField("WikidataId", StringType(), True),
        StructField("Confidence", FloatType(), True),
        StructField("OccurrenceOffsets", StringType(), True),
        StructField("SurfaceForms", ArrayType(StringType()), True)
    ])
)

abstractSchema = ArrayType(
    StructType([
        StructField("Label", StringType(), True),
        StructField("Type", StringType(), True),
        StructField("WikidataId", StringType(), True),
        StructField("Confidence", FloatType(), True),
        StructField("OccurrenceOffsets", StringType(), True),
        StructField("SurfaceForms", ArrayType(StringType()), True)
    ])
)


# Create news_embedding_df PySpark DataFrame
news_title_embedding_df = news_df.withColumn('newTitleEntities', from_json("titleEntities", titleSchema))
news_abstract_embedding_df = news_df.withColumn('newAbstractEntities', from_json("abstractEntities", abstractSchema))

news_title_embedding_df = news_title_embedding_df.selectExpr("inline(newTitleEntities)", "newsID")
news_title_embedding_df = news_title_embedding_df.select("newsID", "WikidataId")

news_abstract_embedding_df = news_abstract_embedding_df.selectExpr("inline(newAbstractEntities)", "newsID")
news_abstract_embedding_df = news_abstract_embedding_df.select("newsID", "WikidataId")

news_embedding_df = news_title_embedding_df.union(news_abstract_embedding_df).dropDuplicates()
news_embedding_df = news_embedding_df.select("newsID", col("WikidataId").alias("entityID"))


# Join the embedded vector with the news_embedding_df
news_embedding_df = news_embedding_df.join(embedding_df, on=["entityID"], how="inner")
news_embedding_df = news_embedding_df.select(*(col(x) for x in news_embedding_df.columns[1:]))
new_columns = news_embedding_df.columns[1:]


# Sum up all the vectors of one news
news_embedding_df = news_embedding_df.groupBy("newsID").sum()
old_columns = news_embedding_df.columns[1:]


for idx in range(len(old_columns)):
  news_embedding_df = news_embedding_df.withColumnRenamed(old_columns[idx], new_columns[idx])


# Normalize the embedded vector for the news
# news_embedding_df = news_embedding_df.withColumn("normalization", sqrt(reduce(add, [col(x) * col(x) for x in news_embedding_df.columns[1:]])))
news_embedding_df = news_embedding_df.withColumn("normalization", sqrt(sum(col(x) * col(x) for x in news_embedding_df.columns[1:])))

new_columns = news_embedding_df.columns[1:-1]

news_embedding_df = news_embedding_df.select("newsID", *(col(c) / news_embedding_df.normalization for c in news_embedding_df.columns[1:-1]))
old_columns = news_embedding_df.columns[1:]

for idx in range(len(old_columns)):
  news_embedding_df = news_embedding_df.withColumnRenamed(old_columns[idx], new_columns[idx])


# Features vector generated for prediction
assembler = VectorAssembler(
  inputCols=[str(x) for x in range(1, 101)], outputCol="features"
)
news_embedding_df = assembler.transform(news_embedding_df).select("newsID", "features")

# news_embedding_df.show(n=5)
# print(news_embedding_df.count())
# print(news_embedding_df.select('newsID').dropDuplicates().count())

In [None]:
print("newsID NULL value count: ", news_embedding_df.where(col("newsID").isNull()).count())
print("features NULL value count: ", news_embedding_df.where(col("features").isNull()).count())

newsID NULL value count:  0
features NULL value count:  0


# 3. Do content-based recommender system on MINDsmall dataset

## 3.1. Prepare the dataset

In [None]:
# Extract dataset from one specified user
dataset = user_click_df.where("userID=='U63482'")
dataset = dataset.join(news_embedding_df, on=["newsID"], how="inner").na.drop("any").orderBy(asc("time"))
dataset.show(n=5)
# print(dataset.count())

+------+------+-----+----------+--------------------+
|newsID|userID|click|      time|            features|
+------+------+-----+----------+--------------------+
| N4530|U63482|  1.0|2019-11-09|[-0.0777836659275...|
| N2272|U63482|  1.0|2019-11-09|[-0.1711606694736...|
|N36889|U63482|  1.0|2019-11-09|[-0.0296129682490...|
|N15198|U63482|  1.0|2019-11-09|[-0.1378691761883...|
| N8756|U63482|  0.0|2019-11-09|[-0.0626723382417...|
+------+------+-----+----------+--------------------+
only showing top 5 rows



In [None]:
# Split dataset into train and test datasets
dataset_count = dataset.count()
train_count = int(0.9 * dataset_count)
test_dataset = dataset
train_dataset = test_dataset.limit(train_count)
test_dataset = test_dataset.subtract(train_dataset)

train_dataset.orderBy(asc("time")).show(n=5)
test_dataset.orderBy(asc("time")).show(n=5)

+------+------+-----+----------+--------------------+
|newsID|userID|click|      time|            features|
+------+------+-----+----------+--------------------+
|N62006|U63482|  1.0|2019-11-09|[0.09825049387673...|
|N16209|U63482|  0.0|2019-11-09|[0.04475929334630...|
|N21351|U63482|  0.0|2019-11-09|[-0.0177218875516...|
|N45331|U63482|  1.0|2019-11-09|[0.07647303509497...|
|N15582|U63482|  0.0|2019-11-09|[-0.0467451200857...|
+------+------+-----+----------+--------------------+
only showing top 5 rows

+------+------+-----+----------+--------------------+
|newsID|userID|click|      time|            features|
+------+------+-----+----------+--------------------+
| N1539|U63482|  0.0|2019-11-14|[-0.0405669473213...|
|N42515|U63482|  0.0|2019-11-14|[0.08227742112814...|
|N61829|U63482|  0.0|2019-11-14|[0.24295307281900...|
|N34869|U63482|  0.0|2019-11-14|[-0.1178017185116...|
|N48613|U63482|  0.0|2019-11-14|[-0.0256017508484...|
+------+------+-----+----------+--------------------+
onl

## 3.2. Training and testing processes

In [None]:
# Prepare data for training and testing processes
train_df = train_dataset.select(col("click").alias("label"), "features")
test_df = test_dataset.select(col("click").alias("label"), "features")

# Create model and do training process
mpl = MultilayerPerceptronClassifier(layers=[100, 50, 20, 5, 2], seed=1)
mpl.setMaxIter(100)
mpl.setBlockSize(64)
model = mpl.fit(train_df)

In [None]:
# Do validation
train_result = model.transform(train_df)
train_output_df = train_result.select("prediction", "label")

test_result = model.transform(test_df)
test_output_df = test_result.select("prediction", "label")

# train_result.show(n=5)
# test_result.show(n=5)

metrics = ['weightedPrecision', 'weightedRecall', 'accuracy']
for metric in metrics:
  evaluator = MulticlassClassificationEvaluator(metricName=metric)
  if metric == "weightedPrecision":
    print('Weighted precision value on train dataset = ' + str(evaluator.evaluate(train_output_df)))
    print('Weighted precision value on test dataset = ' + str(evaluator.evaluate(test_output_df)))
  elif metric == "weightedRecall":
    print('Weighted recall value on train dataset = ' + str(evaluator.evaluate(train_output_df)))
    print('Weighted recall value on test dataset = ' + str(evaluator.evaluate(test_output_df)))
  else:
    print('Accuracy value on train dataset = ' + str(evaluator.evaluate(train_output_df)))
    print('Accuracy value on test dataset = ' + str(evaluator.evaluate(test_output_df)))
  print('\n--------------------------------------------------------\n')

Weighted precision value on train dataset = 0.6936783586549082
Weighted precision value on test dataset = 0.8064386317907446

--------------------------------------------------------

Weighted recall value on train dataset = 0.6912156166814551
Weighted recall value on test dataset = 0.6428571428571429

--------------------------------------------------------

Accuracy value on train dataset = 0.6912156166814551
Accuracy value on test dataset = 0.6428571428571429

--------------------------------------------------------

