# Création d'un ELT avec Spark SQL

Un ELT (Extract Load Transform) est un processus de transformation de données qui consiste à extraire des données d'une source, à les charger dans un entrepôt de données, puis à les transformer en un format adapté à l'analyse.

Nous allons voir comment créer un ELT avec Spark SQL pour créer un jeu de données à partir de fichiers contenant les passagers du Titanic.

## Configuration du notebook

In [None]:
print("Hello, World from Scala kernel!")

Intitializing Scala interpreter ...

In [2]:
import org.apache.spark.sql.{DataFrame, SparkSession};

import org.apache.spark.sql.{DataFrame, SparkSession}


## Extract

Cette partie consiste à fusionner les données de plusieurs fichiers en un seul jeu de données.

In [14]:
def extract(spark: SparkSession): DataFrame = {
    // load titanic files as DataFrames
    val titanicPart1 = spark.read.format("csv").option("header", "true").load("data/titanic_part_1.txt")
    val titanicPart2 = spark.read.format("parquet").load("data/titanic_part_2.parquet")
    val titanicPart3 = spark.read.format("orc").load("data/titanic_part_3.orc")

    // combine DataFrames
    val titanic = titanicPart1.union(titanicPart2).union(titanicPart3).dropDuplicates()
    
    titanic
}

extract: (spark: org.apache.spark.sql.SparkSession)org.apache.spark.sql.DataFrame
load: (titanic: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
transform: (titanic: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
typeVariables: (titanic: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
translateToEnglish: (titanic: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


# Load

Cette partie consiste à charger les données fusionnées dans un fichier CSV.

In [None]:
def load(titanic: DataFrame): DataFrame = {
    // write dataframe to csv
    titanic.write.format("csv").option("header", "true").save("data/titanic.csv")
    titanic
}

## Transform

Cette partie consiste à transformer les données pour les rendre exploitables.

- Typage des variables
- Traduction en anglais
- Ajout de nouvelles variables

In [None]:
def addNewVariables(titanic: DataFrame): DataFrame = {
    titanic
        .withColumn("FamilySize", col("SibSp") + col("Parch") + 1)
        .withColumn("Title", regexp_extract(col("Name"), "(\\w+\\.)", 1))
        .withColumn("AgeCategory", when(col("Age") < 20, "Young")
            .when(col("Age") < 40, "Adult")
            .when(col("Age") < 60, "Old")
            .otherwise("Very Old"))
}

def translateToEnglish(titanic: DataFrame): DataFrame = {
    titanic
        // replace all 'Monsieur' by 'Mr', and all 'Madame' by 'Mrs' in the "Name" column
        .withColumn("Name", regexp_replace(col("Name"), "Monsieur", "Mr"))
        .withColumn("Name", regexp_replace(col("Name"), "Mamade", "Mrs"))
        // replace all "femme" by "female", and all "homme" by "male in the "Sex" column
        .withColumn("Sex", regexp_replace(col("Sex"), "homme", "male"))
        .withColumn("Sex", regexp_replace(col("Sex"), "femme", "female"))
}

def typeVariables(titanic: DataFrame): DataFrame = {
    titanic.select(titanic.columns.map {
        case column@("PassengerId" | "Survived" | "Pclass" | "SibSp" | "Parch") => titanic(column).cast("int").as(column)
        case column@("Age" | "Fare") => titanic(column).cast("double").as(column)
        case column => titanic(column).cast("string").as(column)
    }:_*)
}

def transform(titanic: DataFrame): DataFrame = {
    addNewVariables(translateToEnglish(typeVariables(titanic)))
}

## Exécution de l'ELT

In [None]:
val transformedTitanic = transform(load(extract(spark)))

transformedTitanic.write.format("csv").option("header", "true").save("data/transformed_titanic.csv")