In [1]:
from resolutionselector import select_resolution
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import (
    udf, col, explode, lit, collect_list
)
from pyspark.sql.functions import (
    udf, col, explode, monotonically_increasing_id, radians,
    sin, cos, sqrt, atan2, lit, collect_list, expr, posexplode
)

from pyspark.sql.types import (
    StringType, ArrayType, StructType, StructField, DoubleType, IntegerType
)
from pyspark.sql import Window

import h3
import pandas as pd
import numpy as np
from datetime import datetime


# -----------------------------------------------------------------------------
# Spark Configuration
# -----------------------------------------------------------------------------
spark = SparkSession.builder \
    .appName("CloseEncountersH3") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "10g") \
    .config("spark.rpc.message.maxSize", 1028) \
    .getOrCreate()



25/04/25 17:10:04 WARN Utils: Your hostname, Quintens-Laptop.local resolves to a loopback address: 127.0.0.1; using 10.130.79.222 instead (on interface en0)
25/04/25 17:10:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/25 17:10:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [8]:
# -----------------------------------------------------------------------------
# Settings
# -----------------------------------------------------------------------------
distance_nm = 5
resolution = 6  # select_resolution(distance_km)
earth_radius_km = 6378
print(f"The selected resolution for a distance of {distance_nm} NM is: {resolution}")

# -----------------------------------------------------------------------------
# Load and Filter Data
# -----------------------------------------------------------------------------
coords_df = pd.read_parquet('~/Repos/close-encounters/data/flight_profiles_cpf_20240701_filtered.parquet')
f = np.logical_and(coords_df.TIME_OVER >= datetime(2024,7,1,12,0,0), coords_df.TIME_OVER<= datetime(2024,7,1,13,0,0))
coords_df = coords_df[f]
coords_df['SEGMENT_ID'] = coords_df.index
coords_df = coords_df[coords_df.FLIGHT_LEVEL > 250]
coords_df = coords_df[['FLIGHT_ID', 'SEGMENT_ID', 'LONGITUDE', 'LATITUDE', 'TIME_OVER', 'FLIGHT_LEVEL']].rename(
    columns={
        'LATITUDE': 'latitude', 
        'LONGITUDE': 'longitude'
        }
    )
coords_df = spark.createDataFrame(coords_df)
coords_df = coords_df.repartition(100, ["FLIGHT_ID", "SEGMENT_ID"])
# -----------------------------------------------------------------------------
# Define UDFs for H3
# -----------------------------------------------------------------------------
def lat_lon_to_h3(lat, lon, resolution):
    return h3.latlng_to_cell(lat, lon, resolution)

def grid_disk_k1(cell):
    return h3.grid_disk(cell, k=1)

lat_lon_to_h3_udf = udf(lat_lon_to_h3, StringType())
grid_disk_k1_udf = udf(grid_disk_k1, ArrayType(StringType()))

# Add H3 index and neighbors
coords_df = coords_df.withColumn("h3_index", lat_lon_to_h3_udf(col("latitude"), col("longitude"), lit(resolution)))
coords_df = coords_df.withColumn("h3_neighbours", grid_disk_k1_udf(col("h3_index")))

# -----------------------------------------------------------------------------
# Explode neighbors and group by h3_neighbour to collect IDs when there's multiple FLIGHT_ID in a cell
# -----------------------------------------------------------------------------
exploded_df = coords_df.withColumn("h3_neighbour", explode(col("h3_neighbours")))

grouped_df = (exploded_df.groupBy("h3_neighbour")
              .agg(F.countDistinct("FLIGHT_ID").alias("flight_count"),
                   F.collect_list("SEGMENT_ID").alias("id_list"))
              .filter(F.col("flight_count") > 1)
              .drop("flight_count"))

grouped_df = grouped_df.filter(F.size("id_list") > 1)

# -----------------------------------------------------------------------------
# Create pairwise combinations using self-join on indexed exploded DataFrame
# -----------------------------------------------------------------------------
# Explode id_list to individual rows and add index within each h3 group
df_exploded = grouped_df.withColumn("SEGMENT_ID", explode("id_list"))
window_spec = Window.partitionBy("h3_neighbour").orderBy("SEGMENT_ID")
df_indexed = df_exploded.withColumn("idx", F.row_number().over(window_spec))

# Self-join to form unique unordered ID pairs
df_pairs = (
    df_indexed.alias("df1")
    .join(
        df_indexed.alias("df2"),
        (F.col("df1.h3_neighbour") == F.col("df2.h3_neighbour")) &
        (F.col("df1.idx") < F.col("df2.idx"))
    )
    .select(
        F.col("df1.h3_neighbour").alias("h3_group"),
        F.col("df1.SEGMENT_ID").alias("ID1"),
        F.col("df2.SEGMENT_ID").alias("ID2")
    )
)

# -----------------------------------------------------------------------------
# Clean Pairs, Create Unique Pair ID
# -----------------------------------------------------------------------------
df_pairs = df_pairs.filter(col("ID1") != col("ID2")) # should not be necessary as we join on < not <=
df_pairs = df_pairs.withColumn(
    "ID",
    F.concat_ws("_", F.array_sort(F.array(col("ID1").cast("int"), col("ID2").cast("int"))))
)
from pyspark.sql import Window
from pyspark.sql import functions as F

# Define a window partitioned by ID, ordering arbitrarily (or by some column if needed)
window_spec = Window.partitionBy("ID").orderBy(F.monotonically_increasing_id())

# Add row number to each partition
df_pairs = df_pairs.withColumn("row_num", F.row_number().over(window_spec))

# Keep only the first row per ID
df_pairs = df_pairs.filter(F.col("row_num") == 1).drop("row_num")

# -----------------------------------------------------------------------------
# Join with Original Coordinates for Each ID
# -----------------------------------------------------------------------------
coords_sdf1 = coords_df.withColumnRenamed("SEGMENT_ID", "ID1") \
    .withColumnRenamed("latitude", "lat1") \
    .withColumnRenamed("longitude", "lon1") \
    .withColumnRenamed("TIME_OVER", "time1") \
    .withColumnRenamed("FLIGHT_LEVEL", 'flight_lvl1') \
    .withColumnRenamed("FLIGHT_ID", "flight_id1") \
    .select("ID1", "lat1", "lon1", "time1", "flight_lvl1", "flight_id1")

coords_sdf2 = coords_df.withColumnRenamed("SEGMENT_ID", "ID2") \
    .withColumnRenamed("latitude", "lat2") \
    .withColumnRenamed("longitude", "lon2") \
    .withColumnRenamed("TIME_OVER", "time2") \
    .withColumnRenamed("FLIGHT_LEVEL", 'flight_lvl2') \
    .withColumnRenamed("FLIGHT_ID", "flight_id2") \
    .select("ID2", "lat2", "lon2", "time2", "flight_lvl2", "flight_id2")

coords_sdf1 = coords_sdf1.repartition(100, "ID1")
coords_sdf2 = coords_sdf2.repartition(100, "ID2")

df_pairs = df_pairs.join(coords_sdf1, on="ID1", how="left")
df_pairs = df_pairs.join(coords_sdf2, on="ID2", how="left")

# -----------------------------------------------------------------------------
# Calculate and filter based on time differense (s)
# -----------------------------------------------------------------------------
df_pairs = df_pairs.withColumn('time_diff_s', F.unix_timestamp(F.col("time1")) - F.unix_timestamp(F.col("time2")))
df_pairs = df_pairs.filter(F.abs(F.col('time_diff_s')) < 6)

# -----------------------------------------------------------------------------
# Calculate and filter based on height differense (s)
# -----------------------------------------------------------------------------
df_pairs = df_pairs.withColumn('FL_diff', F.col("flight_lvl1") - F.col("flight_lvl2"))
df_pairs = df_pairs.filter(F.abs(F.col('FL_diff')) <= 9.5)

# -----------------------------------------------------------------------------
# Calulate and filter based on distance (km)
# -----------------------------------------------------------------------------

df_pairs = df_pairs.withColumn(
    "distance_nm",
    0.539957 * 2 * earth_radius_km * atan2(
        sqrt(
            (sin(radians(col("lat2")) - radians(col("lat1"))) / 2)**2 +
            cos(radians(col("lat1"))) * cos(radians(col("lat2"))) *
            (sin(radians(col("lon2")) - radians(col("lon1"))) / 2)**2
        ),
        sqrt(1 - (
            (sin(radians(col("lat2")) - radians(col("lat1"))) / 2)**2 +
            cos(radians(col("lat1"))) * cos(radians(col("lat2"))) *
            (sin(radians(col("lon2")) - radians(col("lon1"))) / 2)**2
        ))
    )
)

df_pairs = df_pairs.filter(col('distance_nm') <= lit(distance_nm))

# -----------------------------------------------------------------------------
# Fetch sample
# -----------------------------------------------------------------------------

df_pairs.cache()
df = df_pairs.toPandas()
#print(f"Number of unique ID pairs: {df_pairs.count()}")


The selected resolution for a distance of 5 NM is: 6


25/04/25 17:15:27 WARN TaskSetManager: Stage 105 contains a task of very large size (1543 KiB). The maximum recommended task size is 1000 KiB.
25/04/25 17:15:28 WARN TaskSetManager: Stage 106 contains a task of very large size (1543 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [9]:
df = df_pairs.toPandas()

In [11]:
df

Unnamed: 0,ID2,ID1,h3_group,ID,lat1,lon1,time1,flight_lvl1,flight_id1,lat2,lon2,time2,flight_lvl2,flight_id2,time_diff_s,FL_diff,distance_nm
0,5344693,1477301,861e1d61fffffff,1477301_5344693,46.483333,20.140833,2024-07-01 12:24:55,371.0,273709925.0,46.4675,20.024722,2024-07-01 12:24:55,380.0,273705851.0,0,-9.0,4.89953
1,4264077,2001145,861f8ba07ffffff,2001145_4264077,46.965,12.386944,2024-07-01 12:42:05,310.0,273714985.0,46.963889,12.389167,2024-07-01 12:42:06,310.0,273713103.0,-1,0.0,0.113002
2,2190884,1230097,861f86ba7ffffff,1230097_2190884,48.128056,5.518889,2024-07-01 12:46:09,370.0,273712458.0,48.116389,5.454722,2024-07-01 12:46:06,361.0,273713022.0,3,9.0,2.668393
3,5866755,5764432,863f4a8c7ffffff,5764432_5866755,33.109444,30.951389,2024-07-01 12:25:31,380.0,273705272.0,33.099444,30.950833,2024-07-01 12:25:27,371.0,273710784.0,4,9.0,0.601715
4,1880209,950881,861f800c7ffffff,950881_1880209,47.883333,7.006667,2024-07-01 12:06:45,371.0,273712803.0,47.903889,6.984444,2024-07-01 12:06:45,380.0,273709701.0,0,-9.0,1.525978
5,1020422,157940,86396189fffffff,157940_1020422,43.995833,2.730833,2024-07-01 12:00:56,381.0,273710833.0,43.971944,2.805278,2024-07-01 12:00:56,390.0,273710857.0,0,-9.0,3.525297
6,4183379,1686647,861ed921fffffff,1686647_4183379,38.886389,26.326389,2024-07-01 12:31:16,360.0,273713669.0,38.899722,26.339167,2024-07-01 12:31:18,351.0,273709150.0,-2,9.0,0.999801
7,4665435,274746,863969997ffffff,274746_4665435,43.6725,6.405833,2024-07-01 12:32:10,350.0,273713497.0,43.725556,6.318611,2024-07-01 12:32:10,341.0,273713124.0,0,9.0,4.95338
8,3564640,2841760,861ef566fffffff,2841760_3564640,43.860278,21.114167,2024-07-01 12:27:54,341.0,273711475.0,43.803056,21.085,2024-07-01 12:27:58,350.0,273711509.0,-4,-9.0,3.664557
9,5165865,2667568,861eb5adfffffff,2667568_5165865,43.877778,10.381944,2024-07-01 12:04:52,370.0,273709181.0,43.9025,10.308889,2024-07-01 12:04:52,379.0,273710047.0,0,-9.0,3.496058


In [20]:
coords_df = pd.read_parquet('~/Repos/close-encounters/data/flight_profiles_cpf_20240701_filtered.parquet')
coords_df['SEGMENT_ID'] = coords_df.index
coords_df = coords_df[coords_df.FLIGHT_LEVEL > 250]
coords_df = coords_df[['FLIGHT_ID', 'SEGMENT_ID', 'LONGITUDE', 'LATITUDE', 'TIME_OVER', 'FLIGHT_LEVEL', 'AIRCRAFT_TYPE']].rename(
    columns={
        'LATITUDE': 'latitude', 
        'LONGITUDE': 'longitude'
        }
    )

c1 = coords_df[['FLIGHT_ID', 'SEGMENT_ID', 'AIRCRAFT_TYPE']].rename(
    {'FLIGHT_ID':'FLIGHT_ID1', 'SEGMENT_ID':'ID1', 'AIRCRAFT_TYPE':'AC1'},axis=1)
c2 = coords_df[['FLIGHT_ID', 'SEGMENT_ID', 'AIRCRAFT_TYPE']].rename(
    {'FLIGHT_ID':'FLIGHT_ID2', 'SEGMENT_ID':'ID2', 'AIRCRAFT_TYPE':'AC2'},axis=1)

df = df.merge(c1, how='left').merge(c2, how='left')

In [21]:
coords_df['FLIGHT_ID'] = coords_df['FLIGHT_ID'].apply(str) + '_id'
df['FLIGHT_ID1'] = df['FLIGHT_ID1'].apply(str) + '_id'
df['FLIGHT_ID2'] = df['FLIGHT_ID2'].apply(str) + '_id'
coords_df = coords_df[coords_df.FLIGHT_ID.isin(df.FLIGHT_ID1.to_list() + df.FLIGHT_ID2.to_list())]

In [22]:
coords_df.to_parquet('coords.parquet')

In [23]:
df.to_parquet('pairs.parquet')

In [36]:
print(df[df.ID2 == 2557313].lon2.values[0])

0.6858333333333333


In [35]:
print(coords_df[coords_df.SEGMENT_ID == 2557313].longitude.values[0])

0.6858333333333333


In [135]:
df_pairs.toPandas()

Unnamed: 0,ID2,ID1,h3_group,ID,lat1,lon1,time1,flight_lvl1,flight_id1,lat2,lon2,time2,flight_lvl2,flight_id2,time_diff_s,FL_diff,distance_nm
0,3761964,1697528,861f98a17ffffff,1697528_3761964,45.657778,8.363333,2024-07-01 11:25:13,340.0,273711593.0,45.640833,8.326389,2024-07-01 11:25:13,349.0,273709970.0,0,-9.0,1.856592
