## Overzicht:
<ol>
<li>Big Data & PySpark</li>
<li>Jupyter Basics</li>
<li>Install</li>
<li>PySpark Pandas
    <ol>
    <li>Extract
      <ol>
        <li>Data inlezen</li>
        <li>Data inspecteren</li>
        <li>Schema</li>
      </ol>
    </li>
    <li>Transform
      <ol>
        <li>Select</li>
        <li>Insert</li>
        <li>Group by</li>
        <li>Remove</li>
        <li>Update</li>
      </ol>
    </li>
    </ol>
</li>
<li>Spark SQL</li>
<li>Load</li>
<li>Oefening</li>
</ol>

## Big Data & PySpark
We ontdekten wat Big Data is en welke uitdagingen het met zich mee brengt. Nu gaan we specifiek zien hoe je de ETL kan doen met PySpark. Om de scope van deze sessie te beperken focussen we enkel op de interne werking van PySpark. We gebruiken met andere woorden geen gedistribueerde file systemen, geen data orchestration noch gebruiken we een volwaardig data warehouse.

><li>We zien hoe we een PySpark sessie opzetten.</li>
><li>We ontdekken de PySpark functions voor data extraction. (Extract)</li>
><li>We ontdekken de PySpark functions voor data transformation. (Transform)</li>
><li>We maken gebruik van onze SQL kennis om dataframes te manipuleren.</li>
><li>We ontleden een code snippet om de getransformeerde data door te sturen naar een Oracle DB (Load).</li>


## Jupyter Basics
Voer de code cells uit.

### Code cells

In [None]:
# resultaten worden afgedrukt in jupyter
2*3

In [None]:
# Importeren van externe libraries in python
from math import pi
print(pi)

In [None]:
print("Hello world")

### Text cells

In jupyter kan je een onderscheid maken tussen code en tekst cells om je document structuur te geven.

### Access to the shell

In [None]:
ls

In [None]:
pwd

## Install

Installeer Dependencies: (Python is reeds geïnstalleerd in Google Colab)

1.   Java 8
2.   Apache Spark with hadoop and
3.   Findspark (used to locate the spark in the system)


In [None]:
!sudo apt update
!apt-get install openjdk-17-jdk-headless -qq > /dev/null
# Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark
!wget -q https://dlcdn.apache.org/spark/spark-4.1.0/spark-4.1.0-bin-hadoop3.tgz
!tar xf spark-4.1.0-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

*Set* Environment Variables:

In [None]:
import os
import sys
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-4.1.0-bin-hadoop3"

In [None]:
!ls

In [None]:
# Importeer de findspark module
import findspark
# Initialiseer de Spark environment met behulp van findspark
findspark.init()

# Importeer een SparkSession module van PySpark
from pyspark.sql import SparkSession
# Maak een SparkSession object genaamd 'spark'
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Stel Spark configuratie in om eager evaluation in te schakelen in de SQL REPL (Read-Eval-Print Loop)
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

# Print het 'spark' object, die de Spark sessie representeert
spark

## PySpark Pandas

### Extract

#### Data inlezen

In [None]:
# Downloading and preprocessing Cars Data downloaded origianlly from https://perso.telecom-paristech.fr/eagan/class/igr204/datasets
!wget https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv

In [None]:
!ls

In [None]:
# De spark.read functie extract data van een source file.
# De read functie kan van verschillende sources lezen: csv, xml, text, json, db ...
# header=True geeft aan dat de eerste rij een header is
# sep=';' geeft het seperator character door om de kolommen te splitsen
df = spark.read.csv('cars.csv', header=True, sep=";")
df.show(5)
df["Car"].count


De bovenstaande cell laad de data in met spark.read in een Spark DataFrame. Een DataFrame is een 2D datastructuur waarin elke kolom een eigen datatype kan toegewezen krijgen.

#### Data Inspecteren

Er zijn meerdere methodes om een dataframe(DF) in PySpark te zien:

1.   `df.take(5)`  geeft een list van vijf Row objects terug.
2.   `df.collect()` geeft ALLE data terug van het hele dataframe. Wees heel voorzichtig met het uitvoeren van deze methode, aangezien het makkelijk de driver node kan crashen.
3.   `df.show()` is de meest gebruikte methode om data te tonen. Je kan enkele parameters meegeven, zoals de hoeveelheid van rijen en of er truncation mag gebruikt worden. Bijvoorbeeld: df.show(5, False) of df.show(5, truncate=False)
4.   `df.limit(5)` zal een nieuw DataFrame terug geven door de eerste n rijen terug te geven. Aangezien de data gedistribueerd kan zijn, is er geen garantie dat deze methode elke keer dezelfde data teruggeeft.
5.   `df.columns`  is de eigenschap die de verzameling van kolommen terug geeft.

In [None]:
df.show(5, truncate=False)

In [None]:
df.limit(5)

In [None]:
df.columns

#### Dataframe Schema

Er zijn twee methodes om datatypes van het schema (van een DataFrame) op te vragen.

In [None]:
df.dtypes

In [None]:
df.printSchema()

#### Schema Impliciet afleiden

Met de `inferschema=true` parameter kunnen we afdwingen dat de datatypes moeten afgeleid worden op basis van de data die uitgelezen wordt.

In [None]:
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)
df.printSchema()

We zien nu dat de gegevens correct zijn afgeleid, maar dat is niet altijd het geval. Soms wil je deze zelf instellen (wat ook mogelijk is). Spark heeft in deze dataset rekening gehouden met de precisie van kommagetallen en maakt van de Weight kolom een decimal(4,0). Wanneer je echter meerdere source bestanden ontvangt om te verwerken van verschillende klanten, dan kan je in problemen komen. *Niet elke klant slaat zijn data even accurraat op.*

#### Expliciet Schema aanmaken

In [None]:
from pyspark.sql.types import *
df.columns

In [None]:
# Maak de list van tupels van het schema in het formaat (column_name, data_type)
labels = [
     ('Car',StringType()),
     ('MPG',DoubleType()),
     ('Cylinders',IntegerType()),
     ('Displacement',DoubleType()),
     ('Horsepower',DoubleType()),
     ('Weight',DoubleType()),
     ('Acceleration',DoubleType()),
     ('Model',IntegerType()),
     ('Origin',StringType())
]

In [None]:
# Maak het schema mbv een StructType op basis van de list
schema = StructType([StructField (x[0], x[1], True) for x in labels])
schema

In [None]:
df = spark.read.csv('cars.csv', header=True, sep=";", schema=schema)
df.printSchema()

In [None]:
df.show(truncate=False)

We zien hier dat de data succesvol is geladen met de gegeven datatypes.

### Transform

In de **Transform** sectie gaan we de volgende concepten overlopen in PySpark.

<ol>
  <li>Select:</li>
  <ol>
    <li>Selecting Columns</li>
    <li>Selecting Multiple Columns</li>
  </ol>
  <li>Inserting Columns</li>
  <ol>
    <li>Kolommen toevoegen</li>
    <li>Kolommen hernoemen</li>
  </ol>
  <li>Group by</li>
  <li>Dropping Columns</li>
  <li>DataFrame Operations on Rows</li>
  <ol>
    <li>Filtering</li>
    <li>Distinct</li>
    <li>Sort</li>
    <li>Union</li>
  </ol>
</ol>



#### Selecting Columns

Er zijn meerdere manieren om een kolom te selecteren. De punt-notatie lijkt misschien eenvoudiger, maar brengt problemen met zich mee. **Het is mogelijk dat je een kolomnaam hebt die overlapt met een eigenschap of functie van de DataFrame class**. Hierdoor zal je punt-notatie niet werken in deze gevallen.

In [None]:
# 1ste Optie
# Kolomnamen zijn case sensitive met de punt-notatie
print(df.Car)
print("*"*20)
df.select(df.Car).show(truncate=False)

In [None]:
# 2e Optie
# Kolomnamen zijn niet case sensitive met de index-notatie
print(df['car'])
print("*"*20)
df.select(df['car']).show(truncate=False)

In [None]:
# 3e Optie
# Kolomnamen zijn niet case sensitive met de `col()` functie
from pyspark.sql.functions import col
df.select(col('car')).show(truncate=False)

#### Selecting Multiple Columns

Wanneer je meerdere kolommen wil selecteren, dan geef je de gewenste kolom identifiers door aan de select methode.

In [None]:
# 1ste methode
print(df.Car, df.Cylinders)
print("*"*40)
df.select(df.Car, df.Cylinders).show(truncate=False)

In [None]:
# 2e methode
print(df['car'],df['cylinders'])
print("*"*40)
df.select(df['car'],df['cylinders']).show(truncate=False)

In [None]:
# 3e methode
from pyspark.sql.functions import col
df.select(col('car'),col('cylinders')).show(truncate=False)


<img src="https://i.ibb.co/M6cpLKk/oefeningen.png" height="200"/>

**Probeer nu zelf in het df drie kolommen te selecteren die niet "Car" of "Cylinders" zijn. Zoek met behulp van je kennis over het schema de andere bestaande kolommen op en selecteer ze.**

In [None]:
# Hier volgt jouw code


#### Kolommen toevoegen

We overlopen de volgende drie scenario's

1.   Kolom toevoegen
2.   Meerdere kolommen toevoegen
3.   Een nieuwe kolom aanmaken op basis van een bestaande kolom

In [None]:
# 1. Kolom toevoegen
# We voegen achteraan in het dataframe een nieuwe kolom toe
from pyspark.sql.functions import lit
df = df.withColumn('first_column',lit(1))
# lit betekent literal. It vult de rij met de literal value.
# Wanneer men statische data toevoegd, dan is het best practice om lit() te gebruiken.
df.show(5,truncate=False)

In [None]:
# 2. Meerdere kolommen toevoegen
# We voegen de kolommen second_column en third_column toe achteraan in het df
df = df.withColumn('second_column', lit(2)) \
       .withColumn('third_column', lit('Third Column'))
# lit betekent literal. It vult de rij met de literal value.
# Wanneer men statische data toevoegd, dan is het best practice om lit() te gebruiken.
df.show(5,truncate=False)

In [None]:
# 3.  Een nieuwe kolom aanmaken op basis van een bestaande kolom
# We voegen een nieuwe kolom toe 'car_model' die de waarde heeft van car en model aan elkaar geconcateneerd met een spatie tussen
from pyspark.sql.functions import concat
df = df.withColumn('car_model', concat(df['Car'], lit(" "), df['model']))
# lit betekent literal. It vult de rij met de literal value.
# Wanneer men statische data toevoegd, dan is het best practice om lit() te gebruiken.
df.show(5,truncate=False)

We gebruiken hier de `concat` methode om de twee kolommen aan elkaar vast te plakken.

<img src="https://i.ibb.co/M6cpLKk/oefeningen.png" height="200"/>

**Probeer nu zelf een kolom toe te voegen die het gewicht van de kolom Weight omzet van pounds (lb.) naar kilogram (kg). Noem de kolom Weight_kg**

In [None]:
# Hier volgt jouw code


#### Kolommen hernoemen

We kunnen koklommen een nieuwe naam geven door middel van de  `withColumnRenamed` functie.

In [None]:
#Renaming a column in PySpark
df = df.withColumnRenamed('first_column', 'new_column_one') \
       .withColumnRenamed('second_column', 'new_column_two') \
       .withColumnRenamed('third_column', 'new_column_three')
df.show(truncate=False)

#### Group By

We kunnen allemaal al kolommen groeperen in SQL, maar wat is de syntax voor groeperingen in DataFrames?

1.   Group By één kolom
2.   Group By meerdere kolommen

In [None]:
# Group By in PySpark
df.groupBy('Origin').count().show(5)

In [None]:
# Group By met meerdere kolommen in PySpark
df.groupBy('Origin', 'Model').count().show(5)

#### Dropping Columns

In [None]:
#Drop column in PySpark
df = df.drop('new_column_one')
df.show(5,truncate=False)

In [None]:
#Drop multiple columns:
df = df.drop('new_column_two') \
       .drop('new_column_three')
df.show(5,truncate=False)

#### DataFrame Operations on Rows

We zien de volgende onderwerpen voor rij-operaties:

1.   Filtering
2. 	 Distinct
3.   Sort
4.   Union



##### Filtering

In [None]:
# Filtering (Where) rows in PySpark
total_count = df.count()
print("TOTAL RECORD COUNT: " + str(total_count))
europe_filtered_count = df.filter(col('Origin')=='Europe').count()
print("EUROPE FILTERED RECORD COUNT: " + str(europe_filtered_count))
df.filter(col('Origin')=='Europe').show(truncate=False)

In [None]:
# Filtering op basis van meerdere voorwaardes
total_count = df.count()
print("TOTAL RECORD COUNT: " + str(total_count))
europe_filtered_count = df.filter((col('Origin')=='Europe') &
                                  (col('Cylinders')==4)).count() # Tweede voorwaarde
print("EUROPE FILTERED RECORD COUNT: " + str(europe_filtered_count))
df.filter(col('Origin')=='Europe').show(truncate=False)

Nu weet je hoe je succesvol filters kan toepassen.

<img src="https://i.ibb.co/M6cpLKk/oefeningen.png" height="200"/>

**Probeer nu zelf een filter te schrijven voor de Weight_kg kolom die alle wagens toont die meer wegen dan 2000 kg en een Horsepower hebben boven 120. Tel het verschil in rijen ten opzicht van het oorspronkelijke dataframe.**

In [None]:
# Hier volgt jouw code


##### Distinct

In [None]:
# Unieke Rijen opvragen in PySpark
df.select('Origin').distinct().show()

In [None]:
# Unieke rijen opvragen op basis van meerdere voorwaardes
df.select('Origin','model').distinct().show()

##### Sort

In [None]:
# Sorteren van rijen in PySpark
# By default is de data gesorteerd in "ascending" order
df.orderBy('Cylinders').show(truncate=False)

In [None]:
# Indien je dit niet wilt, dan zal je de ascending parameter moeten gebruiken
df.orderBy('Cylinders', ascending=False).show(truncate=False)

In [None]:
# Combineren van groupBy met orderBy
df.groupBy("Origin").count().orderBy('count', ascending=False).show(10)

##### Union

We gaan twee verschillende methodes zien om een union uit te voeren tussen twee dataframes. Het is belangrijk om de verschillen te kennen tussen de methodes, zodat je de juiste kan kiezen.

*   `union()` – Deze methode kan je gebruiken om twee dataframes te mergen met dezelfde structuur/schema. Als de twee dataframes niet hetzelfde zijn, dan krijg je een error.
*   `unionByName()` - Deze methode wordt gebruikt om twee dataframes te mergen op basis van kolom naam. Het maakt niet uit in welke volgorde de kolommen in de dataframes staan, zolang de kolomnamen gelinkt kunnen worden.

*In andere SQL talen, zorgen Unions ervoor dat duplicaten verwijdert worden, maar in PySpark gebeurt dit niet. Het is best practice om achteraf een distinct() filter of dropDuplicates() uit te voeren.*

In [None]:
# 1. Union met de kolommen in gelijke volgorde
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)
europe_cars = df.filter((col('Origin')=='Europe') & (col('Cylinders')==5))
japan_cars = df.filter((col('Origin')=='Japan') & (col('Cylinders')==3))
print("EUROPE CARS: "+str(europe_cars.count()))
print("JAPAN CARS: "+str(japan_cars.count()))
print("AFTER UNION: "+str(europe_cars.union(japan_cars).count()))

Hier kan je zien hoe de data van twee verschillende dataframes aan elkaar gemerged worden. De Europese autos met 5 cylinders en Japanse autos met 3 cylinders staan nu samen in één dataframe.

In [None]:
# 2. Union Met kolommen in een andere volgorde, maar met gelijke namen
# Twee dataframes aanmaken met een andere volgorde van kolommen:
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()

### Data Manipulation Functions

In [None]:
# Beschikbare data manipulatie functies in PySpark
from pyspark.sql import functions
# Net zoals in python, kunnen we de dir functie gebruiken om de beschikbare functies op te roepen van de library
print(dir(functions))

#### String Functions

In [None]:
# Laad de data opnieuw (een nieuwe start)
from pyspark.sql.functions import col
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)

In onderstaande code cell worden de lower, upper en substring functies gebruikt om de naam van de auto te manipuleren. De alias methode wordt gebruikt om de kolom een nieuwe naam te geven.

In [None]:
from pyspark.sql.functions import col,lower, upper, substring
# Met help kan je de details van de functie afdrukken
help(substring)
# alias is used to rename the column name in the output
df.select(col('Car'),lower(col('Car')),upper(col('Car')),substring(col('Car'),1,4).alias("concatenated value")).show(5, False)

#### Numerieke functies

**Toon de oudste datum en de meest recente datum.**

In [None]:
from pyspark.sql.functions import min, max
df.select(min(col('Weight')), max(col('Weight'))).show()

**Voeg tien toe aan het minimum en maximum gewicht.**

In [None]:
from pyspark.sql.functions import min, max, lit
df.select(min(col('Weight'))+lit(10), max(col('Weight')+lit(10))).show()

### Joins in PySpark

In [None]:
# Maak twee dataframes
cars_df = spark.createDataFrame([[1, 'Car A'],[2, 'Car B'],[3, 'Car C']], ["id", "car_name"])
car_price_df = spark.createDataFrame([[1, 1000],[2, 2000],[3, 3000]], ["id", "car_price"])
cars_df.show()
car_price_df.show()

In [None]:
# Voer een inner join uit, zodat we de id, naam en prijs van elke auto zien in een rij.
cars_df.join(car_price_df, cars_df.id == car_price_df.id, 'inner').select(cars_df['id'],cars_df['car_name'],car_price_df['car_price']).show(truncate=False)

Zoals hierboven aangetoond kunnen we net zoals gewoonlijk inner joins uitvoeren tussen twee dataframes. De volgende joins zijn ook ondersteund in PySpark:
1. inner (default)
2. cross
3. outer
4. full
5. full_outer
6. left
7. left_outer
8. right
9. right_outer
10. left_semi
11. left_anti

## Spark SQL

SQL bestaat sinds de jaren 1970. Er zijn bijzonder veel ontwikkelaars die er dagelijks in schrijven en er hun brood mee verdienen. Naarmate big data populairder werd, groeide het tekort aan professionals met de technische kennis in data engineering om dit tekort op te vangen heeft men Spark SQL gemaakt.


In [None]:
# Load data
df = spark.read.csv('cars.csv', header=True, sep=";")
# Register Temporary Table
df.createOrReplaceTempView("temp")
# Select all data from temp table
sql_dataframe = spark.sql("select * from temp limit 5")
print("spark sql")
sql_dataframe.show(5)
print("spark dataframe")
sql_dataframe.orderBy(col('Horsepower'), ascending=False).show(5)
# Select count of data in table
spark.sql("select count(*) as total_count from temp").show()

Met `createOrReplaceTempView("temp_view_name")` kunnen we een dataframe omzetten naar een sql view waarin we traditionele queries kunnen uitvoeren. Nadat we een query uitgevoerd hebben op de view kunnen we het resultaat opslaan in een SQL DataFrame. Dit is niet hetzelfde als een Spark Dataframe (ook is het geen pandas DataFrame).

Om terug te werken met een DataFrame nadat je de data hebt opgeslagen als een view gebruik je de `spark.table("temp_view_name")` methode.

# Load

Tot slot is hier een voorbeeld script om data te "load"-en naar een Oracle database. Deze database bevat de verwerkte versie van je gegevens. Je slaat hier geen gegevens op, maar informatie.

Men load de data na transformatie naar een data warehouse. Een data warehouse bevat data analyse tools en meestal een RDBMS.



```
# Initieer de volgende waardes met jouw connection string
driver = 'oracle.jdbc.driver.OracleDriver'
url = 'jdbc:oracle:thin@localhost:1521/XEPDB1'
user = 'studend'
password = 'pxl'
table = 'table_name'

# Maak een nieuwe table met "createTableOptions"
df.write.format('jdbc').option('driver', driver)\
  .option('url', url)\
  .option('user', user)\
  .option('password', password)\
  .mode('overwrite')\
  .option('createTableOptions', '')\
  .option('dbtable', table).save()

# Append aan een bestaande table
df.write.format('jdbc').option('driver', driver)\
  .option('url', url)\
  .option('user', user)\
  .option('password', password)\
  .mode('append')\
  .option('dbtable', table).save()
```



# Oefening:

Inspecteer de volgende vier csv bestanden. Je werkt voor een bedrijfketen die over heel de wereld winkels heeft. Op het einde van elke werkdag sturen de winkels een verslag door (csv) waarin er een overzicht te vinden is van hoeveel producten er verkocht zijn aan welke prijs.

Je produceert je producten zelf en weet wat de productiekost is van elk product. De verschillende winkels gebruiken echter hun regionale data (munteenheid). Bovendien zijn de winkels zelf vrij om hun eigen prijzen te kiezen. Na wisselkoers hanteren niet alle winkels dezelfde prijzen.

**Fase 1: Extract**</br>
Jij moet er voor zorgen dat elk van de csv's ingeladen kan worden (EXTRACT). Ontdek de data. Wat is het schema? Maak een dataframe voor elke csv.

Je vindt de data terug via de volgende links:


1. https://github.com/PXL-Labs/Data-Expert/blob/main/europe-sales.csv
2. https://github.com/PXL-Labs/Data-Expert/blob/main/japan-sales.csv
3. https://github.com/PXL-Labs/Data-Expert/blob/main/production-costs.csv
4. https://github.com/PXL-Labs/Data-Expert/blob/main/us-sales.csv


**Fase 2: Transform**</br>

1.   Converteer elke munt naar euro (yen en dollar naar euro)
2.   Voeg aan elk winkelrapport een kolom toe `ProductionCost` met de productiekosten van elk verkocht product.
3.   Voeg een kolom `Profit` toe aan elk winkelrapport waarin de totale opbrengst berekend wordt. *(Je mag hier gewoon de verkoopsprijs ten opzicht van productiekost hanteren. Je moet niet rekening houden met transport kosten, onderhoud, werknemers uitbetalen, ...)*
4.   Voeg een kolom `CountryCode` toe aan elk winkelrapport met een country code voor elke winkel (EUR, US en JP)
5.   Verwijder de Region en Currency kolommen.
6.   Voeg alle winkelrapporten samen in één groot dataframe.

**Spark SQL**:
7.   Registreer je temp view van het dataframe dat alle gegevens bevat van de drie csv bestanden.
8.   Schrijf een query met spark.sql die berekent wat de totale opbrengst is.
9.   Schrijf een query met spark.sql die berekent wat het meest winstgevend product was per winkel.

<img src="https://i.ibb.co/M6cpLKk/oefeningen.png" height="300"/>



In [None]:
# Oefening 1:
# download files
!wget -O us_sales.csv https://github.com/PXL-Labs/Data-Expert/blob/main/us-sales.csv?raw=true
!wget -O production_costs.csv https://github.com/PXL-Labs/Data-Expert/blob/main/production-costs.csv?raw=true
!wget -O japan_sales.csv https://github.com/PXL-Labs/Data-Expert/blob/main/japan-sales.csv?raw=true
!wget -O europe_sales.csv https://github.com/PXL-Labs/Data-Expert/blob/main/europe-sales.csv?raw=true

Controleer de aanwezigheid van alle 4 bestanden:

In [None]:
ls

In [None]:
# Oplossing