In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, udf, concat_ws, to_date, lit
from pyspark.sql.types import StringType, ArrayType
from shapely.geometry import shape, Point

In [8]:
# 1. Initialisation
spark = SparkSession.builder \
    .appName("Datathon_Lyon_Final_All_Points") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

In [9]:
# ---------------------------------------------------------
# ETAPE 1 : CHARGEMENT ET FILTRAGE TEMPOREL DES TRAVAUX
# ---------------------------------------------------------
DATE_DEBUT_COLLECTE = "2025-02-28"
DATE_FIN_COLLECTE = "2025-06-02"

# Chargement du GeoJSON des travaux
df_travaux_raw = spark.read.option("multiLine", True).json("../data_travaux/metropole-de-lyon_lyv_lyvia.lyvchantier.json")

# Aplatissement et sélection
df_travaux = df_travaux_raw.select(explode("features").alias("feature")) \
    .select(
        col("feature.properties.gid").alias("work_id"),
        col("feature.properties.nature_chantier").alias("nom_chantier"),
        to_date(col("feature.properties.date_debut")).alias("date_debut"),
        to_date(col("feature.properties.date_fin")).alias("date_fin"),
        col("feature.geometry").alias("geometry_struct")
    )

# Filtre Temporel : Travaux actifs pendant la collecte
df_travaux_filtered = df_travaux.filter(
    (col("date_debut") <= lit(DATE_FIN_COLLECTE)) &
    (col("date_fin") >= lit(DATE_DEBUT_COLLECTE))
)
print(f"Travaux actifs retenus : {df_travaux_filtered.count()}")

# Préparation du Broadcast (Géométries)
travaux_list = df_travaux_filtered.collect()
prepared_geometries = []

for row in travaux_list:
    try:
        if row['geometry_struct']:
            geo_dict = row['geometry_struct'].asDict(recursive=True)
            geom = shape(geo_dict)
            # Buffer de ~5m
            geom_buffer = geom.buffer(0.00005)
            prepared_geometries.append({
                "id": row['work_id'],
                "geom": geom_buffer
            })
    except Exception:
        continue

bc_travaux = spark.sparkContext.broadcast(prepared_geometries)

Travaux actifs retenus : 91


In [10]:
# ---------------------------------------------------------
# ETAPE 2 : CHARGEMENT DES INCIDENTS (SANS FILTRE DESCRIPTION)
# ---------------------------------------------------------
df_incidents_raw = spark.read.option("multiLine", True).json("../data_coord/points-rouges-200046977.geojson")

df_incidents = df_incidents_raw.select(explode("features").alias("feature")) \
    .select(
        col("feature.properties.description").alias("description"),
        col("feature.properties.commune").alias("commune"),
        col("feature.geometry.coordinates").alias("coords")
    )

# --> ICI : AUCUN FILTRE SUR LA DESCRIPTION <--
print(f"Total des incidents chargés : {df_incidents.count()}")

Total des incidents chargés : 24256


In [11]:
# ---------------------------------------------------------
# ETAPE 3 : CROISEMENT SPATIAL
# ---------------------------------------------------------
def check_inclusion_optim(coords):
    if not coords or len(coords) < 2:
        return None
    point = Point(coords[0], coords[1])
    matches = []
    for work in bc_travaux.value:
        if work["geom"].contains(point):
            matches.append(work["id"])
    return matches if len(matches) > 0 else None

join_udf = udf(check_inclusion_optim, ArrayType(StringType()))

print("Lancement du croisement spatial...")
df_resultat = df_incidents.withColumn("travaux_id_match", join_udf(col("coords")))

# On ne garde que les incidents liés à un chantier
df_final = df_resultat.filter(col("travaux_id_match").isNotNull())
print(f"Incidents corrélés avec des travaux : {df_final.count()}")

Lancement du croisement spatial...
Incidents corrélés avec des travaux : 1759


In [12]:
# ---------------------------------------------------------
# ETAPE 4 : EXPORT CSV
# ---------------------------------------------------------
df_export = df_final.withColumn("travaux_id_match", concat_ws("|", col("travaux_id_match"))) \
                    .withColumn("longitude", col("coords")[0]) \
                    .withColumn("latitude", col("coords")[1]) \
                    .drop("coords")

print("Sauvegarde en cours...")
df_export.coalesce(1) \
    .write \
    .option("header", "true") \
    .option("sep", ";") \
    .option("encoding", "UTF-8") \
    .mode("overwrite") \
    .csv("../resultats_croisement_complet/travaux")

print(" Sauvegarde terminée dans 'resultats_croisement_complet/travaux'")

Sauvegarde en cours...
 Sauvegarde terminée dans 'resultats_croisement_complet/travaux'
