<a href="https://colab.research.google.com/github/Mbaroudi/DELTA_LAKE_TIPS/blob/main/Delta_Lake_SCD_Tips_Frensh.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Dans le contexte de **PySpark** et de l'utilisation de **Delta Lake** pour gérer les dimensions avec des stratégies de dimension à évolution lente (SCD), le concept de **dimension « Type 7 »** n'est pas standard. Normalement, nous avons les types 1 à 6 qui couvrent divers scénarios de gestion des modifications des données dimensionnelles.
Cependant, le « Type 7 » fait souvent référence à une **approche hybride** qui combine les fonctionnalités du Type 1 (écrasement des anciennes données) et du Type 2 (suivi des données historiques avec gestion des versions). Cette approche permet d'interroger à la fois l'état actuel des données et leurs versions historiques.

Pour implémenter une dimension **SCD de type 7 dans PySpark à l'aide de Delta Lake**, vous implémenterez essentiellement un **SCD de type 2 **mais conserverez également une vue actuelle pour un accès plus facile aux derniers enregistrements. Vous trouverez ci-dessous un exemple de la façon dont vous pouvez configurer cela à l'aide de ***PySpark et Delta Lake*** :

# Conditions préalables
**Configuration d'Apache Spark et Delta Lake :** assurez-vous qu'Apache Spark et Delta Lake sont correctement configurés dans votre environnement.
La bibliothèque de Delta Lake doit être incluse dans votre session Spark.
Configuration initiale du DataFrame :
 * supposons que vous disposez d'un DataFrame avec des données client qui
incluent ***un identifiant client, un nom_client, une adresse e-mail, revenu, une date_effet et un indicateur is_current***.

In [None]:
!pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("SCD Type 7 avec Aggrégats Temporels") \
    #.config("spark.sql.warehouse.dir", "/path/to/hive/warehouse") \
    #.enableHiveSupport() \
    .master("local[*]") \
    .getOrCreate()


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=78645a1f225b8e9e780e160038e97f693a622b52ee9f9e53cb3e97f9131a71a4
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


## **Préparez les données:** Insérez le code suivant pour initialiser vos DataFrames.

In [None]:
from pyspark.sql import SparkSession, functions as F

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("SCD Type 7 with Temporal Aggregates") \
    .master("local[*]") \
    .getOrCreate()

# Sample data with 'is_current' column added
data = [
    (1, "John Doe", "john.doe@email.com", 1000, "2020-01-15", True),
    (2, "Jane Smith", "jane.smith@email.com", 1500, "2020-01-20", True)
]
columns = ["customer_id", "customer_name", "email", "revenue", "effective_date", "is_current"]
df = spark.createDataFrame(data, schema=columns)

new_data = [
    (1, "Johnathan Doe", "john.doe@email.com", 1200, "2020-02-01", True),
    (2, "Jane Smith", "jane.smith@email.com", 1500, "2020-02-01", True),
    (3, "Mike Jones", "mike.jones@email.com", 500, "2020-02-01", True)
]
new_df = spark.createDataFrame(new_data, schema=columns)

# Convert 'effective_date' to date type
df = df.withColumn("effective_date", F.to_date(F.col("effective_date")))
new_df = new_df.withColumn("effective_date", F.to_date(F.col("effective_date")))
df.show()
new_df.show()


+-----------+-------------+--------------------+-------+--------------+----------+
|customer_id|customer_name|               email|revenue|effective_date|is_current|
+-----------+-------------+--------------------+-------+--------------+----------+
|          1|     John Doe|  john.doe@email.com|   1000|    2020-01-15|      true|
|          2|   Jane Smith|jane.smith@email.com|   1500|    2020-01-20|      true|
+-----------+-------------+--------------------+-------+--------------+----------+

+-----------+-------------+--------------------+-------+--------------+----------+
|customer_id|customer_name|               email|revenue|effective_date|is_current|
+-----------+-------------+--------------------+-------+--------------+----------+
|          1|Johnathan Doe|  john.doe@email.com|   1200|    2020-02-01|      true|
|          2|   Jane Smith|jane.smith@email.com|   1500|    2020-02-01|      true|
|          3|   Mike Jones|mike.jones@email.com|    500|    2020-02-01|      true|
+--

## **Appliquez SCD Type 7:** Utilisez le code suivant pour simuler les opérations SCD Type 7.

In [None]:
def apply_scd_type_2(base_df, updates_df):
    # Join based on customer_id and check for changes
    condition = (base_df["customer_id"] == updates_df["customer_id"]) & \
                (base_df["is_current"] == True) & \
                ((base_df["customer_name"] != updates_df["customer_name"]) |
                 (base_df["revenue"] != updates_df["revenue"]))

    # Set existing records to not current if changes are detected
    updates_df = updates_df.withColumn("is_current", F.lit(True))
    updated_existing_df = base_df.join(updates_df, "customer_id", "inner") \
                                 .filter(condition) \
                                 .select(base_df["*"]) \
                                 .withColumn("is_current", F.lit(False))

    # Union all: unchanged existing, updated existing set to false, and new updates set to true
    final_df = base_df.join(updated_existing_df, ["customer_id"], "left_anti") \
                      .unionByName(updated_existing_df) \
                      .unionByName(updates_df)

    return final_df

historical_df = apply_scd_type_2(df, new_df)
historical_df.show()


+-----------+-------------+--------------------+-------+--------------+----------+
|customer_id|customer_name|               email|revenue|effective_date|is_current|
+-----------+-------------+--------------------+-------+--------------+----------+
|          2|   Jane Smith|jane.smith@email.com|   1500|    2020-01-20|      true|
|          1|     John Doe|  john.doe@email.com|   1000|    2020-01-15|     false|
|          1|Johnathan Doe|  john.doe@email.com|   1200|    2020-02-01|      true|
|          2|   Jane Smith|jane.smith@email.com|   1500|    2020-02-01|      true|
|          3|   Mike Jones|mike.jones@email.com|    500|    2020-02-01|      true|
+-----------+-------------+--------------------+-------+--------------+----------+



# **Analysez les Aggrégats Temporels:**

In [None]:
# Calcul des agrégats temporels
from pyspark.sql import functions as F

historical_df = historical_df.withColumn("month", F.month("effective_date"))
historical_df = historical_df.withColumn("quarter", F.quarter("effective_date"))
historical_df = historical_df.withColumn("year", F.year("effective_date"))

historical_df.createOrReplaceTempView("historical_data")

# Calcul de MTD, QTD, YTD
aggregates_query = """
SELECT customer_id, customer_name,
       SUM(CASE WHEN month = 2 AND year = 2020 THEN revenue ELSE 0 END) as MTD_Revenue,
       SUM(CASE WHEN quarter = (SELECT quarter FROM historical_data WHERE month = 2 AND year = 2020 LIMIT 1) AND year = 2020 THEN revenue ELSE 0 END) as QTD_Revenue,
       SUM(CASE WHEN year = 2020 THEN revenue ELSE 0 END) as YTD_Revenue
FROM historical_data
GROUP BY customer_id, customer_name
"""
aggregates_df = spark.sql(aggregates_query)
aggregates_df.show()




+-----------+-------------+-----------+-----------+-----------+
|customer_id|customer_name|MTD_Revenue|QTD_Revenue|YTD_Revenue|
+-----------+-------------+-----------+-----------+-----------+
|          2|   Jane Smith|       1500|       3000|       3000|
|          1|     John Doe|          0|       1000|       1000|
|          1|Johnathan Doe|       1200|       1200|       1200|
|          3|   Mike Jones|        500|        500|        500|
+-----------+-------------+-----------+-----------+-----------+



# **Écriture des données historiques (y compris toutes les versions) :**

In [None]:
# Stockage dans une table de faits Hive
aggregates_df.write.mode("overwrite").saveAsTable("fact_revenue_aggregates")
# Pour ajouter des données au lieu de les écraser, vous pouvez utiliser mode("append")

# **Rédaction des enregistrements actuels :**

In [None]:
# Filtrage uniquement des enregistrements actuels
current_records_df = historical_df.filter("is_current = True")

# Écrivez ou écrasez les données de l'enregistrement actuel dans une autre table Hive
current_records_df.write.mode("overwrite").saveAsTable("hive_current_records")


Consommation de données pour les outils de tableau de bord
Une fois vos données stockées dans Hive, elles sont accessibles par divers outils de BI et de tableaux de bord qui se connectent à Hive. La configuration implique généralement :

**Connexion de l'outil à Hive :** configurez votre outil de tableau de bord (Microstrat) pour vous connecter à votre base de données Hive.
Interrogation de données : utilisez des requêtes SQL dans l'outil pour récupérer et visualiser les données, en tirant parti des tables **hive_historical_data** et **hive_current_records**.

## Inconvénients de la Méthode Full Overwrite sur Hive

### Risque de Perte de Données :

Lorsque vous écrasez une table, il y a un court intervalle durant lequel les données ne sont pas disponibles, ce qui peut être problématique pour les opérations en continu. De plus, en cas d'erreur pendant l'opération d'écriture, il est possible que les données soient partiellement écrites ou corrompues, ce qui pourrait entraîner une perte de données.
### Utilisation Inefficace des Ressources :
Écraser une table entière à chaque mise à jour est une opération très coûteuse en termes de traitement et de stockage, surtout si seules quelques lignes ou colonnes nécessitent une actualisation. Cela entraîne une utilisation inefficace des ressources de calcul et de stockage.
### Performance :
La performance peut être sérieusement affectée, surtout avec de grandes tables. Le processus de rechargement complet peut être très long, ce qui retarde la disponibilité des données mises à jour pour les utilisateurs et les processus en aval.
### Impact sur les Utilisateurs en Aval :
Pendant que la table est en cours de surcharge, les utilisateurs qui dépendent de cette table pour des rapports ou des analyses peuvent se retrouver sans données ou avec des données incomplètes, impactant ainsi leur capacité à effectuer des tâches critiques.
### Complexité de la Gestion des Versions :
Si des versions historiques des données sont nécessaires pour l'audit ou d'autres analyses rétrospectives, les gérer devient plus compliqué avec une approche de surcharge. Chaque surcharge efface l'historique antérieur, à moins que des mesures supplémentaires ne soient prises pour archiver ces données.
### Considérations de Synchronisation :
Si la table est utilisée par plusieurs utilisateurs ou processus, synchroniser les accès pour éviter les lectures de données en état de transition peut devenir une tâche administrative complexe.

# Un nouveau concept Delta Lake avec PySpark

In [None]:
!pip install pyspark==3.1.2 delta-spark


# **Étape 2 : Configuration de la Session Spark**
Après avoir installé les packages nécessaires, configurez votre session Spark pour utiliser Delta Lake :

In [None]:
!pip install pyspark==3.1.2 delta-spark

from pyspark.sql import SparkSession

builder = SparkSession.builder.appName("SCD Type 7 with Temporal Aggregates") \
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0")

spark = builder.getOrCreate()




# **Étape 3 : Utilisation de Delta Lake**
Avec la session configurée, vous pouvez maintenant créer des DataFrames, les écrire en format Delta, et effectuer des mises à jour ou des suppressions si nécessaire. Voici comment procéder :

In [None]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType

# Initialize Spark Session with Delta Lake
spark = SparkSession.builder \
    .appName("SCD Type 7 with Temporal Aggregates") \
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define schema without DateType for initial creation
schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("customer_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("revenue", IntegerType(), True),
    StructField("effective_date", StringType(), True),  # Temporarily as StringType
    StructField("is_current", BooleanType(), True)
])

# Sample data, keeping dates as strings initially
data = [
    (1, "John Doe", "john.doe@email.com", 1000, "2020-01-15", True),
    (2, "Jane Smith", "jane.smith@email.com", 1500, "2020-01-20", True)
]

# Create DataFrame with dates as strings
df = spark.createDataFrame(data, schema=schema)

# Convert 'effective_date' from string to DateType within the DataFrame
df = df.withColumn("effective_date", F.to_date(F.col("effective_date")))

df.show()

# Define a path for the Delta table
delta_path = "/tmp/delta-table"

# Write the DataFrame to Delta format
df.write.format("delta").mode("overwrite").save(delta_path)

# Read the DataFrame back as a DeltaTable
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, delta_path)

# New data, converted dates within the DataFrame creation
new_data = [
    (1, "Johnathan Doe", "john.doe@email.com", 1200, "2020-02-01", True),
    (2, "Jane Smith", "jane.smith@email.com", 1500, "2020-02-01", True),
    (3, "Mike Jones", "mike.jones@email.com", 500, "2020-02-01", True)
]

new_df = spark.createDataFrame(new_data, schema=schema)
new_df = new_df.withColumn("effective_date", F.to_date(F.col("effective_date")))

new_df.show()
# Perform the merge operation
delta_table.alias("old").merge(
    new_df.alias("new"),
    "old.customer_id = new.customer_id"
).whenMatchedUpdate(
    condition = """
        old.is_current = TRUE AND (
        old.customer_name != new.customer_name OR
        old.email != new.email OR
        old.revenue != new.revenue)
    """,
    set = {
        "customer_name": F.col("new.customer_name"),
        "email": F.col("new.email"),
        "revenue": F.col("new.revenue"),
        "effective_date": F.col("new.effective_date"),
        "is_current": F.lit(False)
    }
).whenNotMatchedInsertAll().execute()

# Show the updated results
updated_df = spark.read.format("delta").load(delta_path)
updated_df.show()


+-----------+-------------+--------------------+-------+--------------+----------+
|customer_id|customer_name|               email|revenue|effective_date|is_current|
+-----------+-------------+--------------------+-------+--------------+----------+
|          1|     John Doe|  john.doe@email.com|   1000|    2020-01-15|      true|
|          2|   Jane Smith|jane.smith@email.com|   1500|    2020-01-20|      true|
+-----------+-------------+--------------------+-------+--------------+----------+

+-----------+-------------+--------------------+-------+--------------+----------+
|customer_id|customer_name|               email|revenue|effective_date|is_current|
+-----------+-------------+--------------------+-------+--------------+----------+
|          1|Johnathan Doe|  john.doe@email.com|   1200|    2020-02-01|      true|
|          2|   Jane Smith|jane.smith@email.com|   1500|    2020-02-01|      true|
|          3|   Mike Jones|mike.jones@email.com|    500|    2020-02-01|      true|
+--


Il est tout à fait possible de combiner deux approches *(Utilisation de Delta Lake pour Emuler des Opérations Transactionnelles / Utilisation de Tableaux Externes et Vues pour Simuler des Updates)* pour gérer les limitations de Hive en matière de mises à jour et de suppressions de données.

En mélangeant les choix 1 (utilisation de Delta Lake pour émuler des opérations transactionnelles) et 4 (utilisation de tableaux externes et de vues pour simuler des updates), vous pouvez créer un système hybride qui optimise les capacités de votre infrastructure existante tout en apportant une flexibilité accrue pour la gestion des données.

**Voici comment vous pourriez procéder :**

# Combinaison des Approches avec Delta Lake et Hive

## Utilisation de Delta Lake comme Couche de Traitement Primaire :
 **Stockage Temporaire :**

 Utilisez Delta Lake pour gérer les données de manière transactionnelle dans une zone de stockage temporaire. Cela vous permet d'utiliser les fonctionnalités de mise à jour, de suppression et de merge qui ne sont pas disponibles dans Hive.

**Transformation et Préparation :**

Effectuez toutes les transformations nécessaires, y compris les mises à jour et les nettoyages de données, dans cette couche avant de les transférer à Hive.

# **Exportation vers Hive en Utilisant des Tables Externes :**

**Création de Tables Externes :**

Une fois les données prêtes dans Delta Lake, exportez-les sous forme de fichiers Parquet (ou un autre format optimal pour votre cas d'usage) dans un répertoire accessible par Hive.
Tables Hive sur Fichiers Externes : Créez des tables Hive qui pointent vers ces fichiers externes. Ces tables ne seront pas transactionnelles, mais elles bénéficieront de la préparation et des mises à jour effectuées dans la couche Delta Lake.

**Utilisation de Vues pour Simuler les Mises à Jour :**

**Création de Vues :**

Dans Hive, créez des vues qui simulent les versions les plus récentes des données en utilisant des clauses SQL pour filtrer les enregistrements selon les critères que vous auriez définis (par exemple, le timestamp le plus récent, les flags de version, etc.).
Vues Aggregées : Si nécessaire, utilisez des vues pour créer des agrégats ou des synthèses qui sont régulièrement mises à jour par vos opérations dans Delta Lake et répercutées dans Hive via les nouvelles écritures de fichiers.
Avantages de cette Approche Hybride

**Flexibilité :**

Vous combinez la flexibilité transactionnelle de Delta Lake avec la capacité de stockage et l'écosystème analytique étendu de Hive.

**Performance :**

Les opérations lourdes et les mises à jour sont gérées en amont dans un environnement optimisé (Delta Lake), améliorant ainsi la performance des requêtes exécutées sur Hive.

**Sécurité des Données :**

Les données peuvent être préparées et nettoyées efficacement avant d'être exposées via Hive, réduisant le risque d'erreurs ou de données obsolètes.

# Configuration de l'Environnement Spark avec Iceberg

In [None]:
#!wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark3-runtime/0.11.0/iceberg-spark3-runtime-0.11.0.jar
#!wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql_2.12/3.0.1/spark-sql_2.12-3.0.1.jar

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /content/iceberg-spark3-runtime-0.11.0.jar,/content/spark-sql_2.12-3.0.1.jar pyspark-shell'


In [None]:
#!pip install pyspark
#!wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.2_2.12/0.13.1/iceberg-spark-runtime-3.2_2.12-0.13.1.jar


In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /content/iceberg-spark-runtime-3.2_2.12-0.13.1.jar pyspark-shell'


In [None]:
#!wget -q https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.1_2.12/0.13.1/iceberg-spark-runtime-3.1_2.12-0.13.1.jar -O iceberg-runtime.jar


In [None]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /content/iceberg-runtime.jar pyspark-shell'


In [None]:
import os

os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages org.apache.iceberg:iceberg-spark3-runtime:0.13.1 "
    "--repositories https://repo.maven.apache.org/maven2 "
    "pyspark-shell"
)

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Iceberg Integration") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "file:///content/iceberg_warehouse") \
    .getOrCreate()


In [None]:
# dropper la table Iceberg if is not exist
spark.sql("""
DROP table local.db.table_name
""")


DataFrame[]

In [None]:
# Création d'une table Iceberg if is not exist
spark.sql("""
 CREATE TABLE local.db.table_name (
    id int,
    data string
) USING iceberg
""")

# Insertion de données
spark.sql("""
INSERT INTO local.db.table_name VALUES (1, 'hello'), (2, 'world')
""")

# Lecture des données
df = spark.sql("SELECT * FROM local.db.table_name")
df.show()


In [None]:
#!pip install pyspark
#!wget -q https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.1_2.12/0.13.1/iceberg-spark-runtime-3.1_2.12-0.13.1.jar -O iceberg-runtime.jar

import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.apache.iceberg:iceberg-spark3-runtime:0.13.1 --repositories https://repo.maven.apache.org/maven2 pyspark-shell"

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Iceberg Integration") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "file:///content/iceberg_warehouse") \
    .getOrCreate()


In [None]:
schema = """
DROP TABLE IF EXISTS local.default.customer_data
"""
spark.sql(schema)


DataFrame[]

In [None]:
schema = """
CREATE TABLE IF NOT EXISTS local.default.customer_data (
    customer_id INT,
    customer_name STRING,
    email STRING,
    revenue INT,
    effective_date DATE,
    is_current BOOLEAN,
    version INT,
    end_date DATE
) USING iceberg
"""

spark.sql(schema)


DataFrame[]

In [None]:
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType

# Establishing Spark session
spark = SparkSession.builder \
    .appName("SCD2 Example using Iceberg") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .getOrCreate()

# Define the schema with StringType for date fields
schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("customer_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("revenue", IntegerType(), True),
    StructField("effective_date", StringType(), True),  # Use StringType initially
    StructField("is_current", BooleanType(), True),
    StructField("version", IntegerType(), True),
    StructField("end_date", StringType(), True)  # Use StringType initially
])

# Creating new data with date initially as string
new_data = [
    (1, "John Doe Updated", "john.doe@update.com", 1500, "2021-05-01", True, 2, None),  # Date as string
    (1, "John Doe", "john.doe@email.com", 1900, "2020-01-01", True, 1, None),
    (1, "John Doe", "john.new@email.com", 1000, "2020-06-01", False, 2, "2021-01-01"),
    (1, "John Doe II", "john.newII@email.com", 1200, "2021-01-02", True, 3, None),
    (2, "Jane Smith", "jane.smith@email.com", 1500, "2020-01-01", False, 1, "2020-09-30"),
    (2, "Jane S. Doe", "jane.s.doe@email.com", 1600, "2020-10-01", True, 2, None),
    (3, "Bob Brown", "bob.brown@email.com", 800, "2020-01-01", False, 1, "2020-08-15"),
    (3, "Bobby Brown", "bobby.br@email.com", 950, "2020-08-16", True, 2, None)
]

# Create DataFrame with string dates
df = spark.createDataFrame(new_data, schema=schema)

# Convert 'effective_date' and 'end_date' from string to DateType using to_date function
df = df.withColumn("effective_date", to_date(col("effective_date"), "yyyy-MM-dd"))
df = df.withColumn("end_date", to_date(col("end_date"), "yyyy-MM-dd"))

# Showing the DataFrame to verify the results
df.show()

+-----------+----------------+--------------------+-------+--------------+----------+-------+----------+
|customer_id|   customer_name|               email|revenue|effective_date|is_current|version|  end_date|
+-----------+----------------+--------------------+-------+--------------+----------+-------+----------+
|          1|John Doe Updated| john.doe@update.com|   1500|    2021-05-01|      true|      2|      null|
|          1|        John Doe|  john.doe@email.com|   1900|    2020-01-01|      true|      1|      null|
|          1|        John Doe|  john.new@email.com|   1000|    2020-06-01|     false|      2|2021-01-01|
|          1|     John Doe II|john.newII@email.com|   1200|    2021-01-02|      true|      3|      null|
|          2|      Jane Smith|jane.smith@email.com|   1500|    2020-01-01|     false|      1|2020-09-30|
|          2|     Jane S. Doe|jane.s.doe@email.com|   1600|    2020-10-01|      true|      2|      null|
|          3|       Bob Brown| bob.brown@email.com|    

In [None]:
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, lit, current_date
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType
from pyspark.sql.window import Window


# Define a window specification to get the latest record for deduplication
window_spec = Window.partitionBy("customer_id").orderBy(col("version").desc())

# Creating a data set that includes updates (marking old records as not current)
updates = current_data.withColumn("is_current", lit(False)) \
                      .withColumn("end_date", lit(current_date()))

# Append new data to updates without duplicates
deduplicated_data = new_df.withColumn("row_number", row_number().over(window_spec)) \
                          .filter(col("row_number") == 1).drop("row_number")

# Handle condition: if record exists in both new and old data, take from new data and update old
final_data = updates.unionByName(deduplicated_data).dropDuplicates(["customer_id"])

# Write results back to the Iceberg table
final_data.write.format("iceberg").mode("append").option("overwrite-mode", "dynamic").saveAsTable("local.default.customer_data")

# Verify the results
result_df = spark.table("local.default.customer_data")
result_df.show()

+-----------+----------------+--------------------+-------+--------------+----------+-------+--------+
|customer_id|   customer_name|               email|revenue|effective_date|is_current|version|end_date|
+-----------+----------------+--------------------+-------+--------------+----------+-------+--------+
|          1|John Doe Updated| john.doe@update.com|   1100|    2021-05-01|      true|      2|    null|
|          3|    New Customer|new.customer@emai...|    500|    2021-05-01|      true|      1|    null|
+-----------+----------------+--------------------+-------+--------------+----------+-------+--------+



In [None]:
result_df = spark.sql("SELECT * FROM local.default.customer_data")
result_df.show()


+-----------+----------------+--------------------+-------+--------------+----------+-------+--------+
|customer_id|   customer_name|               email|revenue|effective_date|is_current|version|end_date|
+-----------+----------------+--------------------+-------+--------------+----------+-------+--------+
|          1|John Doe Updated| john.doe@update.com|   1100|    2021-05-01|      true|      2|    null|
|          3|    New Customer|new.customer@emai...|    500|    2021-05-01|      true|      1|    null|
+-----------+----------------+--------------------+-------+--------------+----------+-------+--------+

