In [1]:
import tqdm.notebook as tqdm
import numpy as np
import scipy
import sklearn

# Spark

In [2]:
! /home/jovyan/start-hadoop.sh

jovyan
 * Starting OpenBSD Secure Shell server sshd
start-stop-daemon: unable to set gid to 0 (Operation not permitted)
   ...fail!
 * sshd is running
Starting namenodes on [localhost]
localhost: namenode is running as process 155.  Stop it first and ensure /tmp/hadoop-jovyan-namenode.pid file is empty before retry.
Starting datanodes
localhost: datanode is running as process 262.  Stop it first and ensure /tmp/hadoop-jovyan-datanode.pid file is empty before retry.
Starting secondary namenodes [15aa28d30d56]
15aa28d30d56: secondarynamenode is running as process 447.  Stop it first and ensure /tmp/hadoop-jovyan-secondarynamenode.pid file is empty before retry.
Starting resourcemanager
resourcemanager is running as process 689.  Stop it first and ensure /tmp/hadoop-jovyan-resourcemanager.pid file is empty before retry.
Starting nodemanagers
localhost: nodemanager is running as process 790.  Stop it first and ensure /tmp/hadoop-jovyan-nodemanager.pid file is empty before retry.
689 org.ap

In [3]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')

# Convert Spark RDD to Spark DataFrame

from pyspark.sql import SparkSession, Row
se = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-05-10 18:50:48,154 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


## HDFS

In [4]:
! hdfs dfs -df -h

Filesystem               Size   Used  Available  Use%
hdfs://localhost:9000  58.4 G  1.6 G     21.3 G    3%


In [6]:
! head -n 5 page_views_sample.csv

uuid,document_id,timestamp,platform,geo_location,traffic_source
1fd5f051fba643,120,31905835,1,RS,2
8557aa9004be3b,120,32053104,1,VN>44,2
c351b277a358f0,120,54013023,1,KR>12,1
8205775c5387f9,120,44196592,1,IN>16,2


In [15]:
! head -n 5 documents_topics.csv

document_id,topic_id,confidence_level
1595802,140,0.0731131601068925
1595802,16,0.0594164867373976
1595802,143,0.0454207537554526
1595802,170,0.0388674285182961


In [9]:
! hadoop fs -copyFromLocal page_views_sample.csv

copyFromLocal: `page_views_sample.csv': File exists


In [12]:
! hadoop fs -copyFromLocal documents_topics.csv

In [13]:
! hdfs dfs -ls /

Found 2 items
drwxrwx---   - root   supergroup          0 2023-05-10 17:59 /tmp
drwxr-xr-x   - jovyan supergroup          0 2023-05-10 17:59 /user


In [14]:
! hadoop fs -ls -h

Found 3 items
drwxr-xr-x   - jovyan supergroup          0 2023-05-10 18:50 .sparkStaging
-rw-r--r--   1 jovyan supergroup      226 M 2023-05-10 18:57 documents_topics.csv
-rw-r--r--   1 jovyan supergroup    433.3 M 2023-05-10 18:09 page_views_sample.csv


## Evaluation Assignment

Data: outbrain click prediction

Tasks:
Using Spark RDD, DataFrame API and Python, calculate:

**1**. Top 10 most visited document_ids in the page_views_sample log

**2**. How many users have at least 2 different traffic_sources in the page_views_sample log (note the value is not a count, it's an encoded enum)

**3***. Top 10 most visited topic_ids in page_views_sample log (use documents_topics table)

The submission format is the result.json json file with top_10_documents, users and top_10_topics keys.
For TOP-10 results, the answer must be written in the form of a sheet ordered from TOP-1 to TOP-10 with an id.

result.json example:

    {
        "top_10_documents": [
            111,
            222,
            333,
            ...,
            1010
        ],
        "users": 10000,
        "top_10_topics": [
            11,
            22,
            33,
            ...,
            101
        ]
    }

### Spark DataFrame, RDD and Python 

#### 1. Top 10 most visited document_ids in the page_views_sample log

In [16]:
page_views_sample = se.read.csv("page_views_sample.csv", header=True)
page_views_sample.show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
|2aa611f32875c7|        120| 71495491|       1|       CA>ON|             2|
|f55a6eaf2b34ab|        120| 73309199|       1|       BR>27|             2|
|cc01b582c8cbff|        120| 50033577|       1|       CA>BC|             2|
|6c802978b8dd4d|        120| 66590306|       1|       CA>ON|             2|
|f4e423314303ff|        120| 48314254|       1|   US>LA>622|             1|
|3937372ca27

                                                                                

In [74]:
from pyspark.sql.functions import desc

# group by document_id, count the rows, and select the top 10 rows
Most_Visit_Doc = (
    page_views_sample.groupBy("document_id")
      .agg({"*": "count"})
      .withColumnRenamed("count(1)", "count")
      .orderBy(desc("count"))
      .limit(10)
)

### Top 10 most visited document_ids

In [75]:
# convert the result into a list of integers
Most_Visit_Doc = [int(row["document_id"]) for row in Most_Visit_Doc.collect()]
Most_Visit_Doc

                                                                                

[1811567,
 234,
 42744,
 1858440,
 1780813,
 60164,
 1790442,
 1877626,
 1821895,
 732651]

#### 2. Users have at least 2 different traffic_sources in the page_views_sample log (note the value is not a count, it's an encoded enum)

In [23]:
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import col

#Group By uuid, aggregate them by the number of distinct sources, filter it 
# and count the number of distinct traffic sources for each uuid
At_Least_u = (
      page_views_sample.select("uuid", "traffic_source")
        .distinct()
        .groupBy("uuid")
        .agg(countDistinct("traffic_source").alias("distinct_traffic_sources"))
        .filter(col("distinct_traffic_sources") >= 2)
        .count()
)

                                                                                

### Users have at least 2 different traffic_sources

In [24]:
At_Least_u

98080

#### 3. Top 10 most visited topic_ids in page_views_sample log (use documents_topics table)

In [25]:
documents_topics = se.read.csv("documents_topics.csv", header=True)
documents_topics.show()

                                                                                

+-----------+--------+-------------------+
|document_id|topic_id|   confidence_level|
+-----------+--------+-------------------+
|    1595802|     140| 0.0731131601068925|
|    1595802|      16| 0.0594164867373976|
|    1595802|     143| 0.0454207537554526|
|    1595802|     170| 0.0388674285182961|
|    1524246|     113|  0.196450402209685|
|    1524246|     260|   0.14287816210395|
|    1524246|      92| 0.0331591277639303|
|    1524246|     168| 0.0140903418233461|
|    1524246|      54|0.00878222028049753|
|    1524246|     207|0.00828237207366929|
|    1617787|     113|  0.216892316441472|
|    1617787|     260|  0.096312506979167|
|    1617787|     258| 0.0487299672992729|
|    1617787|      10| 0.0311358537054966|
|    1617787|     168| 0.0131200552985547|
|    1617787|     148| 0.0131103270369802|
|    1615583|      89|  0.316306499636243|
|    1615583|     198| 0.0157218239706048|
|    1615460|     260| 0.0979641463762194|
|    1615460|      26| 0.0145508920589849|
+----------

#### Inner Joint

In [95]:
Topic_Id = page_views_sample.join(documents_topics, on="document_id", how="inner")

##### Group By "topic_id", Aggregate and Count, Rename the Column, Desending Order the Result and Top 10 Result

In [96]:
Most_Visit_Topic_Id = (
    Topic_Id.groupBy("topic_id")
        .agg({"*": "count"})
        .withColumnRenamed("count(1)", "count")
        .orderBy(desc("count"))
        .limit(10)
)

### Top 10 most visited topic_ids

In [98]:
Most_Visit_Topic_Id = [int(row["topic_id"]) for row in Most_Visit_Topic_Id.collect()]
Most_Visit_Topic_Id

[20, 16, 216, 136, 140, 143, 36, 97, 8, 269]

### DUMP BY JSON

In [99]:
import json

data = {
    "top_10_documents": Most_Visit_Doc,
    "users": At_Least_u,
    "top_10_topics": Most_Visit_Topic_Id,
}

with open("result.json", "w") as f:
    json.dump(data, f)

### RESULT

In [100]:
data

{'top_10_documents': [1811567,
  234,
  42744,
  1858440,
  1780813,
  60164,
  1790442,
  1877626,
  1821895,
  732651],
 'users': 98080,
 'top_10_topics': [20, 16, 216, 136, 140, 143, 36, 97, 8, 269]}

### Grading

In [101]:
!curl -F file=@result.json "51.250.54.133:80/MDS-LSML1/unknown_32/w4/1"

1.0
Well done!
