In [1]:
import findspark
import numpy as np
import scipy
import sklearn
import tqdm.notebook as tqdm

In [2]:
findspark.init()

In [3]:
import pyspark as ps
from pyspark.sql import SparkSession, Row

## Initialize Spark Context and Session

In [4]:
! /home/jovyan/start-hadoop.sh

jovyan
 * Starting OpenBSD Secure Shell server sshd
start-stop-daemon: unable to set gid to 0 (Operation not permitted)
   ...fail!
 * sshd is running
Starting namenodes on [localhost]
localhost: namenode is running as process 158.  Stop it first and ensure /tmp/hadoop-jovyan-namenode.pid file is empty before retry.
Starting datanodes
localhost: datanode is running as process 263.  Stop it first and ensure /tmp/hadoop-jovyan-datanode.pid file is empty before retry.
Starting secondary namenodes [470b66c3c538]
470b66c3c538: secondarynamenode is running as process 431.  Stop it first and ensure /tmp/hadoop-jovyan-secondarynamenode.pid file is empty before retry.
Starting resourcemanager
resourcemanager is running as process 721.  Stop it first and ensure /tmp/hadoop-jovyan-resourcemanager.pid file is empty before retry.
Starting nodemanagers
localhost: nodemanager is running as process 826.  Stop it first and ensure /tmp/hadoop-jovyan-nodemanager.pid file is empty before retry.
721 org.ap

In [4]:
# I used this piece of code because simple ps.SparkContext didn't work on my machine :(

conf = ps.SparkConf().setMaster("yarn").setAppName("spark_advanced_hw")
conf.set("spark.executor.heartbeatInterval", "60s")
sc = ps.SparkContext('local[1]', '', conf=conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-05-07 11:25:01,512 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
se = SparkSession(sc)

## Download the data

In [6]:
! mkdir -p ~/.kaggle

In [7]:
%%writefile ~/.kaggle/kaggle.json
{"username":"pavelsyomin","key":"78cb130c1d8b31a4180af8db35ee3f89"}

Overwriting /home/jovyan/.kaggle/kaggle.json


In [8]:
! chmod 600 ~/.kaggle/kaggle.json

In [9]:
# We need only page_views_sample and document_topics
! pip install -U urllib3 kaggle==1.5.3
! kaggle competitions download -c outbrain-click-prediction -f page_views_sample.csv.zip
! kaggle competitions download -c outbrain-click-prediction -f documents_topics.csv.zip 

Collecting urllib3
  Using cached urllib3-2.0.2-py3-none-any.whl (123 kB)
Downloading page_views_sample.csv.zip to /home/jovyan/work
100%|███████████████████████████████████████▊| 148M/149M [00:33<00:00, 4.55MB/s]
100%|████████████████████████████████████████| 149M/149M [00:33<00:00, 4.64MB/s]
Downloading documents_topics.csv.zip to /home/jovyan/work
100%|████████████████████████████████████████| 121M/121M [00:27<00:00, 4.56MB/s]
100%|████████████████████████████████████████| 121M/121M [00:27<00:00, 4.63MB/s]


In [10]:
! unzip '*.zip'
! rm -rf *.zip

Archive:  documents_topics.csv.zip
  inflating: documents_topics.csv    

Archive:  page_views_sample.csv.zip
  inflating: page_views_sample.csv   

2 archives were successfully processed.


In [16]:
! hdfs dfs -put page_views_sample.csv
! hdfs dfs -put documents_topics.csv

In [30]:
! hdfs dfs -ls 

Found 3 items
drwxr-xr-x   - jovyan supergroup          0 2023-05-07 05:54 .sparkStaging
-rw-r--r--   1 jovyan supergroup  339473038 2023-05-07 11:31 documents_topics.csv
-rw-r--r--   1 jovyan supergroup  454346554 2023-05-07 11:31 page_views_sample.csv


## Read the data to Spark Dataframe

In [31]:
page_views = se.read.csv("page_views_sample.csv", header=True)
page_views.registerTempTable("page_views")
page_views.show(3)

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
+--------------+-----------+---------+--------+------------+--------------+
only showing top 3 rows



In [32]:
documents_topics = se.read.csv("documents_topics.csv", header=True)
documents_topics.registerTempTable("document_topics")
documents_topics.show(3)

+-----------+--------+------------------+
|document_id|topic_id|  confidence_level|
+-----------+--------+------------------+
|    1595802|     140|0.0731131601068925|
|    1595802|      16|0.0594164867373976|
|    1595802|     143|0.0454207537554526|
+-----------+--------+------------------+
only showing top 3 rows



## Complete the tasks

Data: outbrain click prediction

Tasks:
Using Spark RDD, DataFrame API and Python, calculate:

**1**. Top 10 most visited document_ids in the page_views_sample log

**2**. How many users have at least 2 different traffic_sources in the page_views_sample log (note the value is not a count, it's an encoded enum)

**3***. Top 10 most visited topic_ids in page_views_sample log (use documents_topics table)

The submission format is the result.json json file with top_10_documents, users and top_10_topics keys.
For TOP-10 results, the answer must be written in the form of a sheet ordered from TOP-1 to TOP-10 with an id.

result.json example:

    {
        "top_10_documents": [
            111,
            222,
            333,
            ...,
            1010
        ],
        "users": 10000,
        "top_10_topics": [
            11,
            22,
            33,
            ...,
            101
        ]
    }

### 1: top-10 documents

In [33]:
top_10_documents = [
    int(row.document_id) 
    for row in page_views.groupby("document_id").count().sort("count", ascending=False).head(10)
]
top_10_documents

                                                                                

[1811567,
 234,
 42744,
 1858440,
 1780813,
 60164,
 1790442,
 1877626,
 1821895,
 732651]

### 2: users with more than one traffic source

In [35]:
users = se.sql("""
    SELECT uuid, count(DISTINCT traffic_source) as cnt
    FROM page_views
    GROUP BY uuid
    HAVING count(DISTINCT traffic_source) > 1
""").count()
users

                                                                                

98080

### 3: top-10 topics

In [36]:
top_10_topics = [
    int(row.topic_id)
    for row in (
        page_views
          .join(documents_topics, page_views.document_id == documents_topics.document_id, "left")
          .select(page_views.document_id, documents_topics.topic_id)
          .groupby("topic_id")
          .count()
          .sort("count", ascending=False)
          .limit(10)
          .collect()
    )
]
top_10_topics

                                                                                

[20, 16, 216, 136, 140, 143, 36, 97, 8, 269]

## Prepare result and submit it

In [37]:
import json
result = {
    "top_10_documents": top_10_documents,
    "users": users,
    "top_10_topics": top_10_topics
}
with open("result.json", "w") as f:
    json.dump(result, f)
result

{'top_10_documents': [1811567,
  234,
  42744,
  1858440,
  1780813,
  60164,
  1790442,
  1877626,
  1821895,
  732651],
 'users': 98080,
 'top_10_topics': [20, 16, 216, 136, 140, 143, 36, 97, 8, 269]}

In [38]:
! curl -F file=@result.json "51.250.54.133:80/MDS-LSML1/findabalance22/w4/1"

1.0
Well done!
