In [None]:
# Checklist:
# AWS emr-5.29.0
# MASTER r5d.8xlarge 1x, no EBS
# CORE r5d.8xlarge 4x, no EBS
# Custom bootstrap action: s3://ydatazian/bootstrap.sh
# Allow ssh in master node security group

In [83]:
import tqdm.notebook as tqdm
import numpy as np
import scipy
import sklearn

# Spark

In [53]:
# connect, context, session

import findspark
findspark.init()

import spark_utils
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkContext("yarn", "My App", conf=spark_utils.get_spark_conf())

spark_utils.print_ui_links()


NameNode: http://ec2-3-218-152-40.compute-1.amazonaws.com:50070
YARN: http://ec2-3-218-152-40.compute-1.amazonaws.com:8088
Spark UI: http://ec2-3-218-152-40.compute-1.amazonaws.com:20888/proxy/application_1620976236162_0003


In [58]:
se = SparkSession(sc)

## HDFS

In [9]:
! hdfs dfs -df -h

Filesystem                                   Size     Used  Available  Use%
hdfs://ip-172-31-7-108.ec2.internal:8020  547.5 G  222.3 M    546.0 G    0%


In [10]:
! hdfs dfs -ls /

Found 3 items
drwxrwxrwt   - hdfs hadoop          0 2021-05-14 07:10 /tmp
drwxr-xr-x   - hdfs hadoop          0 2021-05-14 07:10 /user
drwxr-xr-x   - hdfs hadoop          0 2021-05-14 07:10 /var


## RDD

RDD (Resilient Distributed Datasets) - base data block of Spark. The system takes care about parts of the data and it's manipulations on distributed system. It could be treated as ordered sequence of rows (commonly key-value pairs like in MapReduce, but could be any arbitrary data).

RDDs are immutable. You get new RDD by making operations on initial RDD.

There is two kinds of operations on RDD: *actions* and *transformations*.

Transformations are not applied instantly, they are stacked in operations order.

Actions are used to materialize transformations (so the data is actually transformed on cluster).

Documentation: https://spark.apache.org/docs/latest/rdd-programming-guide.html

https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

In [11]:
# Let's create simple RDD first
rdd = sc.parallelize(range(10))
rdd

PythonRDD[1] at RDD at PythonRDD.scala:53

### Actions

In [12]:
rdd.collect() # gather data into python, be careful, loads data into memory

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [13]:
rdd.count() # returns count of objects

10

In [14]:
rdd.first() # get first element of RDD

0

In [15]:
rdd.take(2) # get first N=2 elements

[0, 1]

In [16]:
rdd.mean() # mean of RDD's values

4.5

In [17]:
# We can create RDD with text data
rdd = sc.parallelize(["one", "two"])
rdd

ParallelCollectionRDD[12] at parallelize at PythonRDD.scala:195

In [18]:
rdd.collect() # get RDD values

['one', 'two']

In [19]:
rdd.saveAsTextFile("/tmp_text.txt")  # save RDD into HDFS

In [20]:
%%bash

hdfs dfs -ls /tmp_text.txt # parts

Found 501 items
-rw-r--r--   1 hadoop hadoop          0 2021-05-14 07:19 /tmp_text.txt/_SUCCESS
-rw-r--r--   1 hadoop hadoop          0 2021-05-14 07:19 /tmp_text.txt/part-00000
-rw-r--r--   1 hadoop hadoop          0 2021-05-14 07:19 /tmp_text.txt/part-00001
-rw-r--r--   1 hadoop hadoop          0 2021-05-14 07:19 /tmp_text.txt/part-00002
-rw-r--r--   1 hadoop hadoop          0 2021-05-14 07:19 /tmp_text.txt/part-00003
-rw-r--r--   1 hadoop hadoop          0 2021-05-14 07:19 /tmp_text.txt/part-00004
-rw-r--r--   1 hadoop hadoop          0 2021-05-14 07:19 /tmp_text.txt/part-00005
-rw-r--r--   1 hadoop hadoop          0 2021-05-14 07:19 /tmp_text.txt/part-00006
-rw-r--r--   1 hadoop hadoop          0 2021-05-14 07:19 /tmp_text.txt/part-00007
-rw-r--r--   1 hadoop hadoop          0 2021-05-14 07:19 /tmp_text.txt/part-00008
-rw-r--r--   1 hadoop hadoop          0 2021-05-14 07:19 /tmp_text.txt/part-00009
-rw-r--r--   1 hadoop hadoop          0 2021-05-14 07:19 /tmp_text.txt/part-00010
-r

In [21]:
%%bash

hdfs dfs -cat /tmp_text.txt/* # actual data from parts

one
two


### Transformations

In [22]:
rdd = sc.parallelize(range(10)) # RDD from range

In [23]:
# New RDD with "square + 1" transformation by two map operations;
# Map operations are similar to those from MapReduce,
# the difference - given map functions are applied to each element of rdd:
squares = rdd.map(lambda x: x**2).map(lambda x: x + 1)

# IMPORTANT NOTE - nothing is calculated right now,
# `squares` now only represents sequence for new RDD over initial data

In [24]:
squares.first() 

# Now we applied Action, so the map transformations are run
# But not all data is calculated, Spark optimized that for us
# and only the value for first row was calculated and returned

1

In [25]:
squares.collect() # get all data

[1, 2, 5, 10, 17, 26, 37, 50, 65, 82]

In [26]:
squares.sample(False, 0.5).collect()

[1, 10, 26, 37, 50, 65, 82]

In [29]:
squares.flatMap(lambda x: [x, x+1, x+2]).take(5)

[1, 2, 3, 2, 3]

In [30]:
(
    squares
    .takeOrdered(1, lambda x: -x) # top 1
)

[82]

In [31]:
rdd.filter(lambda x: x % 2).collect()

[1, 3, 5, 7, 9]

### MapReduce

In [32]:
# step by step MapReduce simulation:
rdd = sc.parallelize(["this is text", "some more text"])

In [33]:
(
    rdd
    .flatMap(lambda x: [(w, 1) for w in x.split()])
    .collect()
)

[('this', 1), ('is', 1), ('text', 1), ('some', 1), ('more', 1), ('text', 1)]

In [34]:
# we gonna use first iteration often, to save time we can cache the result:
words = rdd.flatMap(lambda x: [(w, 1) for w in x.split()]).cache()

**PairRDD**

If you have a tuple of length 2 as your RDD data type, you can use *ByKey operations on your RDD, with first value of tuple being the key and second being the value. Let's create such RDD.

We want to aggregate data by key (word), we are able to do it with `groupByKey` method, it will produce values iterable.

In [35]:
(
    words
    .groupByKey()
    .collect()
)

[('text', <pyspark.resultiterable.ResultIterable at 0x7f9b4ccefba8>),
 ('more', <pyspark.resultiterable.ResultIterable at 0x7f9b4ccefb70>),
 ('this', <pyspark.resultiterable.ResultIterable at 0x7f9b4ccefc18>),
 ('some', <pyspark.resultiterable.ResultIterable at 0x7f9b4ccefc88>),
 ('is', <pyspark.resultiterable.ResultIterable at 0x7f9b4ccefcf8>)]

And of course we can use any function in map, not just lambdas,

with regular `map` function you can change key/value, create complex keys

In [36]:
def mapToList(x):
  return x[0], list(x[1])

data = (
    words
    .groupByKey()
    .map(mapToList)
    .collect()
)

dict(data)

{'text': [1, 1], 'more': [1], 'this': [1], 'some': [1], 'is': [1]}

We may use `.mapValues` method to manipulate only with values and leave keys intact:

In [37]:
def mapValuesToLen(x):
  return len(x)

(
    words
    .groupByKey()
    .mapValues(mapValuesToLen)
    .collect()
)

[('text', 2), ('more', 1), ('this', 1), ('some', 1), ('is', 1)]

In [38]:
(
    words
    .groupByKey()
    .map(lambda x: (x[0], len(x[1])))
    .collect()
)

[('text', 2), ('more', 1), ('this', 1), ('some', 1), ('is', 1)]

Another way to manipulate values grouped by key is reduce performed by `reduceByKey` operation:

In [39]:
(
    words # -> (word, cnt=1)
    .reduceByKey(lambda a, b: a + b) # -> (word, sum(cnt))
    .collect()
)

[('text', 2), ('more', 1), ('this', 1), ('some', 1), ('is', 1)]

In [40]:
# how errors look like:

(
    rdd
    .flatMap(lambda x: [(w, len(1)) for w in x.split()]) # len from int
    .reduceByKey(lambda a, b: a + b) # -> (word, sum(cnt))
    .collect()
)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 249 in stage 45.0 failed 4 times, most recent failure: Lost task 249.3 in stage 45.0 (TID 11346, ip-172-31-15-106.ec2.internal, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1620976236162_0002/container_1620976236162_0002_01_000004/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1620976236162_0002/container_1620976236162_0002_01_000004/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2499, in pipeline_func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2499, in pipeline_func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 352, in func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1861, in combineLocally
  File "/mnt/yarn/usercache/hadoop/appcache/application_1620976236162_0002/container_1620976236162_0002_01_000004/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "/mnt/yarn/usercache/hadoop/appcache/application_1620976236162_0002/container_1620976236162_0002_01_000004/pyspark.zip/pyspark/util.py", line 113, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-40-fda08014ba43>", line 5, in <lambda>
  File "<ipython-input-40-fda08014ba43>", line 5, in <listcomp>
TypeError: object of type 'int' has no len()

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1620976236162_0002/container_1620976236162_0002_01_000004/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1620976236162_0002/container_1620976236162_0002_01_000004/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2499, in pipeline_func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2499, in pipeline_func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 352, in func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1861, in combineLocally
  File "/mnt/yarn/usercache/hadoop/appcache/application_1620976236162_0002/container_1620976236162_0002_01_000004/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "/mnt/yarn/usercache/hadoop/appcache/application_1620976236162_0002/container_1620976236162_0002_01_000004/pyspark.zip/pyspark/util.py", line 113, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-40-fda08014ba43>", line 5, in <lambda>
  File "<ipython-input-40-fda08014ba43>", line 5, in <listcomp>
TypeError: object of type 'int' has no len()

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


#### It is possible to manipulate amount of map operations and partitions of the data, we will discuss it next time

### Word Count

In [41]:
texts = sc.parallelize(['Of course we can use predefined functions with map and not just lambda',
                       'Imagine we want to have each element in the RDD as a key-value pair where the key is the tag (e.g. normal) and the value is the whole list of elements that represents the row in the CSV formatted file', 
                       'We could proceed as follows'])
words = texts.flatMap(lambda x: x.lower().split(' '))
words.take(5)

['of', 'course', 'we', 'can', 'use']

In [42]:
words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).collect()

[('element', 1),
 ('key-value', 1),
 ('in', 2),
 ('of', 2),
 ('not', 1),
 ('and', 2),
 ('csv', 1),
 ('with', 1),
 ('lambda', 1),
 ('use', 1),
 ('formatted', 1),
 ('elements', 1),
 ('just', 1),
 ('course', 1),
 ('tag', 1),
 ('map', 1),
 ('imagine', 1),
 ('follows', 1),
 ('rdd', 1),
 ('that', 1),
 ('file', 1),
 ('a', 1),
 ('pair', 1),
 ('have', 1),
 ('predefined', 1),
 ('the', 7),
 ('represents', 1),
 ('we', 3),
 ('whole', 1),
 ('where', 1),
 ('to', 1),
 ('is', 2),
 ('list', 1),
 ('as', 2),
 ('can', 1),
 ('(e.g.', 1),
 ('value', 1),
 ('key', 1),
 ('proceed', 1),
 ('each', 1),
 ('row', 1),
 ('could', 1),
 ('normal)', 1),
 ('want', 1),
 ('functions', 1)]

## Broadcast and accumulator

In [43]:
bc = sc.broadcast({"this": 0, "is": 1, "text": 2})
errors = sc.accumulator(0)

def mapper(x):
    global errors
    for w in x.split():
        if w in bc.value:
            yield (bc.value[w], 1)
        else:
            errors += 1

rdd = (
    sc
   .parallelize(["this is text", "text too"])
   .flatMap(mapper)
   .reduceByKey(lambda a, b: a + b)
)
print(rdd)
print(rdd.collect())
print("errors:", errors.value)

PythonRDD[87] at RDD at PythonRDD.scala:53
[(0, 1), (1, 1), (2, 2)]
errors: 1


## DataFrame API

RDD is much better and useful than plain MapReduce, but Spark can do even more!
Spark DataFrame is table structure over RDDs and can be treated as pandas on steroids.

It allows us to perform structured queries and benefit from it. One way is to perform SQL-styled queries (will discuss on next lesson) and another is DataFrame API.

Documentation: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html

In [48]:
# import pandas as pd

In [56]:
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4)])
rdd.collect()

[('a', 1), ('a', 2), ('b', 3), ('b', 4)]

In [59]:
df = se.createDataFrame(rdd)
df.printSchema()
df.show()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

+---+---+
| _1| _2|
+---+---+
|  a|  1|
|  a|  2|
|  b|  3|
|  b|  4|
+---+---+



In [61]:
from pyspark.sql import Row

df = se.createDataFrame(
    rdd.map(lambda x: Row(col_one=x[0], col_two=x[1]))
)
df.printSchema()
df.show()

root
 |-- col_one: string (nullable = true)
 |-- col_two: long (nullable = true)

+-------+-------+
|col_one|col_two|
+-------+-------+
|      a|      1|
|      a|      2|
|      b|      3|
|      b|      4|
+-------+-------+



In [62]:
df.select(['col_one']).limit(2).show()

+-------+
|col_one|
+-------+
|      a|
|      a|
+-------+



In [63]:
df.select(['col_one']).distinct().rdd.map(lambda x: x.col_one).collect()

['b', 'a']

Docs: https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [66]:
from pyspark.sql import functions as F

(
  df.select(['col_one', 'col_two'])
    .where(F.col('col_one') == 'a')
    .limit(2)
    .show()
)

+-------+-------+
|col_one|col_two|
+-------+-------+
|      a|      1|
|      a|      2|
+-------+-------+



In [79]:
df = df.select('*', df['col_two'].cast('float').alias('col_two_float'))
df.show()

+-------+-------+-------------+
|col_one|col_two|col_two_float|
+-------+-------+-------------+
|      a|      1|          1.0|
|      a|      2|          2.0|
|      b|      3|          3.0|
|      b|      4|          4.0|
+-------+-------+-------------+



In [80]:
square_df = df.select('col_one', (df['col_two_float'] * df['col_two_float']).alias('col_two_square'))
square_df.orderBy('col_two_square', ascending=False).show(5)

+-------+--------------+
|col_one|col_two_square|
+-------+--------------+
|      b|          16.0|
|      b|           9.0|
|      a|           4.0|
|      a|           1.0|
+-------+--------------+



In [81]:
from pyspark.sql import functions as F

(
    df
      .groupby('col_one')
      .agg(F.collect_list("col_two").alias("col_two_list"))
      .select(['col_one', 'col_two_list'])
      .limit(10)
      .show()
)

+-------+------------+
|col_one|col_two_list|
+-------+------------+
|      b|      [3, 4]|
|      a|      [1, 2]|
+-------+------------+



In [93]:
list(df.toLocalIterator())

[Row(col_one='a', col_two=1, col_two_float=1.0),
 Row(col_one='a', col_two=2, col_two_float=2.0),
 Row(col_one='b', col_two=3, col_two_float=3.0),
 Row(col_one='b', col_two=4, col_two_float=4.0)]

In [94]:
df = se.createDataFrame(df.toLocalIterator())

In [95]:
df.printSchema()
df.show()

root
 |-- col_one: string (nullable = true)
 |-- col_two: long (nullable = true)
 |-- col_two_float: double (nullable = true)

+-------+-------+-------------+
|col_one|col_two|col_two_float|
+-------+-------+-------------+
|      a|      1|          1.0|
|      a|      2|          2.0|
|      b|      3|          3.0|
|      b|      4|          4.0|
+-------+-------+-------------+



In [96]:
# also can get RDD from DF
df.rdd.collect()

[Row(col_one='a', col_two=1, col_two_float=1.0),
 Row(col_one='a', col_two=2, col_two_float=2.0),
 Row(col_one='b', col_two=3, col_two_float=3.0),
 Row(col_one='b', col_two=4, col_two_float=4.0)]

## Data formats

In [97]:
# We may want to operate with not just plain text, but something more complex
# For example, Parquet - it can be useful for huge datasets for faster calcs
df.write.save("data.parquet")

In [99]:
data = se.read.parquet("data.parquet")
data.rdd.collect()

[Row(col_one='a', col_two=1, col_two_float=1.0),
 Row(col_one='a', col_two=2, col_two_float=2.0),
 Row(col_one='b', col_two=3, col_two_float=3.0),
 Row(col_one='b', col_two=4, col_two_float=4.0)]

## Outbrain click prediction dataseet

https://www.kaggle.com/c/outbrain-click-prediction/data

### AWS S3

In [100]:
! aws s3 ls s3://ydatazian

                           PRE week1/
2021-05-06 13:10:24       1672 bootstrap.sh
2021-05-13 21:20:22  176843889 clicks_test.parquet
2021-05-13 21:20:22  495815517 clicks_train.parquet
2021-05-13 21:21:58   34267065 documents_categories.parquet
2021-05-13 21:21:58  206455957 documents_entities.parquet
2021-05-13 21:21:58   23859965 documents_meta.parquet
2021-05-13 21:21:58  187410196 documents_topics.parquet
2021-05-13 21:21:58  734643471 events.parquet
2021-05-13 21:56:44 50764611872 page_views.parquet
2021-05-13 21:21:58  248421413 page_views_sample.parquet
2021-05-13 21:21:59    5116927 promoted_content.parquet
2021-05-13 21:21:58  273136709 sample_submission.csv


In [101]:
# load data

df = se.read.parquet("s3://ydatazian/page_views.parquet")
df.show(5)

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
+--------------+-----------+---------+--------+------------+--------------+
only showing top 5 rows



### Data manipulations

In [103]:
from IPython.display import display
tables = ["clicks_test", "clicks_train", 
          "documents_categories", "documents_entities", "documents_meta", "documents_topics", 
          "events", "page_views", "page_views_sample", "promoted_content"]
for name in tqdm.tqdm(tables):
    df = se.read.parquet("s3://ydatazian/{}.parquet".format(name))
    df.registerTempTable(name)
    print(name)
    df.limit(3).show()

  0%|          | 0/10 [00:00<?, ?it/s]

clicks_test
+----------+------+
|display_id| ad_id|
+----------+------+
|  16874594| 66758|
|  16874594|150083|
|  16874594|162754|
+----------+------+

clicks_train
+----------+------+-------+
|display_id| ad_id|clicked|
+----------+------+-------+
|         1| 42337|      0|
|         1|139684|      0|
|         1|144739|      1|
+----------+------+-------+

documents_categories
+-----------+-----------+----------------+
|document_id|category_id|confidence_level|
+-----------+-----------+----------------+
|    1595802|       1611|            0.92|
|    1595802|       1610|            0.07|
|    1524246|       1807|            0.92|
+-----------+-----------+----------------+

documents_entities
+-----------+--------------------+-----------------+
|document_id|           entity_id| confidence_level|
+-----------+--------------------+-----------------+
|    1524246|f9eec25663db4cd83...|0.672865314504701|
|    1524246|55ebcfbdaff1d6f60...|0.399113728441297|
|    1524246|839907a972930b17b

In [104]:
page_views = se.table("page_views")
print(page_views)

DataFrame[uuid: string, document_id: string, timestamp: string, platform: string, geo_location: string, traffic_source: string]


In [140]:
page_views.select('*').show()

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
|2aa611f32875c7|        120| 71495491|       1|       CA>ON|             2|
|f55a6eaf2b34ab|        120| 73309199|       1|       BR>27|             2|
|cc01b582c8cbff|        120| 50033577|       1|       CA>BC|             2|
|6c802978b8dd4d|        120| 66590306|       1|       CA>ON|             2|
|f4e423314303ff|        120| 48314254|       1|   US>LA>622|             1|
|3937372ca27

In [143]:
page_views_sql = se.sql("SELECT * from page_views")
page_views_sql.show()

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
|2aa611f32875c7|        120| 71495491|       1|       CA>ON|             2|
|f55a6eaf2b34ab|        120| 73309199|       1|       BR>27|             2|
|cc01b582c8cbff|        120| 50033577|       1|       CA>BC|             2|
|6c802978b8dd4d|        120| 66590306|       1|       CA>ON|             2|
|f4e423314303ff|        120| 48314254|       1|   US>LA>622|             1|
|3937372ca27

In [105]:
documents_topics = se.table("documents_topics")
print(documents_topics)

DataFrame[document_id: string, topic_id: string, confidence_level: string]


In [None]:
from pyspark.sql.functions import desc

(
    page_views
      .join(documents_topics, page_views.document_id == documents_topics.document_id, 'outer')
      .select(page_views.document_id, documents_topics.topic_id)
      .sort(desc("document_id"))
      .limit(50)
      .show()
)

In [130]:
(
  documents_topics
    .groupby('document_id')
    .agg(F.collect_list("topic_id").alias("topics"))
    .limit(10)
    .show()
)

+-----------+--------------------+
|document_id|              topics|
+-----------+--------------------+
|     100010|[16, 254, 192, 25...|
|    1000240|[138, 143, 89, 20...|
|    1000280|[183, 199, 235, 279]|
|    1000665|           [183, 35]|
|    1000795|           [183, 35]|
|    1000839|[184, 183, 235, 1...|
|    1000888|       [75, 143, 64]|
|     100140|      [112, 24, 181]|
|    1001866|[269, 97, 214, 43...|
|    1002011|[137, 150, 119, 1...|
+-----------+--------------------+



In [138]:
(
  documents_topics
    .groupby('document_id')
    .agg(F.count("topic_id").alias("topics_cnt"))
    .orderBy("topics_cnt")
    .limit(10)
    .show()
)

+-----------+----------+
|document_id|topics_cnt|
+-----------+----------+
|     428514|         1|
|    1951619|         1|
|    1188934|         1|
|    2259465|         1|
|    2365393|         1|
|    2450228|         1|
|    2330926|         1|
|     511261|         1|
|    2432117|         1|
|    2635049|         1|
+-----------+----------+



In [139]:
(
  documents_topics
    .groupby('document_id')
    .agg(F.countDistinct("topic_id").alias("topics_cnt"))
    .orderBy("topics_cnt")
    .limit(10)
    .show()
)

+-----------+----------+
|document_id|topics_cnt|
+-----------+----------+
|    2733387|         1|
|    2480990|         1|
|    2604517|         1|
|     769267|         1|
|    2883834|         1|
|    1591304|         1|
|    2642433|         1|
|    2152463|         1|
|    2839332|         1|
|    2961911|         1|
+-----------+----------+



In [112]:
page_views.select("traffic_source").distinct().show()

+--------------+
|traffic_source|
+--------------+
|             3|
|             1|
|             2|
+--------------+



In [125]:
page_views.select("traffic_source").distinct().explain()

== Physical Plan ==
*(2) HashAggregate(keys=[traffic_source#338], functions=[])
+- Exchange hashpartitioning(traffic_source#338, 200)
   +- *(1) HashAggregate(keys=[traffic_source#338], functions=[])
      +- *(1) Project [traffic_source#338]
         +- *(1) FileScan parquet [traffic_source#338] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3://ydatazian/page_views.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<traffic_source:string>


In [115]:
from pyspark.sql.window import Window

In [137]:
w = Window.partitionBy('topic_id')\
          .orderBy('confidence_level')
(
  documents_topics
    .withColumn('row_number', F.row_number().over(w) - 1)
    .orderBy("row_number")
    .take(100)
)

[Row(document_id='1224367', topic_id='257', confidence_level='0.00800036766405164', row_number=0),
 Row(document_id='464561', topic_id='168', confidence_level='0.00800001705882854', row_number=0),
 Row(document_id='1556465', topic_id='6', confidence_level='0.00800008064000644', row_number=0),
 Row(document_id='489921', topic_id='31', confidence_level='0.0080008014972619', row_number=0),
 Row(document_id='1721729', topic_id='100', confidence_level='0.00800006482315529', row_number=0),
 Row(document_id='1648188', topic_id='268', confidence_level='0.00800010666666665', row_number=0),
 Row(document_id='1014485', topic_id='185', confidence_level='0.00800018794669673', row_number=0),
 Row(document_id='1382424', topic_id='99', confidence_level='0.00800032192007726', row_number=0),
 Row(document_id='1771673', topic_id='107', confidence_level='0.00800083967475122', row_number=0),
 Row(document_id='1210186', topic_id='160', confidence_level='0.00800055060004129', row_number=0),
 Row(document_id=

## HW

Dataset: outbrain click prediction

Tasks:
using Spark RDD, DataFrame API and Python, complete following tasks:

**1 (1 point)**. Find 10 most visited document_ids in page_views log

**2 (1 point)**. Find out how many users have at least two different traffic_sources in their page_views log

**3* (1 additional point)**. Find 10 most visited topic_ids in page_views log (use documents_topics table)