# MapViewer Data Pipeline

This notebook documents the complete data pipeline from raw data ingestion to price statistics computation.

## Pipeline Overview

```
DVF CSV Files → DuckDB → Cleaned Data → Statistics → GeoJSON → Frontend
```

**Data Sources:**
- **DVF (Demandes de Valeurs Foncières)**: Real estate transactions from data.gouv.fr
- **Admin Express**: Administrative boundaries from IGN (regions, départements, cantons, communes)
- **Cadastre**: Parcel boundaries from Etalab

## Setup

In [1]:
import duckdb
import pandas as pd
import json
from pathlib import Path

# Paths
DB_PATH = Path("../data/real_estate.duckdb")
STATS_PATH = Path("../src/frontend/stats_cache.json")

# Connect to database
con = duckdb.connect(str(DB_PATH), read_only=True)
con.execute("LOAD spatial;")
print(f"Connected to {DB_PATH}")

Connected to ..\data\real_estate.duckdb


---
## 1. Data Ingestion (ETL)

The `src/data/etl.py` script handles:
1. **Download DVF data** from data.gouv.fr (2019-2024)
2. **Create DuckDB database** with spatial extension
3. **Clean and deduplicate** transactions

### Raw DVF Schema

In [2]:
# Check available tables
tables = con.execute("SHOW TABLES").df()
print("Available tables:")
display(tables)

Available tables:


Unnamed: 0,name
0,dvf
1,dvf_clean
2,parcels


In [3]:
# DVF cleaned data schema
schema = con.execute("DESCRIBE dvf_clean").df()
print("DVF Clean table schema:")
display(schema)

DVF Clean table schema:


Unnamed: 0,column_name,column_type,null,key,default,extra
0,mutation_id,VARCHAR,YES,,,
1,mutation_date,DATE,YES,,,
2,nature,VARCHAR,YES,,,
3,dept_code,VARCHAR,YES,,,
4,commune_code,BIGINT,YES,,,
5,postal_code,BIGINT,YES,,,
6,commune_name,VARCHAR,YES,,,
7,property_type,VARCHAR,YES,,,
8,price,DOUBLE,YES,,,
9,total_surface,HUGEINT,YES,,,


In [4]:
# Data volume
stats = con.execute("""
    SELECT 
        COUNT(*) as total_transactions,
        COUNT(DISTINCT dept_code) as departments,
        MIN(mutation_date) as earliest_date,
        MAX(mutation_date) as latest_date,
        AVG(price_m2) as avg_price_m2
    FROM dvf_clean
""").df()
print("Dataset Statistics:")
display(stats)

Dataset Statistics:


Unnamed: 0,total_transactions,departments,earliest_date,latest_date,avg_price_m2
0,3298257,97,2020-07-01,2025-06-30,2316.638251


---
## 2. Price Calculation

The `price_m2` (price per square meter) is calculated as:

```python
price_m2 = price / total_surface
```

### Filtering Outliers

We filter transactions where `price_m2` is outside the range [100, 50000] €/m² to remove data errors.

In [None]:
# Price distribution
price_dist = con.execute("""
    SELECT 
        APPROX_QUANTILE(price_m2, 0.10) as p10,
        APPROX_QUANTILE(price_m2, 0.25) as p25,
        APPROX_QUANTILE(price_m2, 0.50) as median,
        APPROX_QUANTILE(price_m2, 0.75) as p75,
        APPROX_QUANTILE(price_m2, 0.90) as p90,
        AVG(price_m2) as mean
    FROM dvf_clean
    WHERE price_m2 BETWEEN 100 AND 50000
""").df()
print("Price/m² Distribution (€):")
display(price_dist)

---
## 3. Geographic Aggregation Levels

Statistics are computed at 6 levels:

| Level | Zoom | Count |
|-------|------|-------|
| Country | 0-5 | 1 |
| Région | 5-7 | 17 |
| Département | 7-9 | 97 |
| Canton | 9-11 | ~2000 |
| Commune | 11-17 | ~35000 |
| Parcelle | 17+ | ~74M |

In [None]:
# Stats per aggregation level
with open(STATS_PATH) as f:
    stats_cache = json.load(f)

print("Statistics by level:")
for level, data in stats_cache.items():
    if isinstance(data, dict):
        print(f"  {level}: {len(data)} areas")

---
## 4. Statistics Computation (Precompute)

The `src/data/precompute.py` script computes for each geographic area:

- **median_price_m2**: Median price per square meter
- **q25**: 25th percentile (lower quartile)
- **q75**: 75th percentile (upper quartile)
- **n_sales**: Number of transactions

### Minimum Sales Threshold

Areas with fewer than 10 sales are excluded for statistical reliability.

In [None]:
# Department-level statistics query
dept_stats = con.execute("""
    SELECT
        dept_code,
        APPROX_QUANTILE(price_m2, 0.5) AS median_price_m2,
        APPROX_QUANTILE(price_m2, 0.25) AS q25,
        APPROX_QUANTILE(price_m2, 0.75) AS q75,
        COUNT(*) AS n_sales
    FROM dvf_clean
    WHERE price_m2 BETWEEN 100 AND 50000
    GROUP BY dept_code
    HAVING COUNT(*) >= 10
    ORDER BY median_price_m2 DESC
    LIMIT 15
""").df()
print("Top 15 Departments by Median Price/m²:")
display(dept_stats)

In [None]:
# Commune-level statistics (sample)
commune_stats = con.execute("""
    SELECT
        dept_code || LPAD(CAST(commune_code AS VARCHAR), 3, '0') AS insee_code,
        commune_name,
        APPROX_QUANTILE(price_m2, 0.5) AS median_price_m2,
        COUNT(*) AS n_sales
    FROM dvf_clean
    WHERE price_m2 BETWEEN 100 AND 50000
    GROUP BY dept_code, commune_code, commune_name
    HAVING COUNT(*) >= 50
    ORDER BY median_price_m2 DESC
    LIMIT 20
""").df()
print("Top 20 Most Expensive Communes (min 50 sales):")
display(commune_stats)

---
## 5. Canton Statistics (Special Case)

Cantons don't have direct DVF data. We compute canton stats by aggregating commune stats:

```python
# Weighted average by number of sales
canton_median = sum(commune_median * commune_n_sales) / sum(commune_n_sales)
```

### Paris, Lyon, Marseille Exception

These cities don't have cantons - they use arrondissements. We create "pseudo-cantons" by aggregating all arrondissements.

In [None]:
# Paris, Lyon, Marseille as pseudo-cantons
plm_codes = {
    "75_PARIS": "Paris (20 arrondissements)",
    "69_LYON": "Lyon (9 arrondissements)", 
    "13_MARSEILLE": "Marseille (16 arrondissements)"
}

print("PLM Pseudo-Canton Stats:")
for code, name in plm_codes.items():
    if code in stats_cache.get('canton', {}):
        s = stats_cache['canton'][code]
        print(f"  {name}: {s['median_price_m2']:.0f} €/m², {s['n_sales']} sales")

---
## 6. GeoJSON Generation

The pipeline generates GeoJSON files for the frontend:

| File | Source | Simplification |
|------|--------|---------------|
| `country.geojson` | Regions dissolved | High |
| `regions.geojson` | Admin Express | 500m |
| `departements.geojson` | Admin Express | 200m |
| `cantons.geojson` | Admin Express | 100m |
| `communes.geojson` | Admin Express | 50m |
| `communes/{dept}.geojson` | Split by department | 50m |

In [None]:
# GeoJSON file sizes
frontend_dir = Path("../src/frontend")
geojson_files = list(frontend_dir.glob("*.geojson"))

print("GeoJSON Files:")
total_size = 0
for f in sorted(geojson_files):
    size_mb = f.stat().st_size / (1024 * 1024)
    total_size += size_mb
    print(f"  {f.name}: {size_mb:.2f} MB")
print(f"  Total: {total_size:.2f} MB")

---
## 7. Stats Cache Structure

The `stats_cache.json` file has the structure:

```json
{
  "region": {
    "11": {"median_price_m2": 5000, "q25": 3000, "q75": 8000, "n_sales": 50000}
  },
  "departement": {...},
  "canton": {...},
  "commune": {...}
}
```

In [None]:
# Sample of stats cache
print("Sample Region Stats (Île-de-France, code 11):")
if '11' in stats_cache.get('region', {}):
    display(stats_cache['region']['11'])

print("\nSample Department Stats (Paris, code 75):")
if '75' in stats_cache.get('departement', {}):
    display(stats_cache['departement']['75'])

---
## 8. Frontend Visualization

The frontend uses MapLibre GL JS with:

1. **Color scale**: Blue (cheap) → Green → Yellow → Orange → Red (expensive)
2. **Dynamic loading**: Communes loaded by department on viewport change
3. **Hover info**: Shows price, quartiles, and sales count

### Color Expression

```javascript
['interpolate', ['linear'], ['get', 'price_m2'],
  1000, '#3498db',  // Blue - cheap
  3000, '#27ae60',  // Green
  5000, '#f1c40f',  // Yellow
  7500, '#e67e22',  // Orange
  10000, '#e74c3c'  // Red - expensive
]
```

---
## Summary

The pipeline transforms raw DVF transaction data into an interactive price map:

1. **ETL**: Download, clean, deduplicate DVF data
2. **Precompute**: Calculate median prices at each geographic level
3. **GeoJSON**: Simplify and export boundary geometries
4. **Frontend**: Render colored polygons with MapLibre

**Run the full pipeline:**
```bash
python -m src.data.pipeline
```

In [None]:
# Close connection
con.close()
print("Done!")