In [1]:
import tqdm.notebook as tqdm
import numpy as np
import scipy
import sklearn

# Spark

In [2]:
! /home/jovyan/start-hadoop.sh

jovyan
 * Starting OpenBSD Secure Shell server sshd
start-stop-daemon: unable to set gid to 0 (Operation not permitted)
   ...fail!
 * sshd is running
Starting namenodes on [localhost]
localhost: namenode is running as process 165.  Stop it first and ensure /tmp/hadoop-jovyan-namenode.pid file is empty before retry.
Starting datanodes
localhost: datanode is running as process 280.  Stop it first and ensure /tmp/hadoop-jovyan-datanode.pid file is empty before retry.
Starting secondary namenodes [507e1180e956]
507e1180e956: secondarynamenode is running as process 505.  Stop it first and ensure /tmp/hadoop-jovyan-secondarynamenode.pid file is empty before retry.
Starting resourcemanager
resourcemanager is running as process 740.  Stop it first and ensure /tmp/hadoop-jovyan-resourcemanager.pid file is empty before retry.
Starting nodemanagers
localhost: nodemanager is running as process 847.  Stop it first and ensure /tmp/hadoop-jovyan-nodemanager.pid file is empty before retry.
16256 sun.

In [3]:
# connect, context, session

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')

from pyspark.sql import SparkSession, Row
se = SparkSession(sc)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-05-10 12:51:49,924 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


# HDFS

In [4]:
! hdfs dfs -df -h

Filesystem                 Size   Used  Available  Use%
hdfs://localhost:9000  1006.9 G  1.0 G    943.6 G    0%


In [5]:
! hdfs dfs -ls /

Found 2 items
drwxrwx---   - root   supergroup          0 2023-05-09 20:43 /tmp
drwxr-xr-x   - jovyan supergroup          0 2023-05-09 20:43 /user


## Outbrain click prediction dataseet

In [6]:
!pip install kaggle --upgrade

Collecting kaggle
  Using cached kaggle-1.5.13-py3-none-any.whl
Installing collected packages: kaggle
  Attempting uninstall: kaggle
    Found existing installation: kaggle 1.5.3
    Uninstalling kaggle-1.5.3:
      Successfully uninstalled kaggle-1.5.3
Successfully installed kaggle-1.5.13


In [7]:
! mkdir -p ~/.kaggle

In [8]:
%%writefile ~/.kaggle/kaggle.json
{"username":"nataliamakhova","key":"1a39a597b3fa3321fc5248f677a82e68"}

Overwriting /home/jovyan/.kaggle/kaggle.json


In [9]:
! chmod 600 ~/.kaggle/kaggle.json

In [10]:
! pip install -U urllib3 kaggle==1.5.3
! kaggle competitions download -c outbrain-click-prediction -f page_views_sample.csv.zip
! kaggle competitions download -c outbrain-click-prediction -f documents_topics.csv.zip 

Collecting urllib3
  Using cached urllib3-2.0.2-py3-none-any.whl (123 kB)
Collecting kaggle==1.5.3
  Using cached kaggle-1.5.3-py3-none-any.whl
Installing collected packages: kaggle
  Attempting uninstall: kaggle
    Found existing installation: kaggle 1.5.13
    Uninstalling kaggle-1.5.13:
      Successfully uninstalled kaggle-1.5.13
Successfully installed kaggle-1.5.3
403 - Forbidden
403 - Forbidden


In [11]:
! unzip '*.zip'
! rm -rf *.zip

unzip:  cannot find or open *.zip, *.zip.zip or *.zip.ZIP.

No zipfiles found.


In [12]:
! hdfs dfs -put page_views_sample.csv
! hdfs dfs -put documents_topics.csv

put: `page_views_sample.csv': File exists
put: `documents_topics.csv': File exists


In [14]:
# load data
page_views_sample_df = se.read.csv("page_views_sample.csv", header=True)
documents_topics_df = se.read.csv("documents_topics.csv", header=True)

## Evaluation Assignment

Data: outbrain click prediction

Tasks:
Using Spark RDD, DataFrame API and Python, calculate:

**1**. Top 10 most visited document_ids in the page_views_sample log

**2**. How many users have at least 2 different traffic_sources in the page_views_sample log (note the value is not a count, it's an encoded enum)

**3***. Top 10 most visited topic_ids in page_views_sample log (use documents_topics table)

The submission format is the result.json json file with top_10_documents, users and top_10_topics keys.
For TOP-10 results, the answer must be written in the form of a sheet ordered from TOP-1 to TOP-10 with an id.

result.json example:

    {
        "top_10_documents": [
            111,
            222,
            333,
            ...,
            1010
        ],
        "users": 10000,
        "top_10_topics": [
            11,
            22,
            33,
            ...,
            101
        ]
    }

### 1. Top 10 most visited document_ids in the page_views_sample log

In [16]:
top_10_visited_docs = page_views_sample_df.groupBy('document_id').count().orderBy('count', ascending=False).limit(10)

In [25]:
top_10_visited_docs_list = [int(row['document_id']) for row in top_10_visited_docs.collect()]

                                                                                

In [26]:
top_10_visited_docs_list

[1811567,
 234,
 42744,
 1858440,
 1780813,
 60164,
 1790442,
 1877626,
 1821895,
 732651]

### **2**. How many users have at least 2 different traffic_sources in the page_views_sample log (note the value is not a count, it's an encoded enum)

In [20]:
from pyspark.sql.functions import countDistinct, col

multi_traffic_sources_users = (page_views_sample_df.groupBy('uuid').agg(countDistinct('traffic_source')
                                .alias('distinct_traffic_sources')).filter('distinct_traffic_sources > 1').count())

                                                                                

In [21]:
multi_traffic_sources_users

98080

### **3***. Top 10 most visited topic_ids in page_views_sample log (use documents_topics table)

In [22]:
# Joining the tables:
joined_tables_df = page_views_sample_df.join(documents_topics_df, on='document_id', how='inner')

In [23]:
top_10_topic_ids = (joined_tables_df.groupBy('topic_id').count().orderBy('count', ascending=False).limit(10)
    .rdd.map(lambda row: int(row.topic_id)).collect())

                                                                                

In [24]:
top_10_topic_ids

[20, 16, 216, 136, 140, 143, 36, 97, 8, 269]

## Results

In [31]:
import json

result = {
    'top_10_documents': top_10_visited_docs_list,
    'users': multi_traffic_sources_users,
    'top_10_topics': top_10_topic_ids,
}

with open("result.json", "w") as f:
    json.dump(result, f)

In [32]:
result

{'top_10_documents': [1811567,
  234,
  42744,
  1858440,
  1780813,
  60164,
  1790442,
  1877626,
  1821895,
  732651],
 'users': 98080,
 'top_10_topics': [20, 16, 216, 136, 140, 143, 36, 97, 8, 269]}

In [33]:
!curl -F file=@result.json "51.250.54.133:80/MDS-LSML1/nsmakhova/w4/1"

1.0
Well done!
