In [0]:
# =============================================================================
# 📊 PIPELINE ETL - Offres d'Emploi IA/ML France
# =============================================================================

# Charger le fichier CSV depuis le Volume
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Volumes/workspace/default/data_emploi/offres_ia_france_20260212_144803.csv")

# Afficher les premières lignes
display(df)

id,titre,entreprise,type_contrat,type_contrat_libelle,experience,salaire,ville,code_postal,departement,region,competences,nombre_competences,date_creation,date_actualisation,description,url,secteur_activite,qualification,mot_cle_recherche
203RZPZ,Spécialiste IA / Machine Learning Engineer (H/F),R(E) GREEN,CDI,CDI,48 Mois,Annuel de 50000.0 Euros à 55000.0 Euros sur 12.0 mois,75 - PARIS 09,75009,75,Île-de-France,"Développer des modèles prédictifs pour l'analyse de données, Documenter les processus et les architectures d'IA, Optimiser les performances des systèmes d'IA",3,2026-02-06 10:30:38.425000+00:00,2026-02-06 10:30:38.425000+00:00,Spécialiste IA / Machine Learning Engineer,,,,
Développement d'API IA & entraînement de modèles propriétaires,,,,,,,,,,,,,,,,,,,
Gecob.tech,,,,,,,,,,,,,,,,,,,
France / Remote possible,,,,,,,,,,,,,,,,,,,
CDI ou Freelance longue durée,,,,,,,,,,,,,,,,,,,
Contexte,,,,,,,,,,,,,,,,,,,
Gecob.tech est une plateforme SaaS et DeepTech dédiée à l'immobilier,à la data et à la décarbonation,avec une forte ambition technologique :,,,,,,,,,,,,,,,,,
développer et entraîner des modèles d'IA propriétaires,,,,,,,,,,,,,,,,,,,
connecter intelligemment des API externes,,,,,,,,,,,,,,,,,,,
industrialiser des traitements IA à forte valeur métier.,,,,,,,,,,,,,,,,,,,


In [0]:
# Afficher le schéma des données
print("📋 SCHÉMA DES DONNÉES:")
df.printSchema()

# Compter le nombre de lignes
nb_lignes = df.count()
print(f"\n📊 NOMBRE TOTAL D'OFFRES: {nb_lignes}")

📋 SCHÉMA DES DONNÉES:
root
 |-- id: string (nullable = true)
 |-- titre: string (nullable = true)
 |-- entreprise: string (nullable = true)
 |-- type_contrat: string (nullable = true)
 |-- type_contrat_libelle: string (nullable = true)
 |-- experience: string (nullable = true)
 |-- salaire: string (nullable = true)
 |-- ville: string (nullable = true)
 |-- code_postal: string (nullable = true)
 |-- departement: string (nullable = true)
 |-- region: string (nullable = true)
 |-- competences: string (nullable = true)
 |-- nombre_competences: string (nullable = true)
 |-- date_creation: string (nullable = true)
 |-- date_actualisation: string (nullable = true)
 |-- description: string (nullable = true)
 |-- url: string (nullable = true)
 |-- secteur_activite: string (nullable = true)
 |-- qualification: string (nullable = true)
 |-- mot_cle_recherche: string (nullable = true)


📊 NOMBRE TOTAL D'OFFRES: 6229


In [0]:
from pyspark.sql.functions import col, when, trim, lower, count, desc

# =============================================================================
# 🧹 NETTOYAGE DES DONNÉES
# =============================================================================

# 1. Supprimer les doublons
df_clean = df.dropDuplicates(["id"])

# 2. Nettoyer les valeurs nulles et "Non précisé"
df_clean = df_clean.withColumn(
    "region", 
    when(col("region").isNull() | (col("region") == ""), "Non précisé")
    .otherwise(col("region"))
)

df_clean = df_clean.withColumn(
    "entreprise",
    when(col("entreprise").isNull() | (col("entreprise") == ""), "Non précisé")
    .otherwise(col("entreprise"))
)

# 3. Nettoyer les espaces
df_clean = df_clean.withColumn("titre", trim(col("titre")))
df_clean = df_clean.withColumn("region", trim(col("region")))

# Afficher les stats
print(f"📊 Avant nettoyage: {df.count()} lignes")
print(f"✅ Après nettoyage: {df_clean.count()} lignes")
print(f"🗑️  Doublons supprimés: {df.count() - df_clean.count()}")

📊 Avant nettoyage: 6229 lignes
✅ Après nettoyage: 5096 lignes
🗑️  Doublons supprimés: 1133


In [0]:
print("🗺️ TOP 10 RÉGIONS:")
df_regions = df_clean.groupBy("region").count().orderBy(desc("count"))
display(df_regions)

🗺️ TOP 10 RÉGIONS:


region,count
Non précisé,3636
Île-de-France,447
Occitanie,185
Auvergne-Rhône-Alpes,151
PACA,123
Nouvelle-Aquitaine,100
Pays de la Loire,99
Bretagne,92
Hauts-de-France,80
Grand Est,58


In [0]:
# 2. Offres par type de contrat
print("📝 RÉPARTITION PAR TYPE DE CONTRAT:")
df_contrats = df_clean.groupBy("type_contrat_libelle").count().orderBy(desc("count"))
display(df_contrats)

📝 RÉPARTITION PAR TYPE DE CONTRAT:


type_contrat_libelle,count
,2444
CDI,1362
Intelligence Artificielle,334
Machine Learning,90
Data Engineer,90
Intérim - 1 Mois,84
CDD - 12 Mois,80
Cadre,57
CDD - 36 Mois,44
Data Scientist,26


In [0]:
# =============================================================================
# 💾 SAUVEGARDE EN DELTA LAKE
# =============================================================================

# 1. Sauvegarder la table principale nettoyée
df_clean.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.default.offres_emploi_ia")

print("✅ Table 'offres_emploi_ia' sauvegardée en Delta Lake!")

# 2. Sauvegarder l'agrégation par région
df_regions.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.default.offres_par_region")

print("✅ Table 'offres_par_region' sauvegardée!")

# 3. Sauvegarder l'agrégation par contrat
df_contrats.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.default.offres_par_contrat")

print("✅ Table 'offres_par_contrat' sauvegardée!")

print("\n🎉 PIPELINE ETL TERMINÉ!")

✅ Table 'offres_emploi_ia' sauvegardée en Delta Lake!
✅ Table 'offres_par_region' sauvegardée!
✅ Table 'offres_par_contrat' sauvegardée!

🎉 PIPELINE ETL TERMINÉ!
