In [1]:
# Dana Rozenblum & Efrat Magidov

# Checklist:
# AWS emr-5.29.0
# MASTER r5d.2xlarge 1x, no EBS
# CORE r5d.2xlarge 2x, no EBS
# Custom bootstrap action: s3://ydatazian/bootstrap.sh
# Allow ssh in master node security group

In [2]:
import tqdm.notebook as tqdm
import numpy as np
import scipy
import sklearn

# Spark

In [3]:
# connect, context, session

import findspark
findspark.init()

import spark_utils
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkContext("yarn", "My App", conf=spark_utils.get_spark_conf())

spark_utils.print_ui_links()


NameNode: http://ec2-44-192-126-69.compute-1.amazonaws.com:50070
YARN: http://ec2-44-192-126-69.compute-1.amazonaws.com:8088
Spark UI: http://ec2-44-192-126-69.compute-1.amazonaws.com:20888/proxy/application_1622111155906_0002


In [4]:
se = SparkSession(sc)

## Outbrain click prediction dataseet

https://www.kaggle.com/c/outbrain-click-prediction/data

### AWS S3

In [5]:
! aws s3 ls s3://ydatazian

                           PRE week1/
2021-05-21 04:09:44       1874 bootstrap.sh
2021-05-13 21:20:22  176843889 clicks_test.parquet
2021-05-13 21:20:22  495815517 clicks_train.parquet
2021-05-13 21:21:58   34267065 documents_categories.parquet
2021-05-13 21:21:58  206455957 documents_entities.parquet
2021-05-13 21:21:58   23859965 documents_meta.parquet
2021-05-13 21:21:58  187410196 documents_topics.parquet
2021-05-13 21:21:58  734643471 events.parquet
2021-05-13 21:56:44 50764611872 page_views.parquet
2021-05-13 21:21:58  248421413 page_views_sample.parquet
2021-05-13 21:21:59    5116927 promoted_content.parquet
2021-05-13 21:21:58  273136709 sample_submission.csv


In [6]:
from IPython.display import display
tables = ["clicks_test",
          "clicks_train", 
          "documents_categories",
          "documents_entities",
          "documents_meta",
          "documents_topics", 
          "events",
          "page_views",
          "page_views_sample",
          "promoted_content"]
for name in tqdm.tqdm(tables):
    df = se.read.parquet("s3://ydatazian/{}.parquet".format(name))
    df.registerTempTable(name)
    print(name)
    df.limit(3).show()

  0%|          | 0/10 [00:00<?, ?it/s]

clicks_test
+----------+------+
|display_id| ad_id|
+----------+------+
|  16874594| 66758|
|  16874594|150083|
|  16874594|162754|
+----------+------+

clicks_train
+----------+------+-------+
|display_id| ad_id|clicked|
+----------+------+-------+
|         1| 42337|      0|
|         1|139684|      0|
|         1|144739|      1|
+----------+------+-------+

documents_categories
+-----------+-----------+----------------+
|document_id|category_id|confidence_level|
+-----------+-----------+----------------+
|    1595802|       1611|            0.92|
|    1595802|       1610|            0.07|
|    1524246|       1807|            0.92|
+-----------+-----------+----------------+

documents_entities
+-----------+--------------------+-----------------+
|document_id|           entity_id| confidence_level|
+-----------+--------------------+-----------------+
|    1524246|f9eec25663db4cd83...|0.672865314504701|
|    1524246|55ebcfbdaff1d6f60...|0.399113728441297|
|    1524246|839907a972930b17b

In [7]:
page_views = se.table("page_views")
print(page_views)
page_views.show(5)

DataFrame[uuid: string, document_id: string, timestamp: string, platform: string, geo_location: string, traffic_source: string]
+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
+--------------+-----------+---------+--------+------------+--------------+
only showing top 5 rows



## HW

Dataset: outbrain click prediction

Tasks:
using Spark RDD, DataFrame API and Python, complete following tasks:

**1 (1 point)**. Find 10 most visited document_ids in page_views log

**2 (1 point)**. Find out how many users have at least two different traffic_sources in their page_views log

**3* (1 additional point)**. Find 10 most visited topic_ids in page_views log (use documents_topics table)

In [8]:
# 1 (1 point). Find 10 most visited document_ids in page_views log

page_views_df = se.read.parquet("s3://ydatazian/{}.parquet".format("page_views"))

(
    page_views_df
      .groupby('document_id')
      .count()
      .orderBy('count', ascending=False)
      .select(['document_id', 'count'])
      .limit(10)
      .show()
)

+-----------+--------+
|document_id|   count|
+-----------+--------+
|    1179111|26688981|
|     394689|10454691|
|       2191| 5044391|
|       7054| 4996916|
|      38922| 4930261|
|    1154100| 4610780|
|     357569| 4046253|
|    1827718| 3046274|
|        234| 2359994|
|     467462| 2328065|
+-----------+--------+



In [11]:
# 2 (1 point). Find out how many users have at least two different traffic_sources in their page_views log

from pyspark.sql import functions as F

different = (
    page_views_df
      .groupby('uuid')
      .agg(F.countDistinct("traffic_source").alias('count'))
      .filter('count > 1')
)
different.count()

93342305

In [12]:
# 3* (1 additional point). Find 10 most visited topic_ids in page_views log (use documents_topics table)

documents_topics_df = se.read.parquet("s3://ydatazian/{}.parquet".format("documents_topics"))

(
    page_views_df
      .join(documents_topics_df, page_views_df.document_id == documents_topics_df.document_id)
      .groupby(documents_topics_df.topic_id)
      .count()
      .orderBy('count', ascending=False)
      .select(['topic_id', 'count'])
      .limit(10)
      .show()
)

+--------+---------+
|topic_id|    count|
+--------+---------+
|      16|226580657|
|     140|175020525|
|      20|171010359|
|     143|153101583|
|     136|137553975|
|     216|109567943|
|       8|105683622|
|     160| 99645579|
|     181| 98421838|
|      97| 90551102|
+--------+---------+

