In [3]:
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, StructType, StructField, StringType
import folium
from folium import plugins
from folium import LinearColormap

# Initialize Spark session
spark = SparkSession.builder.appName("GridProcessing").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

######################################
# Define the grid cell size
######################################
lat_start, lon_start = 45.687757, 4.744455

lat_step = 0.0030  # 500m en latitude
lon_step = 0.0045  # 500m en longitude

rows = 40 # 20km * 500m
cols = 60 # 20km * 500m

# Create a list of all grid cells
grid_cells = []
for i in range(rows):
    for j in range(cols):
        lat1 = lat_start + i * lat_step
        lon1 = lon_start + j * lon_step
        lat2 = lat1 + lat_step
        lon2 = lon1 + lon_step
        
        lat_center = (lat1 + lat2) / 2
        lon_center = (lon1 + lon2) / 2
        
        grid_cells.append((i * cols + j, lat1, lon1, lat2, lon2, lat_center, lon_center))

# Convert the list to a DataFrame
grid_cells_df = spark.createDataFrame(grid_cells, ["id", "lat1", "lon1", "lat2", "lon2", "lat_center", "lon_center"])


######################################
# Fonciere data
######################################
# Define the schema and read the data
fonciere_schema = StructType([
    StructField("VALEUR_FONCIERE", FloatType(), False), 
    StructField("CODE_POSTAL", StringType(), False),
    StructField("NOM_COMMUNE", StringType(), False), 
    StructField("TYPE_LOCAL", StringType(), False),
    StructField("SURFACE_REELLE_BATI", FloatType(), False), 
    StructField("SURFACE_TERRAIN", FloatType(), False), 
    StructField("LONGITUDE", FloatType(), False),
    StructField("LATITUDE", FloatType(), False), 
    StructField("prix_du_m_carre", FloatType(), False)
])

fonciere_df = spark.read.option("header", True).schema(fonciere_schema).csv("./cleaned/valeurs_foncieres.csv")

# Join the grid cells DataFrame with fonciere_df based on latitude and longitude conditions
joined_df = fonciere_df.crossJoin(grid_cells_df).where(
    (fonciere_df.LATITUDE >= grid_cells_df.lat1) & (fonciere_df.LATITUDE <= grid_cells_df.lat2) &
    (fonciere_df.LONGITUDE >= grid_cells_df.lon1) & (fonciere_df.LONGITUDE <= grid_cells_df.lon2)
)

# Group by grid cell ID and calculate the mean and max values for prix_du_m_carre
fonciere_joined_df = joined_df.groupBy("id").agg(
    F.mean("prix_du_m_carre").alias("mean_prix_du_m_carre"),
    F.max("prix_du_m_carre").alias("max_prix_du_m_carre")
).fillna(
    0, subset=["mean_prix_du_m_carre", "max_prix_du_m_carre"]
).withColumn(
    "score_prix",
    F.when((F.col("mean_prix_du_m_carre") == 0) & (F.col("max_prix_du_m_carre") == 0), 0)
    .otherwise((1 - (F.col("mean_prix_du_m_carre") / F.col("max_prix_du_m_carre")))) * 100
)

fonciere_joined_df = grid_cells_df.join(fonciere_joined_df, on="id", how="left").orderBy("score_prix")
fonciere_joined_df = fonciere_joined_df.fillna(0)

# Convert the result to a Pandas DataFrame and save to CSV
final_df_pandas = fonciere_joined_df.toPandas()
final_df_pandas.to_csv('grid_data.csv', index=False)



######################################
# Hopitaux data
######################################
hopitaux_schema = StructType([
 StructField("NOM", StringType(), False),
 StructField("LATITUDE", FloatType(), False),
 StructField("LONGITUDE", FloatType(), False)
])

hopitaux_df = spark.read.option("header", True).option("sep", ";").schema(hopitaux_schema).csv("./cleaned/adr_voie_lieu.adrhopital.csv")


# Broadcast is more efficient than a crossJoin
grid_cells_df_broadcast = F.broadcast(grid_cells_df)

hopitaux_joined_df = (
    hopitaux_df
    .join(grid_cells_df_broadcast, 
          (hopitaux_df.LATITUDE >= grid_cells_df_broadcast.lat1) & 
          (hopitaux_df.LATITUDE <= grid_cells_df_broadcast.lat2) &
          (hopitaux_df.LONGITUDE >= grid_cells_df_broadcast.lon1) & 
          (hopitaux_df.LONGITUDE <= grid_cells_df_broadcast.lon2))
    .groupBy("id")
    .count()
    .withColumn("score_hopitaux", (F.col("count") / hopitaux_df.count()) * 1000)
)

######################################
# PAM data
######################################
docteurs = StructType([
 StructField("ID", StringType(), False),
 StructField("PROFESSION", StringType(), False),
 StructField("COMMUNE", StringType(), False), # keep
 StructField("NOM", StringType(), False), # keep
 StructField("LATITUDE", FloatType(), False),
 StructField("LONGITUDE", FloatType(), False)
])

docteurs_df = spark.read.option("header", True).schema(docteurs).csv("./cleaned/pam-doc.csv")

# Calculer le nombre de professions distinctes
distinct_profession_count = docteurs_df.select("PROFESSION").distinct().count()

practitiens_final = (
    docteurs_df
    .crossJoin(grid_cells_df_broadcast.withColumnRenamed("id", "zone_id"))
    .where(
        (F.col("LATITUDE") >= F.col("lat1")) & (F.col("LATITUDE") <=F.col("lat2")) &
        (F.col("LONGITUDE") >= F.col("lon1")) & (F.col("LONGITUDE") <= F.col("lon2"))
    )
    # Calculer le ratio de praticiens par profession
    .groupBy("zone_id", "PROFESSION")
    .agg(F.count("*").alias("filtered_count"))
    # totaux par profession
    .join(
        docteurs_df.groupBy("PROFESSION").agg(F.count("*").alias("profession_count")),
        on="PROFESSION",
        how="left"
    )
    # joindre le ratio de praticiens par profession et les totaux par profession
    .withColumn(
        "ratio",
        F.when(F.col("profession_count") > 0, F.col("filtered_count") / F.col("profession_count")).otherwise(0)
    )
    .groupBy("zone_id")
    .agg(F.sum("ratio").alias("sum_ratios"))
    # nombre de profession / zone
    .withColumn(
        "calcul_practicien",
        F.when(F.col("sum_ratios").isNull(), F.lit(0))
        .otherwise(F.col("sum_ratios") / F.lit(distinct_profession_count)) * 100
    )
    .join(grid_cells_df_broadcast, grid_cells_df_broadcast.id == F.col("zone_id"), how="left")
    .drop("sum_ratios", "id")
    .orderBy(F.col("calcul_practicien").desc())
    .select(
        "zone_id", "calcul_practicien"
    )
    .fillna(0, subset=["calcul_practicien"])
)

######################################
# Subway stations data
######################################
# Charger les stations de métro
stations_schema = StructType([
    StructField("nom", StringType(), False),
    StructField("longitude", FloatType(), False),
    StructField("latitude", FloatType(), False),
    StructField("nb", FloatType(), False)
])

df_stations = spark.read.csv('cleaned/subway_stations_with_nb.csv', schema=stations_schema, header=True, sep=';')

# Supprimer les doublons en gardant une seule instance par nom de station
df_stations_distinct = df_stations.groupBy("nom").agg(
    F.first("longitude").alias("longitude"),
    F.first("latitude").alias("latitude"),
    F.first("nb").alias("nb")
)

# Diffuser la grille pour optimiser la jointure
grid_cells_df_broadcast = F.broadcast(grid_cells_df)

# Joindre les stations de métro à la grille
subway_joined_df = (
    df_stations_distinct
    .join(grid_cells_df_broadcast,
          (df_stations_distinct.latitude >= grid_cells_df_broadcast.lat1) &
          (df_stations_distinct.latitude <= grid_cells_df_broadcast.lat2) &
          (df_stations_distinct.longitude >= grid_cells_df_broadcast.lon1) &
          (df_stations_distinct.longitude <= grid_cells_df_broadcast.lon2),
          how="inner")
    .groupBy("id")
    .agg(F.sum("nb").alias("subway_count"))  
    .withColumn("score_metro", (F.col("subway_count") / df_stations_distinct.count()) * 100) 
)

######################################
# Parking data
######################################

parking_schema = StructType([
     StructField("NOM", StringType(), False),
     StructField("HANDICAP", StringType(), False), 
     StructField("CAPACITE_TOTALE", StringType(), False), 
     StructField("CAPACITE_PERMANENTE", StringType(), False), 
     StructField("ACCUEIL_TEMPORAIRE", StringType(), False), 
     StructField("LONGITUDE", FloatType(), False),
     StructField("LATITUDE", FloatType(), False)
])

parking_df = spark.read.option("header", True).schema(parking_schema).csv("./cleaned/places_parking.csv")

# Get the total parking capacity
total_parking_capacity = parking_df.select(F.sum(parking_df.CAPACITE_TOTALE.cast('int')).alias('total_capacity')).collect()[0]['total_capacity']

parking_final_df = (
    parking_df.join(grid_cells_df, (
        (parking_df.LATITUDE >= grid_cells_df.lat1) & 
        (parking_df.LATITUDE <= grid_cells_df.lat2) &
        (parking_df.LONGITUDE >= grid_cells_df.lon1) & 
        (parking_df.LONGITUDE <= grid_cells_df.lon2)
    ), how="right")
    .groupBy("id")
    .agg(
        F.sum(parking_df.CAPACITE_TOTALE.cast('int')).alias('total_capacity_per_zone')
    )
    .withColumn(
        "score_parking",
        F.when(F.col("total_capacity_per_zone").isNotNull(), F.col("total_capacity_per_zone") / total_parking_capacity)
        .otherwise(0)
    )
    .select(
        "id", "score_parking"
    )
    .fillna(0, subset=["score_parking"])
    .orderBy(F.col("score_parking").desc()) 
)

######################################
# Final part
######################################
# important de partir de fonciere_joined_df pour ne pas perdre les cellules sans hopitaux
final_df = fonciere_joined_df.join(hopitaux_joined_df, on="id", how="left")
final_df = final_df.join(practitiens_final, fonciere_joined_df.id == practitiens_final.zone_id, how="left").drop("zone_id")
final_df = final_df.join(subway_joined_df, on="id", how="left").fillna(0)
final_df = final_df.join(parking_final_df, on="id", how="left")
final_df = final_df.fillna(0)
final_df.show(truncate=False, vertical=True)

assert final_df.count() == grid_cells_df.count(), "Both datasets must contains the same number of items"

final_df_pandas = final_df.toPandas()
final_df_pandas.to_csv('grid_data.csv', index=False)

lat, lon = 45.751719, 4.880243
zoom_start = 12

tiles = 'https://tile.jawg.io/jawg-lagoon/{z}/{x}/{y}{r}.png'
attr = (
    '&copy; <a href="https://jawg.io" title="Tiles Courtesy of Jawg Maps" target="_blank">Jawg Maps</a> &copy; '
    '<a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors'
)

m = folium.Map(location=[lat, lon], tiles=tiles, attr=attr, zoom_start=zoom_start)

colormap = LinearColormap(['blue', 'green', 'yellow', 'orange', 'red'], vmin=0, vmax=100)

for index, row in final_df_pandas.iterrows():
    lat1, lon1, lat2, lon2 = row['lat1'], row['lon1'], row['lat2'], row['lon2']
    
    square_coords = [(lat1, lon1), (lat1, lon2), (lat2, lon2), (lat2, lon1)]

    s_prix = (row['score_prix'])
    s_hopitaux = (row['score_hopitaux'])
    s_pam = (row['calcul_practicien'])
    s_metro = (row['score_metro'])
    s_parking = (row['score_parking'])
    
    color = colormap(s_prix)

    folium.Polygon(
        locations=square_coords,
        color=None,
        fill=True,
        fill_color=color,
        fill_opacity=0.2
    ).add_to(m)

m

-RECORD 0----------------------------------
 id                   | 0                  
 lat1                 | 45.687757          
 lon1                 | 4.744455           
 lat2                 | 45.690757          
 lon2                 | 4.7489550000000005 
 lat_center           | 45.689257          
 lon_center           | 4.746705           
 mean_prix_du_m_carre | 0.0                
 max_prix_du_m_carre  | 0.0                
 score_prix           | 0.0                
 count                | 0                  
 score_hopitaux       | 0.0                
 calcul_practicien    | 0.0                
 subway_count         | 0.0                
 score_metro          | 0.0                
 score_parking        | 0.0                
-RECORD 1----------------------------------
 id                   | 1                  
 lat1                 | 45.687757          
 lon1                 | 4.7489550000000005 
 lat2                 | 45.690757          
 lon2                 | 4.753455

final_df has 2400 rows
grid_cells_df has 2400 rows
