# Install jupyter-leaflet in the Extension Manager first

In [None]:
# For conda:
import sys
!conda env update --file environment.yml  --prune

In [None]:
# For pip on windows
import sys
!{sys.executable} -m pip install -r requirements.txt

In [None]:
import json
import os
import shutil
import urllib3

import pandas as pd
import numpy as np

import pyspark
from pyspark.sql import SparkSession

from sedona.register import SedonaRegistrator  
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

from ipyleaflet import Map, basemaps, basemap_to_tiles, MarkerCluster, Marker, AwesomeIcon
from ipywidgets import Layout

In [None]:
spark = SparkSession.\
    builder.\
    master(f"local[*]").\
    appName("Sedona App").\
    config("spark.serializer", KryoSerializer.getName).\
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName) .\
    config('spark.jars.packages',
           'com.acervera.osm4scala:osm4scala-spark3-shaded_2.12:1.0.11,'
           'org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.4.0,'
           'org.datasyslab:geotools-wrapper:1.4.0-28.2') .\
    getOrCreate()

SedonaRegistrator.registerAll(spark)

print(f"Spark version: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")

In [None]:
# url="https://download.geofabrik.de/europe/germany/berlin-latest.osm.pbf"
# url="https://download.geofabrik.de/europe/germany-latest.osm.pbf"
url="https://download.geofabrik.de/europe/germany/nordrhein-westfalen-latest.osm.pbf"
pbf_file = url.split('/')[-1]
if not os.path.exists(pbf_file):
    http = urllib3.PoolManager()
    with open(pbf_file, 'wb') as out:
        r = http.request('GET', url, preload_content=False)
        shutil.copyfileobj(r, out)

base_name = os.path.basename(pbf_file).split(".")[0]

In [None]:
raw_df = spark.read.format("osm.pbf").load(pbf_file)

In [None]:
node_df = raw_df.where("type = 0")
way_df = raw_df.where("type = 1")

In [None]:
node_simple_df = node_df.select("id","latitude", "longitude")
way_simple_df = way_df.drop("id","latitude", "longitude")
way_with_gps_df = way_simple_df.join(
    node_simple_df, way_simple_df.nodes.getItem(0) == node_simple_df.id)

way_simple_df = way_with_gps_df.select("latitude", "longitude", "tags")
node_simple_df = node_df.select("latitude", "longitude", "tags")

In [None]:
charge_and_food_df = way_simple_df.union(node_simple_df).\
    where("element_at(tags, 'amenity') in ('charging_station', 'fast_food')")
charge_and_food_df.cache()

In [None]:
charger_df = charge_and_food_df.select("latitude", "longitude").\
    where("element_at(tags, 'amenity') == 'charging_station' and instr(element_at(tags, 'socket:type2_combo:output'),' kW') > 0 and replace(element_at(tags, 'socket:type2_combo:output'), ' kW','') > 50")
charger_df.createOrReplaceTempView("charger")

fast_food_df = charge_and_food_df.select("latitude", "longitude").\
    where("element_at(tags, 'amenity') == 'fast_food'")
fast_food_df.createOrReplaceTempView("fast_food")

In [None]:
icon_charger = AwesomeIcon(
    name='fa-battery-full',
    marker_color='green',
    icon_color='darkgreen'
)

icon_fast_food = AwesomeIcon(
    name='fa-cutlery',
    marker_color='red',
    icon_color='black'
)

In [None]:
charger_pos = tuple([Marker(location=tuple(row), icon=icon_charger) for row in charger_df.limit(250).collect()])
fast_food_pos  = tuple([Marker(location=tuple(row), icon=icon_fast_food ) for row in fast_food_df.limit(250).collect()])

marker_charger = MarkerCluster(markers=charger_pos)
marker_fast_food = MarkerCluster(markers=fast_food_pos)

latitudes =  np.array([x.location[0] for x in charger_pos]+[x.location[0] for x in fast_food_pos])
longitudes = np.array([x.location[1] for x in charger_pos]+[x.location[1] for x in fast_food_pos])
ce = [latitudes.mean(), longitudes.mean()]

m = Map(
    basemap=basemap_to_tiles(basemaps.OpenStreetMap.Mapnik),
    center=ce,
    layout=Layout(width='50%', height='800px'),
    zoom=7
)

m.add_layer(marker_charger)
m.add_layer(marker_fast_food)

display(m)

In [None]:
epsg_code = "epsg:25832"

charger_geo = spark.sql(f"""
SELECT 
ST_Transform(ST_Point(CAST(latitude AS Decimal(24,20)), CAST(longitude AS Decimal(24,20))), 'epsg:4326', '{epsg_code}') AS charger_point 
from charger""")
charger_geo.cache()
charger_geo.createOrReplaceTempView("charger_geo")

fast_food_geo = spark.sql(f"""
SELECT ST_Transform(ST_Point(CAST(latitude AS Decimal(24,20)), CAST(longitude AS Decimal(24,20))), 'epsg:4326', '{epsg_code}') AS fast_food_point from fast_food
""")
fast_food_geo.cache()
fast_food_geo.createOrReplaceTempView("fast_food_geo")

print(f"Charger count:   {charger_geo.count()}")
print(f"Fast food count: {fast_food_geo.count()}")

In [None]:
food_near_charger_df = spark.sql(f"""
SELECT 
ST_AsGeoJSON(
   ST_Transform(charger_geo.charger_point,     '{epsg_code}', 'epsg:4326')
) charger_point, 
ST_AsGeoJSON(
   ST_Transform(fast_food_geo.fast_food_point, '{epsg_code}', 'epsg:4326')
) fast_food_point, 
ST_Distance(
  charger_geo.charger_point, fast_food_geo.fast_food_point
) distance_meter
FROM charger_geo, fast_food_geo 
WHERE 
ST_Distance(charger_geo.charger_point, fast_food_geo.fast_food_point) <= 100
""").cache()

charger_near_df = food_near_charger_df.select("charger_point").distinct()
charger_near_df.cache()
food_near_df = food_near_charger_df.select("fast_food_point").distinct()
food_near_df.cache()

In [None]:
print(f"Fast food: {food_near_df.count()}")
print(f"Charger: {charger_near_df.count()}")

In [None]:
charger_near_pos = tuple([Marker(location=tuple(json.loads(row["charger_point"])["coordinates"]), icon=icon_charger) for row in charger_near_df.collect()])
burger_near_pos  = tuple([Marker(location=tuple(json.loads(row["fast_food_point"])["coordinates"]), icon=icon_fast_food) for row in food_near_df.collect()])

marker_charger = MarkerCluster(markers=charger_near_pos)
marker_burger = MarkerCluster(markers=burger_near_pos)

latitudes =  np.array([x.location[0] for x in charger_near_pos]+[x.location[0] for x in burger_near_pos])
longitudes = np.array([x.location[1] for x in charger_near_pos]+[x.location[1] for x in burger_near_pos])

ce = [latitudes.mean(), longitudes.mean()]


m = Map(
    basemap=basemap_to_tiles(basemaps.OpenStreetMap.Mapnik),
    center=ce,
    layout=Layout(width='50%', height='800px'),
    zoom=7
)

m.add_layer(marker_charger)
m.add_layer(marker_burger)

display(m)