In [1]:
import tqdm.notebook as tqdm
import numpy as np
import scipy
import sklearn

# Spark

In [2]:
! /home/jovyan/start-hadoop.sh

jovyan
 * Starting OpenBSD Secure Shell server sshd
start-stop-daemon: unable to set gid to 0 (Operation not permitted)
   ...fail!
 * sshd is running
Starting namenodes on [localhost]
localhost: namenode is running as process 163.  Stop it first and ensure /tmp/hadoop-jovyan-namenode.pid file is empty before retry.
Starting datanodes
localhost: datanode is running as process 307.  Stop it first and ensure /tmp/hadoop-jovyan-datanode.pid file is empty before retry.
Starting secondary namenodes [615c73bbc755]
615c73bbc755: secondarynamenode is running as process 497.  Stop it first and ensure /tmp/hadoop-jovyan-secondarynamenode.pid file is empty before retry.
Starting resourcemanager
resourcemanager is running as process 724.  Stop it first and ensure /tmp/hadoop-jovyan-resourcemanager.pid file is empty before retry.
Starting nodemanagers
localhost: nodemanager is running as process 838.  Stop it first and ensure /tmp/hadoop-jovyan-nodemanager.pid file is empty before retry.
2656 sun.t

In [3]:
# connect, context, session

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')

from pyspark.sql import SparkSession, Row
se = SparkSession(sc)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-05-09 10:37:05,493 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


## HDFS

In [4]:
! hdfs dfs -df -h

Filesystem                Size     Used  Available  Use%
hdfs://localhost:9000  467.9 G  291.6 M    125.1 G    0%


In [5]:
! hdfs dfs -ls /

Found 2 items
drwxrwx---   - root   supergroup          0 2023-05-09 10:37 /tmp
drwxr-xr-x   - jovyan supergroup          0 2023-05-09 10:37 /user


## Broadcast and accumulator

In [6]:
bc = sc.broadcast({"this": 0, "is": 1, "text": 2})
errors = sc.accumulator(0)

def mapper(x):
    global errors
    for w in x.split():
        if w in bc.value:
            yield (bc.value[w], 1)
        else:
            errors += 1

rdd = (
    sc
   .parallelize(["this is text", "text too"])
   .flatMap(mapper)
   .reduceByKey(lambda a, b: a + b)
)
print(rdd)
print(rdd.collect())
print("errors:", errors.value)

PythonRDD[5] at RDD at PythonRDD.scala:53




[(0, 1), (1, 1), (2, 2)]
errors: 1


                                                                                

## DataFrame API

RDD is much better and useful than plain MapReduce, but Spark can do even more!
Spark DataFrame is table structure over RDDs and can be treated as pandas on steroids.

It allows us to perform structured queries and benefit from it. One way is to perform SQL-styled queries (will discuss on next lesson) and another is DataFrame API.

Documentation: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html

In [7]:
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4)])
rdd.collect()

[('a', 1), ('a', 2), ('b', 3), ('b', 4)]

In [8]:
df = se.createDataFrame(rdd)
df.printSchema()
df.show()

                                                                                

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



                                                                                

+---+---+
| _1| _2|
+---+---+
|  a|  1|
|  a|  2|
|  b|  3|
|  b|  4|
+---+---+



In [9]:
from pyspark.sql import Row

df = se.createDataFrame(
    rdd.map(lambda x: Row(col_one=x[0], col_two=x[1]))
)
df.printSchema()
df.show()

root
 |-- col_one: string (nullable = true)
 |-- col_two: long (nullable = true)

+-------+-------+
|col_one|col_two|
+-------+-------+
|      a|      1|
|      a|      2|
|      b|      3|
|      b|      4|
+-------+-------+



In [10]:
df.select(['col_one']).limit(2).show()

+-------+
|col_one|
+-------+
|      a|
|      a|
+-------+



In [11]:
df.select(['col_one']).distinct().rdd.map(lambda x: x.col_one).collect()

                                                                                

['a', 'b']

Docs: https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [12]:
from pyspark.sql import functions as F

(
  df.select(['col_one', 'col_two'])
    .where(F.col('col_one') == 'a')
    .limit(2)
    .show()
)



+-------+-------+
|col_one|col_two|
+-------+-------+
|      a|      1|
|      a|      2|
+-------+-------+



                                                                                

In [13]:
df = df.select('*', df['col_two'].cast('float').alias('col_two_float'))
df.show()

+-------+-------+-------------+
|col_one|col_two|col_two_float|
+-------+-------+-------------+
|      a|      1|          1.0|
|      a|      2|          2.0|
|      b|      3|          3.0|
|      b|      4|          4.0|
+-------+-------+-------------+



In [14]:
square_df = df.select('col_one', (df['col_two_float'] * df['col_two_float']).alias('col_two_square'))
square_df.orderBy('col_two_square', ascending=False).show(5)



+-------+--------------+
|col_one|col_two_square|
+-------+--------------+
|      b|          16.0|
|      b|           9.0|
|      a|           4.0|
|      a|           1.0|
+-------+--------------+



                                                                                

In [15]:
from pyspark.sql import functions as F

(
    df
      .groupby('col_one')
      .agg(F.collect_list("col_two").alias("col_two_list"))
      .select(['col_one', 'col_two_list'])
      .limit(10)
      .show()
)



+-------+------------+
|col_one|col_two_list|
+-------+------------+
|      a|      [2, 1]|
|      b|      [3, 4]|
+-------+------------+



                                                                                

In [16]:
list(df.toLocalIterator())

[Row(col_one='a', col_two=1, col_two_float=1.0),
 Row(col_one='a', col_two=2, col_two_float=2.0),
 Row(col_one='b', col_two=3, col_two_float=3.0),
 Row(col_one='b', col_two=4, col_two_float=4.0)]

In [17]:
df = se.createDataFrame(df.toLocalIterator())

In [18]:
df.printSchema()
df.show()

root
 |-- col_one: string (nullable = true)
 |-- col_two: long (nullable = true)
 |-- col_two_float: double (nullable = true)

+-------+-------+-------------+
|col_one|col_two|col_two_float|
+-------+-------+-------------+
|      a|      1|          1.0|
|      a|      2|          2.0|
|      b|      3|          3.0|
|      b|      4|          4.0|
+-------+-------+-------------+



In [19]:
# also can get RDD from DF
df.rdd.collect()

                                                                                

[Row(col_one='a', col_two=1, col_two_float=1.0),
 Row(col_one='a', col_two=2, col_two_float=2.0),
 Row(col_one='b', col_two=3, col_two_float=3.0),
 Row(col_one='b', col_two=4, col_two_float=4.0)]

## Data formats

In [20]:
# We may want to operate with not just plain text, but something more complex
# For example, Parquet - it can be useful for huge datasets for faster calcs
df.write.save("data.parquet")

                                                                                

In [21]:
data = se.read.parquet("data.parquet")
data.rdd.collect()

[Row(col_one='a', col_two=1, col_two_float=1.0),
 Row(col_one='a', col_two=2, col_two_float=2.0),
 Row(col_one='b', col_two=3, col_two_float=3.0),
 Row(col_one='b', col_two=4, col_two_float=4.0)]

## Outbrain click prediction dataseet

https://www.kaggle.com/c/outbrain-click-prediction/data - you need to register here

In [None]:
# https://github.com/Kaggle/kaggle-api
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# instruction where to get ~/.kaggle/kaggle.json file

Collecting kaggle
  Using cached kaggle-1.5.12-py3-none-any.whl
Installing collected packages: kaggle
  Attempting uninstall: kaggle
    Found existing installation: kaggle 1.5.12
    Uninstalling kaggle-1.5.12:
      Successfully uninstalled kaggle-1.5.12
Successfully installed kaggle-1.5.12


In [22]:
! mkdir -p ~/.kaggle

In [27]:
%%writefile ~/.kaggle/kaggle.json
{"username":"ashishparmar","key":"###"}

Overwriting /home/jovyan/.kaggle/kaggle.json


In [28]:
! chmod 600 ~/.kaggle/kaggle.json

In [34]:
! pip install -U urllib3 kaggle==1.5.3
! kaggle competitions download -c outbrain-click-prediction -f page_views_sample.csv.zip
! kaggle competitions download -c outbrain-click-prediction -f events.csv.zip 
! kaggle competitions download -c outbrain-click-prediction -f documents_topics.csv.zip 
! kaggle competitions download -c outbrain-click-prediction -f documents_entities.csv.zip 
! kaggle competitions download -c outbrain-click-prediction -f documents_meta.csv.zip 
! kaggle competitions download -c outbrain-click-prediction -f clicks_test.csv.zip
! kaggle competitions download -c outbrain-click-prediction -f clicks_train.csv.zip
! kaggle competitions download -c outbrain-click-prediction -f documents_categories.csv.zip
! kaggle competitions download -c outbrain-click-prediction -f promoted_content.csv.zip

Collecting urllib3
  Using cached urllib3-2.0.2-py3-none-any.whl (123 kB)
403 - Forbidden
403 - Forbidden
403 - Forbidden
403 - Forbidden
403 - Forbidden
403 - Forbidden
403 - Forbidden
403 - Forbidden
403 - Forbidden


In [14]:
! unzip '*.zip'
! rm -rf *.zip

Archive:  documents_topics.csv.zip
  inflating: documents_topics.csv    

Archive:  documents_categories.csv.zip
  inflating: documents_categories.csv  

Archive:  events.csv.zip
  inflating: events.csv              

Archive:  clicks_train.csv.zip
  inflating: clicks_train.csv        

Archive:  documents_entities.csv.zip
  inflating: documents_entities.csv  

Archive:  documents_meta.csv.zip
  inflating: documents_meta.csv      

Archive:  page_views_sample.csv.zip
  inflating: page_views_sample.csv   

Archive:  clicks_test.csv.zip
  inflating: clicks_test.csv         

8 archives were successfully processed.


In [36]:
! hdfs dfs -put page_views_sample.csv
! hdfs dfs -put documents_topics.csv

put: `page_views_sample.csv': File exists
put: `documents_topics.csv': File exists


In [40]:
# load data
pv = se.read.csv("page_views_sample.csv", header=True)
pv.show(5)

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
+--------------+-----------+---------+--------+------------+--------------+
only showing top 5 rows



In [42]:
dt = se.read.csv("documents_topics.csv", header=True)
dt.show(5)

+-----------+--------+------------------+
|document_id|topic_id|  confidence_level|
+-----------+--------+------------------+
|    1595802|     140|0.0731131601068925|
|    1595802|      16|0.0594164867373976|
|    1595802|     143|0.0454207537554526|
|    1595802|     170|0.0388674285182961|
|    1524246|     113| 0.196450402209685|
+-----------+--------+------------------+
only showing top 5 rows



## Evaluation Assignment

Data: outbrain click prediction

Tasks:
Using Spark RDD, DataFrame API and Python, calculate:

**1**. Top 10 most visited document_ids in the page_views_sample log

**2**. How many users have at least 2 different traffic_sources in the page_views_sample log (note the value is not a count, it's an encoded enum)

**3***. Top 10 most visited topic_ids in page_views_sample log (use documents_topics table)

### 1. Top 10 most visited document ids in the page_views_sample log

In [44]:
top_10 = (
    pv.groupBy("document_id")
    .count()
    .orderBy("count", ascending=False)
    .limit(10)
    .rdd.map(lambda row: int(row.document_id))
    .collect()
)
top_10

                                                                                

[1811567,
 234,
 42744,
 1858440,
 1780813,
 60164,
 1790442,
 1877626,
 1821895,
 732651]

## 2. Number of users who have atleast 2 different traffic sources in the page_views_sample log.

In [46]:
from pyspark.sql.functions import countDistinct

users = (
    pv.groupBy("uuid")
    .agg(countDistinct("traffic_source").alias("distinct_traffic_sources"))
    .filter("distinct_traffic_sources >= 2")
    .count()
)
users

                                                                                

98080

## 3. Top 10 most visited topic ids in page_views_sample log.

In [48]:
joined_df = pv.join(dt, on="document_id", how="inner")

top_10_ = (
    joined_df.groupBy("topic_id")
    .count()
    .orderBy("count", ascending=False)
    .limit(10)
    .rdd.map(lambda row: int(row.topic_id))
    .collect()
)
top_10_

                                                                                

[20, 16, 216, 136, 140, 143, 36, 97, 8, 269]

The submission format is the result.json json file with top_10_documents, users and top_10_topics keys.
For TOP-10 results, the answer must be written in the form of a sheet ordered from TOP-1 to TOP-10 with an id.

result.json example:

    {
        "top_10_documents": [
            111,
            222,
            333,
            ...,
            1010
        ],
        "users": 10000,
        "top_10_topics": [
            11,
            22,
            33,
            ...,
            101
        ]
    }

In [49]:
import json

result = {
    "top_10_documents": top_10,
    "users": users,
    "top_10_topics": top_10_,
}

with open("result.json", "w") as f:
    json.dump(result, f)

In [50]:
! cat result.json

{"top_10_documents": [1811567, 234, 42744, 1858440, 1780813, 60164, 1790442, 1877626, 1821895, 732651], "users": 98080, "top_10_topics": [20, 16, 216, 136, 140, 143, 36, 97, 8, 269]}

In [51]:
! curl -F file=@result.json "51.250.54.133:80/MDS-LSML1/unknown_14/w4/1"

1.0
Well done!
