### Imports and spark initialization

In [1]:
import sys
import json
import pyspark
from pyspark.sql.functions import col, collect_list, array_join
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import length

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

### Load text_dataset

In [2]:
tedx_dataset_path = "./tedx_dataset.csv"

## READ TEDX DATASET
tedx_dataset = spark.read \
    .option("header","true") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .csv(tedx_dataset_path)
    
tedx_dataset.printSchema()

#### FILTER ITEMS WITH NULL POSTING KEY
count_items = tedx_dataset.count()
count_items_null = tedx_dataset.filter("idx is not null").count()

print(f"Number of items from RAW DATA {count_items}")
print(f"Number of items from RAW DATA with NOT NULL KEY {count_items_null}")

root
 |-- idx: string (nullable = true)
 |-- main_speaker: string (nullable = true)
 |-- title: string (nullable = true)
 |-- details: string (nullable = true)
 |-- posted: string (nullable = true)
 |-- url: string (nullable = true)
 |-- num_views: string (nullable = true)

Number of items from RAW DATA 4494
Number of items from RAW DATA with NOT NULL KEY 4494


### Load tags dataset and join with tags dataset

In [3]:
## READ TAGS DATASET
tags_dataset_path = "./tags_dataset.csv"
tags_dataset = spark.read.option("header", "true").csv(tags_dataset_path)

# CREATE THE AGGREGATE MODEL, ADD TAGS TO TEDX_DATASET
tags_dataset_agg = tags_dataset.groupBy(col("idx").alias("idx_ref")).agg(collect_list("tag").alias("tags"))
tedx_dataset_agg = tedx_dataset.join(tags_dataset_agg, tedx_dataset.idx == tags_dataset_agg.idx_ref, "left").drop("idx_ref")

tedx_dataset_agg.printSchema()
tedx_dataset.select(col('idx'), col('main_speaker'), col('title')).show()

root
 |-- idx: string (nullable = true)
 |-- main_speaker: string (nullable = true)
 |-- title: string (nullable = true)
 |-- details: string (nullable = true)
 |-- posted: string (nullable = true)
 |-- url: string (nullable = true)
 |-- num_views: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------+--------------------+--------------------+
|                 idx|        main_speaker|               title|
+--------------------+--------------------+--------------------+
|8d2005ec35280deb6...|      Alexandra Auer|The intangible ef...|
|b3072cd11f40eb57f...|   Elizabeth Gilbert|It's OK to feel o...|
|4adc9fee977fa04c3...|        Butterscotch|   "Accept Who I Am"|
|59c641a72b495d522...|          Ethan Lisi|What it's really ...|
|d227f2faf6ec185e5...|       Daniel Finkel|Can you solve the...|
|fe612cc9179e038f9...|         Matt Walker|Why sleep matters...|
|6e67b6ad4cc6b420f...|          Sonia Shah|How to make pa

We saw a problem with some rows
```
...
|Elisabeth Pierre ...|                null|                null|
|Elisabeth est zyt...| une des quelques...| chercheurs et cu...|
...
```
We investigated and, after looking into the original dataset, we concluded that some rows from the csv files some issues. We decided to filter them out by checking the length of the 'idx' column

In [4]:
print(f"Pre filtering: {tedx_dataset_agg.where(length(col('idx')) != 32).count()}")
tedx_dataset_agg = tedx_dataset_agg.filter(length(col('idx')) == 32)
print(f"After filtering: {tedx_dataset_agg.where(length(col('idx')) != 32).count()}")

Pre filtering: 27
After filtering: 0


### load watch_next dataset

In [5]:
## READ TEDX DATASET
watch_next_path = "./watch_next_dataset.csv"
wn_dataset = spark.read.option("header","true").csv(watch_next_path)

wn_dataset.printSchema()

root
 |-- idx: string (nullable = true)
 |-- url: string (nullable = true)
 |-- watch_next_idx: string (nullable = true)



The watch_next dataset seems to contains a lot of repetitions. We use `.dropDuplicates()` to remove them

In [6]:
print(f"Pre drop duplicated: {wn_dataset.count()}")
wn_dataset = wn_dataset.dropDuplicates()
print(f"After drop duplicated: {wn_dataset.count()}")

Pre drop duplicated: 77364
After drop duplicated: 30254


Add watch_next ids to speech items

In [7]:
# CREATE THE AGGREGATE MODEL, ADD WATCH NEXT TO TEDX_DATASET
wn_dataset_agg = wn_dataset.groupBy(col("idx").alias("idx_wn")).agg(collect_list("watch_next_idx").alias("watch_next_ids"))
wn_dataset_agg.printSchema()
tedx_dataset_aggf = tedx_dataset_agg.join(wn_dataset_agg, tedx_dataset_agg.idx == wn_dataset_agg.idx_wn, "left") \
.drop("idx_wn") \
.select(col("idx").alias("_id"), col("*")) \
.drop("idx")
#Con la select prendo idx e lo rinomino _id, le altre (tutte) colonne son prese pari pari
tedx_dataset_aggf.printSchema()
tedx_dataset_aggf.select(col('_id'), col('watch_next_ids')).show()

root
 |-- idx_wn: string (nullable = true)
 |-- watch_next_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)

root
 |-- _id: string (nullable = true)
 |-- main_speaker: string (nullable = true)
 |-- title: string (nullable = true)
 |-- details: string (nullable = true)
 |-- posted: string (nullable = true)
 |-- url: string (nullable = true)
 |-- num_views: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- watch_next_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------+--------------------+
|                 _id|      watch_next_ids|
+--------------------+--------------------+
|8d2005ec35280deb6...|[fe35edd737282ab3...|
|b3072cd11f40eb57f...|[46a2254c4eda643c...|
|4adc9fee977fa04c3...|[9f7b1654e792011b...|
|59c641a72b495d522...|[9f7b1654e792011b...|
|d227f2faf6ec185e5...|[b8cdfc4eb7713a40...|
|fe612cc9179e038f9...|[9f7b1654e792011b...|
|6e67b6ad4cc6b42