In [2]:
import tqdm.notebook as tqdm
import numpy as np
import scipy
import sklearn

# Spark

In [3]:
! /home/jovyan/start-hadoop.sh

jovyan
 * Starting OpenBSD Secure Shell server sshd
start-stop-daemon: unable to set gid to 0 (Operation not permitted)
   ...fail!
 * sshd is running
Starting namenodes on [localhost]
localhost: namenode is running as process 155.  Stop it first and ensure /tmp/hadoop-jovyan-namenode.pid file is empty before retry.
Starting datanodes
localhost: datanode is running as process 276.  Stop it first and ensure /tmp/hadoop-jovyan-datanode.pid file is empty before retry.
Starting secondary namenodes [0cddd7a95d84]
0cddd7a95d84: secondarynamenode is running as process 514.  Stop it first and ensure /tmp/hadoop-jovyan-secondarynamenode.pid file is empty before retry.
Starting resourcemanager
resourcemanager is running as process 749.  Stop it first and ensure /tmp/hadoop-jovyan-resourcemanager.pid file is empty before retry.
Starting nodemanagers
localhost: nodemanager is running as process 871.  Stop it first and ensure /tmp/hadoop-jovyan-nodemanager.pid file is empty before retry.
448390 org

In [4]:
# connect, context, session

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')

from pyspark.sql import SparkSession, Row
se = SparkSession(sc)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-04-23 05:51:26,476 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


## HDFS

In [5]:
! hdfs dfs -df -h

Filesystem                Size   Used  Available  Use%
hdfs://localhost:9000  196.8 G  3.5 G    147.2 G    2%


In [6]:
! hdfs dfs -ls /

Found 3 items
drwxrwx---   - root   supergroup          0 2023-04-19 14:53 /tmp
drwxr-xr-x   - jovyan supergroup          0 2023-04-19 14:53 /user
drwxr-xr-x   - jovyan supergroup          0 2023-04-19 22:00 /wiki


## Broadcast and accumulator

In [7]:
bc = sc.broadcast({"this": 0, "is": 1, "text": 2})
errors = sc.accumulator(0)

def mapper(x):
    global errors
    for w in x.split():
        if w in bc.value:
            yield (bc.value[w], 1)
        else:
            errors += 1

rdd = (
    sc
   .parallelize(["this is text", "text too"])
   .flatMap(mapper)
   .reduceByKey(lambda a, b: a + b)
)
print(rdd)
print(rdd.collect())
print("errors:", errors.value)

PythonRDD[5] at RDD at PythonRDD.scala:53


                                                                                

[(0, 1), (1, 1), (2, 2)]
errors: 1


## DataFrame API

RDD is much better and useful than plain MapReduce, but Spark can do even more!
Spark DataFrame is table structure over RDDs and can be treated as pandas on steroids.

It allows us to perform structured queries and benefit from it. One way is to perform SQL-styled queries (will discuss on next lesson) and another is DataFrame API.

Documentation: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html

In [8]:
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4)])
rdd.collect()

[('a', 1), ('a', 2), ('b', 3), ('b', 4)]

In [9]:
df = se.createDataFrame(rdd)
df.printSchema()
df.show()

                                                                                

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



                                                                                

+---+---+
| _1| _2|
+---+---+
|  a|  1|
|  a|  2|
|  b|  3|
|  b|  4|
+---+---+



                                                                                

In [10]:
from pyspark.sql import Row

df = se.createDataFrame(
    rdd.map(lambda x: Row(col_one=x[0], col_two=x[1]))
)
df.printSchema()
df.show()

                                                                                

root
 |-- col_one: string (nullable = true)
 |-- col_two: long (nullable = true)



                                                                                

+-------+-------+
|col_one|col_two|
+-------+-------+
|      a|      1|
|      a|      2|
|      b|      3|
|      b|      4|
+-------+-------+



In [11]:
df.select(['col_one']).limit(2).show()



+-------+
|col_one|
+-------+
|      a|
|      a|
+-------+



                                                                                

In [12]:
df.select(['col_one']).distinct().rdd.map(lambda x: x.col_one).collect()

                                                                                

['b', 'a']

Docs: https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [13]:
from pyspark.sql import functions as F

(
  df.select(['col_one', 'col_two'])
    .where(F.col('col_one') == 'a')
    .limit(2)
    .show()
)



+-------+-------+
|col_one|col_two|
+-------+-------+
|      a|      1|
|      a|      2|
+-------+-------+



                                                                                

In [14]:
df = df.select('*', df['col_two'].cast('float').alias('col_two_float'))
df.show()

                                                                                

+-------+-------+-------------+
|col_one|col_two|col_two_float|
+-------+-------+-------------+
|      a|      1|          1.0|
|      a|      2|          2.0|
|      b|      3|          3.0|
|      b|      4|          4.0|
+-------+-------+-------------+



In [15]:
square_df = df.select('col_one', (df['col_two_float'] * df['col_two_float']).alias('col_two_square'))
square_df.orderBy('col_two_square', ascending=False).show(5)



+-------+--------------+
|col_one|col_two_square|
+-------+--------------+
|      b|          16.0|
|      b|           9.0|
|      a|           4.0|
|      a|           1.0|
+-------+--------------+



                                                                                

In [16]:
from pyspark.sql import functions as F

(
    df
      .groupby('col_one')
      .agg(F.collect_list("col_two").alias("col_two_list"))
      .select(['col_one', 'col_two_list'])
      .limit(10)
      .show()
)



+-------+------------+
|col_one|col_two_list|
+-------+------------+
|      a|      [1, 2]|
|      b|      [4, 3]|
+-------+------------+



                                                                                

In [17]:
list(df.toLocalIterator())

[Row(col_one='a', col_two=1, col_two_float=1.0),
 Row(col_one='a', col_two=2, col_two_float=2.0),
 Row(col_one='b', col_two=3, col_two_float=3.0),
 Row(col_one='b', col_two=4, col_two_float=4.0)]

In [18]:
df = se.createDataFrame(df.toLocalIterator())

In [19]:
df.printSchema()
df.show()

root
 |-- col_one: string (nullable = true)
 |-- col_two: long (nullable = true)
 |-- col_two_float: double (nullable = true)



                                                                                

+-------+-------+-------------+
|col_one|col_two|col_two_float|
+-------+-------+-------------+
|      a|      1|          1.0|
|      a|      2|          2.0|
|      b|      3|          3.0|
|      b|      4|          4.0|
+-------+-------+-------------+



                                                                                

In [20]:
# also can get RDD from DF
df.rdd.collect()

                                                                                

[Row(col_one='a', col_two=1, col_two_float=1.0),
 Row(col_one='a', col_two=2, col_two_float=2.0),
 Row(col_one='b', col_two=3, col_two_float=3.0),
 Row(col_one='b', col_two=4, col_two_float=4.0)]

## Data formats

In [21]:
# We may want to operate with not just plain text, but something more complex
# For example, Parquet - it can be useful for huge datasets for faster calcs
df.write.save("data.parquet")

AnalysisException: path hdfs://localhost:9000/user/jovyan/data.parquet already exists.

In [22]:
data = se.read.parquet("data.parquet")
data.rdd.collect()

                                                                                

[Row(col_one='a', col_two=1, col_two_float=1.0),
 Row(col_one='a', col_two=2, col_two_float=2.0),
 Row(col_one='b', col_two=3, col_two_float=3.0),
 Row(col_one='b', col_two=4, col_two_float=4.0)]

## Outbrain click prediction dataseet

https://www.kaggle.com/c/outbrain-click-prediction/data - you need to register here

In [23]:
# https://github.com/Kaggle/kaggle-api
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# instruction where to get ~/.kaggle/kaggle.json file

In [24]:
! mkdir -p ~/.kaggle

In [25]:
%%writefile ~/.kaggle/kaggle.json
...

Overwriting /home/jovyan/.kaggle/kaggle.json


In [26]:
! chmod 600 ~/.kaggle/kaggle.json

In [27]:
! pip install -U urllib3 kaggle==1.5.3
! kaggle competitions download -c outbrain-click-prediction -f page_views_sample.csv.zip
! kaggle competitions download -c outbrain-click-prediction -f events.csv.zip 
! kaggle competitions download -c outbrain-click-prediction -f documents_topics.csv.zip 
! kaggle competitions download -c outbrain-click-prediction -f documents_entities.csv.zip 
! kaggle competitions download -c outbrain-click-prediction -f documents_meta.csv.zip 
! kaggle competitions download -c outbrain-click-prediction -f clicks_test.csv.zip
! kaggle competitions download -c outbrain-click-prediction -f clicks_train.csv.zip
! kaggle competitions download -c outbrain-click-prediction -f documents_categories.csv.zip
! kaggle competitions download -c outbrain-click-prediction -f promoted_content.csv.zip

Collecting urllib3
  Using cached urllib3-1.26.15-py2.py3-none-any.whl (140 kB)
Traceback (most recent call last):
  File "/opt/conda/bin/kaggle", line 5, in <module>
    from kaggle.cli import main
  File "/opt/conda/lib/python3.10/site-packages/kaggle/__init__.py", line 23, in <module>
    api.authenticate()
  File "/opt/conda/lib/python3.10/site-packages/kaggle/api/kaggle_api_extended.py", line 119, in authenticate
    self._load_config(config_data)
  File "/opt/conda/lib/python3.10/site-packages/kaggle/api/kaggle_api_extended.py", line 160, in _load_config
    raise ValueError('Error: Missing %s in configuration.' % item)
ValueError: Error: Missing username in configuration.
Traceback (most recent call last):
  File "/opt/conda/bin/kaggle", line 5, in <module>
    from kaggle.cli import main
  File "/opt/conda/lib/python3.10/site-packages/kaggle/__init__.py", line 23, in <module>
    api.authenticate()
  File "/opt/conda/lib/python3.10/site-packages/kaggle/api/kaggle_api_extended.p

In [28]:
! unzip '*.zip'
! rm -rf *.zip

unzip:  cannot find or open *.zip, *.zip.zip or *.zip.ZIP.

No zipfiles found.


In [29]:
! hdfs dfs -put page_views_sample.csv
! hdfs dfs -put events.csv
! hdfs dfs -put documents_topics.csv
! hdfs dfs -put documents_entities.csv
! hdfs dfs -put documents_meta.csv
! hdfs dfs -put clicks_test.csv
! hdfs dfs -put clicks_train.csv
! hdfs dfs -put documents_categories.csv
! hdfs dfs -put promoted_content.csv

put: `page_views_sample.csv': File exists
put: `events.csv': No such file or directory
put: `documents_topics.csv': File exists
put: `documents_entities.csv': No such file or directory
put: `documents_meta.csv': No such file or directory
put: `clicks_test.csv': No such file or directory
put: `clicks_train.csv': No such file or directory
put: `documents_categories.csv': No such file or directory
put: `promoted_content.csv': No such file or directory


In [30]:
# load data
df = se.read.csv("page_views_sample.csv", header=True)
df.show(5)

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
+--------------+-----------+---------+--------+------------+--------------+
only showing top 5 rows



### Data manipulations

In [31]:
from IPython.display import display
tables = ["clicks_test", "clicks_train", 
          "documents_categories", "documents_entities", "documents_meta", "documents_topics", 
          "events", "page_views_sample", "promoted_content"]
for name in tqdm.tqdm(tables):
    df = se.read.csv("{}.csv".format(name), header=True)
    df.registerTempTable(name)
    print(name)
    df.limit(3).show()

  0%|          | 0/9 [00:00<?, ?it/s]

clicks_test
+----------+------+
|display_id| ad_id|
+----------+------+
|  16874594| 66758|
|  16874594|150083|
|  16874594|162754|
+----------+------+

clicks_train
+----------+------+-------+
|display_id| ad_id|clicked|
+----------+------+-------+
|         1| 42337|      0|
|         1|139684|      0|
|         1|144739|      1|
+----------+------+-------+

documents_categories
+-----------+-----------+----------------+
|document_id|category_id|confidence_level|
+-----------+-----------+----------------+
|    1595802|       1611|            0.92|
|    1595802|       1610|            0.07|
|    1524246|       1807|            0.92|
+-----------+-----------+----------------+

documents_entities
+-----------+--------------------+-----------------+
|document_id|           entity_id| confidence_level|
+-----------+--------------------+-----------------+
|    1524246|f9eec25663db4cd83...|0.672865314504701|
|    1524246|55ebcfbdaff1d6f60...|0.399113728441297|
|    1524246|839907a972930b17b

In [32]:
page_views = se.table("page_views_sample")
print(page_views)

DataFrame[uuid: string, document_id: string, timestamp: string, platform: string, geo_location: string, traffic_source: string]


In [33]:
page_views.select('*').show()

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
|2aa611f32875c7|        120| 71495491|       1|       CA>ON|             2|
|f55a6eaf2b34ab|        120| 73309199|       1|       BR>27|             2|
|cc01b582c8cbff|        120| 50033577|       1|       CA>BC|             2|
|6c802978b8dd4d|        120| 66590306|       1|       CA>ON|             2|
|f4e423314303ff|        120| 48314254|       1|   US>LA>622|             1|
|3937372ca27

In [34]:
page_views_sql = se.sql("SELECT * from page_views_sample")
page_views_sql.show()

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
|2aa611f32875c7|        120| 71495491|       1|       CA>ON|             2|
|f55a6eaf2b34ab|        120| 73309199|       1|       BR>27|             2|
|cc01b582c8cbff|        120| 50033577|       1|       CA>BC|             2|
|6c802978b8dd4d|        120| 66590306|       1|       CA>ON|             2|
|f4e423314303ff|        120| 48314254|       1|   US>LA>622|             1|
|3937372ca27

In [35]:
documents_topics = se.table("documents_topics")
print(documents_topics)

DataFrame[document_id: string, topic_id: string, confidence_level: string]


In [36]:
from pyspark.sql import functions as F
from pyspark.sql.functions import desc

(
    page_views
      .join(documents_topics, page_views.document_id == documents_topics.document_id, 'outer')
      .select(page_views.document_id, documents_topics.topic_id)
      .sort(desc("document_id"))
      .limit(50)
      .show()
)



+-----------+--------+
|document_id|topic_id|
+-----------+--------+
|     999924|     184|
|     999924|     184|
|     999924|     223|
|     999924|     223|
|     999924|     223|
|     999924|     223|
|     999924|     184|
|     999924|     223|
|     999924|     184|
|     999924|     223|
|     999924|     184|
|     999924|     223|
|     999924|     184|
|     999924|     184|
|     999924|     223|
|     999924|     184|
|     999924|     184|
|     999924|     184|
|     999924|     223|
|     999924|     184|
+-----------+--------+
only showing top 20 rows



                                                                                

In [37]:
(
  documents_topics
    .groupby('document_id')
    .agg(F.collect_list("topic_id").alias("topics"))
    .limit(10)
    .show()
)



+-----------+--------------------+
|document_id|              topics|
+-----------+--------------------+
|    1000087|[36, 216, 194, 15...|
|    1000123|     [184, 235, 183]|
|    1000127|               [184]|
|     100014|[181, 254, 173, 2...|
|    1000152|[184, 183, 235, 260]|
|    1000176|[27, 107, 281, 16...|
|    1000205|           [184, 35]|
|    1000228| [184, 235, 97, 281]|
|    1000242| [107, 62, 162, 104]|
|    1000264|      [183, 35, 283]|
+-----------+--------------------+



                                                                                

In [38]:
(
  documents_topics
    .groupby('document_id')
    .agg(F.count("topic_id").alias("topics_cnt"))
    .orderBy("topics_cnt")
    .limit(10)
    .show()
)

2023-04-23 06:04:36,832 ERROR cluster.YarnScheduler: Lost executor 2 on 0cddd7a95d84: Container from a bad node: container_1681926456357_0021_01_000003 on host: 0cddd7a95d84. Exit status: 137. Diagnostics: [2023-04-23 06:04:36.667]Container killed on request. Exit code is 137
[2023-04-23 06:04:36.667]Container exited with a non-zero exit code 137. 
[2023-04-23 06:04:36.668]Killed by external signal
.
2023-04-23 06:04:36,836 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 2 for reason Container from a bad node: container_1681926456357_0021_01_000003 on host: 0cddd7a95d84. Exit status: 137. Diagnostics: [2023-04-23 06:04:36.667]Container killed on request. Exit code is 137
[2023-04-23 06:04:36.667]Container exited with a non-zero exit code 137. 
[2023-04-23 06:04:36.668]Killed by external signal
.
2023-04-23 06:04:36,848 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 141.0 (TID 720) (0cddd7a95d84 executor 2): ExecutorLostFailure (execu

+-----------+----------+
|document_id|topics_cnt|
+-----------+----------+
|     428514|         1|
|    1951619|         1|
|    1188934|         1|
|    2259465|         1|
|    2365393|         1|
|    2450228|         1|
|    2330926|         1|
|     511261|         1|
|    2432117|         1|
|    2635049|         1|
+-----------+----------+



                                                                                

In [39]:
(
  documents_topics
    .groupby('document_id')
    .agg(F.countDistinct("topic_id").alias("topics_cnt"))
    .orderBy("topics_cnt")
    .limit(10)
    .show()
)



+-----------+----------+
|document_id|topics_cnt|
+-----------+----------+
|    2670217|         1|
|    2362264|         1|
|    2116816|         1|
|    2724827|         1|
|    2174411|         1|
|    2445009|         1|
|    2250784|         1|
|    2093753|         1|
|    1922610|         1|
|    2057626|         1|
+-----------+----------+



                                                                                

In [40]:
page_views.select("traffic_source").distinct().show()



+--------------+
|traffic_source|
+--------------+
|             3|
|             1|
|             2|
+--------------+



                                                                                

In [41]:
page_views.select("traffic_source").distinct().explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[traffic_source#492], functions=[])
   +- Exchange hashpartitioning(traffic_source#492, 200), ENSURE_REQUIREMENTS, [plan_id=994]
      +- HashAggregate(keys=[traffic_source#492], functions=[])
         +- FileScan csv [traffic_source#492] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/jovyan/page_views_sample.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<traffic_source:string>




In [42]:
from pyspark.sql.window import Window

In [43]:
w = Window.partitionBy('topic_id')\
          .orderBy('confidence_level')
(
  documents_topics
    .withColumn('row_number', F.row_number().over(w) - 1)
    .orderBy("row_number")
    .take(100)
)

                                                                                

[Row(document_id='1373067', topic_id='120', confidence_level='0.00800022845571354', row_number=0),
 Row(document_id='1721729', topic_id='100', confidence_level='0.00800006482315529', row_number=0),
 Row(document_id='16439', topic_id='126', confidence_level='0.0080008012801282', row_number=0),
 Row(document_id='1205392', topic_id='52', confidence_level='0.00800003350319164', row_number=0),
 Row(document_id='113635', topic_id='5', confidence_level='0.00800018262447156', row_number=0),
 Row(document_id='2281646', topic_id='75', confidence_level='0.00800059856285632', row_number=0),
 Row(document_id='1367225', topic_id='140', confidence_level='0.00800012204207135', row_number=0),
 Row(document_id='1450684', topic_id='187', confidence_level='0.00800008391306855', row_number=0),
 Row(document_id='1820377', topic_id='195', confidence_level='0.00800005376000286', row_number=0),
 Row(document_id='1939630', topic_id='179', confidence_level='0.00800007203623664', row_number=0),
 Row(document_id='

## Evaluation Assignment

Data: outbrain click prediction

Tasks:
Using Spark RDD, DataFrame API and Python, calculate:

**1**. Top 10 most visited document_ids in the page_views_sample log

**2**. How many users have at least 2 different traffic_sources in the page_views_sample log (note the value is not a count, it's an encoded enum)

**3***. Top 10 most visited topic_ids in page_views_sample log (use documents_topics table)

The submission format is the result.json json file with top_10_documents, users and top_10_topics keys.
For TOP-10 results, the answer must be written in the form of a sheet ordered from TOP-1 to TOP-10 with an id.

result.json example:

    {
        "top_10_documents": [
            111,
            222,
            333,
            ...,
            1010
        ],
        "users": 10000,
        "top_10_topics": [
            11,
            22,
            33,
            ...,
            101
        ]
    }

In [60]:
page_views = se.read.csv("page_views_sample.csv", header=True)
page_views.registerTempTable("page_views")
documents = se.read.csv("documents_topics.csv", header=True)
documents.registerTempTable("documents")

In [61]:
se.sql("""select * from page_views""").limit(10).toPandas()

Unnamed: 0,uuid,document_id,timestamp,platform,geo_location,traffic_source
0,1fd5f051fba643,120,31905835,1,RS,2
1,8557aa9004be3b,120,32053104,1,VN>44,2
2,c351b277a358f0,120,54013023,1,KR>12,1
3,8205775c5387f9,120,44196592,1,IN>16,2
4,9cb0ccd8458371,120,65817371,1,US>CA>807,2
5,2aa611f32875c7,120,71495491,1,CA>ON,2
6,f55a6eaf2b34ab,120,73309199,1,BR>27,2
7,cc01b582c8cbff,120,50033577,1,CA>BC,2
8,6c802978b8dd4d,120,66590306,1,CA>ON,2
9,f4e423314303ff,120,48314254,1,US>LA>622,1


**1. Top 10 most visited document_ids in the page_views_sample log**

In [59]:
se.sql(""" 
select document_id, count(*) as count_doc_id from page_views group by document_id order by count_doc_id desc 
""").limit(10).toPandas()

                                                                                

Unnamed: 0,document_id,count_doc_id
0,1811567,429551
1,234,179692
2,42744,156231
3,1858440,112140
4,1780813,109624
5,60164,104941
6,1790442,91420
7,1877626,80309
8,1821895,79956
9,732651,74630


In [100]:
top_10_documents=[1811567,
 234,
 42744,
 1858440,
 1780813,
 60164,
 1790442,
 1877626,
 1821895,
 732651]

**2. How many users have at least 2 different traffic_sources in the page_views_sample log (note the value is not a count, it's an encoded enum)**

In [90]:
se.sql("""select uuid, count(distinct(traffic_source)) as count_source from page_views group by uuid having count_source>=2""").count()

                                                                                

98080

In [101]:
users=98080

**3. Top 10 most visited topic_ids in page_views_sample log (use documents_topics table)**

In [92]:
se.sql("""select * from documents limit 5""").limit(5).toPandas()

Unnamed: 0,document_id,topic_id,confidence_level
0,1595802,140,0.0731131601068925
1,1595802,16,0.0594164867373976
2,1595802,143,0.0454207537554526
3,1595802,170,0.0388674285182961
4,1524246,113,0.196450402209685


In [99]:
se.sql("""select documents.topic_id,count(1) as count from documents inner join page_views on documents.document_id=page_views.document_id group by documents.topic_id order by count desc""").limit(10).toPandas()

                                                                                

Unnamed: 0,topic_id,count
0,20,1429253
1,16,1425838
2,216,1160563
3,136,1099382
4,140,971983
5,143,919136
6,36,855793
7,97,839328
8,8,819599
9,269,727145


In [102]:
top_10_topics=[20, 16, 216, 136, 140, 143, 36, 97, 8, 269]

In [103]:
import json

result = {
    "top_10_documents": top_10_documents,
    "users": users,
    "top_10_topics": top_10_topics,
}

with open("result.json", "w") as f:
    json.dump(result, f)

In [104]:
!curl -F file=@result.json "51.250.54.133:80/MDS-LSML1/kewlsid96/w4/1"

1.0
Well done!
