# Landing Salers

Pipeline de chargement des données commerciaux (Excel) vers le Lakehouse Fabric.

## Architecture

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                           FABRIC LAKEHOUSE                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  Excel "Contacts RAE RSM BD for Leads.xlsx" (3 feuilles):                  │
│                                                                             │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │ RAE_RSM + emails  ──►  landing_salers (par PAYS)                    │   │
│  │   - Assignation par pays                                            │   │
│  │   - JOIN opportunites sur country                                   │   │
│  │   - Voit TOUTES les verticales de son pays                          │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │ BD + emails  ──►  landing_salers_bd (par VERTICALE)                 │   │
│  │   - Assignation par market_segment                                  │   │
│  │   - JOIN opportunites sur market_segment                            │   │
│  │   - Voit TOUS les pays de sa verticale                              │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
│  Note: 2 profils hybrides (Zohar Pajela, Andre Pichette) sont dans les 2  │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
```

## 1. Configuration

In [None]:
# =============================================================================
# CONFIGURATION
# =============================================================================

# Mode de chargement: "one_time", "incremental", "full_refresh"
LOAD_MODE = "one_time"  # Premiere execution: charger tout

# Chemin vers le fichier Excel dans le Lakehouse Fabric
EXCEL_PATH = "/lakehouse/default/Files/"
EXCEL_FILENAME = "Contacts RAE RSM BD for Leads.xlsx"

# Noms des feuilles Excel
SHEET_RAE_RSM = "RAE_RSM"    # Feuille des commerciaux (RAE + RSM) - par pays
SHEET_EMAILS = "emails"       # Feuille des emails (Name, Email)
SHEET_BD = "BD"               # Feuille BD - par verticale (Market segment)

# Lakehouse Tables
TABLE_SALERS = "landing_salers"        # RAE/RSM par pays
TABLE_SALERS_BD = "landing_salers_bd"  # BD par verticale

print(f"Mode: {LOAD_MODE}")
print(f"Excel: {EXCEL_PATH}{EXCEL_FILENAME}")
print(f"Feuilles: {SHEET_RAE_RSM}, {SHEET_EMAILS}, {SHEET_BD}")
print(f"Tables cibles: {TABLE_SALERS} (par pays), {TABLE_SALERS_BD} (par verticale)")

## 2. Imports et Setup

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import (
    col, current_timestamp, lit, trim, upper, lower,
    regexp_replace, when
)
from datetime import datetime
from delta.tables import DeltaTable
import pandas as pd

spark = SparkSession.builder.getOrCreate()
print("Spark session ready")

## 3. Schema de la table Salers

In [None]:
# Schema pour RAE/RSM (par pays) - sans market_segment
schema_salers = StructType([
    StructField("sales_zone", StringType(), True),      # EMEA, APAC, AMERICAS
    StructField("sales_region", StringType(), True),    # Region de vente
    StructField("country_code", StringType(), True),    # Code pays (FR, US, etc.)
    StructField("country", StringType(), True),         # Nom du pays
    StructField("who", StringType(), True),             # Nom du commercial
    StructField("role", StringType(), True),            # Role (RAE, RSM)
    StructField("email", StringType(), True),           # Email du commercial
    # Metadata
    StructField("loaded_at", TimestampType(), True),
    StructField("updated_at", TimestampType(), True),
])

# Schema pour BD (par verticale)
schema_salers_bd = StructType([
    StructField("market_segment", StringType(), True),  # Verticale (Live Events, Install...)
    StructField("who", StringType(), True),             # Nom du BD
    StructField("role", StringType(), True),            # Role (BD)
    StructField("email", StringType(), True),           # Email
    # Metadata
    StructField("loaded_at", TimestampType(), True),
    StructField("updated_at", TimestampType(), True),
])

print(f"Schema landing_salers: {len(schema_salers.fields)} colonnes (par pays)")
print(f"Schema landing_salers_bd: {len(schema_salers_bd.fields)} colonnes (par verticale)")

## 4. Lecture du fichier Excel (3 feuilles)

In [None]:
# Construire le chemin complet
excel_full_path = f"{EXCEL_PATH}{EXCEL_FILENAME}"

print(f"Lecture du fichier: {excel_full_path}")

# 1. Lire la feuille RAE_RSM (commerciaux)
print(f"\n1. Lecture feuille '{SHEET_RAE_RSM}'...")
df_rae_rsm = pd.read_excel(excel_full_path, sheet_name=SHEET_RAE_RSM)
print(f"   {len(df_rae_rsm)} lignes chargees")
print(f"   Colonnes: {list(df_rae_rsm.columns)}")

# 2. Lire la feuille emails
print(f"\n2. Lecture feuille '{SHEET_EMAILS}'...")
df_emails = pd.read_excel(excel_full_path, sheet_name=SHEET_EMAILS)
print(f"   {len(df_emails)} lignes chargees")
print(f"   Colonnes: {list(df_emails.columns)}")

# 3. Lire la feuille BD (Market segment)
print(f"\n3. Lecture feuille '{SHEET_BD}'...")
df_bd = pd.read_excel(excel_full_path, sheet_name=SHEET_BD)
print(f"   {len(df_bd)} lignes chargees")
print(f"   Colonnes: {list(df_bd.columns)}")

In [None]:
# Apercu des donnees RAE_RSM
print("Apercu RAE_RSM:")
df_rae_rsm.head(10)

In [None]:
# Apercu des donnees BD (Market segment)
print("Apercu BD (Market segment):")
df_bd.head(10)

## 5. Jointure emails (Who = Name)

In [None]:
# Trouver le nom de la colonne email dans df_emails
email_col = None
for c in df_emails.columns:
    if 'mail' in c.lower():
        email_col = c
        break

name_col = None
for c in df_emails.columns:
    if c.lower() in ['name', 'nom', 'who']:
        name_col = c
        break

print(f"Colonne Name detectee: '{name_col}'")
print(f"Colonne Email detectee: '{email_col}'")

# Renommer et preparer pour la jointure
df_emails_clean = df_emails[[name_col, email_col]].copy()
df_emails_clean.columns = ['Who', 'email']
df_emails_clean = df_emails_clean.drop_duplicates(subset=['Who'], keep='first')

# JOIN RAE_RSM avec emails
df_with_email = df_rae_rsm.merge(df_emails_clean, on='Who', how='left')
print(f"\nApres jointure emails: {len(df_with_email)} lignes")

## 6. Preparation des emails (normalises)

In [None]:
# Preparer la table emails avec normalisation pour jointures
df_emails_clean = df_emails[[name_col, email_col]].copy()
df_emails_clean.columns = ['Name', 'email']
df_emails_clean['Name_normalized'] = df_emails_clean['Name'].str.strip().str.lower()
df_emails_clean = df_emails_clean.drop_duplicates(subset=['Name_normalized'], keep='first')

print(f"{len(df_emails_clean)} emails uniques prepares")

In [None]:
## 7. Table 1: landing_salers (RAE/RSM par pays)

In [None]:
# RAE_RSM + emails (jointure sur Who = Name)
df_rae_rsm['Who_normalized'] = df_rae_rsm['Who'].str.strip().str.lower()

df_salers = df_rae_rsm.merge(
    df_emails_clean[['Name_normalized', 'email']],
    left_on='Who_normalized',
    right_on='Name_normalized',
    how='left'
)

# Nettoyer colonnes temporaires
df_salers = df_salers.drop(columns=['Who_normalized', 'Name_normalized'], errors='ignore')

print(f"landing_salers: {len(df_salers)} lignes")
print(f"Avec email: {df_salers['email'].notna().sum()}")
print(f"\nApercu:")
df_salers.head(10)

In [None]:
## 8. Table 2: landing_salers_bd (BD par verticale)

In [None]:
# BD + emails (jointure sur Who = Name)
df_bd['Who_normalized'] = df_bd['Who'].str.strip().str.lower()

df_salers_bd = df_bd.merge(
    df_emails_clean[['Name_normalized', 'email']],
    left_on='Who_normalized',
    right_on='Name_normalized',
    how='left'
)

# Nettoyer colonnes temporaires
df_salers_bd = df_salers_bd.drop(columns=['Who_normalized', 'Name_normalized'], errors='ignore')

print(f"landing_salers_bd: {len(df_salers_bd)} lignes")
print(f"Avec email: {df_salers_bd['email'].notna().sum()}")
print(f"\nDistribution par Market segment:")
print(df_salers_bd['Market segment'].value_counts().to_string())
print(f"\nApercu:")
df_salers_bd.head(10)

In [None]:
## 9. Conversion en Spark DataFrames

In [None]:
# Convertir en Spark DataFrames
sdf_salers_raw = spark.createDataFrame(df_salers)
sdf_salers_bd_raw = spark.createDataFrame(df_salers_bd)

print(f"Spark DataFrame landing_salers: {sdf_salers_raw.count()} lignes")
print(f"Spark DataFrame landing_salers_bd: {sdf_salers_bd_raw.count()} lignes")

In [None]:
## 10. Transformation des donnees

In [None]:
# Verifier que les DataFrames existent, sinon les recreer
try:
    sdf_salers_raw
except NameError:
    print("Recreation des DataFrames Spark...")
    sdf_salers_raw = spark.createDataFrame(df_salers)
    sdf_salers_bd_raw = spark.createDataFrame(df_salers_bd)

# Transformation landing_salers (RAE/RSM par pays)
def transform_salers(df):
    df_renamed = df \
        .withColumnRenamed("Sales Zone", "sales_zone") \
        .withColumnRenamed("Sales Region", "sales_region") \
        .withColumnRenamed("Code", "country_code") \
        .withColumnRenamed("Country", "country") \
        .withColumnRenamed("Who", "who") \
        .withColumnRenamed("Role", "role")
    
    df_clean = df_renamed \
        .withColumn("sales_zone", upper(trim(col("sales_zone")))) \
        .withColumn("sales_region", trim(col("sales_region"))) \
        .withColumn("country_code", upper(trim(col("country_code")))) \
        .withColumn("country", trim(col("country"))) \
        .withColumn("who", trim(col("who"))) \
        .withColumn("role", upper(trim(col("role")))) \
        .withColumn("email", lower(trim(col("email"))))
    
    df_filtered = df_clean.filter(col("country_code").isNotNull() & (col("country_code") != ""))
    
    return df_filtered \
        .withColumn("loaded_at", current_timestamp()) \
        .withColumn("updated_at", current_timestamp()) \
        .select("sales_zone", "sales_region", "country_code", "country", "who", "role", "email", "loaded_at", "updated_at")

# Transformation landing_salers_bd (BD par verticale)
def transform_salers_bd(df):
    df_renamed = df \
        .withColumnRenamed("Market segment", "market_segment") \
        .withColumnRenamed("Who", "who") \
        .withColumnRenamed("Role", "role")
    
    df_clean = df_renamed \
        .withColumn("market_segment", trim(col("market_segment"))) \
        .withColumn("who", trim(col("who"))) \
        .withColumn("role", upper(trim(col("role")))) \
        .withColumn("email", lower(trim(col("email"))))
    
    df_filtered = df_clean.filter(col("market_segment").isNotNull() & (col("market_segment") != ""))
    
    return df_filtered \
        .withColumn("loaded_at", current_timestamp()) \
        .withColumn("updated_at", current_timestamp()) \
        .select("market_segment", "who", "role", "email", "loaded_at", "updated_at")

# Appliquer transformations
sdf_salers = transform_salers(sdf_salers_raw)
sdf_salers_bd = transform_salers_bd(sdf_salers_bd_raw)

print(f"landing_salers transforme: {sdf_salers.count()} lignes")
sdf_salers.printSchema()

print(f"\nlanding_salers_bd transforme: {sdf_salers_bd.count()} lignes")
sdf_salers_bd.printSchema()

In [None]:
# Apercu des donnees transformees
print("Apercu landing_salers (par pays):")
sdf_salers.show(10, truncate=False)

print("\nApercu landing_salers_bd (par verticale):")
sdf_salers_bd.show(10, truncate=False)

In [None]:
## 11. Chargement des tables Delta

In [None]:
# =============================================================================
# CHARGEMENT TABLE 1: landing_salers (RAE/RSM par pays)
# =============================================================================
print(f"=== Chargement {TABLE_SALERS} ===\n")

if LOAD_MODE == "one_time":
    sdf_salers.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(TABLE_SALERS)
    print(f"Table '{TABLE_SALERS}' creee avec {sdf_salers.count()} enregistrements")

elif LOAD_MODE == "incremental":
    if not spark.catalog.tableExists(TABLE_SALERS):
        sdf_salers.write.format("delta").mode("overwrite").saveAsTable(TABLE_SALERS)
    else:
        delta_table = DeltaTable.forName(spark, TABLE_SALERS)
        delta_table.alias("target").merge(
            sdf_salers.alias("source"),
            "target.country_code = source.country_code AND target.who = source.who"
        ).whenMatchedUpdate(set={
            "sales_zone": "source.sales_zone",
            "sales_region": "source.sales_region",
            "country": "source.country",
            "role": "source.role",
            "email": "source.email",
            "updated_at": "source.updated_at"
        }).whenNotMatchedInsertAll().execute()
    total = spark.sql(f"SELECT COUNT(*) FROM {TABLE_SALERS}").collect()[0][0]
    print(f"Table '{TABLE_SALERS}': {total} enregistrements")

elif LOAD_MODE == "full_refresh":
    sdf_salers.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(TABLE_SALERS)
    spark.sql(f"VACUUM {TABLE_SALERS} RETAIN 168 HOURS")
    print(f"Table '{TABLE_SALERS}' recreee avec {sdf_salers.count()} enregistrements")

In [None]:
# =============================================================================
# CHARGEMENT TABLE 2: landing_salers_bd (BD par verticale)
# =============================================================================
print(f"=== Chargement {TABLE_SALERS_BD} ===\n")

if LOAD_MODE == "one_time":
    sdf_salers_bd.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(TABLE_SALERS_BD)
    print(f"Table '{TABLE_SALERS_BD}' creee avec {sdf_salers_bd.count()} enregistrements")

elif LOAD_MODE == "incremental":
    if not spark.catalog.tableExists(TABLE_SALERS_BD):
        sdf_salers_bd.write.format("delta").mode("overwrite").saveAsTable(TABLE_SALERS_BD)
    else:
        delta_table = DeltaTable.forName(spark, TABLE_SALERS_BD)
        delta_table.alias("target").merge(
            sdf_salers_bd.alias("source"),
            "target.market_segment = source.market_segment AND target.who = source.who"
        ).whenMatchedUpdate(set={
            "role": "source.role",
            "email": "source.email",
            "updated_at": "source.updated_at"
        }).whenNotMatchedInsertAll().execute()
    total = spark.sql(f"SELECT COUNT(*) FROM {TABLE_SALERS_BD}").collect()[0][0]
    print(f"Table '{TABLE_SALERS_BD}': {total} enregistrements")

elif LOAD_MODE == "full_refresh":
    sdf_salers_bd.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(TABLE_SALERS_BD)
    spark.sql(f"VACUUM {TABLE_SALERS_BD} RETAIN 168 HOURS")
    print(f"Table '{TABLE_SALERS_BD}' recreee avec {sdf_salers_bd.count()} enregistrements")

In [None]:
## 12. Validation et Statistiques

In [None]:
# =============================================================================
# STATISTIQUES TABLE 1: landing_salers (par pays)
# =============================================================================
print(f"=== Statistiques {TABLE_SALERS} (par pays) ===\n")

total = spark.sql(f"SELECT COUNT(*) FROM {TABLE_SALERS}").collect()[0][0]
zones = spark.sql(f"SELECT COUNT(DISTINCT sales_zone) FROM {TABLE_SALERS}").collect()[0][0]
countries = spark.sql(f"SELECT COUNT(DISTINCT country) FROM {TABLE_SALERS}").collect()[0][0]
persons = spark.sql(f"SELECT COUNT(DISTINCT who) FROM {TABLE_SALERS}").collect()[0][0]
with_email = spark.sql(f"SELECT COUNT(*) FROM {TABLE_SALERS} WHERE email IS NOT NULL AND email != ''").collect()[0][0]

print(f"Total lignes: {total}")
print(f"Zones distinctes: {zones}")
print(f"Pays distincts: {countries}")
print(f"Personnes distinctes: {persons}")
print(f"Avec email: {with_email} ({with_email/total*100:.1f}%)")

print("\nDistribution par Zone:")
spark.sql(f"""
    SELECT sales_zone, COUNT(*) as total, COUNT(DISTINCT country) as nb_pays, COUNT(DISTINCT who) as nb_personnes
    FROM {TABLE_SALERS}
    GROUP BY sales_zone ORDER BY total DESC
""").show(truncate=False)

print("Distribution par Role:")
spark.sql(f"""
    SELECT role, COUNT(*) as total, COUNT(DISTINCT who) as nb_personnes
    FROM {TABLE_SALERS}
    GROUP BY role ORDER BY total DESC
""").show(truncate=False)

In [None]:
# =============================================================================
# STATISTIQUES TABLE 2: landing_salers_bd (par verticale)
# =============================================================================
print(f"=== Statistiques {TABLE_SALERS_BD} (par verticale) ===\n")

total_bd = spark.sql(f"SELECT COUNT(*) FROM {TABLE_SALERS_BD}").collect()[0][0]
segments = spark.sql(f"SELECT COUNT(DISTINCT market_segment) FROM {TABLE_SALERS_BD}").collect()[0][0]
persons_bd = spark.sql(f"SELECT COUNT(DISTINCT who) FROM {TABLE_SALERS_BD}").collect()[0][0]
with_email_bd = spark.sql(f"SELECT COUNT(*) FROM {TABLE_SALERS_BD} WHERE email IS NOT NULL AND email != ''").collect()[0][0]

print(f"Total lignes: {total_bd}")
print(f"Market segments distincts: {segments}")
print(f"Personnes distinctes: {persons_bd}")
print(f"Avec email: {with_email_bd} ({with_email_bd/total_bd*100:.1f}%)")

print("\nDistribution par Market Segment:")
spark.sql(f"""
    SELECT market_segment, COUNT(*) as total, COUNT(DISTINCT who) as nb_personnes
    FROM {TABLE_SALERS_BD}
    GROUP BY market_segment ORDER BY total DESC
""").show(truncate=False)

print("Liste des BD:")
spark.sql(f"""
    SELECT market_segment, who, email
    FROM {TABLE_SALERS_BD}
    ORDER BY market_segment, who
""").show(50, truncate=False)

In [None]:
# Identifier les profils hybrides (present dans les 2 tables)
print("=== Profils hybrides (dans les 2 tables) ===\n")
spark.sql(f"""
    SELECT s.who, s.country, s.role as role_pays, bd.market_segment, bd.role as role_verticale
    FROM {TABLE_SALERS} s
    INNER JOIN {TABLE_SALERS_BD} bd ON LOWER(TRIM(s.who)) = LOWER(TRIM(bd.who))
""").show(truncate=False)

## 13. Historique Delta

In [None]:
print(f"Historique Delta de '{TABLE_SALERS}':")
if spark.catalog.tableExists(TABLE_SALERS):
    spark.sql(f"DESCRIBE HISTORY {TABLE_SALERS}").select("version", "timestamp", "operation").show(5, truncate=50)

print(f"\nHistorique Delta de '{TABLE_SALERS_BD}':")
if spark.catalog.tableExists(TABLE_SALERS_BD):
    spark.sql(f"DESCRIBE HISTORY {TABLE_SALERS_BD}").select("version", "timestamp", "operation").show(5, truncate=50)

---

## Schema des tables

### landing_salers (par pays)
| Colonne | Type | Description |
|---------|------|-------------|
| `sales_zone` | String | Zone (EMEA, APAC, AMERICAS) |
| `sales_region` | String | Region de vente |
| `country_code` | String | Code pays (FR, US...) |
| `country` | String | Nom du pays |
| `who` | String | Nom du commercial |
| `role` | String | Role (RAE, RSM) |
| `email` | String | Email du commercial |
| `loaded_at` | Timestamp | Date de chargement |
| `updated_at` | Timestamp | Date de mise a jour |

**Usage Warehouse**: JOIN sur `country` avec les opportunites

### landing_salers_bd (par verticale)
| Colonne | Type | Description |
|---------|------|-------------|
| `market_segment` | String | Verticale (Live Events, Install...) |
| `who` | String | Nom du BD |
| `role` | String | Role (BD) |
| `email` | String | Email |
| `loaded_at` | Timestamp | Date de chargement |
| `updated_at` | Timestamp | Date de mise a jour |

**Usage Warehouse**: JOIN sur `market_segment` avec les opportunites