In [1]:
import tqdm.notebook as tqdm
import numpy as np
import scipy
import sklearn

# Spark

In [2]:
! /home/jovyan/start-hadoop.sh

jovyan
 * Starting OpenBSD Secure Shell server sshd
start-stop-daemon: unable to set gid to 0 (Operation not permitted)
   ...fail!
 * sshd is running
Starting namenodes on [localhost]
localhost: namenode is running as process 166.  Stop it first and ensure /tmp/hadoop-jovyan-namenode.pid file is empty before retry.
Starting datanodes
localhost: datanode is running as process 272.  Stop it first and ensure /tmp/hadoop-jovyan-datanode.pid file is empty before retry.
Starting secondary namenodes [c5cc60baf499]
c5cc60baf499: secondarynamenode is running as process 502.  Stop it first and ensure /tmp/hadoop-jovyan-secondarynamenode.pid file is empty before retry.
Starting resourcemanager
resourcemanager is running as process 767.  Stop it first and ensure /tmp/hadoop-jovyan-resourcemanager.pid file is empty before retry.
Starting nodemanagers
localhost: nodemanager is running as process 885.  Stop it first and ensure /tmp/hadoop-jovyan-nodemanager.pid file is empty before retry.
272 org.ap

In [3]:
# connect, context, session

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')

from pyspark.sql import SparkSession, Row
se = SparkSession(sc)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-04-11 07:54:55,670 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


## HDFS

In [4]:
! hdfs dfs -df -h

Filesystem                Size   Used  Available  Use%
hdfs://localhost:9000  251.0 G  4.7 G    208.6 G    2%


In [5]:
! hdfs dfs -ls /

Found 4 items
drwxr-xr-x   - jovyan supergroup          0 2023-04-11 03:22 /actors.jsonl
drwxrwx---   - root   supergroup          0 2023-04-10 18:19 /tmp
drwxr-xr-x   - jovyan supergroup          0 2023-04-10 18:19 /user
drwxr-xr-x   - jovyan supergroup          0 2023-04-11 03:21 /wiki


## Broadcast and accumulator

In [6]:
bc = sc.broadcast({"this": 0, "is": 1, "text": 2})
errors = sc.accumulator(0)

def mapper(x):
    global errors
    for w in x.split():
        if w in bc.value:
            yield (bc.value[w], 1)
        else:
            errors += 1

rdd = (
    sc
   .parallelize(["this is text", "text too"])
   .flatMap(mapper)
   .reduceByKey(lambda a, b: a + b)
)
print(rdd)
print(rdd.collect())
print("errors:", errors.value)

PythonRDD[5] at RDD at PythonRDD.scala:53




[(0, 1), (1, 1), (2, 2)]
errors: 1


                                                                                

## DataFrame API

RDD is much better and useful than plain MapReduce, but Spark can do even more!
Spark DataFrame is table structure over RDDs and can be treated as pandas on steroids.

It allows us to perform structured queries and benefit from it. One way is to perform SQL-styled queries (will discuss on next lesson) and another is DataFrame API.

Documentation: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html

In [7]:
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4)])
rdd.collect()

[('a', 1), ('a', 2), ('b', 3), ('b', 4)]

In [8]:
df = se.createDataFrame(rdd)
df.printSchema()
df.show()

                                                                                

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



                                                                                

+---+---+
| _1| _2|
+---+---+
|  a|  1|
|  a|  2|
|  b|  3|
|  b|  4|
+---+---+



                                                                                

In [9]:
from pyspark.sql import Row

df = se.createDataFrame(
    rdd.map(lambda x: Row(col_one=x[0], col_two=x[1]))
)
df.printSchema()
df.show()

                                                                                

root
 |-- col_one: string (nullable = true)
 |-- col_two: long (nullable = true)



                                                                                

+-------+-------+
|col_one|col_two|
+-------+-------+
|      a|      1|
|      a|      2|
|      b|      3|
|      b|      4|
+-------+-------+



In [10]:
df.select(['col_one']).limit(2).show()



+-------+
|col_one|
+-------+
|      a|
|      a|
+-------+



                                                                                

In [11]:
df.select(['col_one']).distinct().rdd.map(lambda x: x.col_one).collect()

                                                                                

['a', 'b']

Docs: https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [12]:
from pyspark.sql import functions as F

(
  df.select(['col_one', 'col_two'])
    .where(F.col('col_one') == 'a')
    .limit(2)
    .show()
)



+-------+-------+
|col_one|col_two|
+-------+-------+
|      a|      1|
|      a|      2|
+-------+-------+



                                                                                

In [13]:
df = df.select('*', df['col_two'].cast('float').alias('col_two_float'))
df.show()

                                                                                

+-------+-------+-------------+
|col_one|col_two|col_two_float|
+-------+-------+-------------+
|      a|      1|          1.0|
|      a|      2|          2.0|
|      b|      3|          3.0|
|      b|      4|          4.0|
+-------+-------+-------------+



In [14]:
square_df = df.select('col_one', (df['col_two_float'] * df['col_two_float']).alias('col_two_square'))
square_df.orderBy('col_two_square', ascending=False).show(5)

                                                                                

+-------+--------------+
|col_one|col_two_square|
+-------+--------------+
|      b|          16.0|
|      b|           9.0|
|      a|           4.0|
|      a|           1.0|
+-------+--------------+



In [15]:
from pyspark.sql import functions as F

(
    df
      .groupby('col_one')
      .agg(F.collect_list("col_two").alias("col_two_list"))
      .select(['col_one', 'col_two_list'])
      .limit(10)
      .show()
)



+-------+------------+
|col_one|col_two_list|
+-------+------------+
|      b|      [4, 3]|
|      a|      [1, 2]|
+-------+------------+



                                                                                

In [16]:
list(df.toLocalIterator())

[Row(col_one='a', col_two=1, col_two_float=1.0),
 Row(col_one='a', col_two=2, col_two_float=2.0),
 Row(col_one='b', col_two=3, col_two_float=3.0),
 Row(col_one='b', col_two=4, col_two_float=4.0)]

In [17]:
df = se.createDataFrame(df.toLocalIterator())

In [18]:
df.printSchema()
df.show()

root
 |-- col_one: string (nullable = true)
 |-- col_two: long (nullable = true)
 |-- col_two_float: double (nullable = true)



                                                                                

+-------+-------+-------------+
|col_one|col_two|col_two_float|
+-------+-------+-------------+
|      a|      1|          1.0|
|      a|      2|          2.0|
|      b|      3|          3.0|
|      b|      4|          4.0|
+-------+-------+-------------+



In [19]:
# also can get RDD from DF
df.rdd.collect()

                                                                                

[Row(col_one='a', col_two=1, col_two_float=1.0),
 Row(col_one='a', col_two=2, col_two_float=2.0),
 Row(col_one='b', col_two=3, col_two_float=3.0),
 Row(col_one='b', col_two=4, col_two_float=4.0)]

## Data formats

In [21]:
# We may want to operate with not just plain text, but something more complex
# For example, Parquet - it can be useful for huge datasets for faster calcs
#df.write.save("data.parquet")

In [22]:
#data = se.read.parquet("data.parquet")
#data.rdd.collect()

NameError: name 'data' is not defined

## Outbrain click prediction dataseet

https://www.kaggle.com/c/outbrain-click-prediction/data - you need to register here

In [None]:
# https://github.com/Kaggle/kaggle-api
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# instruction where to get ~/.kaggle/kaggle.json file

In [30]:
! mkdir -p ~/.kaggle

In [31]:
%%writefile ~/.kaggle/kaggle.json
{"username":"olegarnaut", "key":"b9a74d5ee3e1b8bf97b22d4b2d881f5e"}

Overwriting /home/jovyan/.kaggle/kaggle.json


In [32]:
! chmod 600 ~/.kaggle/kaggle.json

In [33]:
! pip install -U urllib3 kaggle==1.5.3
! kaggle competitions download -c outbrain-click-prediction -f page_views_sample.csv.zip
! kaggle competitions download -c outbrain-click-prediction -f events.csv.zip 
! kaggle competitions download -c outbrain-click-prediction -f documents_topics.csv.zip 
! kaggle competitions download -c outbrain-click-prediction -f documents_entities.csv.zip 
! kaggle competitions download -c outbrain-click-prediction -f documents_meta.csv.zip 
! kaggle competitions download -c outbrain-click-prediction -f clicks_test.csv.zip
! kaggle competitions download -c outbrain-click-prediction -f clicks_train.csv.zip
! kaggle competitions download -c outbrain-click-prediction -f documents_categories.csv.zip
! kaggle competitions download -c outbrain-click-prediction -f promoted_content.csv.zip

Collecting urllib3
  Using cached urllib3-1.26.15-py2.py3-none-any.whl (140 kB)
Downloading page_views_sample.csv.zip to /home/jovyan/work
100%|████████████████████████████████████████| 149M/149M [00:23<00:00, 4.92MB/s]
100%|████████████████████████████████████████| 149M/149M [00:23<00:00, 6.65MB/s]
Downloading events.csv.zip to /home/jovyan/work
100%|████████████████████████████████████████| 478M/478M [01:28<00:00, 6.11MB/s]
100%|████████████████████████████████████████| 478M/478M [01:28<00:00, 5.68MB/s]
Downloading documents_topics.csv.zip to /home/jovyan/work
100%|████████████████████████████████████████| 121M/121M [00:20<00:00, 6.31MB/s]
100%|████████████████████████████████████████| 121M/121M [00:20<00:00, 6.06MB/s]
Downloading documents_entities.csv.zip to /home/jovyan/work
100%|████████████████████████████████████████| 126M/126M [00:21<00:00, 6.08MB/s]
100%|████████████████████████████████████████| 126M/126M [00:21<00:00, 6.12MB/s]
Downloading documents_meta.csv.zip to /home/jov

In [36]:
! unzip '*.zip'
! rm -rf *.zip

unzip:  cannot find or open *.zip, *.zip.zip or *.zip.ZIP.

No zipfiles found.


In [35]:
! hdfs dfs -put page_views_sample.csv
! hdfs dfs -put events.csv
! hdfs dfs -put documents_topics.csv
! hdfs dfs -put documents_entities.csv
! hdfs dfs -put documents_meta.csv
! hdfs dfs -put clicks_test.csv
! hdfs dfs -put clicks_train.csv
! hdfs dfs -put documents_categories.csv
! hdfs dfs -put promoted_content.csv

put: `page_views_sample.csv': File exists
put: `documents_topics.csv': File exists


In [85]:
# load data
df = se.read.csv("page_views_sample.csv", header=True)
df.show(5)

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
+--------------+-----------+---------+--------+------------+--------------+
only showing top 5 rows



### Data manipulations

In [38]:
from IPython.display import display
tables = ["clicks_test", "clicks_train", 
          "documents_categories", "documents_entities", "documents_meta", "documents_topics", 
          "events", "page_views_sample", "promoted_content"]
for name in tqdm.tqdm(tables):
    df = se.read.csv("{}.csv".format(name), header=True)
    df.registerTempTable(name)
    print(name)
    df.limit(3).show()

  0%|          | 0/9 [00:00<?, ?it/s]

clicks_test
+----------+------+
|display_id| ad_id|
+----------+------+
|  16874594| 66758|
|  16874594|150083|
|  16874594|162754|
+----------+------+

clicks_train
+----------+------+-------+
|display_id| ad_id|clicked|
+----------+------+-------+
|         1| 42337|      0|
|         1|139684|      0|
|         1|144739|      1|
+----------+------+-------+

documents_categories
+-----------+-----------+----------------+
|document_id|category_id|confidence_level|
+-----------+-----------+----------------+
|    1595802|       1611|            0.92|
|    1595802|       1610|            0.07|
|    1524246|       1807|            0.92|
+-----------+-----------+----------------+

documents_entities
+-----------+--------------------+-----------------+
|document_id|           entity_id| confidence_level|
+-----------+--------------------+-----------------+
|    1524246|f9eec25663db4cd83...|0.672865314504701|
|    1524246|55ebcfbdaff1d6f60...|0.399113728441297|
|    1524246|839907a972930b17b

In [39]:
page_views = se.table("page_views_sample")
print(page_views)

DataFrame[uuid: string, document_id: string, timestamp: string, platform: string, geo_location: string, traffic_source: string]


In [40]:
page_views.select('*').show()

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
|2aa611f32875c7|        120| 71495491|       1|       CA>ON|             2|
|f55a6eaf2b34ab|        120| 73309199|       1|       BR>27|             2|
|cc01b582c8cbff|        120| 50033577|       1|       CA>BC|             2|
|6c802978b8dd4d|        120| 66590306|       1|       CA>ON|             2|
|f4e423314303ff|        120| 48314254|       1|   US>LA>622|             1|
|3937372ca27

In [41]:
page_views_sql = se.sql("SELECT * from page_views_sample")
page_views_sql.show()

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
|2aa611f32875c7|        120| 71495491|       1|       CA>ON|             2|
|f55a6eaf2b34ab|        120| 73309199|       1|       BR>27|             2|
|cc01b582c8cbff|        120| 50033577|       1|       CA>BC|             2|
|6c802978b8dd4d|        120| 66590306|       1|       CA>ON|             2|
|f4e423314303ff|        120| 48314254|       1|   US>LA>622|             1|
|3937372ca27

In [42]:
documents_topics = se.table("documents_topics")
print(documents_topics)

DataFrame[document_id: string, topic_id: string, confidence_level: string]


In [43]:
from pyspark.sql import functions as F
from pyspark.sql.functions import desc

(
    page_views
      .join(documents_topics, page_views.document_id == documents_topics.document_id, 'outer')
      .select(page_views.document_id, documents_topics.topic_id)
      .sort(desc("document_id"))
      .limit(50)
      .show()
)



+-----------+--------+
|document_id|topic_id|
+-----------+--------+
|     999924|     184|
|     999924|     184|
|     999924|     223|
|     999924|     223|
|     999924|     223|
|     999924|     223|
|     999924|     184|
|     999924|     223|
|     999924|     184|
|     999924|     223|
|     999924|     184|
|     999924|     223|
|     999924|     184|
|     999924|     184|
|     999924|     223|
|     999924|     184|
|     999924|     184|
|     999924|     184|
|     999924|     223|
|     999924|     184|
+-----------+--------+
only showing top 20 rows



                                                                                

In [44]:
(
  documents_topics
    .groupby('document_id')
    .agg(F.collect_list("topic_id").alias("topics"))
    .limit(10)
    .show()
)



+-----------+--------------------+
|document_id|              topics|
+-----------+--------------------+
|    1000015|          [183, 283]|
|    1000067|[199, 183, 229, 1...|
|    1000073|           [183, 35]|
|    1000096|      [173, 254, 71]|
|     100010|[16, 254, 192, 25...|
|    1000113|      [184, 183, 35]|
|    1000128|           [183, 35]|
|    1000131|[183, 199, 235, 1...|
|    1000146|      [183, 35, 202]|
|    1000167|      [183, 35, 202]|
+-----------+--------------------+



                                                                                

In [45]:
(
  documents_topics
    .groupby('document_id')
    .agg(F.count("topic_id").alias("topics_cnt"))
    .orderBy("topics_cnt")
    .limit(10)
    .show()
)



+-----------+----------+
|document_id|topics_cnt|
+-----------+----------+
|    1451809|         1|
|    2256862|         1|
|    1388193|         1|
|    2284422|         1|
|    1801253|         1|
|    2114730|         1|
|    1877485|         1|
|    2885936|         1|
|     196385|         1|
|      79893|         1|
+-----------+----------+



                                                                                

In [46]:
(
  documents_topics
    .groupby('document_id')
    .agg(F.countDistinct("topic_id").alias("topics_cnt"))
    .orderBy("topics_cnt")
    .limit(10)
    .show()
)



+-----------+----------+
|document_id|topics_cnt|
+-----------+----------+
|    2643193|         1|
|    2637627|         1|
|    1827717|         1|
|    1775420|         1|
|    2896006|         1|
|    2977112|         1|
|    2221038|         1|
|    2784051|         1|
|    2208732|         1|
|    2030781|         1|
+-----------+----------+



                                                                                

In [47]:
page_views.select("traffic_source").distinct().show()



+--------------+
|traffic_source|
+--------------+
|             3|
|             1|
|             2|
+--------------+



                                                                                

In [48]:
page_views.select("traffic_source").distinct().explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[traffic_source#486], functions=[])
   +- Exchange hashpartitioning(traffic_source#486, 200), ENSURE_REQUIREMENTS, [plan_id=987]
      +- HashAggregate(keys=[traffic_source#486], functions=[])
         +- FileScan csv [traffic_source#486] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/jovyan/page_views_sample.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<traffic_source:string>




In [49]:
from pyspark.sql.window import Window

In [50]:
w = Window.partitionBy('topic_id')\
          .orderBy('confidence_level')
(
  documents_topics
    .withColumn('row_number', F.row_number().over(w) - 1)
    .orderBy("row_number")
    .take(100)
)

                                                                                

[Row(document_id='1373067', topic_id='120', confidence_level='0.00800022845571354', row_number=0),
 Row(document_id='1772411', topic_id='224', confidence_level='0.00800066703451342', row_number=0),
 Row(document_id='16439', topic_id='126', confidence_level='0.0080008012801282', row_number=0),
 Row(document_id='1744048', topic_id='98', confidence_level='0.00800159041703942', row_number=0),
 Row(document_id='1721729', topic_id='100', confidence_level='0.00800006482315529', row_number=0),
 Row(document_id='1126279', topic_id='27', confidence_level='0.00800065149060317', row_number=0),
 Row(document_id='1367225', topic_id='140', confidence_level='0.00800012204207135', row_number=0),
 Row(document_id='1543668', topic_id='171', confidence_level='0.00800380736393708', row_number=0),
 Row(document_id='935225', topic_id='71', confidence_level='0.00800009549206651', row_number=0),
 Row(document_id='654417', topic_id='284', confidence_level='0.00800013440001791', row_number=0),
 Row(document_id='

## Evaluation Assignment

Data: outbrain click prediction

Tasks:
Using Spark RDD, DataFrame API and Python, calculate:

**1**. Top 10 most visited document_ids in the page_views_sample log

**2**. How many users have at least 2 different traffic_sources in the page_views_sample log (note the value is not a count, it's an encoded enum)

**3***. Top 10 most visited topic_ids in page_views_sample log (use documents_topics table)

The submission format is the result.json json file with top_10_documents, users and top_10_topics keys.
For TOP-10 results, the answer must be written in the form of a sheet ordered from TOP-1 to TOP-10 with an id.

result.json example:

    {
        "top_10_documents": [
            111,
            222,
            333,
            ...,
            1010
        ],
        "users": 10000,
        "top_10_topics": [
            11,
            22,
            33,
            ...,
            101
        ]
    }

In [88]:
df = se.read.csv("page_views_sample.csv", header=True)
df2 = se.read.csv("documents_topics.csv", header=True)

**1. Top 10 most visited document_ids in the page_views_sample log**

In [102]:
from pyspark.sql.functions import desc

# group by document_id, count the rows, and select the top 10 rows
top_10_documents = (
    df.groupBy("document_id")
      .agg({"*": "count"})
      .withColumnRenamed("count(1)", "count")
      .orderBy(desc("count"))
      .limit(10)
)

# convert the result into a list of integers
top_10_documents = [int(row["document_id"]) for row in top_10_documents.collect()]


                                                                                

In [103]:
top_10_documents


[1811567,
 234,
 42744,
 1858440,
 1780813,
 60164,
 1790442,
 1877626,
 1821895,
 732651]

**2. How many users have at least 2 different traffic_sources in the page_views_sample log (note the value is not a count, it's an encoded enum)**

In [92]:
from pyspark.sql.functions import countDistinct

# count the number of distinct traffic sources for each uuid
users = (
      df.select("uuid", "traffic_source")
        .distinct()
        .groupBy("uuid")
        .agg(countDistinct("traffic_source").alias("distinct_traffic_sources"))
        .filter(col("distinct_traffic_sources") >= 2)
        .count()
)


                                                                                

In [93]:
users

98080

**3. Top 10 most visited topic_ids in page_views_sample log (use documents_topics table)**

In [104]:
from pyspark.sql.functions import desc

# join the two DataFrames on "document_id"
df_join = df2.join(df, "document_id", "inner")

# group by "topic_id", count the rows, and select the top 10 rows
top_10_topics = (
    df_join.groupBy("topic_id")
        .agg({"*": "count"})
        .withColumnRenamed("count(1)", "count")
        .orderBy(desc("count"))
        .limit(10)
)

# convert the result into a list of integers
top_10_topics = [int(row["topic_id"]) for row in top_10_topics.collect()]


                                                                                

In [105]:
top_10_topics

[20, 16, 216, 136, 140, 143, 36, 97, 8, 269]

In [107]:
import json

result = {
    "top_10_documents": top_10_documents,
    "users": users,
    "top_10_topics": top_10_topics,
}

with open("result.json", "w") as f:
    json.dump(result, f)

In [108]:
#result

{'top_10_documents': [1811567,
  234,
  42744,
  1858440,
  1780813,
  60164,
  1790442,
  1877626,
  1821895,
  732651],
 'users': 98080,
 'top_10_topics': [20, 16, 216, 136, 140, 143, 36, 97, 8, 269]}

In [109]:
!curl -F file=@result.json "51.250.54.133:80/MDS-LSML1/arnautoleg1/w4/1"

1.0
Well done!
