# Tranformations example

- Détecte automatiquement l'environnement d'exécution (développeur vs production) en vérifiant si le nom d'utilisateur contient '@'
- Pour les développeurs, utilise un catalog personnalisé sandbox-{username}_sales et génère des données de test si les tables n'existent pas


In [1]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType


def get_test_products_df():
    """
    Crée un DataFrame de test pour les transactions
    """
    schema = StructType([
        StructField("transaction_id", IntegerType(), True),
        StructField("customer_id", IntegerType(), True),
        StructField("product_id", IntegerType(), True),
        StructField("amount", DoubleType(), True),
        StructField("transaction_date", StringType(), True)
    ])
    
    data = [
        (1, 101, 201, 100.0, "2023-01-01"),
        (2, 102, 202, 150.0, "2023-01-02"),
        (3, 103, 203, 200.0, "2023-01-03"),
        (4, 101, 202, 120.0, "2023-01-04"),
        (5, 104, 201, 80.0, "2023-01-05")
    ]
    return spark.createDataFrame(data, schema)
  


In [2]:
def table_exists(table_name):
    """
    Vérifie si une table existe
    """
    try:
        spark.sqlf("SELECT * FROM {table_name}")     
        return True
    except:
        return False


In [None]:
def read_table_or_test_data(table_name, test_df_func, is_sandbox):
    """
    Lit une table si elle existe, sinon retourne des données de test en mode sandbox
    """
    print(f"Tentative de lecture de la table: {table_name}")
    
    if table_exists(table_name):
        df = spark.sql(f"SELECT * FROM {table_name}")
        print(f"Table {table_name} lue avec succès: {df.count()} lignes")
        return df
    else:
        print(f"Table {table_name} introuvable")
        test_df = test_df_func()
        print(f"Données de test générées: {test_df.count()} lignes")
        test_df.write.saveAsTable(table_name, mode="overwrite")
        return test_df


In [None]:
from pyspark.sql.functions import current_date,col

def transform(catalog : str, is_sandbox : bool):
    """
    Fonction principale de transformation
    """
        
    
    # Table: Produits
    products_table = f"users.gregoire_portier.{catalog}_silver_products"
    products_df = read_table_or_test_data(
        products_table, 
        get_test_products_df, 
        is_sandbox
    )
    
    # Appliquer les transformations
    print("Application des transformations")
    result_df = products_df.withColumn("processing_date", current_date())
    
    
    
    # En mode sandbox, toujours créer une vue temporaire pour faciliter l'exploration
    if is_sandbox:        
        print("\nAperçu des résultats:")
        result_df.show(truncate=False)
    
    # Écriture du résultat
    output_table = f"users.gregoire_portier.{catalog}_silver_products"
    print(f"Écriture dans la table: {output_table}")

    try:
        result_df.write.mode("overwrite").saveAsTable(output_table, mode="overwrite")
        print(f"Données écrites avec succès dans la table: {output_table}")
        print("Transformation terminée avec succès!")

    except Exception as e:
        raise Exception(f"Erreur lors de l'écriture dans {output_table}: {str(e)}")    

    return result_df


In [5]:
from commons.utils import get_catalog

# Récupérer le catalog 
catalog = get_catalog()
is_sandbox = "sandbox" in catalog

print(f"Démarrage de la transformation avec catalog: {catalog}")
print(f"Mode sandbox: {is_sandbox}")

# Executer la transformation 
result = transform(catalog, is_sandbox)

# For demo debugging 
if result.isEmpty():
    is_empty = True
    print("DATAFRAME EMPTY)")
else:
    is_empty = False
    print("DATAFRAME NOT EMPTY")






Démarrage de la transformation avec catalog: sandbox_gregoire_portier
Mode sandbox: True
Tentative de lecture de la table: users.gregoire_portier.sandbox_gregoire_portier_bronze_products
Table users.gregoire_portier.sandbox_gregoire_portier_bronze_products introuvable
Données de test générées: 5 lignes
Application des transformations

Aperçu des résultats:
+--------------+-----------+----------+------+----------------+---------------+
|transaction_id|customer_id|product_id|amount|transaction_date|processing_date|
+--------------+-----------+----------+------+----------------+---------------+
|1             |101        |201       |100.0 |2023-01-01      |2025-03-16     |
|2             |102        |202       |150.0 |2023-01-02      |2025-03-16     |
|3             |103        |203       |200.0 |2023-01-03      |2025-03-16     |
|4             |101        |202       |120.0 |2023-01-04      |2025-03-16     |
|5             |104        |201       |80.0  |2023-01-05      |2025-03-16     |
+