# First real live project 
ETL Extract Transform Load

In [36]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DateType

# Spark The Definitive Guide
Starten einer 'sparkSession'.The SparkSession instance is the way Spark executes user-defined manipulations across the cluster.
Code: 
- Input
    spark = SparkSession.builder.getOrCreate()

Output:
- SparkSession - in-memory

    SparkContext

    Spark UI

    Version
    v3.4.1
    Master
    local[*]
    AppName
    pyspark-shell



###### Start Seite 23


In [37]:
# Initialisierung der Spark session
spark = SparkSession.builder.getOrCreate()
spark

Hier kann die Sparksession weiter detailiert betrachtet werden.

http://localhost:4040

#### Erstellen eine Spalte von 1000 Reihen Zahlen.
Also 0 - 1.000
Es wird ein Dataframe erstellt. 
Dieser Zahlenbereich stellt eine verteilte Sammlung dar. Wenn er in einem Cluster ausgeführt wird, existiert jeder Teil dieses Zahlenbereichs auf einem anderen Executor. Dies ist ein Spark DataFrame. 
- S.24 Mitte

#### DataFrames
- Ein DataFrame ist die gängigste strukturierte API und stellt einfach eine Datentabelle mit Zeilen und Spalten dar. Die Liste, die die Spalten und die Typen innerhalb dieser Spalten definiert, wird als Schema bezeichnet. 
Sie können sich einen DataFrame wie eine Tabellenkalkulation mit benannten Spalten vorstellen. 

- Eine Tabellenkalkulation befindet sich auf einem Computer an einem bestimmten Ort, während ein Spark DataFrame Tausende von Computern umfassen kann. Der Grund für die Speicherung der Daten auf mehr als einem Computer sollte intuitiv sein: Entweder sind die Daten zu groß, um auf einen Computer zu passen, oder es würde einfach zu lange dauern, diese Berechnung auf einem Computer durchzuführen.

- Spark verfügt über mehrere zentrale Abstraktionen: Datensätze, DataFrames, SQL-Tabellen und belastbare verteilte Datensätze (RDDs). Diese verschiedenen Abstraktionen stellen alle verteilte Datensammlungen dar. Am einfachsten und effizientesten sind DataFrames, die in allen Sprachen verfügbar sind. Wir behandeln Datasets am Ende von Teil II, und RDDs


In [38]:
myRange = spark.range(1000).toDF("number")


In [39]:
myRange.show()

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
+------+
only showing top 20 rows



                                                                                

### Partions 

- Damit jeder Executor die Arbeit parallel ausführen kann, unterteilt Spark die Daten in Stücke, die Partitionen genannt werden. 
Eine Partition ist eine Sammlung von Zeilen, die sich auf einer physischen Maschine in Ihrem Cluster befinden. Die Partitionen eines DataFrame stellen dar, wie die Daten während der Ausführung physisch über den Rechnercluster verteilt werden. Wenn Sie eine Partition haben, hat Spark nur eine Parallelität, auch wenn Sie Tausende von Executors haben. Wenn Sie viele Partitionen, aber nur einen Executor haben, hat Spark immer noch eine Parallelität von nur einer, da es nur eine Berechnungsressource gibt.

--> (Partions oder auch Parallelität bedeutet wie viele Executor parallel auf den Dataset arbeiten können. Als Team wünsch man sich eine hohe Parallelität)>

--> Im Kontext von Datenbanken können Partitionen verwendet werden, um eine große Tabelle in kleinere, handhabbare Segmente aufzuteilen. Diese Segmente können auf verschiedene Arten organisiert werden, z. B. nach bestimmten Werten in einer Spalte (z. B. nach Datum, Region oder Kundengruppe) oder nach bestimmten Kriterien (z. B. durch Zufallsverteilung).

- Ein wichtiger Punkt ist, dass Sie mit DataFrames (in den meisten Fällen) Partitionen nicht manuell oder einzeln bearbeiten. Sie legen lediglich High-Level-Transformationen von Daten in den physischen Partitionen fest, und Spark bestimmt, wie diese Arbeit tatsächlich im Cluster ausgeführt wird. Es gibt APIs auf niedrigerer Ebene (über die RDD-Schnittstelle), die wir in Teil III behandeln.

### Transformations 

In Spark sind die Kerndatenstrukturen unveränderlich, d. h. sie können nach ihrer Erstellung nicht mehr geändert werden. Dies mag auf den ersten Blick wie ein seltsames Konzept erscheinen: Wenn man sie nicht ändern kann, wie soll man sie dann verwenden? Um einen DataFrame zu "ändern", müssen Sie Spark mitteilen, wie Sie ihn ändern möchten, damit er das tut, was Sie wollen. Diese Anweisungen werden als Transformationen bezeichnet. Lassen Sie uns eine einfache Transformation durchführen, um alle geraden Zahlen in unserem aktuellen DataFrame zu finden:

Zeile 36 führt aus das wir nur grade Zahlen bekommen jedoch wurde damit nicht der Kern datensatz verändert.
Durch Zuweisung in eine andere Variable "divisBy2" haben wir den Kern immer noch nicht verändert, sondern die kurzzeitigen veränderten Datensatz in einem neue Variable

=> Auch Transformation genannt

In [40]:
divisBy2 = myRange.where("number % 2 = 0")


In [41]:
divisBy2.show()

+------+
|number|
+------+
|     0|
|     2|
|     4|
|     6|
|     8|
|    10|
|    12|
|    14|
|    16|
|    18|
|    20|
|    22|
|    24|
|    26|
|    28|
|    30|
|    32|
|    34|
|    36|
|    38|
+------+
only showing top 20 rows




- Beachten Sie, dass diese keine Ausgabe liefern. Dies liegt daran, dass wir nur eine abstrakte Transformation angegeben haben und Spark erst dann auf Transformationen reagiert, wenn wir eine Aktion aufrufen (dazu kommen wir gleich). Transformationen sind der Kern Ihrer Geschäftslogik in Spark. Es gibt zwei Arten von Transformationen: solche, die enge Abhängigkeiten angeben, und solche, die weite Abhängigkeiten angeben.




#### Narrow Transformation 
- Wenn jede Partition im übergeordneten RDD von höchstens einer Partition des untergeordneten RDD verwendet wird, liegt eine enge Abhängigkeit vor. Berechnungen von Transformationen mit dieser Art von Abhängigkeit sind recht schnell, da sie keine Datenumlagerung über das Clusternetzwerk erfordern. Darüber hinaus sind auch Optimierungen wie Pipelining möglich.

#### Wide Transformation
Bei einer Transformation im Stil einer breiten Abhängigkeit (oder breiten Transformation) tragen Eingabepartitionen zu vielen Ausgabepartitionen bei. Dies wird oft als "Shuffle" bezeichnet, wobei Spark Partitionen im gesamten Cluster austauscht. Bei schmalen Transformationen führt Spark automatisch eine Operation durch, die als Pipelining bezeichnet wird, d. h., wenn wir mehrere Filter auf DataFrames angeben, werden sie alle im Speicher ausgeführt. Das Gleiche gilt nicht für Shuffles. Wenn wir einen Shuffle durchführen, schreibt Spark die Ergebnisse auf die Festplatte.

#### Lazy Evaluation

Lazy Evaluation bedeutet, dass Spark bis zum letzten Moment wartet, um den Graphen der Berechnungsanweisungen auszuführen. Anstatt die Daten sofort zu ändern, wenn Sie eine Operation ausführen, erstellen Sie in Spark einen Plan mit Transformationen, die Sie auf Ihre Quelldaten anwenden möchten. Wenn Sie mit der Ausführung des Codes bis zur letzten Minute warten, kompiliert Spark diesen Plan aus Ihren rohen DataFrame-Transformationen zu einem rationalisierten physischen Plan, der so effizient wie möglich im Cluster ausgeführt wird. Dies bietet immense Vorteile, da Spark den gesamten
 
gesamten Datenfluss von Ende zu Ende optimieren kann. Ein Beispiel hierfür ist das so genannte Prädikat Pushdown auf DataFrames. Wenn wir einen großen Spark-Job erstellen, aber am Ende einen Filter angeben, der nur eine Zeile aus unseren Quelldaten abrufen muss, ist der effizienteste Weg zur Ausführung der Zugriff auf den einzelnen benötigten Datensatz. Spark optimiert diesen Vorgang für uns, indem es den Filter automatisch nach unten verschiebt.

In [42]:
"The Frist df:", myRange.count(), "The Second df:", divisBy2.count()

('The Frist df:', 1000, 'The Second df:', 500)

In [43]:
flightData2015 = spark.read.option("inferSchema", 
                                   'true').option("header", 
                                                  "true").csv("/Users/riccardo/Desktop/Repositorys_Github/Training/Dataset/2015-summary.csv")


Remember, sort does not modify the DataFrame. We use sort as a transformation that returns a new DataFrame by transforming the previous DataFrame. Let’s illustrate what’s happening when we call take on that resulting DataFrame

In [44]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [45]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#258 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#258 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [plan_id=540]
      +- FileScan csv [DEST_COUNTRY_NAME#256,ORIGIN_COUNTRY_NAME#257,count#258] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/riccardo/Desktop/Repositorys_Github/Training/Dataset/2015-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




Wie zuvor können wir nun eine Aktion festlegen, um diesen Plan zu starten. Bevor wir das tun, müssen wir jedoch noch eine Konfiguration festlegen. Wenn wir einen Shuffle durchführen, gibt Spark standardmäßig 200 Shuffle-Partitionen aus. Wir setzen diesen Wert auf 5, um die Anzahl der ausgegebenen Partitionen des Shuffle zu reduzieren:
  spark.conf.set("spark.sql.shuffle.partitions", "5")
  flightData2015.sort("count").take(2)
  ... Array([Vereinigte Staaten,Singapur,1], [Moldawien,Vereinigte Staaten,1])
  
Abbildung 2-9 veranschaulicht diesen Vorgang. Beachten Sie, dass zusätzlich zu den logischen Transformationen auch die Anzahl der physischen Partitionen berücksichtigt wird.

csv File ->           Dataframe ->            -> Dataframe ->     Array()
       Read (narrow)              sort (wide)             take(3)
                      1 Partion                   5 Partion

In [46]:
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

createOrReplaceTempView("flight_data_2015") <---- Var wo die Daten in der Sparsession gespeichert wurden

Nun können wir unsere Daten in SQL abfragen. Dazu verwenden wir die Funktion spark.sql (zur Erinnerung: spark ist unsere SparkSession-Variable), die praktischerweise einen neuen DataFrame zurückgibt. Obwohl dies
Logik ein wenig zirkulär erscheinen mag - dass eine SQL-Abfrage gegen einen DataFrame einen anderen DataFrame zurückgibt -, ist es tatsächlich ziemlich mächtig. Auf diese Weise können Sie Transformationen so spezifizieren, wie es für Sie zu einem bestimmten Zeitpunkt am bequemsten ist, ohne dass dies zu Lasten der Effizienz geht!

In [47]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [48]:
sqlWay = spark.sql("""
  SELECT DEST_COUNTRY_NAME, count(1)
  FROM flight_data_2015
  GROUP BY DEST_COUNTRY_NAME
  """)

In [49]:
dataFrameWay = flightData2015.groupBy("DEST_COUNTRY_NAME").count()

In [50]:
sqlWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#256], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#256, 5), ENSURE_REQUIREMENTS, [plan_id=562]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#256], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#256] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/riccardo/Desktop/Repositorys_Github/Training/Dataset/2015-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [51]:
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#256], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#256, 5), ENSURE_REQUIREMENTS, [plan_id=575]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#256], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#256] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/riccardo/Desktop/Repositorys_Github/Training/Dataset/2015-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




-> Next Transformation task

In [52]:
spark.sql("SELECT MAX(COUNT) from flight_data_2015").take(1)

[Row(max(COUNT)=370002)]

In [53]:
from pyspark.sql.functions import max

In [54]:
flightData2015.select(max("COUNT")).take(1)

[Row(max(COUNT)=370002)]

- Next Task


--> find the top five destination countries in the data
- Multitransformation query

In [55]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, SUM(COUNT) AS destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY SUM (COUNT) DESC
LIMIT 5
""")

maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [56]:
from pyspark.sql.functions import desc

In [57]:
flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count")\
    .withColumnRenamed("sum(count)","destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



### Schritt für Schritt erklärung
#### 1. Das Einlesen der Daten
- Vorhin haben wir die Daten einer Variable zugeordnet 
-> Erinnernung Spark lies nicht die Daten bis eine "action" durch geführt wird oder om ursprünglichen DataFrame abgeleitete Aktion aufgerufen wird.

#### 2. Das Gruppieren der Daten.
- Durch den befehl des GroupBy erstellen wir ein "RealtionGroupedDataset". Jedoch muss der nutzer noch eine Aggregation ansetzten bevor es weiter queried werden kann.
Wie speziefizieren das wir ein KEY gruppieren oder ein "set of key" und das wir nun eine aggregation durch jeder dieser KEY/KEYS durchführen wollen


#### 3. Aggregation spezifizieren
- SUM() Aggregation. 
Dies nimmt eine Spalte oder simple ein Spalten name. Das Ergebnis durch das Summieren ist **ein neues Dataframe**. Es hat dardurch ein neues Schema jedoch weiß er auch den Typ jeder Spalte.

- ***WICHTIG: Es ist wichtig, (noch einmal!) zu betonen, dass keine Berechnungen durchgeführt wurden***

#### 4 Umbennen
- Mit der "withColumnRenamed" methode welche druch zwei Argumente den alten Spalten und den neuen Spalten namen mit gegeben wird. 
- ***Auch wir keine Berechnung durch geführt. Es ist nur eine Transformation***

#### 5. Sortierung der Daten
- Hier legen wir fest das wir nur die höchsten Werte nehmen möchten des Dataframes das wir erstellt haben in schritt 1 bis 4 und in absteigender rheinfolge nach "destination_total" (eine spalte die wir neu erstellt haben mit der Gruppierung sowie Aggregation) es visulisiert haben möchten 

-- Column Types order column names sind die sonoynome

#### 6. Speziefizierung wie viele Reihen das Dataframes sollen angezeigt werden.
.limit(5)
Zeigt uns nur die ersten 5 an

#### 7. Ein Action durchführen

- Hier beginnnen wir mit der Sammlung der Ergebnisse unserem Dataframes. Und Spark gibt uns eine Liste von Arrays in the Sprachen wo wir es gecoded haben.

In [58]:
flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count")\
    .withColumnRenamed("sum(count)","destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#352L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#256,destination_total#352L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#256], functions=[sum(count#258)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#256, 5), ENSURE_REQUIREMENTS, [plan_id=745]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#256], functions=[partial_sum(count#258)])
            +- FileScan csv [DEST_COUNTRY_NAME#256,count#258] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/riccardo/Desktop/Repositorys_Github/Training/Dataset/2015-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




## Capter 3. A Tour of Sparks Toolset
Inhalt von Kapitel 3

- Running production applications with spark-submit Datasets: type-safe APIs for 
- structured data Structured Streaming
- Machine learning and advanced analytics
- Resilient Distributed Datasets (RDD): Spark’s low level APIs SparkR
- The third-party package ecosystem

Running Production Applications 


Wichtiger Code: 


**"Spark-submit"**
ermöglicht es Ihnen, Ihren Anwendungscode an einen Cluster zu senden und ihn dort auszuführen. Nach dem Senden wird die Anwendung ausgeführt, bis sie beendet wird (die Aufgabe abschließt) oder ein Fehler auftritt. Sie können dies mit allen von Spark unterstützten Clustermanagern tun, einschließlich Standalone, Mesos und YARN.

## Next Dataset 
https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/retail-data/all/online-retail-dataset.csv

In [72]:
staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/Users/riccardo/Desktop/Repositorys_Github/Training/Dataset/online-retail-dataset.csv")
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [73]:
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost" ,
"InvoiceDate" )\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(5)

+----------+------+---------------+
|CustomerId|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



[Stage 60:>                                                         (0 + 8) / 8]

+----------+------+---------------+
|CustomerId|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+

