In [1]:
#### import and spark init

import sys
import json
import pyspark
from pyspark.sql.functions import col, collect_list, array_join
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import length

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

In [2]:
tedx_dataset_path = "./tedx_dataset.csv"

## READ TEDX DATASET
tedx_dataset = spark.read \
    .option("header","true") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .csv(tedx_dataset_path)
    
tedx_dataset.printSchema()

#### CHECK FILTER ITEMS WITH NULL POSTING KEY
count_items = tedx_dataset.count()
#count_items_null = tedx_dataset.filter("url is not null").count()
count_items_null = tedx_dataset.filter("url like 'http%'").count()

print(f"Number of items from RAW DATA {count_items}")
print(f"Number of items from RAW DATA with NOT NULL KEY {count_items_null}")
print(f"Number of items filtered: {count_items-count_items_null}")

root
 |-- idx: string (nullable = true)
 |-- main_speaker: string (nullable = true)
 |-- title: string (nullable = true)
 |-- details: string (nullable = true)
 |-- posted: string (nullable = true)
 |-- url: string (nullable = true)
 |-- num_views: string (nullable = true)

Number of items from RAW DATA 4494
Number of items from RAW DATA with NOT NULL KEY 4444
Number of items filtered: 50


In [3]:
#### filter TEDx dataset on url field
tedx_dataset_filtered = tedx_dataset.filter("url like 'http%'")
tedx_dataset_filtered.printSchema()

root
 |-- idx: string (nullable = true)
 |-- main_speaker: string (nullable = true)
 |-- title: string (nullable = true)
 |-- details: string (nullable = true)
 |-- posted: string (nullable = true)
 |-- url: string (nullable = true)
 |-- num_views: string (nullable = true)



In [4]:
#### READ TAGS DATASET
tags_dataset_path = "./tags_dataset.csv"
tags_dataset = spark.read.option("header", "true").csv(tags_dataset_path)

#### CREATE THE AGGREGATE MODEL, ADD TAGS TO TEDX_DATASET
tags_dataset_agg = tags_dataset.groupBy(col("idx").alias("idx_ref")).agg(collect_list("tag").alias("tags"))
tedx_dataset_agg = tedx_dataset_filtered.join(tags_dataset_agg, tedx_dataset.idx == tags_dataset_agg.idx_ref, "left").drop("idx_ref")

tedx_dataset_agg.printSchema()
tedx_dataset_agg.select(col('idx'), col('main_speaker'), col('title'), col('url')).show()

root
 |-- idx: string (nullable = true)
 |-- main_speaker: string (nullable = true)
 |-- title: string (nullable = true)
 |-- details: string (nullable = true)
 |-- posted: string (nullable = true)
 |-- url: string (nullable = true)
 |-- num_views: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------+--------------------+--------------------+--------------------+
|                 idx|        main_speaker|               title|                 url|
+--------------------+--------------------+--------------------+--------------------+
|8d2005ec35280deb6...|      Alexandra Auer|The intangible ef...|https://www.ted.c...|
|b3072cd11f40eb57f...|   Elizabeth Gilbert|It's OK to feel o...|https://www.ted.c...|
|4adc9fee977fa04c3...|        Butterscotch|   "Accept Who I Am"|https://www.ted.c...|
|59c641a72b495d522...|          Ethan Lisi|What it's really ...|https://www.ted.c...|
|d227f2faf6ec185e5...|       Daniel Fink

In [5]:
#### READ WATCH NEXT DATASET

watch_next_path = "./watch_next_dataset.csv"
wn_dataset = spark.read.option("header","true").csv(watch_next_path)

wn_dataset.printSchema()

root
 |-- idx: string (nullable = true)
 |-- url: string (nullable = true)
 |-- watch_next_idx: string (nullable = true)



In [6]:
#### DROPPING DUPLICATE

wn_count = wn_dataset.count()
print(f"Pre drop duplicated: {wn_count}")
wn_dataset = wn_dataset.dropDuplicates()
print(f"After drop duplicated: {wn_dataset.count()}")
print(f"Dropped: {wn_count-wn_dataset.count()}")

Pre drop duplicated: 77364
After drop duplicated: 30254
Dropped: 47110


In [7]:
#### FILTER BAD URL

count_items = wn_dataset.count()
count_items_good = wn_dataset.filter("url not like 'https://www.ted.com/session/new?%'").count()

print(f"Number of items PRE {count_items}")
print(f"Number of items AFTER {count_items_good}")
print(f"Number of items filtered: {count_items-count_items_good}")

#### bad url row has a watch_next_id that's not in talk id list - can be safely removed, no info lost
bad_id = wn_dataset.filter("url like 'https://www.ted.com/session/new?%'").select("watch_next_idx").dropDuplicates()
print(f"different bad id: {bad_id.count()}")
print(f"bad watch_next_id: ")
bad_id.show(1, False)
#### search bad_id in talks
print(f"search for bad_id in talks: ")
tedx_dataset_agg.filter("idx = '9f7b1654e792011b7e1c6f4288520226'").show()

#### filter bad id
wn_dataset = wn_dataset.filter("url not like 'https://www.ted.com/session/new?%'")

Number of items PRE 30254
Number of items AFTER 25788
Number of items filtered: 4466
different bad id: 1
bad watch_next_id: 
+--------------------------------+
|watch_next_idx                  |
+--------------------------------+
|9f7b1654e792011b7e1c6f4288520226|
+--------------------------------+

search for bad_id in talks: 
+---+------------+-----+-------+------+---+---------+----+
|idx|main_speaker|title|details|posted|url|num_views|tags|
+---+------------+-----+-------+------+---+---------+----+
+---+------------+-----+-------+------+---+---------+----+



In [8]:
# CREATE THE AGGREGATE MODEL, ADD WATCH NEXT TO TEDX_DATASET

wn_dataset_agg = wn_dataset.groupBy(col("idx").alias("idx_wn")).agg(collect_list("watch_next_idx").alias("watch_next_ids"))
wn_dataset_agg.printSchema()
tedx_dataset_aggf = tedx_dataset_agg.join(wn_dataset_agg, tedx_dataset_agg.idx == wn_dataset_agg.idx_wn, "left") \
.drop("idx_wn") \
.select(col("idx").alias("_id"), col("*")) \
.drop("idx") \

tedx_dataset_aggf.printSchema()
tedx_dataset_aggf.select(col('_id'), col('watch_next_ids')).show()

root
 |-- idx_wn: string (nullable = true)
 |-- watch_next_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)

root
 |-- _id: string (nullable = true)
 |-- main_speaker: string (nullable = true)
 |-- title: string (nullable = true)
 |-- details: string (nullable = true)
 |-- posted: string (nullable = true)
 |-- url: string (nullable = true)
 |-- num_views: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- watch_next_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------+--------------------+
|                 _id|      watch_next_ids|
+--------------------+--------------------+
|8d2005ec35280deb6...|[fe35edd737282ab3...|
|b3072cd11f40eb57f...|[46a2254c4eda643c...|
|4adc9fee977fa04c3...|[edb909effab18969...|
|59c641a72b495d522...|[267958f897f12551...|
|d227f2faf6ec185e5...|[b8cdfc4eb7713a40...|
|fe612cc9179e038f9...|[cf6c51cfc4d748c6...|
|6e67b6ad4cc6b42