In [11]:
import pandas as pd

from pyspark.sql.types import DoubleType, ArrayType, StructType, StructField, StringType, IntegerType, FloatType

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,from_unixtime, min, max, to_date, pandas_udf, col, PandasUDFType, lit, round, array_except
from pyspark.sql.types import DoubleType, ArrayType, StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql import functions as F
from pyspark.sql import Window

import os, time
import subprocess
import os,shutil
from datetime import datetime
import pandas as pd
import numpy as np
from IPython.display import display, HTML

import requests
from shapely.geometry import LineString, Polygon
from shapely.ops import transform
import pyproj
from functools import partial
from shapely.geometry import LineString
from shapely.ops import transform
from pyproj import Proj, Transformer
import pandas as pd
import folium
from shapely.geometry import Polygon
from shapely.ops import unary_union
import shapely.geometry
import h3
import h3_pyspark

from sklearn.preprocessing import LabelEncoder

# Settings
project = "project_aiu"


# Getting today's date
today = datetime.today().strftime('%d %B %Y')

# Spark Session Initialization
#shutil.copy("/runtime-addons/cmladdon-2.0.40-b150/log4j.properties", "/etc/spark/conf/") # Setting logging properties

spark = SparkSession.builder \
    .appName("OSN ADEP ADES Identification") \
    .config("spark.log.level", "ERROR")\
    .config("spark.hadoop.fs.azure.ext.cab.required.group", "eur-app-aiu-dev") \
    .config("spark.kerberos.access.hadoopFileSystems", "abfs://storage-fs@cdpdldev0.dfs.core.windows.net/data/project/aiu.db/unmanaged") \
    .config("spark.driver.cores", "1") \
    .config("spark.driver.memory", "8G") \
    .config("spark.executor.memory", "10G") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.instances", "2") \
    .config("spark.dynamicAllocation.maxExecutors", "6") \
    .config("spark.network.timeout", "800s") \
    .config("spark.executor.heartbeatInterval", "400s") \
    .config("spark.driver.maxResultSize", "4g") \
    .enableHiveSupport() \
    .getOrCreate()

# Get environment variables
engine_id = os.getenv('CDSW_ENGINE_ID')
domain = os.getenv('CDSW_DOMAIN')

# Format the URL
url = f"https://spark-{engine_id}.{domain}"

# Display the clickable URL
display(HTML(f'<a href="{url}">{url}</a>'))

  from scipy.sparse import issparse
Setting spark.hadoop.yarn.resourcemanager.principal to quinten.goens


In [12]:
list_of_airports = [
    # Top 30 Y2D 2024 (25 July 2024) - As from https://ansperformance.eu/traffic/
    'LTFM', 'EHAM', 'EGLL', 'LFPG', 'EDDF', 'LEMD', 'EDDM', 'LIRF', 'EGKK',
    'LSZH', 'LGAV', 'EIDW', 'LOWW', 'EKCH', 'LEPA', 'LTFJ', 'LPPT', 'ENGM',
    'LIMC', 'LFPO', 'LTAI', 'EGSS', 'ESSA', 'EBBR', 'EGCC', 'EDDB', 'LSGG',
    'EPWA', 'LEMG',

    # BRA - EUR airports
    'SBBR', 'SBGR', 'SBSP', 'SBKP', 'SBRJ', 'SBGL', 'SBCF', 'SBSV', 'SBPA', 'SBCT',

    # USA - EUR airports
    "KSLC", "KATL", "KDTW", "KIAH", "KCLT", "KMSP", "KSEA", "KSFO", "KORD", "KLAX",
    "KPDX", "KPHL", "KIAD", "KPHX", "KDFW", "KDEN", "KSAN", "KHOU", "KBNA", "KSTL",
    "KBWI", "KDCA", "KMIA", "KMDW", "KMEM", "KDAL", "KBOS", "KLGA", "KLAS", "KTPA",
    "KJFK", "KFLL", "KEWR", "KMCO",

    "LEMD", "EFHK", "ENGM", "LIRF", "LFPO", "EKCH", "LEBL", "EDDM", "LSGG", "ESSA",
    "LOWW", "EHAM", "EGLL", "LSZH", "EDDL", "LFPG", "EDDH", "EBBR", "EPWA", "EDDF",
    "LGAV", "LEMG", "LIMC", "LFMN", "LEPA", "EDDB", "LROP", "EGGW", "EGSS", "EDDK",
    "LPPT", "LTFM", "EIDW", "EGKK",

    # CHN - EUR report
    "ZBAA", "ZSPD", "ZGGG", "ZGSZ", "ZUUU", "ZPPP", "ZLXY", "ZUCK", "ZSHC", "ZSSS",
    "ZYCC", "ZSNJ", "LTFM", "EDDF", "EHAM", "LFPG", "EGLL", "LEMD", "EDDM", "LEBL",
    "LIRF", "EGKK", "LSZH", "EFHK",

    # PBWG
    "WSSS", "VTBS", "RJAA", "RJTT", "SBGR", "SBBR"
]

# Remove duplicates by converting to a set
unique_airports = set(list_of_airports)

# Format the list for the SQL IN clause
formatted_airports = ", ".join(f"'{airport}'" for airport in unique_airports)
formatted_airports = f"({formatted_airports})"

airports_df = spark.sql(f"""
    SELECT id, ident, iso_country, continent, latitude_deg, longitude_deg, elevation_ft, type
    FROM {project}.oa_airports
    WHERE (type = 'large_airport' OR type = 'medium_airport') AND 
    ident IN {formatted_airports}
""")


import math
import json

def generate_circle_polygon(lon, lat, radius_nautical_miles, num_points=360):
    """
    Generate a polygon in GeoJSON format around a given latitude and longitude
    with a specified radius in nautical miles.
    
    :param lat: Latitude of the center point
    :param lon: Longitude of the center point
    :param radius_nautical_miles: Radius in nautical miles
    :param num_points: Number of points to generate for the polygon
    :return: A dictionary representing the polygon in GeoJSON format
    """
    # Convert radius from nautical miles to kilometers
    radius_km = radius_nautical_miles * 1.852
    
    # Function to convert from degrees to radians
    def degrees_to_radians(degrees):
        return degrees * math.pi / 180
    
    # Function to calculate the next point given a distance and bearing
    def calculate_point(lon, lat, distance_km, bearing):
        R = 6371.01  # Earth's radius in kilometers
        lat_rad = degrees_to_radians(lat)
        lon_rad = degrees_to_radians(lon)
        distance_rad = distance_km / R
        bearing_rad = degrees_to_radians(bearing)
        
        lat_new_rad = math.asin(math.sin(lat_rad) * math.cos(distance_rad) +
                                math.cos(lat_rad) * math.sin(distance_rad) * math.cos(bearing_rad))
        lon_new_rad = lon_rad + math.atan2(math.sin(bearing_rad) * math.sin(distance_rad) * math.cos(lat_rad),
                                           math.cos(distance_rad) - math.sin(lat_rad) * math.sin(lat_new_rad))
                                           
        lat_new = math.degrees(lat_new_rad)
        lon_new = math.degrees(lon_new_rad)
        return [lon_new, lat_new]
    
    # Generate points
    points = []
    for i in range(num_points):
        bearing = 360 / num_points * i
        point = calculate_point(lon, lat, radius_km, bearing)
        points.append(point)
    points.append(points[0])  # Close the polygon by repeating the first point
    
    # Create GeoJSON
    geojson = {
        "type": "Polygon",
        "coordinates": [points]
    }
    
    geojson_str = json.dumps(geojson)
    
    return geojson_str

def fill_polygon_with_hexagons(polygon_json, resolution=8):
    """Fills a polygon defined by the given JSON with hexagons with defined resolution.

    Args:
        polygon_json (str): A JSON string defining the polygon.
        resolution (int): The H3 resolution for the hexagons.

    Returns:
        list: The list contains the compact hexagon IDs.
    """
    polygon = json.loads(polygon_json)
    hexagons = h3.polyfill(polygon, resolution, geo_json_conformant=True)
    return hexagons

def fill_polygon_with_compact_hexagons(polygon_json, resolution=8):
    """Fills a polygon defined by the given JSON with compact hexagons.

    Args:
        polygon_json (str): A JSON string defining the polygon.
        resolution (int): The H3 resolution for the hexagons.

    Returns:
        tuple: A tuple containing two lists. The first list contains the compact hexagon IDs,
               and the second list contains the resolutions of these hexagons.
    """
    hexagons = fill_polygon_with_hexagons(polygon_json, resolution)
    compact_hexagons = list(h3.compact(hexagons))
    compact_hexagons_resolutions = [h3.h3_get_resolution(h) for h in compact_hexagons]
    return compact_hexagons, compact_hexagons_resolutions

generate_circle_polygon_udf = udf(generate_circle_polygon, StringType())

compact_hex_result_type = StructType([
    StructField("compact_hexagons", ArrayType(StringType()), False),
    StructField("compact_hexagons_resolutions", ArrayType(IntegerType()), False)
])

fill_polygon_with_compact_hexagons_udf = udf(fill_polygon_with_compact_hexagons, compact_hex_result_type)

Hive Session ID = 26ee71ba-7339-4e81-9016-534210eb08f2


In [None]:
for max_resolution in [8]:
    num_points = 720
    radia_nm = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
    area_type = [ f"C{x+10}" for x in radia_nm]

    df = pd.DataFrame.from_dict({ #c stands for circle
        'max_resolution' : max_resolution,
        'number_of_points_c' : num_points, 
        'area_type' : area_type,
        'min_c_radius_nm' : radia_nm
    })

    df['max_c_radius_nm'] = df['min_c_radius_nm'].shift(-1).fillna(np.max(radia_nm)+10)
    df['min_c_radius_nm'] = df['min_c_radius_nm'].astype(float)
    df['max_c_radius_nm'] = df['max_c_radius_nm'].astype(float)
    df['m_col'] = 1

    # Define the schema corresponding to your Pandas DataFrame structure
    schema = StructType([
        StructField("max_resolution", IntegerType(), True),
        StructField("number_of_points_c", IntegerType(), True),
        StructField("area_type", StringType(), True),
        StructField("min_c_radius_nm", FloatType(), True),
        StructField("max_c_radius_nm", FloatType(), True),
        StructField("m_col", IntegerType(), True)
    ])

    # Create PySpark DataFrame from Pandas DataFrame using the defined schema
    sdf = spark.createDataFrame(df, schema=schema)
    airports_df_m = airports_df.withColumn('m_col', lit(1)).join(sdf, on = 'm_col', how = 'left')

    res = airports_df_m.withColumn(
        "inner_circle_polygon",
        generate_circle_polygon_udf(
            F.col('longitude_deg'),
            F.col('latitude_deg'),
            F.col('min_c_radius_nm'), 
            F.col('number_of_points_c')
        )).withColumn(
        "outer_circle_polygon",
        generate_circle_polygon_udf(
            F.col('longitude_deg'),
            F.col('latitude_deg'),
            F.col('max_c_radius_nm'), 
            F.col('number_of_points_c')
        )).withColumn(
            "inner_circle_hex_ids",
            h3_pyspark.polyfill(
                F.col("inner_circle_polygon"),
                F.col("max_resolution"),
                F.lit(True)
        )).withColumn(
            "outer_circle_hex_ids",
            h3_pyspark.polyfill(
                F.col("outer_circle_polygon"),
                F.col("max_resolution"),
                F.lit(True)
        )).withColumn(
            "hex_id",
            array_except(col("outer_circle_hex_ids"), col("inner_circle_hex_ids"))
        ).drop(
            "inner_circle_polygon", 
            "outer_circle_polygon", 
            "inner_circle_hex_ids", 
            "outer_circle_hex_ids").toPandas()

    #res = res.drop(["inner_circle_polygon", "outer_circle_polygon", "inner_circle_hex_ids", "outer_circle_hex_ids"], axis=1)
    res.to_parquet(f'../../data/airport_hex/airport_concentric_c_hex_res_{max_resolution}_RQ_request.arrow')



In [3]:
import folium
from geojson import Feature, Point, FeatureCollection
import json
import pandas as pd
import matplotlib
import h3

def hexagons_dataframe_to_geojson(df_hex, file_output=None):
    """
    Produce the GeoJSON for a dataframe, constructing the geometry from the "hex_id" column
    and including all other columns as properties.
    """
    list_features = []

    for i, row in df_hex.iterrows():
        try:
            geometry_for_row = {"type": "Polygon", "coordinates": [h3.h3_to_geo_boundary(h=row["hex_id"], geo_json=True)]}
            properties = row.to_dict()  # Convert all columns to a dictionary
            properties.pop("hex_id", None)  # Remove hex_id as it's already used in geometry
            feature = Feature(geometry=geometry_for_row, id=row["hex_id"], properties=properties)
            list_features.append(feature)
        except Exception as e:
            print(f"An exception occurred for hex {row['hex_id']}: {e}")

    feat_collection = FeatureCollection(list_features)
    geojson_result = json.dumps(feat_collection)
    return geojson_result


def get_color(custom_cm, val, vmin, vmax):
    return matplotlib.colors.to_hex(custom_cm((val-vmin)/(vmax-vmin)))

def choropleth_map(df_aggreg, column_name="value", border_color='black', fill_opacity=0.7, color_map_name="Blues", initial_map=None, initial_location=[47, 4], initial_zoom=5.5, tooltip_columns=None):
    """
    Creates choropleth maps given the aggregated data. 
    initial_map can be an existing map to draw on top of.
    initial_location and initial_zoom control the initial view of the map.
    tooltip_columns is a list of column names to display in a tooltip.
    """
    # colormap
    min_value = df_aggreg[column_name].min()
    max_value = df_aggreg[column_name].max()
    mean_value = df_aggreg[column_name].mean()
    print(f"Colour column min value {min_value}, max value {max_value}, mean value {mean_value}")
    print(f"Hexagon cell count: {df_aggreg['hex_id'].nunique()}")

    # Create map if not provided
    if initial_map is None:
        initial_map = folium.Map(location=initial_location, zoom_start=initial_zoom, tiles="cartodbpositron")

    # Create geojson data from dataframe
    geojson_data = hexagons_dataframe_to_geojson(df_hex=df_aggreg)

    # Get colormap
    custom_cm = matplotlib.cm.get_cmap(color_map_name)

    # Add GeoJson to map
    folium.GeoJson(
        geojson_data,
        style_function=lambda feature: {
            'fillColor': get_color(custom_cm, feature['properties'][column_name], vmin=min_value, vmax=max_value),
            'color': border_color,
            'weight': 1,
            'fillOpacity': fill_opacity
        },
        tooltip=folium.features.GeoJsonTooltip(fields=tooltip_columns) if tooltip_columns else None,
        name="Choropleth"
    ).add_to(initial_map)

    return initial_map

In [2]:
import pandas as pd
res = pd.read_parquet(f'../../data/airport_hex/airport_concentric_c_hex_res_7_RQ_request.arrow')

In [4]:
res = res.explode("hex_id")

In [6]:
fig = choropleth_map(res[res['ident']=='EGLL'], column_name = "max_c_radius_nm")

Colour column min value 10.0, max value 110.0, mean value 78.20936584472656
Hexagon cell count: 28152


In [10]:
fig.save('example_EGLL_for_RQ.html')