# Turoriel

https://www.margo-group.com/fr/actualite/tutoriel-delta-lake-premiere-prise-en-main/

## Init Spark
L’initialisation de la session Spark s’écrit comme cela avec le noyau Spylon que j’utilise pour ce notebook.

Il faut veiller à inclure le jar Delta Lake et à définir des configurations additionnelles.

« io.delta.sql.DeltaSparkSessionExtension » permet l’utilisation des fonctionnalités de Delta Lake à travers la syntaxe spark.sql et

« org.apache.spark.sql.delta.catalog.DeltaCatalog » permet l’interaction avec un metastore de base de données et de tables.

Note: pour spark 3.1.2 il faut utiliser delta-core 1.0.0 et pas 0.7.0 comme dans l'article original spark 3.0

`bin/spark-shell --packages io.delta:delta-core_2.12:1.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"`

https://docs.delta.io/latest/quick-start.html#spark-scala-shell

https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#local-mode-in-scala

In [1]:
%%init_spark
#Configure Spark to use a local master
launcher.master = "local"

#launcher.num_executors = 1
#launcher.executor_cores = 1
launcher.driver_memory = '4g'
launcher.conf.set("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
launcher.conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
launcher.conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

Ci-dessous, nous notons la ligne import io.delta.sql.DeltaSparkSessionExtension qui nous permettra d’interagir programmatiquement avec les tables au format Delta :

In [2]:
// check scala version
util.Properties.versionString

Intitializing Scala interpreter ...

Spark Web UI available at http://cd6256a52912:4040
SparkContext available as 'sc' (version = 3.0.2, master = local, app id = local-1634066963096)
SparkSession available as 'spark'


res0: String = version 2.12.10


In [3]:
// Sum of the first 100 whole numbers
val rdd = sc.parallelize(0 to 100)
rdd.sum()
// 5050

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
res1: Double = 5050.0


In [4]:
import java.io.File
import scala.reflect.io.Directory
import org.apache.spark.sql.Encoders

import io.delta.sql.DeltaSparkSessionExtension

import java.io.File
import scala.reflect.io.Directory
import org.apache.spark.sql.Encoders
import io.delta.sql.DeltaSparkSessionExtension


Nous définissons des variables pour la gestion de nos dossiers de travail. Le dossier raw contient les données brutes, le dossier parquet contient les fichiers sous-jacents à la table au format Parquet que nous allons créer et le dossier delta contient les fichiers sous-jacents à la table au format Delta que nous allons créer.

In [5]:
val dataFolder: String = "/home/jovyan/data/"
val rawDataFolder: String = s"${dataFolder}raw/"
val rawDataFile: String = s"${rawDataFolder}fake_people.csv"
val parquetDataFolder: String = s"${dataFolder}parquet/"
val deltaDataFolder: String = s"${dataFolder}delta/"
val deltaLogFolder: String = s"${deltaDataFolder}_delta_log/"

dataFolder: String = /home/jovyan/data/
rawDataFolder: String = /home/jovyan/data/raw/
rawDataFile: String = /home/jovyan/data/raw/fake_people.csv
parquetDataFolder: String = /home/jovyan/data/parquet/
deltaDataFolder: String = /home/jovyan/data/delta/
deltaLogFolder: String = /home/jovyan/data/delta/_delta_log/


Nous vidons les dossiers pour retrouver le même état initial à chaque run :

In [20]:
val directoryRaw = new Directory(new File(rawDataFolder))
val directoryParquet = new Directory(new File(parquetDataFolder))
val directoryDelta = new Directory(new File(deltaDataFolder))
val directoryDeltaLog = new Directory(new File(deltaLogFolder))
directoryParquet.deleteRecursively()
directoryDelta.deleteRecursively()
directoryParquet.createDirectory()
directoryDelta.createDirectory()

directoryRaw: scala.reflect.io.Directory = /home/jovyan/data/raw
directoryParquet: scala.reflect.io.Directory = /home/jovyan/data/parquet
directoryDelta: scala.reflect.io.Directory = /home/jovyan/data/delta
directoryDeltaLog: scala.reflect.io.Directory = /home/jovyan/data/delta/_delta_log
res16: scala.reflect.io.Directory = /home/jovyan/data/delta


Enfin, nous définissons une courte fonction pour afficher le contenu d’un dossier donné. Nous utiliserons un jeu de donnée fictif créé avec la bibliothèque Python Faker.

In [7]:
def showFilesInDir(dir: Directory): Unit = {
  val it = for { file <- dir.files; if !(file.toString.contains("/.")) }
    yield f"${file} ${file.length.toDouble / 1000000} MB"
  it foreach println
}
showFilesInDir(directoryRaw)

/home/jovyan/data/raw/fake_people.csv 1.178365 MB


showFilesInDir: (dir: scala.reflect.io.Directory)Unit


In [25]:
case class Person(id: Int,
                  name: String,
                  email: String,
                  address: String,
                  city: String,
                  dateTime: java.sql.Timestamp,
                  randomInt: Int)
val personSchema = Encoders.product[Person].schema

val data = spark.read
  .format("csv")
  .schema(personSchema)
  .option("header", "true")
  .option("multiLine", true)
  .load(rawDataFile)

data.show(15)

+----+--------------------+--------------------+--------------------+--------------------+-------------------+---------+
|  id|                name|               email|             address|                city|           dateTime|randomInt|
+----+--------------------+--------------------+--------------------+--------------------+-------------------+---------+
|1000|       David Farrell|bairdmichelle@exa...|560 Edward Glens ...|        Colemanburgh|2005-05-20 00:00:00|     1494|
|1001|        Daniel Scott|michellemccormick...|672 Denise Glen
P...|           Port Jody|1996-05-23 00:00:00|     8577|
|1002|    Heather Thompson|  wjames@example.com|8537 Juan Mountai...|South Stephanieshire|2019-01-17 00:00:00|     4966|
|1003|Dr. Scott Morgan DDS|  uburch@example.com|66801 Cunningham ...|          Danielberg|1997-09-16 00:00:00|     5824|
|1004|     Jennifer Torres|randall51@example...|PSC 0139, Box 045...|        Lake Jessica|2007-02-11 00:00:00|     6989|
|1005|  Jennifer Rodriguez| zhar

defined class Person
personSchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(name,StringType,true), StructField(email,StringType,true), StructField(address,StringType,true), StructField(city,StringType,true), StructField(dateTime,TimestampType,true), StructField(randomInt,IntegerType,false))
data: org.apache.spark.sql.DataFrame = [id: int, name: string ... 5 more fields]


Le jeu contient 500 000 enregistrements :

In [9]:
data.count

res5: Long = 10000


## Les tables au format Parquet
Commençons par aborder la création et la manipulation de tables avec un format classique.

Nous utilisons la syntaxe sql afin de créer un metastore (parfois appelé catalogue de metadata) :

In [10]:
val db = "deltalake_tuto_margo"
spark.sql(s"DROP DATABASE IF EXISTS ${db} CASCADE")
spark.sql(s"CREATE DATABASE ${db}")
spark.sql("SHOW DATABASES").show()

+--------------------+
|           namespace|
+--------------------+
|             default|
|deltalake_tuto_margo|
+--------------------+



db: String = deltalake_tuto_margo


In [11]:
spark.sql(s"USE $db")
spark.sql("SHOW TABLES").show

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



Et nous créons la table à partir des données chargées avec Spark :

In [12]:
data.write
.option("path", parquetDataFolder)
.saveAsTable("parquet_table")

spark.sql("SHOW TABLES").show
spark.sql("SELECT id, name, city FROM parquet_table LIMIT 5").show(truncate=false)

+--------------------+-------------+-----------+
|            database|    tableName|isTemporary|
+--------------------+-------------+-----------+
|deltalake_tuto_margo|parquet_table|      false|
+--------------------+-------------+-----------+

+----+--------------------+--------------------+
|id  |name                |city                |
+----+--------------------+--------------------+
|1000|David Farrell       |Colemanburgh        |
|1001|Daniel Scott        |Port Jody           |
|1002|Heather Thompson    |South Stephanieshire|
|1003|Dr. Scott Morgan DDS|Danielberg          |
|1004|Jennifer Torres     |Lake Jessica        |
+----+--------------------+--------------------+



La table parquet_table est maintenant contenue dans un fichier parquet (en conditions réelles, il y aura autant de fichiers parquets que de workers mais travailler avec un seul worker simplifie cette présentation).

In [13]:
showFilesInDir(directoryParquet)

/home/jovyan/data/parquet/part-00000-a2464fb9-ffc4-4fb6-bea6-7d8b7f79f497-c000.snappy.parquet 0.724852 MB
/home/jovyan/data/parquet/_SUCCESS 0.0 MB


In [14]:
spark.sql(""" 
             INSERT INTO parquet_table
              SELECT * FROM parquet_table LIMIT 10;
          """)

res10: org.apache.spark.sql.DataFrame = []


Spark va simplement créer un second fichier parquet qui contient les dix lignes que nous venons d’ajouter à la table :

In [15]:
showFilesInDir(directoryParquet)

/home/jovyan/data/parquet/part-00000-a2464fb9-ffc4-4fb6-bea6-7d8b7f79f497-c000.snappy.parquet 0.724852 MB
/home/jovyan/data/parquet/_SUCCESS 0.0 MB
/home/jovyan/data/parquet/part-00000-0b0bf9dd-7961-4aae-8a1a-5fdcb31d2199-c000.snappy.parquet 0.003041 MB


Ce mode de stockage ne permet pas de connaître l’historique de la table car seul son état à l’instant T est disponible. L’exemple ci-dessus est simpliste mais imaginons un dossier de plusieurs centaines de fichiers sur une table régulièrement modifiée, il sera impossible de reconstituer un historique des transformations.

In [16]:
spark.sql("SELECT COUNT(*) FROM parquet_table").show

+--------+
|count(1)|
+--------+
|   10010|
+--------+



Par ailleurs, la commande suivante ne marchera pas en raison de l’immutabilité des fichiers parquet :

In [17]:
// spark.sql("DELETE FROM parquet_table WHERE id >= 480000")
// org.apache.spark.sql.AnalysisException: DELETE is only supported with v2 tables.

# Les tables au format Delta

D’un point de vue purement pratique, Delta Lake s’utilise comme un format de fichier.

Pour écrire une table, il suffit donc de remplacer .format(« parquet ») par .format(« delta ») :

Error with DeltaLake, could be caused by a bug in spark?

https://github.com/vericast/spylon-kernel/issues/40#issuecomment-318903262

In [38]:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, TimestampType}

val id = StructField("id", IntegerType)
val name = StructField("name", StringType)
val email = StructField("email", StringType)
val address = StructField("address", StringType)
val city = StructField("city", StringType)
//val dateTime = StructField("dateTime", TimestampType)
//val randomInt = StructField("randomInt", IntegerType)


val schema = StructType(Array(id, name, email))

val df = spark.read.schema(schema).csv(rawDataFile)



import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, TimestampType}
id: org.apache.spark.sql.types.StructField = StructField(id,IntegerType,true)
name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true)
email: org.apache.spark.sql.types.StructField = StructField(email,StringType,true)
address: org.apache.spark.sql.types.StructField = StructField(address,StringType,true)
city: org.apache.spark.sql.types.StructField = StructField(city,StringType,true)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(email,StringType,true))
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]


In [39]:
df.write
.format("delta")
.mode("overwrite")
.save(deltaDataFolder)

org.apache.spark.SparkException:  Job aborted.

Comme pour une table classique, Delta Lake permet l’utilisation de ces fonctionnalités via la syntaxe spark.sql :

In [None]:
spark.sql(s"""
    CREATE TABLE deltalake_table
    USING DELTA
    LOCATION '${deltaDataFolder}'
  """)

spark.sql("SELECT * FROM deltalake_table LIMIT 10;").show