# 3. Use sedona to solve real world problems

Suppose we need to solve the below problems:
- Get the nearest communes of a giving location(i.e. Point)
- Get a list of hospitals and doctors(with their geometry location) in Île-de-France
- Get the nearest hospitals of a giving point.
- Count hospitals in each commune in Île-de-France.



In [1]:
from sedona.spark import *
from pathlib import Path
from pyspark.sql.functions import col, lower, lit, split, asc, count, desc
from pyspark.sql import SparkSession, DataFrame


In [2]:
import os

os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"

In [3]:
# get the project root dir
project_root_dir = Path.cwd().parent
# set the data path
data_dir = f"{project_root_dir}/data"
print(project_root_dir)

C:\Users\pliu\Documents\git\Webinaire_CASD_GeoParquet_Sedona


In [4]:
fr_commune_file_path = f"{data_dir}/large_ds/communes_fr_geoparquet"

In [5]:
# here we choose sedona 1.8.0 for spark 3.5.* build with scala 2.12
sedona_version = "sedona-35-212-180"
jar_folder = Path(f"{project_root_dir}/jars/{sedona_version}")
jar_list = [str(jar) for jar in jar_folder.iterdir() if jar.is_file()]
jar_path = ",".join(jar_list)

# build a sedona session (sedona = 1.8.0) offline
spark = SparkSession.builder \
    .appName("sedona_tutorial") \
    .master("local[*]") \
    .config("spark.jars", jar_path) \
    .getOrCreate()

In [6]:
# create a sedona context
sedona = SedonaContext.create(spark)

In [7]:
# get the spark context
sc = spark.sparkContext
# use utf as default encoding
sc.setSystemProperty("sedona.global.charset", "utf8")

## Q1. Get the nearest communes of a location

To solve this question, we need to:
 
- With a given gps coordinates(WGS84/EPSG:4326), get the bird-view distances between the given point and all commune in France
- Sort the distance in ascending order and show the communes with required numbers


In [8]:
fr_commune_raw = sedona.read.format("geoparquet").load(fr_commune_file_path)

In [9]:
fr_commune_raw.show(5)
fr_commune_raw.printSchema()

+--------------------+--------------------+--------------------+------------+-----+
|            geometry|           wikipedia|             surf_ha|         nom|insee|
+--------------------+--------------------+--------------------+------------+-----+
|POLYGON ((9.32016...|fr:Pie-d'Orezza  ...|     573.00000000...|Pie-d'Orezza|2B222|
|POLYGON ((9.20010...|fr:Lano          ...|     824.00000000...|        Lano|2B137|
|POLYGON ((9.27757...|fr:Cambia        ...|     833.00000000...|      Cambia|2B051|
|POLYGON ((9.25119...|fr:Érone         ...|     393.00000000...|       Érone|2B106|
|POLYGON ((9.28339...|fr:Oletta        ...|    2674.00000000...|      Oletta|2B185|
+--------------------+--------------------+--------------------+------------+-----+
only showing top 5 rows

root
 |-- geometry: geometry (nullable = true)
 |-- wikipedia: string (nullable = true)
 |-- surf_ha: string (nullable = true)
 |-- nom: string (nullable = true)
 |-- insee: string (nullable = true)



In [10]:
fr_commune_df = fr_commune_raw.select("geometry", "nom", "insee").withColumn("name", lower(col("nom"))).drop("nom")

In [11]:
fr_commune_df.show(5)
fr_commune_df.printSchema()

+--------------------+-----+------------+
|            geometry|insee|        name|
+--------------------+-----+------------+
|POLYGON ((9.32016...|2B222|pie-d'orezza|
|POLYGON ((9.20010...|2B137|        lano|
|POLYGON ((9.27757...|2B051|      cambia|
|POLYGON ((9.25119...|2B106|       érone|
|POLYGON ((9.28339...|2B185|      oletta|
+--------------------+-----+------------+
only showing top 5 rows

root
 |-- geometry: geometry (nullable = true)
 |-- insee: string (nullable = true)
 |-- name: string (nullable = true)



In [12]:
def get_nearest_commune(in_df: DataFrame, latitude: str, longitude: str, max_commune_number: int):
    """
    This function calculates the nearest commune to the given latitude and longitude.
    :param latitude: latitude of the given point
    :param longitude: longitude of the given point
    :param max_commune_number: Specify the max number of commune in the result
    :param in_df: The input french commune data frame
    :return: 
    """
    # Create reference point
    point_col = ST_PointFromText(lit(f"{longitude},{latitude}"), lit(","))

    # Compute distance + select columns
    nearest_commune_df = (in_df
                          .withColumn("commune_name", col("name"))
                          .withColumn("distance", ST_DistanceSphere(point_col, col("geometry")))
                          .select("commune_name", "insee", "distance")
                          .orderBy(asc("distance"))
                          .limit(max_commune_number)
                          )
    return nearest_commune_df

In [13]:
# the gps coordinates for casd is 48.8190155° N, 2.3081911° E

casd_latitude = "48.8190155"
casd_longitude = "2.3081911"

casd_geo = f"POINT({casd_longitude} {casd_latitude})"

In [14]:
casd_nearest_shape_df = get_nearest_commune(fr_commune_df, casd_latitude, casd_longitude, 36000)

In [15]:
%%time

casd_nearest_shape_df.show(50)
casd_nearest_shape_df.count()

+--------------------+-----+------------------+
|        commune_name|insee|          distance|
+--------------------+-----+------------------+
|           montrouge|92049| 782.4225524838963|
|            malakoff|92046| 931.0760610707576|
|              vanves|92075|1548.4653721969821|
|           châtillon|92020|2244.2297097020064|
|             bagneux|92007|2306.6895398469915|
|             arcueil|94003|2420.7221725908407|
|            gentilly|94037| 2712.776684694567|
| issy-les-moulineaux|92040|3212.5435965134634|
|              cachan|94016| 3506.334512834338|
|  fontenay-aux-roses|92032| 3619.863974489254|
|  le kremlin-bicêtre|94043| 3670.315588897698|
|      bourg-la-reine|92014| 4337.380330327545|
|             clamart|92023| 4677.596275760091|
|           villejuif|94076| 4806.259641317598|
|              sceaux|92071| 4807.635098158753|
|               paris|75056| 4891.894805477514|
|     l'haÿ-les-roses|94038| 5227.715047978959|
|boulogne-billancourt|92012| 5418.541958

34955

In [16]:
# the gps coordinates for Paul-Brousse is 48.7951606539 N, 2.3636935981 E
pb_latitude = "48.7951606539"
pb_longitude = "2.3636935981"
pb_geo = f"POINT({pb_longitude} {pb_latitude})"

In [17]:
pb_nearest_shape_df = get_nearest_commune(fr_commune_df, pb_latitude, pb_longitude, 36000)

In [18]:
%%time

pb_nearest_shape_df.show(10)
pb_nearest_shape_df.count()

+------------------+-----+------------------+
|      commune_name|insee|          distance|
+------------------+-----+------------------+
|         villejuif|94076|417.13873642972163|
|le kremlin-bicêtre|94043|1616.4950323944458|
|            cachan|94016|2350.1367372643913|
|   vitry-sur-seine|94081|2391.3950596721934|
|           arcueil|94003|2455.2075433323284|
|          gentilly|94037|2469.3891101286713|
|    ivry-sur-seine|94041|2592.6844198231634|
|   l'haÿ-les-roses|94038|2842.8348963659664|
|    chevilly-larue|94021|3239.0306951319963|
|    bourg-la-reine|92014|3788.6932471326836|
+------------------+-----+------------------+
only showing top 10 rows

CPU times: total: 0 ns
Wall time: 2.43 s


34955

## Q2. Get a list of hospitals and doctors(with their geometry location) in Île-de-France

To get the list of hospitals and doctors in Île-de-France, we use the (OSM)Open Street Map sample data.

The sample data which I will use in this notebook can be downloaded from this page: 
https://download.geofabrik.de/europe/france.html. I use the `Ile-de-France` map (`ile-de-france-latest.osm.pbf`)

In [19]:
ile_france_pbf_path = f"{data_dir}/large_ds/ile-de-france-geo-parquet"
osm_ile_france_df = spark.read.parquet(ile_france_pbf_path)

In [20]:
osm_ile_france_df.show(15)

+------+----+------------------+------------------+-----+---------+--------------------+--------------------+
|    id|type|          latitude|         longitude|nodes|relations|                tags|                info|
+------+----+------------------+------------------+-----+---------+--------------------+--------------------+
|122626|   0|49.115966300000004|         2.5549119|   []|       []|                  {}|{3, 2020-05-10 11...|
|122627|   0|49.110294100000004|         2.5521725|   []|       []|                  {}|{4, 2009-02-13 19...|
|122631|   0|        49.0834393|2.5511375000000003|   []|       []|                  {}|{15, 2021-06-30 1...|
|122632|   0|        49.0675225|2.5524679000000003|   []|       []|                  {}|{17, 2019-04-10 1...|
|122633|   0|         49.063616|2.5522412000000005|   []|       []|                  {}|{17, 2009-02-13 1...|
|122634|   0|        49.0597465|2.5509097000000005|   []|       []|                  {}|{2, 2009-02-13 19...|
|122635|  

In [21]:
osm_ile_france_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- type: byte (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- nodes: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- relations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- relationType: byte (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- tags: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- info: struct (nullable = true)
 |    |-- version: integer (nullable = true)
 |    |-- timestamp: timestamp (nullable = true)
 |    |-- changeset: long (nullable = true)
 |    |-- userId: integer (nullable = true)
 |    |-- userName: string (nullable = true)
 |    |-- visible: boolean (nullable = true)



In [22]:
def show_row_details(entity_id: int):
    sample_row = osm_ile_france_df.filter(osm_ile_france_df.id == entity_id)
    sample_row.show(truncate=False, vertical=True)

In [23]:
hospital_df = osm_ile_france_df.select("id", "latitude", "longitude", "tags").where(
    "element_at(tags, 'amenity') in ('hospital', 'clinic')")

In [24]:
doctor_df = osm_ile_france_df.select("id", "latitude", "longitude", "tags").where(
    "element_at(tags, 'amenity') == 'doctors'")

In [25]:
print(f"total hosptitals number in ile-de-france: {hospital_df.count()}")

total hosptitals number in ile-de-france: 564


In [26]:
hospital_df.show(5)

+---------+------------------+------------------+--------------------+
|       id|          latitude|         longitude|                tags|
+---------+------------------+------------------+--------------------+
|452401907| 48.78635369999983| 2.291711499999987|{amenity -> hospi...|
|476313165|48.878517699999826| 2.414601899999999|{name -> Maternit...|
|483569726| 48.82353579999984|2.2768316999999985|{website -> https...|
|670633220|48.722277100000056|2.4525995000000083|{name -> Centre H...|
|783760856| 48.83526160000006|         2.2442898|{name -> Centre d...|
+---------+------------------+------------------+--------------------+
only showing top 5 rows



In [27]:
show_row_details(670633220)

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id        | 670633220                                                                                                                                                                                                                                                                                          

In [28]:
print(f"total doctors number in ile-de-france: {doctor_df.count()} ")

total doctors number in ile-de-france: 1298 


In [29]:
doctor_df.show(5)

+---------+------------------+------------------+--------------------+
|       id|          latitude|         longitude|                tags|
+---------+------------------+------------------+--------------------+
|293986528| 48.86378410000007|2.3814177000000036|{amenity -> docto...|
|302305751|        48.5833366|2.2414200000000104|{amenity -> docto...|
|416708946|48.872033599999774|2.3765707000000202|{website -> https...|
|456140610| 48.75835359999986| 3.048538999999998|{amenity -> docto...|
|477197148|49.010437500000016|2.0296950999999948|{amenity -> doctors}|
+---------+------------------+------------------+--------------------+
only showing top 5 rows



In [30]:
show_row_details(416708946)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id        | 416708946                                                                                                                                                                                                                                                
 type      | 0                                                                                                                                                                                                                                                        
 latitude  | 48.872033599999774                                                                                                                                                                                    

## Work with different CRS system

If your geospatial datasets use different CRS system, apache sedona allows you to convert geometry from one crs to another crs system.


In [31]:
from pyspark.sql import DataFrame


def build_osm_geo_df(source_df: DataFrame, lat_col_name: str, long_col_name: str,
                     target_geo_col_name: str = "location"):
    """
    This function takes a raw osm dataframe, and return a geo dataframe which convert raw lat, long into geo column
    """
    target_df = (
        source_df.select(
            col("id"),
            col("tags"),
            ST_Point(
                col(long_col_name).cast("decimal(24,20)"),
                col(lat_col_name).cast("decimal(24,20)")
            ).alias(target_geo_col_name)
        )
    )
    return target_df


def build_osm_geo_df_with_custom_crs_code(source_df: DataFrame, lat_col_name: str, long_col_name: str,
                                          source_epsg_code: str,
                                          target_epsg_code: str, target_geo_col_name: str = "location"):
    """
    This function takes a dataframe with gps coordinate column(string type), convert the string column to a geometry column. The returned dataframe can be stored as geoparquet.
    :param source_df: 
    :type source_df: 
    :param long_col_name: longitude column name
    :type long_col_name: str
    :param lat_col_name: latitude column name
    :type lat_col_name: str
    :param source_epsg_code: the csr code of the input gps coordinates
    :type source_epsg_code: str
    :param target_epsg_code: the csr code of the output gps coordinates
    :type target_epsg_code: str
    :param target_geo_col_name: 
    :type target_geo_col_name: 
    :return: 
    :rtype: 
    """
    # create a temp view of the source df, the table name is the name of the data frame
    target_df = (
        source_df.select(
            col("id"),
            col("tags"),
            ST_Transform(
                ST_Point(
                    col(long_col_name).cast("decimal(24,20)"),
                    col(lat_col_name).cast("decimal(24,20)")
                ),
                lit(source_epsg_code),
                lit(target_epsg_code)
            ).alias(target_geo_col_name)
        )
    )
    return target_df


def geo_table_convertor(source_table_name: str, lat_col_name: str, long_col_name: str, source_epsg_code: str,
                        target_epsg_code: str, target_geo_col_name: str = "location"):
    """
    This function takes a spark temp view with gps coordinate column(string type), convert the string column to a geometry column. The returned dataframe can be stored as geoparquet.
    :param source_table_name: 
    :type source_table_name: 
    :param target_geo_col_name: 
    :type target_geo_col_name: 
    :param long_col_name: 
    :type long_col_name: 
    :param lat_col_name: 
    :type lat_col_name: 
    :param source_epsg_code: 
    :type source_epsg_code: 
    :param target_epsg_code: 
    :type target_epsg_code: 
    :return: 
    :rtype: 
    """
    # create a temp view of the source df, the table name is the name of the data frame
    target_df = sedona.sql(f"""
    SELECT 
    ST_Transform(ST_Point(CAST({long_col_name} AS Decimal(24,20)), CAST({lat_col_name} AS Decimal(24,20))), '{source_epsg_code}', '{target_epsg_code}') AS {target_geo_col_name} from {source_table_name}""")
    return target_df


In [32]:
hospital_geo_df = build_osm_geo_df(hospital_df, "latitude", "longitude")
hospital_geo_df.show(5)

+---------+--------------------+--------------------+
|       id|                tags|            location|
+---------+--------------------+--------------------+
|452401907|{amenity -> hospi...|POINT (2.29171149...|
|476313165|{name -> Maternit...|POINT (2.41460189...|
|483569726|{website -> https...|POINT (2.27683169...|
|670633220|{name -> Centre H...|POINT (2.45259950...|
|783760856|{name -> Centre d...|POINT (2.2442898 ...|
+---------+--------------------+--------------------+
only showing top 5 rows



In [33]:
# Set up the epsg code value, osm uses epsg:25832
source_epsg_code = "epsg:4326"
# eu centered epsg code, more information can be found https://epsg.io/25832
target_epsg_code = "epsg:3857"

hospital_new_code_df = build_osm_geo_df_with_custom_crs_code(hospital_df, "latitude", "longitude", source_epsg_code,
                                                             target_epsg_code)


In [34]:
hospital_new_code_df.show(5)

+---------+--------------------+--------------------+
|       id|                tags|            location|
+---------+--------------------+--------------------+
|452401907|{amenity -> hospi...|POINT (255112.157...|
|476313165|{name -> Maternit...|POINT (268792.253...|
|483569726|{website -> https...|POINT (253455.745...|
|670633220|{name -> Centre H...|POINT (273022.127...|
|783760856|{name -> Centre d...|POINT (249833.197...|
+---------+--------------------+--------------------+
only showing top 5 rows



## Q3. Get nearest hospitals of a giving point.

The osm sample data does not provide geometry column directly, so the first step is to build a geometry column with the giving gps coordinates.

In [35]:
hospital_geo_df.cache()
hospital_geo_df.show(1, truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------
 id       | 452401907                                                                                                                                    
 tags     | {amenity -> hospital, name -> Clinalliance Fontenay-aux-Roses, ref:FR:FINESS -> 920300381, website -> https://www.clinalliance.fr/fontenay/} 
 location | POINT (2.291711499999987 48.78635369999983)                                                                                                  
only showing top 1 row



In [36]:
hospital_geo_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- tags: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- location: geometry (nullable = true)



In [37]:
def get_near_hospital_v1(hospitals: DataFrame, patient_loc: str, distance: float):
    """
    This function get the nearest hospital based on distance with a given patient location
    :param hospitals: the available hospital geo dataframe
    :param patient_loc: gps coordinates in format "POINT(longitude, latitude)"
    :param distance: the max distance between hospital and patient
    :return: 
    """
    tmp_df = (
        hospitals
        .withColumn(
            "distance_meter",
            ST_DistanceSphere(ST_GeomFromWKT(lit(patient_loc)), col("location"))
        )
        .select("id", "tags", "location", "distance_meter")
        .orderBy(asc("distance_meter"))
    )
    near_hospital_df = tmp_df.filter(tmp_df.distance_meter <= distance)
    return near_hospital_df

In [38]:
def get_near_hospital_v2(hospitals: DataFrame, patient_loc: str, distance: float):
    """
    This function get the nearest hospital based on distance with a given patient location
    :param hospitals: the available hospital geo dataframe
    :param patient_loc: gps coordinates in format "POINT(longitude, latitude)"
    :param distance: the max distance between hospital and patient
    :return:
    """
    return hospitals.filter(ST_DWithin(ST_GeomFromWKT(lit(patient_loc)), col("location"), distance))


To better understand the query, we use this [website](https://www.keene.edu/campus/maps/tool/) 
to show gps coordinates on a map.

CASD coordinates:
```text
2.3081911,48.8190155
```

Paul-Brousse coordinates:
```text
2.3636935981,48.7951606539
```



In [39]:
print(f"CASD coordinates: {casd_geo}")
print(f"Paul-Brousse coordinates: {pb_geo}")

CASD coordinates: POINT(2.3081911 48.8190155)
Paul-Brousse coordinates: POINT(2.3636935981 48.7951606539)


### Below codes returns the nearest hospital of CASD

In [40]:
# get all hospital within 5000 meter of CASD
casd_near_hos_df = get_near_hospital_v1(hospital_geo_df, casd_geo, 5000)
casd_near_hos_df.show(5)

+-----------+--------------------+--------------------+------------------+
|         id|                tags|            location|    distance_meter|
+-----------+--------------------+--------------------+------------------+
| 2506232459|{website -> http:...|POINT (2.31573960...| 999.4514393892221|
| 4936398378|{amenity -> clini...|POINT (2.31413310...|1043.5714649372987|
| 9073838043|{healthcare:speci...|POINT (2.3119306 ...|1365.4667383528551|
|10883369653|{name -> Clinique...|POINT (2.32937819...|1746.0477473786032|
|  483569726|{website -> https...|POINT (2.27683169...|2350.2594029700895|
+-----------+--------------------+--------------------+------------------+
only showing top 5 rows



In [41]:
casd_near_hos_df.count()

38

In [42]:
print(f"Hospital count within 5km radius of CASD: {casd_near_hos_df.count()} ")

Hospital count within 5km radius of CASD: 38 


In [43]:
show_row_details(2506232459)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id        | 2506232459                                                                                                                                                                      
 type      | 0                                                                                                                                                                               
 latitude  | 48.81152669999993                                                                                                                                                               
 longitude | 2.3157396000000015                                                                                                                                                              
 nodes     | []                                   

In [44]:
casd_near_hos_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- tags: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- location: geometry (nullable = true)
 |-- distance_meter: double (nullable = true)



In [45]:
# get all hospital within 5000 meter of Paul-Brousse
pb_near_hos_df = get_near_hospital_v2(hospital_geo_df, pb_geo, 5000.0)
pb_near_hos_df.show(5)

+---------+--------------------+--------------------+
|       id|                tags|            location|
+---------+--------------------+--------------------+
|452401907|{amenity -> hospi...|POINT (2.29171149...|
|476313165|{name -> Maternit...|POINT (2.41460189...|
|483569726|{website -> https...|POINT (2.27683169...|
|670633220|{name -> Centre H...|POINT (2.45259950...|
|783760856|{name -> Centre d...|POINT (2.2442898 ...|
+---------+--------------------+--------------------+
only showing top 5 rows



In [46]:
print(f"Hospital count within 5km radius of Paul-Brousse: {pb_near_hos_df.count()} ")

Hospital count within 5km radius of Paul-Brousse: 213 


In [47]:
show_row_details(10926295628)

-RECORD 0-----------------------------------------------------------------------------
 id        | 10926295628                                                              
 type      | 0                                                                        
 latitude  | 48.78587790000029                                                        
 longitude | 2.366615800000005                                                        
 nodes     | []                                                                       
 relations | []                                                                       
 tags      | {amenity -> clinic, healthcare -> clinic, name -> Centre médical Aragon} 
 info      | {1, 2023-05-25 15:06:07, 0, 0, , NULL}                                   



## Q4. Count hospitals in each commune in Île-de-France

For the sake of simplicity, here we only choose the districts(communes) in a radius of 8000 meters of the paris center.

1. Get the centroid of Paris
2. Get all commune in a Radius of 8000 meters from the centroid of Paris
3. Spatial Join(ST_Contains) the hospital data frame with the commune data frame
4. Group by the commune and count the hospital numbers
5. Generate a map to visualize the results

In [48]:
from sedona.spark import *

paris_polygon_df = fr_commune_df.filter(col("name") == "paris").withColumn("centroid", ST_Centroid(col("geometry")))
paris_polygon_df.show()
paris_polygon_df.createOrReplaceTempView("paris_polygon")


+--------------------+-----+-----+--------------------+
|            geometry|insee| name|            centroid|
+--------------------+-----+-----+--------------------+
|POLYGON ((2.22412...|75056|paris|POINT (2.34287643...|
+--------------------+-----+-----+--------------------+



In [49]:
# 2.3428764301940275,48.85662219553845
paris_polygon_df.select("centroid").show(truncate=False)

+--------------------------------------------+
|centroid                                    |
+--------------------------------------------+
|POINT (2.3428764301940275 48.85662219553845)|
+--------------------------------------------+



In [50]:
distance = 8000.0
filtered_commune_df = (
    paris_polygon_df.alias("p")
    .join(fr_commune_df.alias("fr"),
          ST_DWithin(col("p.centroid"), col("fr.geometry"), lit(distance), lit(True)))
    .select(col("fr.name"))
)

In [51]:
# We get 30 communes in the radius of 8000 meters
print(f"total commune count: {filtered_commune_df.count()}")
filtered_commune_df.show()

total commune count: 30
+--------------------+
|                name|
+--------------------+
|boulogne-billancourt|
|           châtillon|
|    levallois-perret|
|              vanves|
|               paris|
|           vincennes|
|            bagnolet|
|         romainville|
|   neuilly-sur-seine|
|           les lilas|
|         saint-mandé|
|le pré-saint-gervais|
| issy-les-moulineaux|
|          courbevoie|
|           montreuil|
|       aubervilliers|
|saint-ouen-sur-seine|
|   charenton-le-pont|
|              pantin|
|      ivry-sur-seine|
+--------------------+
only showing top 20 rows



In [52]:
target_commune = [row["name"] for row in filtered_commune_df.select("name").collect()]

print(target_commune)

['boulogne-billancourt', 'châtillon', 'levallois-perret', 'vanves', 'paris', 'vincennes', 'bagnolet', 'romainville', 'neuilly-sur-seine', 'les lilas', 'saint-mandé', 'le pré-saint-gervais', 'issy-les-moulineaux', 'courbevoie', 'montreuil', 'aubervilliers', 'saint-ouen-sur-seine', 'charenton-le-pont', 'pantin', 'ivry-sur-seine', 'asnières-sur-seine', 'gentilly', 'le kremlin-bicêtre', 'arcueil', 'villejuif', 'cachan', 'malakoff', 'montrouge', 'clichy-la-garenne', 'bagneux']


In [53]:
target_commune_df = fr_commune_df.filter(col("name").isin(target_commune))


In [54]:
commune_hospital_df = (
    target_commune_df.alias("c")
    .join(
        hospital_geo_df.alias("h"),
        ST_Contains(col("c.geometry"), col("h.location"))
    )
    .select(
        col("c.name").alias("commune_name"),
        col("c.geometry").alias("commune_geometry"),
        col("h.id").alias("hospital_id")
    )
)

commune_hospital_df.show()


+--------------------+--------------------+-----------+
|        commune_name|    commune_geometry|hospital_id|
+--------------------+--------------------+-----------+
|boulogne-billancourt|POLYGON ((2.22279...| 3189803775|
|boulogne-billancourt|POLYGON ((2.22279...|  783760856|
|boulogne-billancourt|POLYGON ((2.22279...| 1224158095|
|    levallois-perret|POLYGON ((2.27102...| 5719766166|
|               paris|POLYGON ((2.22412...|10883369653|
|               paris|POLYGON ((2.22412...| 4936398378|
|               paris|POLYGON ((2.22412...| 7923610571|
|               paris|POLYGON ((2.22412...| 9073838043|
|               paris|POLYGON ((2.22412...|10061970658|
|               paris|POLYGON ((2.22412...| 8269359942|
|               paris|POLYGON ((2.22412...| 1362787029|
|               paris|POLYGON ((2.22412...| 9247842736|
|               paris|POLYGON ((2.22412...| 4657177274|
|               paris|POLYGON ((2.22412...| 3624753425|
|               paris|POLYGON ((2.22412...| 7568

In [55]:
commune_hospital_count_df = (
    commune_hospital_df
    .groupBy(col("commune_name"), col("commune_geometry"))
    .agg(count("*").alias("hospital_count"))
    .orderBy(desc("hospital_count"))
)

In [56]:
commune_hospital_count_df.show(5)

+--------------------+--------------------+--------------+
|        commune_name|    commune_geometry|hospital_count|
+--------------------+--------------------+--------------+
|               paris|POLYGON ((2.22412...|            87|
|   neuilly-sur-seine|POLYGON ((2.24562...|             4|
|          courbevoie|POLYGON ((2.23059...|             3|
|boulogne-billancourt|POLYGON ((2.22279...|             3|
|   clichy-la-garenne|POLYGON ((2.28737...|             3|
+--------------------+--------------------+--------------+
only showing top 5 rows



In [57]:
casd_near_hos_df.show(5)

+-----------+--------------------+--------------------+------------------+
|         id|                tags|            location|    distance_meter|
+-----------+--------------------+--------------------+------------------+
| 2506232459|{website -> http:...|POINT (2.31573960...| 999.4514393892221|
| 4936398378|{amenity -> clini...|POINT (2.31413310...|1043.5714649372987|
| 9073838043|{healthcare:speci...|POINT (2.3119306 ...|1365.4667383528551|
|10883369653|{name -> Clinique...|POINT (2.32937819...|1746.0477473786032|
|  483569726|{website -> https...|POINT (2.27683169...|2350.2594029700895|
+-----------+--------------------+--------------------+------------------+
only showing top 5 rows



In [None]:
# stop spark session
spark.stop()