# Apache Spark

## Co se naučíte?

* instalaci dockerizovaného Sparku
* seznámíte se základní datovou abstrakcí (RDD — *resilient distributed data set*)
* prozkoumáte možnosti nízkoúrovňových operací nad RDD

## Co je Spark

I když se přístup, který zavedl Hadoop ukázal jako podnětný, přináší i jistá omezení:

* data (a to i mezivýsledky) jsou ukládány do HDFS, který sice nabízí robustnost, ale i vyšší režii (ta se pro jevuje především při zpracování dat)
* základní model MapReduce není příliš flexibilní a nelte jej aplikovat na některé třídy problémů
* Java je jediný jazyk, který je v Hadoopu nativně podporován, ostatní jazyky musí využívat jen nepříliš elegantní proudové rozhraní
* obtížné ladění programů (nelze je ladit bez minimálně tří běžících služeb)

Z tohoto důvodu vzniklo několik řešení, které se při zachování výhod Hadoopu (distribuovanost dat i výpočtů) snaží nabídnout efektivnější a flexibilnější model.

Spark patří mezi ty nejambicioznější, neboť poskytuje vysokoúrovňovou platformu s podporou několika programovacích jazyků, několika API a vlastní implementací výpočetního clusteru (tj. bez závislosti na Hadoopu).

Základní charakteristikou Sparku je distribuované ukládání mezivýsledků do operační paměti. Tyto tzv. *resilient distributed dataset* jsou při zachování odolnosti proti výpadkům mnohem efektivnější než HDFS soubory. Na druhou stranu jej jejich velikost omezena na gigabyty (nejsou tak vhodné pro opravdu enormní datové sady).

## Instalace

Programy napsané pro Apache Spark lze vykonávat i bez instalace distribuovaného prostředí, neboť Apache Spark lze provozovat ve třech základních režimech:

1. lokálně = pracovní uzly běží jako lokální procesy 
2. prostřednictvím samostatného Spark (v tomto případě je již nutné zajistit distribuci dat a Pythonského prostředí)
3. prostřednictvím Hadoop správce zdrojů a plánovače Yarn (a s vyuužití HDFS pro sdílení dat i kódu)

Lokální běh sice neumožňuje využít sílu distribuovaného zpracování, hodí se však pro účely ladění. 

Nejjednodušším krokem je instalace klintská části. My zvolíme *pyspark* (další možností jsou klienti pro Scalu a R). 
Ten se instaluje jako běžný pythonský framework např. prostřednictvím `pip`.

In [3]:
!pip install pyspark



Instalací se kromě příslušných modulů nainstaluje i interaktivní (REPL) prostředí `pyspark`. Spusťte ho a tím ověříte, že klient je dobře nainstalován.

Po spuštění můžete také zkontrolovat jaký režim je právě využíván:

```
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.1
      /_/

Using Python version 3.9.7 (default, Oct 27 2021 01:23:21)
Spark context Web UI available at http://192.168.122.1:4040
Spark context available as 'sc' (master = local[*], app id = local-1652515703360).
SparkSession available as 'spark'.
>>> 
```

V tomto případě je aktivní režim `locaĺ[*]` tj. běh v  několika souběžných lokálních procesech.

Pokud chcete vyzkoušet distribuované prostředí (i když de facto stále omezené na jednoho hostitele) použijte docker
obraz na adrese https://github.com/big-data-europe/docker-spark.

> **Úkol**: Nainstalujte si tento obraz a prostudujte dokumentaci, jak ho používat.

Pro spuštění můžete využít následující zjednodušenou `docker-compose` konfiguraci:

```yaml
ersion: '3'
services:
  spark-master:
    image: bde2020/spark-master:3.2.0-hadoop3.2
    container_name: spark-master
    ports:
      - "8080:8080"
      - "7077:7077"
    environment:
      - INIT_DAEMON_STEP=setup_spark
  spark-worker-1:
    image: bde2020/spark-worker:3.2.0-hadoop3.2
    container_name: spark-worker-1
    depends_on:
      - spark-master
    ports:
      - "8081:8081"
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"
```

Obecně vždy potřebujete službu master a alespoň jednu službu `worker`.

Po spuštění se zkuste z `pyspark` připojit k serveru `master` (IP adresa se může měnit, použijte `docker network inspect docker-spark_default` pro její zjištění).

```
pyspark --master "spark://172.20.0.2:7077"
```

Jak master tak worker zobrazují stav na na URL `http://localhost:8080` resp. `http://localhost:8081` (podívejte se na ně).

> **Úkol**: Přidejte další worker uzel v `docker-compose.yml`. Nezapomeňte změnit externí port, aby se nepřekrýval s prvním uzlem (např. na 8082). S touto konfigurací bude nadále pracovat.

pro ty, kterým to připadá triviální: 

```
dmVyc2lvbjogJzMnCnNlcnZpY2VzOgogIHNwYXJrLW1hc3RlcjoKICAgIGltYWdlOiBiZGUyMDIw
L3NwYXJrLW1hc3RlcjozLjIuMC1oYWRvb3AzLjIKICAgIGNvbnRhaW5lcl9uYW1lOiBzcGFyay1t
YXN0ZXIKICAgIHBvcnRzOgogICAgICAtICI4MDgwOjgwODAiCiAgICAgIC0gIjcwNzc6NzA3NyIK
ICAgIGVudmlyb25tZW50OgogICAgICAtIElOSVRfREFFTU9OX1NURVA9c2V0dXBfc3BhcmsKICBz
cGFyay13b3JrZXItMToKICAgIGltYWdlOiBiZGUyMDIwL3NwYXJrLXdvcmtlcjozLjIuMC1oYWRv
b3AzLjIKICAgIGNvbnRhaW5lcl9uYW1lOiBzcGFyay13b3JrZXItMQogICAgZGVwZW5kc19vbjoK
ICAgICAgLSBzcGFyay1tYXN0ZXIKICAgIHBvcnRzOgogICAgICAtICI4MDgxOjgwODEiCiAgICBl
bnZpcm9ubWVudDoKICAgICAgLSAiU1BBUktfTUFTVEVSPXNwYXJrOi8vc3BhcmstbWFzdGVyOjcw
NzciCiAgc3Bhcmstd29ya2VyLTI6CiAgICBpbWFnZTogYmRlMjAyMC9zcGFyay13b3JrZXI6My4y
LjAtaGFkb29wMy4yCiAgICBjb250YWluZXJfbmFtZTogc3Bhcmstd29ya2VyLTIKICAgIGRlcGVu
ZHNfb246CiAgICAgIC0gc3BhcmstbWFzdGVyCiAgICBwb3J0czoKICAgICAgLSAiODA4Mjo4MDgx
IgogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gIlNQQVJLX01BU1RFUj1zcGFyazovL3NwYXJrLW1h
c3Rlcjo3MDc3Igo=
```

Nyní už můžeme vytvářet první programy pro Spark. Začneme klíčovou abstrakcí frameworku Spark.

## Resilient distributed dataset

Na nejnižší úrovni programování se ve Sparku pracuje s *resilient distributed datasets*:

1) jsou to datové sady = posloupnosti n-tic (tj. de facto tabulky) včetně posloupnosti řetězců (řádků) a posloupnosti bloků bytů (podporovány jsou elementární typy i komplexní objekty podporovaných jazyků, lze )
2) jsou distribuované na jednotlivé uzly (po částech, tzv. *partitioning*, k dispozici je ale i replikace)
3) jsou odolné proti závadám na jednotlivých uzlech (fault tolerant) 

mezi další vlastnosti patří:
* neměnnost (aplikací mapovací operace vznikne nové RDD)
* úložištěm je primárně operační paměť (v případě většího objemu dat, či požadavku presistentního kešování však mouho být uložena i v distrubuovaném FS)
* mapovací operace nad RDD jsou lenivé (provedou se až tehdy, kdy je očekáván výsledek)

RDD mohou vzniknout dvěma způsoby:

* paralelizací existujícího kontejneru (v Pythonu typicky seznamu nebo numpy pole)
* čtením ze souboru (typicky ze diatribuovaného souborového systému HDFS).

Pro otestování programového přístupu vyzkoušíme elementární úlohu -- paralelizaci numpy pole (čísla 0 až 999) a následná shromáždění do seznamu.

In [12]:
from pyspark import SparkContext, SparkConf
import numpy as np

conf = SparkConf().setAppName("test").setMaster("local")
with  SparkContext(conf=conf) as sc:
    data = np.arange(0, 10_000, dtype=np.int16) #list(range(1_000))
    print(type(data))
    rdd = sc.parallelize(data, 8) # objekt representuje rozddistribuované RDD
    print(type(rdd))
    cdata = rdd.collect()
    print(cdata[:100]) # vypíšeme jen prvních sto položek
    print(type(cdata)) # cdate je seznam

<class 'numpy.ndarray'>
<class 'pyspark.rdd.RDD'>
PythonRDD[1] at RDD at PythonRDD.scala:53
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
<class 'list'>


Jádrem programu je vytvoření tzv. spark-kontextu. Ten je vytvořen na základě konfigurace, která v tomto případě uvádí jen jméno úlohy (*job*) a režim vykonávání. Jméno úlohy se zobrazuje v případě použití distribuovaného systému a režim je v tomto případě jen lokální (spuštění ve Spark clusteru si necháme na zajímavější program).

RDD vznikne paralelizací numpy pole (je možné využít i seznam či jinou sekvenci = indexovatelnou kolekci) metodou nad objektem kontextu. Další metody se volají již přímo nad RDD objektem. Zde je použita  metoda `collect`, která shromáždí data RDD ze všech výpočetních uzlů.

Context je nutno ukončit a uvolnit, což v tomto případě zajišťuje konstrukce `with`.

Reálnější (i když datově opět neadekvátně malý) ukazuje jednoduché zpracování CSV souboru.

Nejdříve je přečten textový soubor, tak že výsledkem je RDD tvořené posloupností záznamů (partitiong nicméně neprobíhá po jednotlivých záznamech ale v blocích jednotek stovek MiB, tj. )

In [14]:
%%writefile data.txt

2022-05-12,10,3
2022-05-13,8,2
2022-05-14,11,1
2022-05-15,14,-3

Writing data.txt


In [9]:
from pyspark import SparkContext, SparkConf
from operator import add

conf = SparkConf().setAppName("test").setMaster("local")
with  SparkContext(conf=conf) as sc:
    rdd = sc.textFile("data.txt")
    t_rdd = rdd.filter(lambda line: line.strip()).map(lambda line: int(line.split(",")[2]))
    result = t_rdd.reduce(add) # redukujeme součtem
    print(result)

3


Nejdříve se provede vytvoření RDD ze souboru. To se nicméně prozatím neprovede , jen se naplánuje. V dalším kroku se provedou dvě **transformace**. První odfiltruje prázdné řádky, druhá je rozloží a vrátí třetí sloupec jako číslo. Všechny transformace jsou lenivé a tak ani na tomto řádku se nic neprovede.

Posledním krokem je redukce operací `add` (= obdoba operátoru `+` v podobě funkce). To je **akce**, která způsobí vykonání všech transformací a vstupních metod na nichž závisí (kromě různých typů redukcí do akcí patří i metoda `collect`. 

> **Úkol**: Upravte kód tak, aby počítal průměr. (existuje několik řešení). Nabídku možných transformací najdete na například na https://spark.apache.org/docs/latest/rdd-programming-guide.html.

Spuštění ve dockerizovaném Spark clusteru, který se víc blíží konečnému nasazení (i když stále není možné mluvit o nějakém skutečně distribuovaném zpracování) není přímočaré.

Za prvé je nutné program uložit jako skript (do stejného adresáře jako soubor `data.txt`). Všimněte si, že se v něm změnila cesta k datovému souboru (to není chyba)!

In [17]:
%%writefile app.py

from pyspark import SparkContext, SparkConf
from operator import add

conf = SparkConf().setAppName("test")
with  SparkContext(conf=conf) as sc:
    rdd = sc.textFile("/app/app.py")
    t_rdd = rdd.filter(lambda line: line.strip()).map(lambda line: int(line.split(",")[2]))
    result = t_rdd.reduce(add) # redukujeme součtem
    print(result)

Overwriting app.py


V dalším kroku je nutné v témže adresáři vytvořit Dockerfile odvozený z bde2020/spark-python-template. (dokumentace viz popis šablony na githubu).

```Dockerfile
FROM bde2020/spark-python-template:3.2.0-hadoop3.2

ENV SPARK_APPLICATION_PYTHON_LOCATION /app/app.py
```

Dále je nutné v tomto adresáři vytvořit soubor `requirements.txt` se jmény nestandardních modulů, jež je nutno nainstalovat přes `pip` (musí být vytvořen i když zůstane prázdný).

Ten poté přeložit:

```bash
docker build --rm -t bde/spark-app .
```

A spustit:

```bash
docker run --rm --name my-spark-app -e ENABLE_INIT_DAEMON=false --link spark-master:spark-master 
           --net docker-spark_default bde/spark-app:latest
```


Výsledek vypadá takto. Je zřejmé, že se spouští celá mašinerie, v jejímž výstupu se vlastní výsledek ztrácí (pokuste se ho najít:))
```
Submit application /app/app.py to Spark master spark://spark-master:7077
Passing arguments 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
22/05/15 20:56:21 INFO SparkContext: Running Spark version 3.2.0
22/05/15 20:56:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/15 20:56:22 INFO ResourceUtils: ==============================================================
22/05/15 20:56:22 INFO ResourceUtils: No custom resources configured for spark.driver.
22/05/15 20:56:22 INFO ResourceUtils: ==============================================================
22/05/15 20:56:22 INFO SparkContext: Submitted application: test
22/05/15 20:56:22 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
22/05/15 20:56:22 INFO ResourceProfile: Limiting resource is cpu
22/05/15 20:56:22 INFO ResourceProfileManager: Added ResourceProfile id: 0
22/05/15 20:56:22 INFO SecurityManager: Changing view acls to: root
22/05/15 20:56:22 INFO SecurityManager: Changing modify acls to: root
22/05/15 20:56:22 INFO SecurityManager: Changing view acls groups to: 
22/05/15 20:56:22 INFO SecurityManager: Changing modify acls groups to: 
22/05/15 20:56:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
22/05/15 20:56:22 INFO Utils: Successfully started service 'sparkDriver' on port 42269.
22/05/15 20:56:22 INFO SparkEnv: Registering MapOutputTracker
22/05/15 20:56:22 INFO SparkEnv: Registering BlockManagerMaster
22/05/15 20:56:22 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
22/05/15 20:56:22 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
22/05/15 20:56:22 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
22/05/15 20:56:22 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-c972acf7-e388-49be-9a39-0c89636a6943
22/05/15 20:56:22 INFO MemoryStore: MemoryStore started with capacity 366.3 MiB
22/05/15 20:56:22 INFO SparkEnv: Registering OutputCommitCoordinator
22/05/15 20:56:22 INFO Utils: Successfully started service 'SparkUI' on port 4040.
22/05/15 20:56:22 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://b30927379e1d:4040
22/05/15 20:56:23 INFO Executor: Starting executor ID driver on host b30927379e1d
22/05/15 20:56:23 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44891.
22/05/15 20:56:23 INFO NettyBlockTransferService: Server created on b30927379e1d:44891
22/05/15 20:56:23 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
22/05/15 20:56:23 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, b30927379e1d, 44891, None)
22/05/15 20:56:23 INFO BlockManagerMasterEndpoint: Registering block manager b30927379e1d:44891 with 366.3 MiB RAM, BlockManagerId(driver, b30927379e1d, 44891, None)
22/05/15 20:56:23 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, b30927379e1d, 44891, None)
22/05/15 20:56:23 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, b30927379e1d, 44891, None)
22/05/15 20:56:24 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 354.1 KiB, free 366.0 MiB)
22/05/15 20:56:24 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 32.0 KiB, free 365.9 MiB)
22/05/15 20:56:24 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on b30927379e1d:44891 (size: 32.0 KiB, free: 366.3 MiB)
22/05/15 20:56:24 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:0
22/05/15 20:56:24 INFO FileInputFormat: Total input files to process : 1
22/05/15 20:56:24 INFO SparkContext: Starting job: reduce at /app/app.py:9
22/05/15 20:56:24 INFO DAGScheduler: Got job 0 (reduce at /app/app.py:9) with 1 output partitions
22/05/15 20:56:24 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at /app/app.py:9)
22/05/15 20:56:24 INFO DAGScheduler: Parents of final stage: List()
22/05/15 20:56:24 INFO DAGScheduler: Missing parents: List()
22/05/15 20:56:24 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at reduce at /app/app.py:9), which has no missing parents
22/05/15 20:56:24 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 8.8 KiB, free 365.9 MiB)
22/05/15 20:56:24 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 5.2 KiB, free 365.9 MiB)
22/05/15 20:56:24 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on b30927379e1d:44891 (size: 5.2 KiB, free: 366.3 MiB)
22/05/15 20:56:24 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1427
22/05/15 20:56:24 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (PythonRDD[2] at reduce at /app/app.py:9) (first 15 tasks are for partitions Vector(0))
22/05/15 20:56:24 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
22/05/15 20:56:24 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (b30927379e1d, executor driver, partition 0, PROCESS_LOCAL, 4487 bytes) taskResourceAssignments Map()
22/05/15 20:56:24 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
22/05/15 20:56:25 INFO HadoopRDD: Input split: file:/app/data.txt:0+65
22/05/15 20:56:25 INFO PythonRunner: Times: total = 423, boot = 410, init = 13, finish = 0
22/05/15 20:56:25 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1440 bytes result sent to driver
22/05/15 20:56:25 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1127 ms on b30927379e1d (executor driver) (1/1)
22/05/15 20:56:25 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
22/05/15 20:56:25 INFO PythonAccumulatorV2: Connected to AccumulatorServer at host: 127.0.0.1 port: 44781
22/05/15 20:56:25 INFO DAGScheduler: ResultStage 0 (reduce at /app/app.py:9) finished in 1.272 s
22/05/15 20:56:25 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
22/05/15 20:56:25 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
22/05/15 20:56:25 INFO DAGScheduler: Job 0 finished: reduce at /app/app.py:9, took 1.332424 s
3
22/05/15 20:56:25 INFO SparkUI: Stopped Spark web UI at http://b30927379e1d:4040
22/05/15 20:56:25 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/05/15 20:56:25 INFO MemoryStore: MemoryStore cleared
22/05/15 20:56:25 INFO BlockManager: BlockManager stopped
22/05/15 20:56:25 INFO BlockManagerMaster: BlockManagerMaster stopped
22/05/15 20:56:25 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/05/15 20:56:25 INFO SparkContext: Successfully stopped SparkContext
22/05/15 20:56:26 INFO ShutdownHookManager: Shutdown hook called
22/05/15 20:56:26 INFO ShutdownHookManager: Deleting directory /tmp/spark-ca1f275f-da48-48ca-a4c2-4ee29d4c547a/pyspark-342607bc-5980-4461-995f-5e3974e557ee
22/05/15 20:56:26 INFO ShutdownHookManager: Deleting directory /tmp/spark-ca1f275f-da48-48ca-a4c2-4ee29d4c547a
22/05/15 20:56:26 INFO ShutdownHookManager: Deleting directory /tmp/spark-a9f763c7-fd12-4a43-8e80-56442b7fffa7
```

Tato šablona zajišťuje tři klíčové podmínky pro spuštění aplikace:
    
1) aplikace musí být spuštěna pomocí příkazu spark-submit, který zajišťuje vykonání tzv. driveru (řídící program na uzlu master) a dílčích úloh v uzlech `worker`.

2) identické softwarové prostředí (stejná verze Pythonu, stejná nabídka knihoven apod.)

navíc je nutné zajistit, že všechny pracovní uzly uvidí uzel na stejné URL adrese. V reálném nasazení je to většinou sdílený HDFS, lze však například využít i HTTP(S). 

> **Úkol:** Vytvořte program, který pro náš textový soubor (resp. rozsáhlejší soubor se stejnou strukturou) vrátí průměrnou hodnotu posledního sloupce a spusťtě jej na (dockerizovaném) Spark clusteru.