# Mini-cycle de maintenance Delta Lake – GreenFarm (Unity Catalog + Volumes)

Objectif : reproduire un mini-cycle de maintenance Delta Lake complet.

### Étapes
1) Créer une table Delta **partitionnée** par `ingest_date` à partir du CSV  
2) Simuler une croissance (générer et ajouter des lignes)  
3) Observer la fragmentation (`numFiles`, `sizeInBytes`)  
4) Lancer `OPTIMIZE` sur les partitions récentes  
5) Activer le CDF, faire une mise à jour + insertion, puis lire les changements  
6) Tester Time Travel (lire une version antérieure)  
7) Lancer `VACUUM` avec une rétention courte (24h) et constater l’effet  

✅ À la fin : expliquer ce que vous observez avant/après OPTIMIZE, puis l’intérêt de CDF / Time Travel / VACUUM.


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
import time

# Paramètres (Unity Catalog + Volumes) - (à adapter si besoin)
csv_path = "/Volumes/workspace/default/oc-lakehouse/sensors_full.csv"

db_name = "greenfarm"
table_name = "sensors_maintenance"
full_name = f"{db_name}.{table_name}"

print("CSV:", csv_path)
print("Table:", full_name)


CSV: /Volumes/workspace/default/oc-lakehouse/sensors_full.csv
Table: greenfarm.sensors_maintenance


## 1) Créer une table Delta partitionnée (ingest_date)

Ici on **impose le schéma** au chargement du CSV pour éviter les problèmes de type
(ex : `sensor_id` lu comme string).

On ajoute ensuite `ingest_date` et on écrit une **managed table Unity Catalog**
partitionnée par `ingest_date`.

**À observer :**
- schéma final
- `partitionColumns = ['ingest_date']`
- entrée dans `DESCRIBE HISTORY`

In [0]:
# 1) DB + drop table
spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
spark.sql(f"DROP TABLE IF EXISTS {full_name}")

# 2) Schéma imposé
schema = T.StructType([
    T.StructField("sensor_id", T.LongType(), True),
    T.StructField("humidity", T.DoubleType(), True),
    T.StructField("parcel", T.StringType(), True),
])

df = (
    spark.read.option("header", True)
    .schema(schema)
    .csv(csv_path)
    .withColumn("ingest_date", F.current_date())
)

display(df.limit(10))
print("Rows:", df.count())
df.printSchema()

# 3) Écriture Delta partitionnée (UC managed table)
(df.write.format("delta")
 .mode("overwrite")
 .partitionBy("ingest_date")
 .saveAsTable(full_name)
)

# Vérifications
display(spark.sql(f"DESCRIBE DETAIL {full_name}"))
display(spark.sql(f"DESCRIBE HISTORY {full_name}"))

sensor_id,humidity,parcel,ingest_date
1000,30.9,West-2,2026-01-23
1001,35.0,West-1,2026-01-23
1002,47.5,East-1,2026-01-23
1003,56.2,East-2,2026-01-23
1004,54.2,North-1,2026-01-23
1005,43.5,South-1,2026-01-23
1006,36.6,East-1,2026-01-23
1007,56.6,West-2,2026-01-23
1008,55.1,East-1,2026-01-23
1009,31.0,South-2,2026-01-23


Rows: 6000
root
 |-- sensor_id: long (nullable = true)
 |-- humidity: double (nullable = true)
 |-- parcel: string (nullable = true)
 |-- ingest_date: date (nullable = false)



format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics,clusterByAuto
delta,56802398-b2e8-459b-a4bf-d05a1c9c75a7,workspace.greenfarm.sensors_maintenance,,,2026-01-23T09:52:28.907Z,2026-01-23T09:52:33.000Z,List(ingest_date),List(),1,17337,"Map(delta.parquet.compression.codec -> zstd, delta.enableDeletionVectors -> true)",3,7,"List(appendOnly, deletionVectors, invariants)","Map(numRowsDeletedByDeletionVectors -> 0, numDeletionVectors -> 0)",False


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2026-01-23T09:52:33.000Z,252167878869526,alexandre.bergere@datalex.io,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [""ingest_date""], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(2491241394162189),0123-095152-mnd9q50q-v2n,,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 6000, numOutputBytes -> 17337)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13


## 2) Simuler une croissance (génération + append)

Comme le CSV d’origine fait ~6 000 lignes, un simple `limit(200000)` ne peut pas produire
200 000 lignes (il n’y a pas assez de données).

On va donc **dupliquer** le dataset via un `crossJoin(range(multiplier))` pour créer
beaucoup de nouvelles lignes, puis les écrire en `append`.

Pour créer de la fragmentation :
- `repartition(n_files_target)`
- `maxRecordsPerFile` bas


In [0]:
multiplier = 50        # 6000 * 50 ≈ 300k lignes générées
n_files_target = 200   # fragmentation volontaire

base = spark.table(full_name)

growth = (
    base.crossJoin(spark.range(multiplier).withColumnRenamed("id", "batch_id"))
        # On rend sensor_id unique pour éviter collisions
        .withColumn("sensor_id", F.col("sensor_id") + F.col("batch_id") * F.lit(1_000_000))
        # On simule des données récentes (partition récente)
        .withColumn("ingest_date", F.date_add(F.current_date(), 1))
        .drop("batch_id")
        .repartition(n_files_target)
)

(growth.write.format("delta")
 .mode("append")
 .option("maxRecordsPerFile", 500)   # plus petit => + de fichiers
 .saveAsTable(full_name)
)

print("Total rows after growth:", spark.table(full_name).count())
display(spark.sql(f"DESCRIBE HISTORY {full_name}"))

Total rows after growth: 306000


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2026-01-23T09:54:32.000Z,252167878869526,alexandre.bergere@datalex.io,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2491241394162189),0123-095152-mnd9q50q-v2n,0.0,WriteSerializable,False,"Map(numFiles -> 600, numOutputRows -> 300000, numOutputBytes -> 1650083)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
0,2026-01-23T09:52:33.000Z,252167878869526,alexandre.bergere@datalex.io,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [""ingest_date""], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(2491241394162189),0123-095152-mnd9q50q-v2n,,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 6000, numOutputBytes -> 17337)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13


## 3) Observer la fragmentation

Sur Databricks Free + Unity Catalog, on observe la fragmentation via `DESCRIBE DETAIL` :
- `numFiles` = nombre de fichiers Parquet (indicateur de fragmentation)
- `sizeInBytes` = taille totale

In [0]:
detail = spark.sql(f"DESCRIBE DETAIL {full_name}").collect()[0].asDict()

print("format:", detail.get("format"))
print("partitionColumns:", detail.get("partitionColumns"))
print("numFiles:", detail.get("numFiles"))
print("sizeInMB:", round((detail.get("sizeInBytes") or 0) / (1024*1024), 2))

format: delta
partitionColumns: ['ingest_date']
numFiles: 347
sizeInMB: 1.07


## 4) OPTIMIZE sur partitions récentes

Comme la table est partitionnée sur `ingest_date`, on peut optimiser uniquement les partitions récentes.

**À observer :**
- `numFiles` diminue après OPTIMIZE
- nouvelle entrée dans l’historique (operation = OPTIMIZE)

In [0]:
before = spark.sql(f"DESCRIBE DETAIL {full_name}").collect()[0].asDict()
print("BEFORE OPTIMIZE - numFiles:", before["numFiles"], "sizeMB:", round(before["sizeInBytes"]/(1024*1024), 2))

spark.sql(
    f"""
    OPTIMIZE {full_name}
    WHERE ingest_date >= date_sub(current_date(), 2)
    """
)

after = spark.sql(f"DESCRIBE DETAIL {full_name}").collect()[0].asDict()
print("AFTER OPTIMIZE  - numFiles:", after["numFiles"], "sizeMB:", round(after["sizeInBytes"]/(1024*1024), 2))

display(spark.sql(f"DESCRIBE HISTORY {full_name}"))

BEFORE OPTIMIZE - numFiles: 347 sizeMB: 1.07
AFTER OPTIMIZE  - numFiles: 2 sizeMB: 0.36


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2026-01-23T09:58:22.000Z,252167878869526,alexandre.bergere@datalex.io,OPTIMIZE,"Map(predicate -> [""('ingest_date >= 'date_sub('current_date(), 2))""], auto -> false, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,List(2491241394162189),0123-095152-mnd9q50q-v2n,3.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 346, numRemovedBytes -> 1107230, p25FileSize -> 360923, numDeletionVectorsRemoved -> 0, minFileSize -> 360923, numAddedFiles -> 1, maxFileSize -> 360923, p75FileSize -> 360923, p50FileSize -> 360923, numAddedBytes -> 360923)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
3,2026-01-23T09:54:44.000Z,252167878869526,alexandre.bergere@datalex.io,OPTIMIZE,"Map(predicate -> [""('ingest_date <=> cast(2026-01-24 as date))""], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 1)",,List(2491241394162189),0123-095152-mnd9q50q-v2n,1.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 128, numRemovedBytes -> 351842, p25FileSize -> 79574, numDeletionVectorsRemoved -> 0, conflictDetectionTimeMs -> 88, minFileSize -> 79574, numAddedFiles -> 1, maxFileSize -> 79574, p75FileSize -> 79574, p50FileSize -> 79574, numAddedBytes -> 79574)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
2,2026-01-23T09:54:41.000Z,252167878869526,alexandre.bergere@datalex.io,OPTIMIZE,"Map(predicate -> [""('ingest_date <=> cast(2026-01-24 as date))""], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,List(2491241394162189),0123-095152-mnd9q50q-v2n,1.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 128, numRemovedBytes -> 352161, p25FileSize -> 81576, numDeletionVectorsRemoved -> 0, minFileSize -> 81576, numAddedFiles -> 1, maxFileSize -> 81576, p75FileSize -> 81576, p50FileSize -> 81576, numAddedBytes -> 81576)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
1,2026-01-23T09:54:32.000Z,252167878869526,alexandre.bergere@datalex.io,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2491241394162189),0123-095152-mnd9q50q-v2n,0.0,WriteSerializable,False,"Map(numFiles -> 600, numOutputRows -> 300000, numOutputBytes -> 1650083)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
0,2026-01-23T09:52:33.000Z,252167878869526,alexandre.bergere@datalex.io,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [""ingest_date""], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(2491241394162189),0123-095152-mnd9q50q-v2n,,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 6000, numOutputBytes -> 17337)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13


## 5) Activer le CDF + faire une mise à jour + une insertion + lire les changements

Le Change Data Feed (CDF) permet de lire uniquement les lignes modifiées entre deux versions.

On va :
- activer le CDF
- noter la version `v_before`
- faire un UPDATE + un INSERT
- noter `v_after`
- lire les changements via `table_changes(...)`

In [0]:
# Version avant modifications
v_before = spark.sql(f"DESCRIBE HISTORY {full_name} LIMIT 1").collect()[0]["version"]
print("Version before changes:", v_before)

# UPDATE (sur un capteur existant)
spark.sql(f"UPDATE {full_name} SET humidity = humidity + 1 WHERE sensor_id = 1000")

# INSERT (nouveau capteur) — version robuste: cast explicite
new_id = int(time.time()) % 10_000_000

sample = spark.table(full_name).orderBy(F.rand()).limit(1)

inserted = (
    sample
    .withColumn("sensor_id", F.lit(new_id).cast("long"))   # ✅ cast explicite
    .withColumn("humidity", F.col("humidity").cast("double"))  # ✅ safe
    .withColumn("ingest_date", F.current_date())
)

# (Optionnel) vérifier le schéma avant écriture
inserted.printSchema()

inserted.write.format("delta").mode("append").saveAsTable(full_name)

# Version après modifications
v_after = spark.sql(f"DESCRIBE HISTORY {full_name} LIMIT 1").collect()[0]["version"]
print("Version after changes:", v_after)
print("Inserted sensor_id:", new_id)

Version before changes: 6
root
 |-- sensor_id: long (nullable = false)
 |-- humidity: double (nullable = true)
 |-- parcel: string (nullable = true)
 |-- ingest_date: date (nullable = false)

Version after changes: 8
Inserted sensor_id: 9162485


In [0]:
display(
    spark.sql(
        f"""
        SELECT *
        FROM table_changes('{full_name}', {v_before}, {v_after})
        ORDER BY _commit_version, _commit_timestamp
        """
    )
)

sensor_id,humidity,parcel,ingest_date,_change_type,_commit_version,_commit_timestamp
1000,31.9,West-2,2026-01-24,update_postimage,6,2026-01-23T09:58:56.000Z
1000,30.9,West-2,2026-01-23,update_preimage,6,2026-01-23T09:58:56.000Z
1000,30.9,West-2,2026-01-24,update_preimage,6,2026-01-23T09:58:56.000Z
1000,31.9,West-2,2026-01-23,update_postimage,6,2026-01-23T09:58:56.000Z
1000,32.9,West-2,2026-01-24,update_postimage,7,2026-01-23T10:01:25.000Z
1000,32.9,West-2,2026-01-23,update_postimage,7,2026-01-23T10:01:25.000Z
1000,31.9,West-2,2026-01-23,update_preimage,7,2026-01-23T10:01:25.000Z
1000,31.9,West-2,2026-01-24,update_preimage,7,2026-01-23T10:01:25.000Z
9162485,38.3,North-2,2026-01-23,insert,8,2026-01-23T10:01:27.000Z


## 6) Time Travel (lecture d’une version passée)

On relit la version 0 (état initial de la table) pour comparer avec l’état actuel.

**À observer :**
- la version 0 ne contient pas les ajouts / changements récents
- utile pour audit, debug et reproductibilité

In [0]:
display(spark.sql(f"SELECT * FROM {full_name} VERSION AS OF 0 LIMIT 20"))
display(spark.table(full_name).limit(20))  # version courante

sensor_id,humidity,parcel,ingest_date
1000,30.9,West-2,2026-01-23
1001,35.0,West-1,2026-01-23
1002,47.5,East-1,2026-01-23
1003,56.2,East-2,2026-01-23
1004,54.2,North-1,2026-01-23
1005,43.5,South-1,2026-01-23
1006,36.6,East-1,2026-01-23
1007,56.6,West-2,2026-01-23
1008,55.1,East-1,2026-01-23
1009,31.0,South-2,2026-01-23


sensor_id,humidity,parcel,ingest_date
1171,33.2,North-1,2026-01-24
1371,32.5,South-2,2026-01-24
1571,50.1,North-1,2026-01-24
1771,44.0,West-2,2026-01-24
1971,44.6,East-1,2026-01-24
2171,52.9,West-1,2026-01-24
2371,37.6,West-2,2026-01-24
2571,45.3,East-2,2026-01-24
2771,40.5,South-1,2026-01-24
2971,46.8,North-1,2026-01-24


## 7) VACUUM avec une rétention courte (24h)

Sur certains environnements (ex. Databricks Free Edition), il n’est pas possible de désactiver le “retention check”
via une configuration Spark globale.

On configure donc la rétention **au niveau de la table Delta** :
- `delta.deletedFileRetentionDuration` : durée de conservation des fichiers de données supprimés
- `delta.logRetentionDuration` : durée de conservation des fichiers de logs Delta

Puis on lance `VACUUM`.

⚠️ Attention : réduire la rétention limite le time travel sur des versions anciennes.

In [0]:
# 1) Configurer la rétention au niveau de la table (24h)
spark.sql(f"""
ALTER TABLE {full_name}
SET TBLPROPERTIES (
  delta.deletedFileRetentionDuration = 'interval 24 hours',
  delta.logRetentionDuration = 'interval 24 hours'
)
""")

# Vérifier que les propriétés sont bien prises en compte
display(spark.sql(f"SHOW TBLPROPERTIES {full_name}"))

# 2) Lancer VACUUM (24h)
spark.sql(f"VACUUM {full_name} RETAIN 24 HOURS")

key,value
delta.deletedFileRetentionDuration,interval 24 hours
delta.enableChangeDataFeed,true
delta.enableDeletionVectors,true
delta.feature.appendOnly,supported
delta.feature.changeDataFeed,supported
delta.feature.deletionVectors,supported
delta.feature.invariants,supported
delta.logRetentionDuration,interval 24 hours
delta.minReaderVersion,3
delta.minWriterVersion,7


DataFrame[path: string]

## Conclusion (3–5 phrases)

- Après plusieurs écritures, la table est fragmentée (numFiles augmente).
- OPTIMIZE compacte les fichiers : numFiles diminue et les lectures deviennent plus efficaces.
- Le CDF permet d’extraire uniquement les changements entre versions (utile pour CDC / pipelines incrémentaux).
- Time Travel permet de relire un état passé de la table (audit/debug).
- VACUUM supprime les fichiers obsolètes après rétention (utile pour maîtriser les coûts, mais attention à l’impact sur time travel).