In [1]:
# Checklist:
# AWS emr-5.29.0
# MASTER r5d.8xlarge 1x, no EBS
# CORE r5d.8xlarge 4x, no EBS
# Custom bootstrap action: s3://ydatazian/bootstrap.sh
# Allow ssh in master node security group

In [2]:
import tqdm.notebook as tqdm
import numpy as np
import scipy
import sklearn

# Spark

In [3]:
# connect, context, session

import findspark
findspark.init()

import sys
sys.path.append("..")
import spark_utils
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkContext("yarn", "My App", conf=spark_utils.get_spark_conf())

spark_utils.print_ui_links()


NameNode: http://ec2-3-239-36-39.compute-1.amazonaws.com:50070
YARN: http://ec2-3-239-36-39.compute-1.amazonaws.com:8088
Spark UI: http://ec2-3-239-36-39.compute-1.amazonaws.com:20888/proxy/application_1638576385598_0002


In [4]:
se = SparkSession(sc)

## HDFS

In [5]:
! hdfs dfs -df -h

Filesystem                                   Size     Used  Available  Use%
hdfs://ip-172-31-1-244.ec2.internal:8020  547.5 G  714.6 M    545.6 G    0%


In [6]:
! hdfs dfs -ls /

Found 5 items
drwxr-xr-x   - hadoop hadoop          0 2021-12-04 00:12 /test2
drwxrwxrwt   - hdfs   hadoop          0 2021-12-04 00:06 /tmp
drwxr-xr-x   - hadoop hadoop          0 2021-12-04 00:12 /tmp_text2.txt
drwxr-xr-x   - hdfs   hadoop          0 2021-12-04 00:06 /user
drwxr-xr-x   - hdfs   hadoop          0 2021-12-04 00:06 /var


## RDD

RDD (Resilient Distributed Datasets) - base data block of Spark. The system takes care about parts of the data and it's manipulations on distributed system. It could be treated as ordered sequence of rows (commonly key-value pairs like in MapReduce, but could be any arbitrary data).

RDDs are immutable. You get new RDD by making operations on initial RDD.

There is two kinds of operations on RDD: *actions* and *transformations*.

Transformations are not applied instantly, they are stacked in operations order.

Actions are used to materialize transformations (so the data is actually transformed on cluster).

Documentation: https://spark.apache.org/docs/latest/rdd-programming-guide.html

https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

In [7]:
# Let's create simple RDD first
rdd = sc.parallelize(range(10))
rdd

PythonRDD[1] at RDD at PythonRDD.scala:53

### Actions

In [8]:
rdd.collect() # gather data into python, be careful, loads data into memory

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [9]:
rdd.cache()

PythonRDD[1] at RDD at PythonRDD.scala:53

In [10]:
rdd.count() # returns count of objects

10

In [11]:
rdd.first() # get first element of RDD

0

In [12]:
rdd.take(2) # get first N=2 elements

[0, 1]

In [13]:
rdd.mean() # mean of RDD's values

4.5

In [14]:
# We can create RDD with text data
rdd = sc.parallelize(["one", "two"] * 1000)
rdd

ParallelCollectionRDD[12] at parallelize at PythonRDD.scala:195

In [15]:
rdd.collect() # get RDD values

['one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',
 'two',
 'one',


In [None]:
rdd.saveAsTextFile("/tmp_text2.txt")  # save RDD into HDFS

In [22]:
%%bash
hdfs dfs -ls /tmp_text2.txt # parts

Found 501 items
-rw-r--r--   1 hadoop hadoop          0 2021-12-04 00:12 /tmp_text2.txt/_SUCCESS
-rw-r--r--   1 hadoop hadoop         16 2021-12-04 00:12 /tmp_text2.txt/part-00000
-rw-r--r--   1 hadoop hadoop         16 2021-12-04 00:12 /tmp_text2.txt/part-00001
-rw-r--r--   1 hadoop hadoop         16 2021-12-04 00:12 /tmp_text2.txt/part-00002
-rw-r--r--   1 hadoop hadoop         16 2021-12-04 00:12 /tmp_text2.txt/part-00003
-rw-r--r--   1 hadoop hadoop         16 2021-12-04 00:12 /tmp_text2.txt/part-00004
-rw-r--r--   1 hadoop hadoop         16 2021-12-04 00:12 /tmp_text2.txt/part-00005
-rw-r--r--   1 hadoop hadoop         16 2021-12-04 00:12 /tmp_text2.txt/part-00006
-rw-r--r--   1 hadoop hadoop         16 2021-12-04 00:12 /tmp_text2.txt/part-00007
-rw-r--r--   1 hadoop hadoop         16 2021-12-04 00:12 /tmp_text2.txt/part-00008
-rw-r--r--   1 hadoop hadoop         16 2021-12-04 00:12 /tmp_text2.txt/part-00009
-rw-r--r--   1 hadoop hadoop         16 2021-12-04 00:12 /tmp_text2.txt/p

In [23]:
%%bash
hdfs dfs -cat /tmp_text2.txt/* # actual data from parts

one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two
one
two


In [None]:
rdd = sc.parallelize([{1: 1}, {2: 2}] * 1000, 10)
rdd.saveAsTextFile("/test2")

In [25]:
%%bash
hdfs dfs -cat /test2/* # actual data from parts

{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}
{2: 2}
{1: 1}

In [26]:
%%bash
hdfs dfs -ls /test2

Found 11 items
-rw-r--r--   1 hadoop hadoop          0 2021-12-04 00:12 /test2/_SUCCESS
-rw-r--r--   1 hadoop hadoop       1400 2021-12-04 00:12 /test2/part-00000
-rw-r--r--   1 hadoop hadoop       1400 2021-12-04 00:12 /test2/part-00001
-rw-r--r--   1 hadoop hadoop       1400 2021-12-04 00:12 /test2/part-00002
-rw-r--r--   1 hadoop hadoop       1400 2021-12-04 00:12 /test2/part-00003
-rw-r--r--   1 hadoop hadoop       1400 2021-12-04 00:12 /test2/part-00004
-rw-r--r--   1 hadoop hadoop       1400 2021-12-04 00:12 /test2/part-00005
-rw-r--r--   1 hadoop hadoop       1400 2021-12-04 00:12 /test2/part-00006
-rw-r--r--   1 hadoop hadoop       1400 2021-12-04 00:12 /test2/part-00007
-rw-r--r--   1 hadoop hadoop       1400 2021-12-04 00:12 /test2/part-00008
-rw-r--r--   1 hadoop hadoop       1400 2021-12-04 00:12 /test2/part-00009


### Transformations

In [27]:
rdd = sc.parallelize(range(20), 10) # RDD from range

In [28]:
# New RDD with "square + 1" transformation by two map operations;
# Map operations are similar to those from MapReduce,
# the difference - given map functions are applied to each element of rdd:
squares = rdd.map(lambda x: x**2).map(lambda x: x + 1)

# IMPORTANT NOTE - nothing is calculated right now,
# `squares` now only represents sequence for new RDD over initial data

In [29]:
squares

PythonRDD[21] at RDD at PythonRDD.scala:53

In [30]:
squares.first()

# Now we applied Action, so the map transformations are run
# But not all data is calculated, Spark optimized that for us
# and only the value for first row was calculated and returned

1

In [31]:
squares.collect() # get all data

[1,
 2,
 5,
 10,
 17,
 26,
 37,
 50,
 65,
 82,
 101,
 122,
 145,
 170,
 197,
 226,
 257,
 290,
 325,
 362]

In [32]:
len(squares.sample(False, 0.5).collect())

10

In [33]:
squares.flatMap(lambda x: [x, x+1, x+2]).take(5)

[1, 2, 3, 2, 3]

In [34]:
(
    squares
    .takeOrdered(1, lambda x: -x) # top 1
)

[362]

In [35]:
(
    rdd
    .filter(lambda x: x % 2)
    .collect()
)

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]

### MapReduce

In [36]:
# step by step MapReduce emulation:
rdd = sc.parallelize(["this is text", "some more text"], 10)

In [37]:
(
    rdd
    .flatMap(lambda x: [(w, 1) for w in x.split()])
    .collect()
)

[('this', 1), ('is', 1), ('text', 1), ('some', 1), ('more', 1), ('text', 1)]

In [38]:
# we gonna use first iteration often, to save time we can cache the result:
words = rdd.flatMap(lambda x: [(w, 1) for w in x.split()]).cache()
words.count()

6

In [39]:
words.collect()

[('this', 1), ('is', 1), ('text', 1), ('some', 1), ('more', 1), ('text', 1)]

**PairRDD**

If you have a tuple of length 2 as your RDD data type, you can use *ByKey operations on your RDD, with first value of tuple being the key and second being the value. Let's create such RDD.

We want to aggregate data by key (word), we are able to do it with `groupByKey` method, it will produce values iterable.

In [40]:
(
    words
    .groupByKey()
    .collect()
)

[('text', <pyspark.resultiterable.ResultIterable at 0x7fd59551db38>),
 ('more', <pyspark.resultiterable.ResultIterable at 0x7fd59551db00>),
 ('this', <pyspark.resultiterable.ResultIterable at 0x7fd59551dba8>),
 ('some', <pyspark.resultiterable.ResultIterable at 0x7fd59551dc18>),
 ('is', <pyspark.resultiterable.ResultIterable at 0x7fd59551dc88>)]

And of course we can use any function in map, not just lambdas,

with regular `map` function you can change key/value, create complex keys

In [41]:
def mapToList(x):
    return x[0], list(x[1])

data = (
    words
    .groupByKey()
    .map(mapToList)
    .collect()
)

data

[('text', [1, 1]), ('more', [1]), ('this', [1]), ('some', [1]), ('is', [1])]

We may use `.mapValues` method to manipulate only with values and leave keys intact:

In [42]:
def mapValuesToLen(x):
    return len(x)

(
    words
    .groupByKey()
    .mapValues(mapValuesToLen)
    .collect()
)

[('text', 2), ('more', 1), ('this', 1), ('some', 1), ('is', 1)]

In [43]:
(
    words
    .groupByKey()
    .map(lambda x: (x[0], len(x[1])))
    .collect()
)

[('text', 2), ('more', 1), ('this', 1), ('some', 1), ('is', 1)]

Another way to manipulate values grouped by key is reduce performed by `reduceByKey` operation:

In [44]:
(
    words # -> (word, cnt=1)
    .reduceByKey(lambda a, b: a + b) # -> (word, sum(cnt))
    .collect()
)

[('text', 2), ('more', 1), ('this', 1), ('some', 1), ('is', 1)]

In [45]:
# how errors look like:
(
    rdd
    .flatMap(lambda x: [(w, len(1)) for w in x.split()]) # len from int
    .reduceByKey(lambda a, b: a + b) # -> (word, sum(cnt))
    .collect()
)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 31.0 failed 4 times, most recent failure: Lost task 4.3 in stage 31.0 (TID 4886, ip-172-31-6-7.ec2.internal, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1638576385598_0002/container_1638576385598_0002_01_000004/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1638576385598_0002/container_1638576385598_0002_01_000004/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2499, in pipeline_func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2499, in pipeline_func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 352, in func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1861, in combineLocally
  File "/mnt/yarn/usercache/hadoop/appcache/application_1638576385598_0002/container_1638576385598_0002_01_000004/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "/mnt/yarn/usercache/hadoop/appcache/application_1638576385598_0002/container_1638576385598_0002_01_000004/pyspark.zip/pyspark/util.py", line 113, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-45-c40113ebb6e2>", line 4, in <lambda>
  File "<ipython-input-45-c40113ebb6e2>", line 4, in <listcomp>
TypeError: object of type 'int' has no len()

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor52.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1638576385598_0002/container_1638576385598_0002_01_000004/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1638576385598_0002/container_1638576385598_0002_01_000004/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2499, in pipeline_func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2499, in pipeline_func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 352, in func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1861, in combineLocally
  File "/mnt/yarn/usercache/hadoop/appcache/application_1638576385598_0002/container_1638576385598_0002_01_000004/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "/mnt/yarn/usercache/hadoop/appcache/application_1638576385598_0002/container_1638576385598_0002_01_000004/pyspark.zip/pyspark/util.py", line 113, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-45-c40113ebb6e2>", line 4, in <lambda>
  File "<ipython-input-45-c40113ebb6e2>", line 4, in <listcomp>
TypeError: object of type 'int' has no len()

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


## Broadcast and accumulator

In [46]:
bc = sc.broadcast({"this": 0, "is": 1, "text": 2})  # read-only
errors = sc.accumulator(0)  # write-only

# x - "this is text"
def mapper(x):
    global errors
    for w in x.split():
        if w in bc.value:
            yield (bc.value[w], 1)
        else:
            errors += 1

rdd = (
    sc
   .parallelize(["this is text too", "text too too"], 10)
   .flatMap(mapper)
   .reduceByKey(lambda a, b: a + b)
)
print(rdd)
print(rdd.collect())
print("errors:", errors.value)

PythonRDD[66] at RDD at PythonRDD.scala:53
[(0, 1), (1, 1), (2, 2)]
errors: 3


## DataFrame API

RDD is much better and useful than plain MapReduce, but Spark can do even more!
Spark DataFrame is table structure over RDDs and can be treated as pandas on steroids.

It allows us to perform structured queries and benefit from it. One way is to perform SQL-styled queries (will discuss on next lesson) and another is DataFrame API.

Documentation: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html

In [47]:
# import pandas as pd

In [48]:
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4)])
rdd.collect()

[('a', 1), ('a', 2), ('b', 3), ('b', 4)]

In [49]:
df = se.createDataFrame(rdd)
df.printSchema()
df.show()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

+---+---+
| _1| _2|
+---+---+
|  a|  1|
|  a|  2|
|  b|  3|
|  b|  4|
+---+---+



In [50]:
# df -> rdd
df.rdd.collect()

[Row(_1='a', _2=1), Row(_1='a', _2=2), Row(_1='b', _2=3), Row(_1='b', _2=4)]

In [51]:
# rdd -> df
rdd.collect()

[('a', 1), ('a', 2), ('b', 3), ('b', 4)]

In [52]:
from pyspark.sql import Row

df = se.createDataFrame(
    rdd.map(lambda x: Row(col_one=x[0], col_two=x[1]))
)
df.printSchema()
df.show()

root
 |-- col_one: string (nullable = true)
 |-- col_two: long (nullable = true)

+-------+-------+
|col_one|col_two|
+-------+-------+
|      a|      1|
|      a|      2|
|      b|      3|
|      b|      4|
+-------+-------+



In [53]:
df.select(['col_one']).limit(2).show()

+-------+
|col_one|
+-------+
|      a|
|      a|
+-------+



In [54]:
df.select(['col_one']).distinct().show()

+-------+
|col_one|
+-------+
|      b|
|      a|
+-------+



In [55]:
# when you need it in Python
df.select(['col_one']).distinct().rdd.map(lambda x: x.col_one).collect()

['b', 'a']

Docs: https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [56]:
from pyspark.sql import functions as F

(
    df
    .select(['col_one', 'col_two'])
    .where(F.col('col_one') == 'a')
    .limit(2)
    .show()
)

+-------+-------+
|col_one|col_two|
+-------+-------+
|      a|      1|
|      a|      2|
+-------+-------+



In [57]:
(
    df
    .select(['col_one', 'col_two'])
    .where(df.col_one == 'a')
    .limit(2)
    .show()
)

+-------+-------+
|col_one|col_two|
+-------+-------+
|      a|      1|
|      a|      2|
+-------+-------+



In [58]:
# the same thing
df.registerTempTable("table")
se.sql("select col_one, col_two from table where col_one = 'a' limit 2").show()

+-------+-------+
|col_one|col_two|
+-------+-------+
|      a|      2|
|      a|      1|
+-------+-------+



In [59]:
df.show()

+-------+-------+
|col_one|col_two|
+-------+-------+
|      a|      1|
|      a|      2|
|      b|      3|
|      b|      4|
+-------+-------+



In [60]:
df = df.select('*', df['col_two'].cast('float').alias('col_two_float'))
df.show()

+-------+-------+-------------+
|col_one|col_two|col_two_float|
+-------+-------+-------------+
|      a|      1|          1.0|
|      a|      2|          2.0|
|      b|      3|          3.0|
|      b|      4|          4.0|
+-------+-------+-------------+



In [61]:
# the same thing
se.sql("""
select *, cast(col_two as float) as col_two_float
from table
""").show()

+-------+-------+-------------+
|col_one|col_two|col_two_float|
+-------+-------+-------------+
|      a|      1|          1.0|
|      a|      2|          2.0|
|      b|      3|          3.0|
|      b|      4|          4.0|
+-------+-------+-------------+



In [62]:
square_df = df.select('col_one', (df['col_two_float'] * df['col_two_float']).alias('col_two_square'))
square_df.orderBy('col_two_square', ascending=False).show(5)

+-------+--------------+
|col_one|col_two_square|
+-------+--------------+
|      b|          16.0|
|      b|           9.0|
|      a|           4.0|
|      a|           1.0|
+-------+--------------+



In [63]:
square_df.registerTempTable('square_df')
se.sql('select * from square_df order by col_two_square DESC limit 5').show()

+-------+--------------+
|col_one|col_two_square|
+-------+--------------+
|      b|          16.0|
|      b|           9.0|
|      a|           4.0|
|      a|           1.0|
+-------+--------------+



In [64]:
df.show()

+-------+-------+-------------+
|col_one|col_two|col_two_float|
+-------+-------+-------------+
|      a|      1|          1.0|
|      a|      2|          2.0|
|      b|      3|          3.0|
|      b|      4|          4.0|
+-------+-------+-------------+



In [65]:
se.sql('select col_one, collect_list(col_two) as col_two_list from table group by col_one limit 10').show()

+-------+------------+
|col_one|col_two_list|
+-------+------------+
|      a|      [1, 2]|
|      b|      [3, 4]|
+-------+------------+



In [66]:
df.alias('df1').join(df.alias('df2'), on='col_one').show()

+-------+-------+-------------+-------+-------------+
|col_one|col_two|col_two_float|col_two|col_two_float|
+-------+-------+-------------+-------+-------------+
|      b|      3|          3.0|      3|          3.0|
|      b|      3|          3.0|      4|          4.0|
|      b|      4|          4.0|      3|          3.0|
|      b|      4|          4.0|      4|          4.0|
|      a|      1|          1.0|      1|          1.0|
|      a|      1|          1.0|      2|          2.0|
|      a|      2|          2.0|      1|          1.0|
|      a|      2|          2.0|      2|          2.0|
+-------+-------+-------------+-------+-------------+



In [68]:
df.alias('df1').join(square_df.alias('df2'), F.col('df1.col_one') == F.col('df2.col_one'), 'inner').show()

+-------+-------+-------------+-------+--------------+
|col_one|col_two|col_two_float|col_one|col_two_square|
+-------+-------+-------------+-------+--------------+
|      b|      3|          3.0|      b|          16.0|
|      b|      3|          3.0|      b|           9.0|
|      b|      4|          4.0|      b|          16.0|
|      b|      4|          4.0|      b|           9.0|
|      a|      1|          1.0|      a|           1.0|
|      a|      1|          1.0|      a|           4.0|
|      a|      2|          2.0|      a|           1.0|
|      a|      2|          2.0|      a|           4.0|
+-------+-------+-------------+-------+--------------+



In [69]:
se.sql('select * from table df1 join table df2 on df1.col_one == df2.col_one').show()

+-------+-------+-------+-------+
|col_one|col_two|col_one|col_two|
+-------+-------+-------+-------+
|      b|      4|      b|      4|
|      b|      4|      b|      3|
|      b|      3|      b|      4|
|      b|      3|      b|      3|
|      a|      2|      a|      2|
|      a|      2|      a|      1|
|      a|      1|      a|      2|
|      a|      1|      a|      1|
+-------+-------+-------+-------+



In [None]:
# import pandas as pd

## Data formats

In [111]:
# We may want to operate with not just plain text, but something more complex
# For example, Parquet - it can be useful for huge datasets for faster calcs
df.write.save("data.parquet")

In [114]:
! hdfs dfs -ls /user/hadoop/*

Found 1 items
drwx------   - hadoop hadoop          0 2021-11-10 09:25 /user/hadoop/.sparkStaging/application_1636529659796_0001
Found 6 items
-rw-r--r--   1 hadoop hadoop          0 2021-11-10 10:31 /user/hadoop/data.parquet/_SUCCESS
-rw-r--r--   1 hadoop hadoop        450 2021-11-10 10:31 /user/hadoop/data.parquet/part-00000-2586d0f6-6e36-4485-8b3f-04cc1f4ebaec-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop        859 2021-11-10 10:31 /user/hadoop/data.parquet/part-00124-2586d0f6-6e36-4485-8b3f-04cc1f4ebaec-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop        859 2021-11-10 10:31 /user/hadoop/data.parquet/part-00249-2586d0f6-6e36-4485-8b3f-04cc1f4ebaec-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop        859 2021-11-10 10:31 /user/hadoop/data.parquet/part-00374-2586d0f6-6e36-4485-8b3f-04cc1f4ebaec-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop        859 2021-11-10 10:31 /user/hadoop/data.parquet/part-00499-2586d0f6-6e36-4485-8b3f-04cc1f4ebaec-c000.snappy.parquet


In [115]:
data = se.read.parquet("data.parquet")
data.rdd.collect()

[Row(col_one='a', col_two=1, col_two_float=1.0),
 Row(col_one='a', col_two=2, col_two_float=2.0),
 Row(col_one='b', col_two=3, col_two_float=3.0),
 Row(col_one='b', col_two=4, col_two_float=4.0)]