## 1. Setup & Configuration

## 1. Setup & Configuration

In [17]:
import yaml
import os
import sys

# Windows compatibility fix for PySpark
if sys.platform == "win32":
    import socketserver
    if not hasattr(socketserver, 'UnixStreamServer'):
        socketserver.UnixStreamServer = socketserver.TCPServer

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, countDistinct, desc, lit, udf
from pyspark.sql.types import DoubleType
from math import radians, sin, cos, sqrt, atan2

# Load configuration
with open("../config/config.yml") as f:
    config = yaml.safe_load(f)

# Extract key parameters
buffer_m = config["params"]["buffer_m"]
bike_mode = config["filters"]["bike_mode_value"]
silver_dir = config["paths"]["silver_dir"]
gold_dir = config["paths"]["gold_dir"]

print(f"âœ“ Buffer distance: {buffer_m}m")
print(f"âœ“ Bike mode filter: {bike_mode}")
print(f"âœ“ Silver data: {silver_dir}")
print(f"âœ“ Gold output: {gold_dir}")

âœ“ Buffer distance: 200m
âœ“ Bike mode filter: velo
âœ“ Silver data: data/silver
âœ“ Gold output: data/gold


In [18]:
# Initialize Spark session (Windows configuration)
import tempfile

# Set Python executable for Spark workers
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Local temp directory
local_temp = tempfile.gettempdir()
os.environ['SPARK_LOCAL_DIRS'] = local_temp

spark = SparkSession.builder \
    .appName("Module2_SpatialUsage") \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", "false") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.bindAddress", "localhost") \
    .config("spark.ui.enabled", "false") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.local.dir", local_temp) \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print(f"âœ“ Spark version: {spark.version}")
print(f"âœ“ Python: {sys.executable}")
print(f"âœ“ HADOOP_HOME: {os.environ.get('HADOOP_HOME', 'Not set')}")

âœ“ Spark version: 3.5.3
âœ“ Python: c:\Users\medma\Documents\2025-2026\Big Data\datathon_velomenaj\venv\Scripts\python.exe
âœ“ HADOOP_HOME: C:\hadoop


## 2. Load Silver Data (CSV)

Les donnÃ©es Silver sont gÃ©nÃ©rÃ©es par le notebook pandas (`02_spatial_usage_pandas.ipynb`)

In [19]:
# Load Silver CSVs
df_amenagements = spark.read.csv(
    f"../{silver_dir}/silver_amenagements.csv",
    header=True,
    inferSchema=True
)

df_sites = spark.read.csv(
    f"../{silver_dir}/silver_sites.csv",
    header=True,
    inferSchema=True
)

df_channels = spark.read.csv(
    f"../{silver_dir}/silver_channels.csv",
    header=True,
    inferSchema=True
)

df_measures = spark.read.csv(
    f"../{silver_dir}/silver_measures.csv",
    header=True,
    inferSchema=True
)

print(f"âœ“ Loaded silver_amenagements: {df_amenagements.count()} rows")
print(f"âœ“ Loaded silver_sites: {df_sites.count()} rows")
print(f"âœ“ Loaded silver_channels: {df_channels.count()} rows")
print(f"âœ“ Loaded silver_measures: {df_measures.count()} rows")

âœ“ Loaded silver_amenagements: 3 rows
âœ“ Loaded silver_sites: 3 rows
âœ“ Loaded silver_channels: 5 rows
âœ“ Loaded silver_measures: 120 rows


In [20]:
# Preview data
print("=== Amenagements ===")
df_amenagements.show(truncate=False)

print("=== Sites ===")
df_sites.show()

print("=== Channels ===")
df_channels.show()

=== Amenagements ===
+--------------+---------------+----------------+-------------+----------+--------------------------------------+------------+------------+------------+
|amenagement_id|annee_livraison|type_amenagement|environnement|longueur_m|geom_wkt                              |centroid_lat|centroid_lon|commune     |
+--------------+---------------+----------------+-------------+----------+--------------------------------------+------------+------------+------------+
|AMEN_001      |2020           |Piste cyclable  |Urbain       |500.0     |LINESTRING(4.835 45.764, 4.836 45.765)|45.764      |4.835       |Lyon        |
|AMEN_002      |2021           |Bande cyclable  |PÃ©riurbain   |300.0     |LINESTRING(4.840 45.770, 4.841 45.771)|45.77       |4.84        |Villeurbanne|
|AMEN_003      |2019           |Voie verte      |Urbain       |800.0     |LINESTRING(4.850 45.750, 4.851 45.751)|45.75       |4.85        |Lyon        |
+--------------+---------------+----------------+-----------

## 3. Spatial Join: Link Counters to Infrastructure

Utilisation de la formule Haversine pour calculer les distances et trouver les compteurs proches (â‰¤ 200m)

In [21]:
# Haversine distance using NATIVE Spark SQL functions (no Python UDF = no worker crash!)
from pyspark.sql.functions import radians, sin, cos, sqrt, atan2, lit

def haversine_spark(lat1_col, lon1_col, lat2_col, lon2_col):
    """
    Calculate distance in meters between two points using Spark SQL native functions.
    This avoids Python UDF issues on Windows.
    """
    R = 6371000  # Earth radius in meters
    
    # Convert to radians
    lat1_rad = radians(lat1_col)
    lon1_rad = radians(lon1_col)
    lat2_rad = radians(lat2_col)
    lon2_rad = radians(lon2_col)
    
    # Haversine formula
    dlat = lat2_rad - lat1_rad
    dlon = lon2_rad - lon1_rad
    
    a = sin(dlat / 2) ** 2 + cos(lat1_rad) * cos(lat2_rad) * sin(dlon / 2) ** 2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    
    return lit(R) * c

print("âœ“ Haversine function using native Spark SQL (no Python UDF)")

âœ“ Haversine function using native Spark SQL (no Python UDF)


In [22]:
# Cross join amenagements and sites
df_cross = df_amenagements.select(
    col("amenagement_id"),
    col("centroid_lat").alias("amen_lat"),
    col("centroid_lon").alias("amen_lon")
).crossJoin(
    df_sites.select(
        col("site_id"),
        col("lat").alias("site_lat"),
        col("lon").alias("site_lon")
    )
)

# Calculate distance using native Spark SQL functions (no UDF!)
df_within_buffer = df_cross.withColumn(
    "distance_m",
    haversine_spark(
        col("amen_lat"), col("amen_lon"),
        col("site_lat"), col("site_lon")
    )
).filter(col("distance_m") <= buffer_m)

print(f"âœ“ Found {df_within_buffer.count()} amenagement-site pairs within {buffer_m}m")
df_within_buffer.show()

âœ“ Found 2 amenagement-site pairs within 200m
+--------------+--------+--------+--------+--------+--------+----------+
|amenagement_id|amen_lat|amen_lon| site_id|site_lat|site_lon|distance_m|
+--------------+--------+--------+--------+--------+--------+----------+
|      AMEN_001|  45.764|   4.835|SITE_001|  45.764|   4.835|       0.0|
|      AMEN_002|   45.77|    4.84|SITE_002|   45.77|    4.84|       0.0|
+--------------+--------+--------+--------+--------+--------+----------+



In [23]:
# Join with channels and filter for bike mode only
df_amen_channels = df_within_buffer.join(
    df_channels,
    on="site_id",
    how="inner"
).filter(col("mode") == bike_mode)

print(f"âœ“ Found {df_amen_channels.count()} amenagement-channel links (bike mode only)")
df_amen_channels.show()

âœ“ Found 3 amenagement-channel links (bike mode only)
+--------+--------------+--------+--------+--------+--------+----------+----------+----+----+
| site_id|amenagement_id|amen_lat|amen_lon|site_lat|site_lon|distance_m|channel_id|mode|sens|
+--------+--------------+--------+--------+--------+--------+----------+----------+----+----+
|SITE_001|      AMEN_001|  45.764|   4.835|  45.764|   4.835|       0.0|  CHAN_002|velo| Sud|
|SITE_001|      AMEN_001|  45.764|   4.835|  45.764|   4.835|       0.0|  CHAN_001|velo|Nord|
|SITE_002|      AMEN_002|   45.77|    4.84|   45.77|    4.84|       0.0|  CHAN_003|velo| Est|
+--------+--------------+--------+--------+--------+--------+----------+----------+----+----+



In [24]:
# Create gold_link_amenagement_channel
gold_link = df_amen_channels.select(
    "amenagement_id",
    "channel_id",
    "site_id",
    "distance_m"
).distinct()

# Cache for reuse
gold_link.cache()

print(f"âœ“ Created gold_link_amenagement_channel ({gold_link.count()} rows)")
gold_link.show()

âœ“ Created gold_link_amenagement_channel (3 rows)
+--------------+----------+--------+----------+
|amenagement_id|channel_id| site_id|distance_m|
+--------------+----------+--------+----------+
|      AMEN_001|  CHAN_001|SITE_001|       0.0|
|      AMEN_002|  CHAN_003|SITE_002|       0.0|
|      AMEN_001|  CHAN_002|SITE_001|       0.0|
+--------------+----------+--------+----------+



## 4. Aggregate Daily Flows per Infrastructure

In [25]:
# Join links with measures (only valid ones)
df_flows = gold_link.join(
    df_measures.filter(col("is_valid") == True),
    on="channel_id",
    how="inner"
)

print(f"âœ“ Joined {df_flows.count()} measure records")
df_flows.select("amenagement_id", "channel_id", "date", "flux").show(10)

âœ“ Joined 90 measure records
+--------------+----------+----------+----+
|amenagement_id|channel_id|      date|flux|
+--------------+----------+----------+----+
|      AMEN_001|  CHAN_001|2023-06-01| 150|
|      AMEN_001|  CHAN_002|2023-06-01| 100|
|      AMEN_002|  CHAN_003|2023-06-01| 250|
|      AMEN_001|  CHAN_001|2023-06-02| 152|
|      AMEN_001|  CHAN_002|2023-06-02| 101|
|      AMEN_002|  CHAN_003|2023-06-02| 253|
|      AMEN_001|  CHAN_001|2023-06-03| 154|
|      AMEN_001|  CHAN_002|2023-06-03| 102|
|      AMEN_002|  CHAN_003|2023-06-03| 256|
|      AMEN_001|  CHAN_001|2023-06-04| 156|
+--------------+----------+----------+----+
only showing top 10 rows



In [26]:
# Aggregate by amenagement_id and date
gold_flow_daily = df_flows.groupBy("amenagement_id", "date").agg(
    sum("flux").alias("flux_estime"),
    countDistinct("channel_id").alias("n_channels")
).orderBy("amenagement_id", "date")

# Cache for reuse
gold_flow_daily.cache()

print(f"âœ“ Created gold_flow_amenagement_daily ({gold_flow_daily.count()} rows)")
gold_flow_daily.show(20)

âœ“ Created gold_flow_amenagement_daily (60 rows)
+--------------+----------+-----------+----------+
|amenagement_id|      date|flux_estime|n_channels|
+--------------+----------+-----------+----------+
|      AMEN_001|2023-06-01|        250|         2|
|      AMEN_001|2023-06-02|        253|         2|
|      AMEN_001|2023-06-03|        256|         2|
|      AMEN_001|2023-06-04|        259|         2|
|      AMEN_001|2023-06-05|        262|         2|
|      AMEN_001|2023-06-06|        265|         2|
|      AMEN_001|2023-06-07|        268|         2|
|      AMEN_001|2023-06-08|        271|         2|
|      AMEN_001|2023-06-09|        274|         2|
|      AMEN_001|2023-06-10|        277|         2|
|      AMEN_001|2023-06-11|        280|         2|
|      AMEN_001|2023-06-12|        283|         2|
|      AMEN_001|2023-06-13|        286|         2|
|      AMEN_001|2023-06-14|        289|         2|
|      AMEN_001|2023-06-15|        292|         2|
|      AMEN_001|2023-06-16|     

## 5. Data Quality Checks

In [27]:
# Check 1: No duplicate amenagement-channel links
duplicate_links = gold_link.groupBy("amenagement_id", "channel_id").count().filter(col("count") > 1)
assert duplicate_links.count() == 0, "FAILED: Found duplicate amenagement-channel links"
print("âœ“ No duplicate links")

# Check 2: Count amenagements with data
amen_with_data = gold_flow_daily.select("amenagement_id").distinct().count()
print(f"âœ“ {amen_with_data} amenagements with flow data")

# Check 3: flux_estime should be non-negative
negative_flux = gold_flow_daily.filter(col("flux_estime") < 0)
assert negative_flux.count() == 0, "FAILED: Found negative flux values"
print("âœ“ All flux values are non-negative")

# Check 4: n_channels should be >= 1
zero_channels = gold_flow_daily.filter(col("n_channels") < 1)
assert zero_channels.count() == 0, "FAILED: Found days with 0 channels"
print("âœ“ All days have at least 1 channel")

print("\nðŸŽ‰ All quality checks passed!")

âœ“ No duplicate links
âœ“ 2 amenagements with flow data
âœ“ All flux values are non-negative
âœ“ All days have at least 1 channel

ðŸŽ‰ All quality checks passed!


## 6. Summary Statistics

In [28]:
# Summary by amenagement
from pyspark.sql.functions import avg, max as spark_max

summary = gold_flow_daily.groupBy("amenagement_id").agg(
    count("date").alias("total_days"),
    sum("flux_estime").alias("total_flux"),
    avg("flux_estime").alias("avg_daily_flux"),
    spark_max("n_channels").alias("max_channels")
).orderBy(desc("avg_daily_flux"))

print("Summary by infrastructure:")
summary.show()

Summary by infrastructure:
+--------------+----------+----------+--------------+------------+
|amenagement_id|total_days|total_flux|avg_daily_flux|max_channels|
+--------------+----------+----------+--------------+------------+
|      AMEN_001|        30|      8805|         293.5|           2|
|      AMEN_002|        30|      8805|         293.5|           1|
+--------------+----------+----------+--------------+------------+



In [29]:
# Daily flows for AMEN_001
print("\nDaily flows for AMEN_001:")
gold_flow_daily.filter(col("amenagement_id") == "AMEN_001") \
    .orderBy("date") \
    .show(10)


Daily flows for AMEN_001:
+--------------+----------+-----------+----------+
|amenagement_id|      date|flux_estime|n_channels|
+--------------+----------+-----------+----------+
|      AMEN_001|2023-06-01|        250|         2|
|      AMEN_001|2023-06-02|        253|         2|
|      AMEN_001|2023-06-03|        256|         2|
|      AMEN_001|2023-06-04|        259|         2|
|      AMEN_001|2023-06-05|        262|         2|
|      AMEN_001|2023-06-06|        265|         2|
|      AMEN_001|2023-06-07|        268|         2|
|      AMEN_001|2023-06-08|        271|         2|
|      AMEN_001|2023-06-09|        274|         2|
|      AMEN_001|2023-06-10|        277|         2|
+--------------+----------+-----------+----------+
only showing top 10 rows



## 7. Save Outputs

Sauvegarde en CSV (format actuel). Option Parquet disponible pour la production.

In [30]:
# Create output directory
os.makedirs(f"../{gold_dir}", exist_ok=True)

# === OPTION 1: Save as CSV (current) ===
gold_link.coalesce(1).write.mode("overwrite").option("header", True).csv(
    f"../{gold_dir}/gold_link_amenagement_channel_spark"
)

gold_flow_daily.coalesce(1).write.mode("overwrite").option("header", True).csv(
    f"../{gold_dir}/gold_flow_amenagement_daily_spark"
)

print(f"âœ“ Saved Gold outputs as CSV to {gold_dir}/")

# === OPTION 2: Save as Parquet (for production) ===
# Uncomment when Hadoop is properly configured
# gold_link.write.mode("overwrite").parquet(f"../{gold_dir}/gold_link_amenagement_channel")
# gold_flow_daily.write.mode("overwrite").parquet(f"../{gold_dir}/gold_flow_amenagement_daily")
# print(f"âœ“ Saved Gold outputs as Parquet to {gold_dir}/")

âœ“ Saved Gold outputs as CSV to data/gold/


## 8. Cleanup

In [31]:
# Stop Spark session
spark.stop()
print("âœ“ Spark session stopped")

âœ“ Spark session stopped


## 9. Notes

### Workflow

1. **GÃ©nÃ©rer les donnÃ©es mock** â†’ ExÃ©cuter `02_spatial_usage_pandas.ipynb`
2. **Traitement PySpark** â†’ ExÃ©cuter ce notebook
3. **Scoring** â†’ Module 3 consomme les fichiers Gold

### Fichiers produits

```
data/gold/
â”œâ”€â”€ gold_link_amenagement_channel_spark/   (CSV)
â””â”€â”€ gold_flow_amenagement_daily_spark/     (CSV)
```

