# Install Spark for Python and Load the dataset


## Install Python, Import libraries and Create session for Spark

In [2]:
!pip install --force-reinstall pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m16.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=93424f7a4839ec1530723ce8faab5cb379c7a400ed5574cfccbbcda597110d31
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.7
  

In [3]:
# Do all imports and installs here
import pandas as pd
import datetime as dt
from functools import reduce
from operator import add
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType, TimestampType, StructType, StructField, IntegerType, StringType, FloatType, DoubleType, ArrayType
from pyspark.sql.functions import col, sqrt, desc, asc, split, explode, from_json, get_json_object, inline
from pyspark.sql.functions import from_unixtime, unix_timestamp, array, monotonically_increasing_id, lit, min, max, to_date
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import MultilayerPerceptronClassifier

spark = SparkSession.builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .master("local[*]") \
    .config("spark.executor.memory", "12g") \
    .config("spark.driver.memory", "12g") \
    .config("spark.memory.offHeap.enabled", True) \
    .config('spark.sql.parquet.int96RebaseModeInRead', 'LEGACY') \
    .config('spark.sql.parquet.int96RebaseModeInWrite', 'LEGACY') \
    .config("spark.memory.offHeap.size","12g") \
    .config("spark.sql.shuffle.partitions",64) \
    .getOrCreate()

## Load the Dataset

In [4]:
# Connet to my drive
from google.colab import drive
drive.mount('/content/drive/')
dir = '/content/drive/MyDrive/BTL Big Data & Business Intelligence'

Mounted at /content/drive/


### Load train set

In [5]:
behaviors_train_df = spark.read.csv('/content/drive/MyDrive/BTL Big Data & Business Intelligence/MIND Small/MINDsmall_train/behaviors.tsv', sep=r'\t', header=False) \
    .selectExpr('_c0 AS impressionID',
                '_c1 AS userID',
                '_c2 AS time',
                '_c3 AS history',
                '_c4 AS impressions')

news_train_df = spark.read.csv('/content/drive/MyDrive/BTL Big Data & Business Intelligence/MIND Small/MINDsmall_train/news.tsv', sep=r'\t', header=False) \
    .selectExpr('_c0 AS newsID',
                '_c1 AS category',
                '_c2 AS subcategory',
                '_c3 AS title',
                '_c4 AS abstract',
                '_c5 AS url',
                '_c6 AS titleEntities',
                '_c7 AS abstractEntities')



### Load test (dev) set

In [6]:
behaviors_dev_df = spark.read.csv('/content/drive/MyDrive/BTL Big Data & Business Intelligence/MIND Small/MINDsmall_dev/behaviors.tsv', sep=r'\t', header=False) \
    .selectExpr('_c0 AS impressionID',
                '_c1 AS userID',
                '_c2 AS time',
                '_c3 AS history',
                '_c4 AS impressions')

news_dev_df = spark.read.csv('/content/drive/MyDrive/BTL Big Data & Business Intelligence/MIND Small/MINDsmall_dev/news.tsv', sep=r'\t', header=False) \
    .selectExpr('_c0 AS newsID',
                '_c1 AS category',
                '_c2 AS subcategory',
                '_c3 AS title',
                '_c4 AS abstract',
                '_c5 AS url',
                '_c6 AS titleEntities',
                '_c7 AS abstractEntities')

# Data Extraction and Preprocessing

## Remove duplicate rows if exists

### Remove duplicate row for train set

In [7]:
behaviors_train_df = behaviors_train_df.dropDuplicates()
behaviors_train_df = behaviors_train_df.withColumn("time", to_date(col("time"), "MM/dd/yyyy"))
news_train_df = news_train_df.dropDuplicates()

### Remove duplicate rows for dev test

In [8]:
behaviors_dev_df = behaviors_dev_df.dropDuplicates()
behaviors_dev_df = behaviors_dev_df.withColumn("time", to_date(col("time"), "MM/dd/yyyy"))
news_dev_df = news_dev_df.dropDuplicates()

## Create table for relationship between users and articles appeared on impressions

The table will have three columns:
* **userID**: ID of users
* **newsID**: ID of article's news.
* **click**: Value 1 if the article is clicked by user, 0 otherwise.

### Create table for train set

In [9]:
history_click_df = behaviors_train_df.select("userID", "time",
                                       explode(split("history", " ").alias("newsID")).alias("newsID")) \
                                       .withColumn("click", lit(1.0).cast(DoubleType()))

current_click_df = behaviors_train_df.select("userID", "time",
                                    explode(split("impressions", " ").alias("click")).alias("click"))
current_click_df = current_click_df.select("userID", "time",
                                     split("click", "-").alias("click"))
current_click_df = current_click_df.select("userID", "time",
                                           current_click_df.click.getItem(0).alias("newsID"),
                                           current_click_df.click.getItem(1).alias("click").cast(DoubleType()))

user_click_train_df = history_click_df.union(current_click_df).dropDuplicates()
user_click_train_df = user_click_train_df.groupBy("userID", "newsID", "click").agg(min("time").alias("time"))
user_click_train_df = user_click_train_df.join(user_click_train_df.groupBy("userID", "newsID").agg(max("click").alias("click")),
                             on=["userID", "newsID", "click"], how="inner")


user_click_train_df = user_click_train_df.select("userID", "newsID", "click")

### Create table for test set

In [10]:
history_click_df = behaviors_dev_df.select("userID", "time",
                                       explode(split("history", " ").alias("newsID")).alias("newsID")) \
                                       .withColumn("click", lit(1.0).cast(DoubleType()))

current_click_df = behaviors_dev_df.select("userID", "time",
                                    explode(split("impressions", " ").alias("click")).alias("click"))
current_click_df = current_click_df.select("userID", "time",
                                     split("click", "-").alias("click"))
current_click_df = current_click_df.select("userID", "time",
                                           current_click_df.click.getItem(0).alias("newsID"),
                                           current_click_df.click.getItem(1).alias("click").cast(DoubleType()))

user_click_test_df = history_click_df.union(current_click_df).dropDuplicates()
user_click_test_df = user_click_test_df.groupBy("userID", "newsID", "click").agg(min("time").alias("time"))
user_click_test_df = user_click_test_df.join(user_click_test_df.groupBy("userID", "newsID").agg(max("click").alias("click")),
                             on=["userID", "newsID", "click"], how="inner")


user_click_test_df = user_click_test_df.select("userID", "newsID", "click")

## Create train set and test set from user_click table

### Recreate the ID for users and articles

In [23]:
# Get list of news' ID
news_ids = news_train_df.select('newsID').dropDuplicates()
user_ids = behaviors_train_df.select('userID').dropDuplicates()

news_ids_map = news_ids.withColumn("new_newsid", monotonically_increasing_id())
user_ids_map = user_ids.withColumn("new_userid", monotonically_increasing_id())

In [24]:
news_ids_map = news_ids_map.join(news_train_df,on=["newsID"], how="inner").select("newsID", "new_newsid", "title")

In [26]:
news_ids_map.toPandas().to_csv("news_ids_map.csv")

In [27]:

user_ids_map.toPandas().to_csv("user_ids_map.csv")

### Create train set and save it

In [None]:
user_click_train_df = user_click_train_df.join(news_ids_map, on=["newsID"], how="inner")

In [None]:
user_click_train_df = user_click_train_df.join(user_ids_map, on=["userID"], how="inner")

In [None]:
training_set = user_click_train_df.select("new_userid","new_newsid", "click")

In [None]:
training_set.show()

+----------+----------+-----+
|new_userid|new_newsid|click|
+----------+----------+-----+
|     36154|     40328|  0.0|
|     36154|     50182|  0.0|
|     36154|     50294|  0.0|
|     36154|     29301|  0.0|
|     36154|     47084|  0.0|
|     36154|     48407|  0.0|
|     36154|     23097|  1.0|
|     36154|     41905|  0.0|
|     36154|     26529|  1.0|
|     36154|     45665|  0.0|
|     36154|     48154|  0.0|
|     36154|     32971|  0.0|
|     36154|       918|  0.0|
|     36154|     37375|  0.0|
|     36154|     40259|  0.0|
|     36154|     46132|  0.0|
|     36154|     50355|  0.0|
|     36154|     42537|  0.0|
|     36154|     28324|  0.0|
|     36154|     37446|  0.0|
+----------+----------+-----+
only showing top 20 rows



In [None]:
training_set.toPandas().to_csv("train_set.csv")

### Create test set and save it

In [None]:
user_click_test_df = user_click_test_df.join(news_ids_map, on=["newsID"], how="inner")
user_click_test_df = user_click_test_df.join(user_ids_map, on=["userID"], how="inner")
test_set = user_click_test_df.select("new_userid","new_newsid", "click")
test_set.toPandas().to_csv("test_set.csv")