# Meetup Lublin 29.05.19

# Analiza danych w Apache Spark


<div style="text-align: right">
<b>Patryk Pilarski</b><br>
1patryk.pilarski@gmail.com<br>
p.pilarski@sages.com.pl
</div>

----

## Apache Spark

Apache Spark to silnik do obliczeń rozproszonych na licencji open-source. Pierwotnie powstał na Berkley, po czym przekazano go do Apache Software Foundation gdzie jest od tamtej pory utrzymywany i rozwijany. Spark oferuje interfejs pozwalający na programowanie obliczeń na klastrach z domyślną paralelizacją oraz odpornością na awarie.

Ze Sparkiem pracować możemy w Scali, Pythonie, Javie oraz R.

**Komponenty Sparka:**

![title](spark-stack.png)

* Spark "core" - podstawa Sparka z podstawową abstrakcją danych nazywaną RDD
* Spark SQL - komponent pozwalający na operowanie na ustrukturyzowanych danych z wykorzystaniem operacji znanych z SQL
* Spark MLlib - komponent zawierający algorytmy ML dostępne w Sparku
* Spark Streaming - moduł pozwalający na pracę ze strumnieniami danych
* Spark GraphX - komponent do pracy z grafami

**Architektura Sparka:**

![title](cluster-overview.png)

* driver - proces uruchamiający główną funkcję aplikacji i tworzący SparkContext
* executor(y) - proces uruchomiony dla aplikacji w węźle roboczym (worker node), który uruchamia zadania i przechowuje dane w pamięci lub na dysku. Każda aplikacja ma własne executory
* cluster manager - dostępne opcje: YARN, Mesos, Kubernetes, Standalone

**SparkContext:**
* punkt wejścia do pracy ze Sparkiem
* koordynuje procesy na klastrze
* zatrzymanie SparkContextu == zatrzymanie działania aplikacji
* zwykle nazywany `sc`
* kroki niezbędne do utworzenia SparkContextu w pySparku:

> import pyspark<br>
> sc = pyspark.SparkContext(appName="my_app")

**SparkSession:**
* wprowadzony w Spark 2.0
* składa się ze SparkContextu, SQLContextu oraz HiveContext
* zwykle nazywany `spark`
* kroki niezbędne do utworzenia SparkSession w pySparku:

> from pyspark.sql import SparkSession<br>
> spark = SparkSession.builder.appName('my_app').getOrCreate()


**RDD:**
* podstawowa abstrakcja danych w Sparku
* R - resilient
* D - distributed
* D - dataset
* Matei Zharia, et al. `Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing`
* immutable
* in-memory
* lazy evaluated
* parallel
* dwa typy operacji: akcje i transformacje


[Dokumentacja](https://spark.apache.org/docs/latest/api/python/pyspark.html)

In [1]:
import pyspark
sc = pyspark.SparkContext(appName="demo_rdd")

In [2]:
rdd = sc.textFile("MobyDick.txt")

In [3]:
type(rdd)

pyspark.rdd.RDD

In [4]:
rdd.take(20)

['MOBY DICK;',
 '',
 '',
 'or, THE WHALE.',
 '',
 '',
 'CHAPTER 1. Loomings.',
 '',
 'Call me Ishmael. Some years ago—never mind how long precisely—having',
 'little or no money in my purse, and nothing particular to interest me on',
 'shore, I thought I would sail about a little and see the watery part of',
 'the world. It is a way I have of driving off the spleen and regulating',
 'the circulation. Whenever I find myself growing grim about the mouth;',
 'whenever it is a damp, drizzly November in my soul; whenever I find',
 'myself involuntarily pausing before coffin warehouses, and bringing up',
 'the rear of every funeral I meet; and especially whenever my hypos get',
 'such an upper hand of me, that it requires a strong moral principle to',
 'prevent me from deliberately stepping into the street, and methodically',
 'knocking people’s hats off—then, I account it high time to get to',
 'sea as soon as I can. This is my substitute for pistol and ball. With']

In [5]:
rdd.map(lambda x: x.lower()).take(20)

['moby dick;',
 '',
 '',
 'or, the whale.',
 '',
 '',
 'chapter 1. loomings.',
 '',
 'call me ishmael. some years ago—never mind how long precisely—having',
 'little or no money in my purse, and nothing particular to interest me on',
 'shore, i thought i would sail about a little and see the watery part of',
 'the world. it is a way i have of driving off the spleen and regulating',
 'the circulation. whenever i find myself growing grim about the mouth;',
 'whenever it is a damp, drizzly november in my soul; whenever i find',
 'myself involuntarily pausing before coffin warehouses, and bringing up',
 'the rear of every funeral i meet; and especially whenever my hypos get',
 'such an upper hand of me, that it requires a strong moral principle to',
 'prevent me from deliberately stepping into the street, and methodically',
 'knocking people’s hats off—then, i account it high time to get to',
 'sea as soon as i can. this is my substitute for pistol and ball. with']

In [6]:
rdd.flatMap(lambda x: x.lower().split(" "))\
.map(lambda x: (x.strip(".,;:?!-"), 1))\
.reduceByKey(lambda x,y: x+y)\
.takeOrdered(20, lambda x: -x[1])

[('the', 14161),
 ('of', 6549),
 ('and', 6286),
 ('a', 4566),
 ('to', 4564),
 ('in', 4085),
 ('', 3560),
 ('that', 2873),
 ('his', 2479),
 ('it', 2288),
 ('i', 1832),
 ('with', 1729),
 ('but', 1728),
 ('as', 1700),
 ('he', 1697),
 ('is', 1687),
 ('was', 1617),
 ('for', 1581),
 ('all', 1456),
 ('this', 1381)]

In [7]:
sc.stop()

**DataFrame:**
* abstrakcja danych z modułu Spark SQL - u podstaw leżą RDD
* immutable
* in-memory
* resilient
* distributed
* parallel
* przechowuje dodatkowe informacje o strukturze danych (schema)
* rozproszona kolekcja wierszy z nazwanymi kolumnami
* optymalizowane przez Catalyst Optymizer
* pozwala na pracę z danymi wykorzysując zapytania znane z SQL/Hive

[Dokumentacja](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.ml import feature, classification, evaluation, Pipeline

spark = SparkSession.builder\
    .appName('DataFrame')\
    .getOrCreate()

In [9]:
iris = spark.read.csv("iris.csv", header=True, inferSchema=True)

In [10]:
type(iris)

pyspark.sql.dataframe.DataFrame

In [11]:
iris.show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         5.0|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
|         5.4|        3.7|         1.5|        0.2| setosa|
|         4.8|        3.4|         1.6|        0.2| setosa|
|         4.8|        3.0|         1.4|        0.1| setosa|
|         4.3|        3.0|         1.1| 

In [12]:
iris.select("sepal_length", "sepal_width", "species").show()

+------------+-----------+-------+
|sepal_length|sepal_width|species|
+------------+-----------+-------+
|         5.1|        3.5| setosa|
|         4.9|        3.0| setosa|
|         4.7|        3.2| setosa|
|         4.6|        3.1| setosa|
|         5.0|        3.6| setosa|
|         5.4|        3.9| setosa|
|         4.6|        3.4| setosa|
|         5.0|        3.4| setosa|
|         4.4|        2.9| setosa|
|         4.9|        3.1| setosa|
|         5.4|        3.7| setosa|
|         4.8|        3.4| setosa|
|         4.8|        3.0| setosa|
|         4.3|        3.0| setosa|
|         5.8|        4.0| setosa|
|         5.7|        4.4| setosa|
|         5.4|        3.9| setosa|
|         5.1|        3.5| setosa|
|         5.7|        3.8| setosa|
|         5.1|        3.8| setosa|
+------------+-----------+-------+
only showing top 20 rows



In [13]:
iris.createOrReplaceTempView("iris")

In [14]:
spark.sql("select sepal_length, sepal_width, species from iris").show()

+------------+-----------+-------+
|sepal_length|sepal_width|species|
+------------+-----------+-------+
|         5.1|        3.5| setosa|
|         4.9|        3.0| setosa|
|         4.7|        3.2| setosa|
|         4.6|        3.1| setosa|
|         5.0|        3.6| setosa|
|         5.4|        3.9| setosa|
|         4.6|        3.4| setosa|
|         5.0|        3.4| setosa|
|         4.4|        2.9| setosa|
|         4.9|        3.1| setosa|
|         5.4|        3.7| setosa|
|         4.8|        3.4| setosa|
|         4.8|        3.0| setosa|
|         4.3|        3.0| setosa|
|         5.8|        4.0| setosa|
|         5.7|        4.4| setosa|
|         5.4|        3.9| setosa|
|         5.1|        3.5| setosa|
|         5.7|        3.8| setosa|
|         5.1|        3.8| setosa|
+------------+-----------+-------+
only showing top 20 rows



In [15]:
iris.groupBy("species").avg().show()

+----------+-----------------+------------------+-----------------+------------------+
|   species|avg(sepal_length)|  avg(sepal_width)|avg(petal_length)|  avg(petal_width)|
+----------+-----------------+------------------+-----------------+------------------+
| virginica|6.587999999999998|2.9739999999999998|            5.552|             2.026|
|versicolor|            5.936|2.7700000000000005|             4.26|1.3259999999999998|
|    setosa|5.005999999999999|3.4180000000000006|            1.464|0.2439999999999999|
+----------+-----------------+------------------+-----------------+------------------+



In [16]:
q = """select species, avg(sepal_length), avg(sepal_width), avg(petal_length), avg(petal_width) 
from iris group by species"""

spark.sql(q).show()

+----------+-----------------+------------------+-----------------+------------------+
|   species|avg(sepal_length)|  avg(sepal_width)|avg(petal_length)|  avg(petal_width)|
+----------+-----------------+------------------+-----------------+------------------+
| virginica|6.587999999999998|2.9739999999999998|            5.552|             2.026|
|versicolor|            5.936|2.7700000000000005|             4.26|1.3259999999999998|
|    setosa|5.005999999999999|3.4180000000000006|            1.464|0.2439999999999999|
+----------+-----------------+------------------+-----------------+------------------+



In [17]:
pd_iris = iris.groupBy("species").avg().toPandas()

In [18]:
type(pd_iris)

pandas.core.frame.DataFrame

In [19]:
pd_iris

Unnamed: 0,species,avg(sepal_length),avg(sepal_width),avg(petal_length),avg(petal_width)
0,virginica,6.588,2.974,5.552,2.026
1,versicolor,5.936,2.77,4.26,1.326
2,setosa,5.006,3.418,1.464,0.244


----

### Porównanie pandas vs pyspark lokalnie

Query 1. SELECT max(ss_list_price) FROM store_sales
<img src="image2.png" alt="drawing" width="600"/>

Query 2. SELECT count(distinct ss_customer_sk) FROM store_sales
<img src="image1.png" alt="drawing" width="600"/>

Query 3. SELECT sum(ss_net_profit) FROM store_sales GROUP BY ss_store_sk
<img src="image4.png" alt="drawing" width="600"/>

Artykuł Databricks: [link](https://databricks.com/blog/2018/05/03/benchmarking-apache-spark-on-a-single-node-machine.html)

**ML:**
* Transformer - algorytm przekształcający wejściowy DF w inny DF, np. wytrenowany model ML tworzący nowy DF zawierający predykcje (transform)
* Estymator - algorytm który na podstawie DF tworzy transformer (fit)
* Pipeline - szeregowe połączenie transformerów i estymatorów w celu utworzenia przepływu (workflow)

[Dokumentacja](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html)

In [20]:
iris.show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         5.0|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
|         5.4|        3.7|         1.5|        0.2| setosa|
|         4.8|        3.4|         1.6|        0.2| setosa|
|         4.8|        3.0|         1.4|        0.1| setosa|
|         4.3|        3.0|         1.1| 

Podział na zbiór uczący i testowy

In [21]:
train_raw, test_raw = iris.randomSplit([0.7, 0.3], seed=42)

In [22]:
print("Train")
train_raw.groupBy("species").count().show()

print("Test")
test_raw.groupBy("species").count().show()

Train
+----------+-----+
|   species|count|
+----------+-----+
| virginica|   38|
|versicolor|   37|
|    setosa|   39|
+----------+-----+

Test
+----------+-----+
|   species|count|
+----------+-----+
| virginica|   12|
|versicolor|   13|
|    setosa|   11|
+----------+-----+



Przekształcenie zmiennej celu do postaci numerycznej

In [23]:
strIdx = feature.StringIndexer(inputCol="species", outputCol="label")

In [24]:
strIdx_trans = strIdx.fit(train_raw)

In [25]:
train = strIdx_trans.transform(train_raw)
train.show()

+------------+-----------+------------+-----------+----------+-----+
|sepal_length|sepal_width|petal_length|petal_width|   species|label|
+------------+-----------+------------+-----------+----------+-----+
|         4.3|        3.0|         1.1|        0.1|    setosa|  0.0|
|         4.5|        2.3|         1.3|        0.3|    setosa|  0.0|
|         4.6|        3.1|         1.5|        0.2|    setosa|  0.0|
|         4.6|        3.2|         1.4|        0.2|    setosa|  0.0|
|         4.6|        3.4|         1.4|        0.3|    setosa|  0.0|
|         4.6|        3.6|         1.0|        0.2|    setosa|  0.0|
|         4.7|        3.2|         1.3|        0.2|    setosa|  0.0|
|         4.7|        3.2|         1.6|        0.2|    setosa|  0.0|
|         4.8|        3.0|         1.4|        0.1|    setosa|  0.0|
|         4.8|        3.0|         1.4|        0.3|    setosa|  0.0|
|         4.8|        3.4|         1.6|        0.2|    setosa|  0.0|
|         4.8|        3.4|        

Przygotowanie zmiennych objaśniających

In [26]:
vectA = feature.VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"], 
                                outputCol="feats")

In [27]:
train = vectA.transform(train)
train.show()

+------------+-----------+------------+-----------+----------+-----+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|   species|label|            feats|
+------------+-----------+------------+-----------+----------+-----+-----------------+
|         4.3|        3.0|         1.1|        0.1|    setosa|  0.0|[4.3,3.0,1.1,0.1]|
|         4.5|        2.3|         1.3|        0.3|    setosa|  0.0|[4.5,2.3,1.3,0.3]|
|         4.6|        3.1|         1.5|        0.2|    setosa|  0.0|[4.6,3.1,1.5,0.2]|
|         4.6|        3.2|         1.4|        0.2|    setosa|  0.0|[4.6,3.2,1.4,0.2]|
|         4.6|        3.4|         1.4|        0.3|    setosa|  0.0|[4.6,3.4,1.4,0.3]|
|         4.6|        3.6|         1.0|        0.2|    setosa|  0.0|[4.6,3.6,1.0,0.2]|
|         4.7|        3.2|         1.3|        0.2|    setosa|  0.0|[4.7,3.2,1.3,0.2]|
|         4.7|        3.2|         1.6|        0.2|    setosa|  0.0|[4.7,3.2,1.6,0.2]|
|         4.8|        3.0|         1.4|    

Przeskalowanie zmiennych

In [28]:
scaler = feature.StandardScaler(inputCol="feats", outputCol="features")

In [29]:
scaler_trans = scaler.fit(train)

In [30]:
train = scaler_trans.transform(train)
train.show()

+------------+-----------+------------+-----------+----------+-----+-----------------+--------------------+
|sepal_length|sepal_width|petal_length|petal_width|   species|label|            feats|            features|
+------------+-----------+------------+-----------+----------+-----+-----------------+--------------------+
|         4.3|        3.0|         1.1|        0.1|    setosa|  0.0|[4.3,3.0,1.1,0.1]|[5.25128172868887...|
|         4.5|        2.3|         1.3|        0.3|    setosa|  0.0|[4.5,2.3,1.3,0.3]|[5.49552739048835...|
|         4.6|        3.1|         1.5|        0.2|    setosa|  0.0|[4.6,3.1,1.5,0.2]|[5.61765022138809...|
|         4.6|        3.2|         1.4|        0.2|    setosa|  0.0|[4.6,3.2,1.4,0.2]|[5.61765022138809...|
|         4.6|        3.4|         1.4|        0.3|    setosa|  0.0|[4.6,3.4,1.4,0.3]|[5.61765022138809...|
|         4.6|        3.6|         1.0|        0.2|    setosa|  0.0|[4.6,3.6,1.0,0.2]|[5.61765022138809...|
|         4.7|        3.2|  

Utworzenie i wytrenowanie modelu regresji logistycznej

In [31]:
lr = classification.LogisticRegression()

In [32]:
lr_mod = lr.fit(train)

Przygotowanie zbioru testowego

In [33]:
test = strIdx_trans.transform(test_raw)
test = vectA.transform(test)
test = scaler_trans.transform(test)
test.show()

+------------+-----------+------------+-----------+----------+-----+-----------------+--------------------+
|sepal_length|sepal_width|petal_length|petal_width|   species|label|            feats|            features|
+------------+-----------+------------+-----------+----------+-----+-----------------+--------------------+
|         4.4|        2.9|         1.4|        0.2|    setosa|  0.0|[4.4,2.9,1.4,0.2]|[5.37340455958861...|
|         4.4|        3.0|         1.3|        0.2|    setosa|  0.0|[4.4,3.0,1.3,0.2]|[5.37340455958861...|
|         4.4|        3.2|         1.3|        0.2|    setosa|  0.0|[4.4,3.2,1.3,0.2]|[5.37340455958861...|
|         4.8|        3.1|         1.6|        0.2|    setosa|  0.0|[4.8,3.1,1.6,0.2]|[5.86189588318758...|
|         4.9|        2.4|         3.3|        1.0|versicolor|  2.0|[4.9,2.4,3.3,1.0]|[5.98401871408732...|
|         5.0|        3.3|         1.4|        0.2|    setosa|  0.0|[5.0,3.3,1.4,0.2]|[6.10614154498706...|
|         5.0|        3.4|  

Inferencja

In [34]:
pred = lr_mod.transform(test)
pred.select("species", "label", "features", "prediction").show()

+----------+-----+--------------------+----------+
|   species|label|            features|prediction|
+----------+-----+--------------------+----------+
|    setosa|  0.0|[5.37340455958861...|       0.0|
|    setosa|  0.0|[5.37340455958861...|       0.0|
|    setosa|  0.0|[5.37340455958861...|       0.0|
|    setosa|  0.0|[5.86189588318758...|       0.0|
|versicolor|  2.0|[5.98401871408732...|       2.0|
|    setosa|  0.0|[6.10614154498706...|       0.0|
|    setosa|  0.0|[6.10614154498706...|       0.0|
|    setosa|  0.0|[6.10614154498706...|       0.0|
|    setosa|  0.0|[6.22826437588680...|       0.0|
|    setosa|  0.0|[6.22826437588680...|       0.0|
|    setosa|  0.0|[6.59463286858603...|       0.0|
|    setosa|  0.0|[6.59463286858603...|       0.0|
|versicolor|  2.0|[6.71675569948577...|       2.0|
|versicolor|  2.0|[6.71675569948577...|       2.0|
|versicolor|  2.0|[6.83887853038551...|       2.0|
|versicolor|  2.0|[6.83887853038551...|       2.0|
| virginica|  1.0|[6.9610013612

Utworzenie ewaluatora i ewaluacja modelu

In [35]:
ev = evaluation.MulticlassClassificationEvaluator(metricName="accuracy")

In [36]:
ev.evaluate(pred)

0.9722222222222222

----

Stworzenie pipeline'u

In [37]:
strIdx = feature.StringIndexer(inputCol="species", outputCol="label")
vectA = feature.VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"], 
                                outputCol="feats")
scaler = feature.StandardScaler(inputCol="feats", outputCol="features")
lr = classification.LogisticRegression()

In [38]:
pipe = Pipeline(stages=[strIdx, vectA, scaler, lr])

In [39]:
pipe_mod = pipe.fit(train_raw)

In [40]:
pipe_mod.transform(test_raw).select("species", "label", "features", "prediction").show()

+----------+-----+--------------------+----------+
|   species|label|            features|prediction|
+----------+-----+--------------------+----------+
|    setosa|  0.0|[5.37340455958861...|       0.0|
|    setosa|  0.0|[5.37340455958861...|       0.0|
|    setosa|  0.0|[5.37340455958861...|       0.0|
|    setosa|  0.0|[5.86189588318758...|       0.0|
|versicolor|  2.0|[5.98401871408732...|       2.0|
|    setosa|  0.0|[6.10614154498706...|       0.0|
|    setosa|  0.0|[6.10614154498706...|       0.0|
|    setosa|  0.0|[6.10614154498706...|       0.0|
|    setosa|  0.0|[6.22826437588680...|       0.0|
|    setosa|  0.0|[6.22826437588680...|       0.0|
|    setosa|  0.0|[6.59463286858603...|       0.0|
|    setosa|  0.0|[6.59463286858603...|       0.0|
|versicolor|  2.0|[6.71675569948577...|       2.0|
|versicolor|  2.0|[6.71675569948577...|       2.0|
|versicolor|  2.0|[6.83887853038551...|       2.0|
|versicolor|  2.0|[6.83887853038551...|       2.0|
| virginica|  1.0|[6.9610013612