# Spark Query for Terminal Area IFF Flight Record 

In [1]:
# Set spark environments
import os
os.environ["SPARK_HOME"] = '/home/ypang6/anaconda3/lib/python3.7/site-packages/pyspark'
os.environ["PYTHONPATH"] = '/home/ypang6/anaconda3/bin/python3.7'
os.environ['PYSPARK_PYTHON'] = '/home/ypang6/anaconda3/bin/python3.7'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/home/ypang6/anaconda3/bin/python3.7'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [3]:
spark = SparkSession \
        .builder \
        .appName("Terminal_Area_Flight_Data_Query") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

## Custom schema of the data
### References to IFF_2.13_Specs_Sherlock.doc

In [4]:
myschema = StructType([
    StructField("recType", ShortType(), True),  #1  //track point record type number
    StructField("recTime", StringType(), True),  #2  //seconds since midnigght 1/1/70 UTC
    StructField("fltKey", LongType(), True),  #3  //flight key
    StructField("bcnCode", IntegerType(), True),  #4  //digit range from 0 to 7
    StructField("cid", IntegerType(), True),  #5  //computer flight id
    StructField("Source", StringType(), True),  #6  //source of the record 
    StructField("msgType", StringType(), True),  #7
    StructField("acId", StringType(), True),  #8  //call sign
    StructField("recTypeCat", IntegerType(), True),  #9
    StructField("lat", DoubleType(), True),  #10
    StructField("lon", DoubleType(), True),  #11 
    StructField("alt", DoubleType(), True),  #12  //in 100s of feet
    StructField("significance", ShortType(), True),  #13 //digit range from 1 to 10
    StructField("latAcc", DoubleType(), True),  #14
    StructField("lonAcc", DoubleType(), True),  #15
    StructField("altAcc", DoubleType(), True),  #16
    StructField("groundSpeed", IntegerType(), True),  #17 //in knots
    StructField("course", DoubleType(), True),  #18  //in degrees from true north
    StructField("rateOfClimb", DoubleType(), True),  #19  //in feet per minute
    StructField("altQualifier", StringType(), True),  #20  //Altitude qualifier (the “B4 character”)
    StructField("altIndicator", StringType(), True),  #21  //Altitude indicator (the “C4 character”)
    StructField("trackPtStatus", StringType(), True),  #22  //Track point status (e.g., ‘C’ for coast)
    StructField("leaderDir", IntegerType(), True),  #23  //int 0-8 representing the direction of the leader line
    StructField("scratchPad", StringType(), True),  #24
    StructField("msawInhibitInd", ShortType(), True),  #25 // MSAW Inhibit Indicator (0=not inhibited, 1=inhibited)
    StructField("assignedAltString", StringType(), True),  #26 
    StructField("controllingFac", StringType(), True),  #27
    StructField("controllingSec", StringType(), True),  #28
    StructField("receivingFac", StringType(), True),  #29
    StructField("receivingSec", StringType(), True),  #30
    StructField("activeContr", IntegerType(), True),  #31  // the active control number
    StructField("primaryContr", IntegerType(), True),  #32  //The primary(previous, controlling, or possible next)controller number
    StructField("kybrdSubset", StringType(), True),  #33  //identifies a subset of controller keyboards
    StructField("kybrdSymbol", StringType(), True),  #34  //identifies a keyboard within the keyboard subsets
    StructField("adsCode", IntegerType(), True),  #35  //arrival departure status code
    StructField("opsType", StringType(), True),  #36  //Operations type (O/E/A/D/I/U)from ARTS and ARTS 3A data
    StructField("airportCode", StringType(), True),  #37 
    StructField("trackNumber", IntegerType(), True),  #38
    StructField("tptReturnType", StringType(), True),  #39
    StructField("modeSCode", StringType(), True)  #40
])

In [5]:
date = 20190801

In [6]:
import glob
file_path = glob.glob("/media/ypang6/paralab/Research/data/ATL/IFF_ATL+ASDEX_{}*.csv".format(date))[0]

In [7]:
df = spark.read.csv(file_path, header=False, sep=",", schema=myschema)

In [8]:
import pandas as pd
start_date = 20190801
end_date = 20190831
for date in range(start_date, end_date+1):
    #print(date)
    pass

## Count row numbers of the raw data

In [9]:
df.count()

1876430

## Show column names

In [10]:
df.printSchema()

root
 |-- recType: short (nullable = true)
 |-- recTime: string (nullable = true)
 |-- fltKey: long (nullable = true)
 |-- bcnCode: integer (nullable = true)
 |-- cid: integer (nullable = true)
 |-- Source: string (nullable = true)
 |-- msgType: string (nullable = true)
 |-- acId: string (nullable = true)
 |-- recTypeCat: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- alt: double (nullable = true)
 |-- significance: short (nullable = true)
 |-- latAcc: double (nullable = true)
 |-- lonAcc: double (nullable = true)
 |-- altAcc: double (nullable = true)
 |-- groundSpeed: integer (nullable = true)
 |-- course: double (nullable = true)
 |-- rateOfClimb: double (nullable = true)
 |-- altQualifier: string (nullable = true)
 |-- altIndicator: string (nullable = true)
 |-- trackPtStatus: string (nullable = true)
 |-- leaderDir: integer (nullable = true)
 |-- scratchPad: string (nullable = true)
 |-- msawInhibitInd: short (nullable = true)
 

## Select columns

In [11]:
cols = ['recType', 'recTime', 'acId', 'lat', 'lon', 'alt']

In [12]:
df = df.select(*cols).filter(df['recType']==3).withColumn("recTime", df['recTime'].cast(IntegerType()))

# Query

In [15]:
timestamp = 1567346400  # 2PM 
FAF_9L = (33.63465, -84.54984166666667)  # waypoint NIVII (FAF of KATL runway 9L)
FAF_9R = (33.63172777777777, -84.54940555555555)  # waypoint BURNY (FAF of KATL runway 9R)
IF_9R = (33.631397222222226, -84.71883611111112)  # waypoint GGUYY (IF of KATL runway 9R)
IAF_9L = (33.63394722222222, -84.86316388888888) # waypoint RYENN (IAF of KATL runway 9L)
IAF_9R = (33.63093611111111, -84.86295) # waypoint ANDIY (IAF of KATL runway 9R)
IF_27R = (33.63430555555556, -84.12904722222221) # waypoint MAASN (IF of KATL runway 27R)
IAF_27R = (33.633874999999996, -83.99111666666667) # waypoint YOUYU (IAF of KATL runway 27R)
radius = 0.001

### Query based on Timestamp

In [14]:
df.filter(df['recTime'] == timestamp).show()

+-------+-------+----+---+---+---+
|recType|recTime|acId|lat|lon|alt|
+-------+-------+----+---+---+---+
+-------+-------+----+---+---+---+



### Number of flight in the airspace at the given timestamp

In [None]:
df.filter(df['recTime'] == timestamp).count()

### Flight record of a given callsign

In [None]:
df.filter(df['acId'] == 'UAL533').count()

In [None]:
df.filter(df['alt'] == 20.06).count()

### Flight records of multiple given callsigns

In [None]:
import pyspark.sql.functions as f
df.where(f.col("acId").isin({"CLX56L", "DAL1323"})).count()

### Rectangular query at given location and timestamp

In [None]:
df.filter(df['recTime'] == timestamp).\
filter(df['lat']>IAF_9L[0]-radius).filter(df['lat']<IAF_9L[0]+radius).\
filter(df['lon']>IAF_9L[1]-radius).filter(df['lon']<IAF_9L[1]+radius).\
count()

In [None]:
df.filter(df['lat']>=IAF_9R[0]-radius).filter(df['lat']<=IAF_9R[0]+radius).\
filter(df['lon']>=IAF_9R[1]-radius).filter(df['lon']<=IAF_9R[1]+radius).\
count()

### Number of flight callsigns in the rawdata

In [None]:
df.select("acId").distinct().count()

### Return callsigns within a radius in lat/lon degrees
* GeoSpark needed here

In [None]:
df.show()

### List of Call Signs

In [None]:
cs_list = [x['acId'] for x in df.select("acId").distinct().collect()]

In [None]:
len(cs_list)

## Seperate departure/arrival aircrafts

In [None]:
cs_dep = []
cs_arr = []
cs_unknown = []
for x in df.select('acId').distinct().collect():
    temp_df = df.filter(df['acId'] == x['acId'])
    if temp_df.select(['alt']).take(1)[0][0] == 10.06:
        cs_dep.append(x['acId'])
    elif temp_df.orderBy(temp_df.recTime.desc()).select('alt').take(1)[0][0] == 10.06:
        cs_arr.append(x['acId'])
    else:
        cs_unknown.append(x['acId'])

In [None]:
cs_arr[:10]

## Find landing points close to FAF_9R

In [None]:
df_arr = df.filter(df.acId.isin(cs_arr) == True)

In [None]:
faf9rflight = df_arr.filter(df_arr['lat']>=FAF_9R[0]-radius).filter(df_arr['lat']<=FAF_9R[0]+radius).\
filter(df_arr['lon']>=FAF_9R[1]-radius).filter(df_arr['lon']<=FAF_9R[1]+radius)
faf9rflight.count()

### Save into csv

In [None]:
faf9rflight.coalesce(1).write.csv('./faf9rflights')

In [None]:
df.filter(df['acId'] == 'NKS1561').show(2000)  # randomly pick one callsign from cs_arr

In [None]:
len(cs_arr)

In [None]:
len(cs_dep)

In [None]:
len(cs_unknown)

### Plot multiple arrival flights in one plot

In [None]:
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

In [None]:
n = 100 # number of flight to plot

i = 0
plt.figure(figsize=(20, 15))

for cs in cs_arr:
    df_arr = df.filter(df['acId'] == cs)
    yy = np.array(df_arr.select("lat").collect()).reshape(-1)
    xx = np.array(df_arr.select("lon").collect()).reshape(-1)
    
    plt.plot(xx, yy)
    plt.xlabel('Longitude/Degrees')
    plt.ylabel('Latitude/Degrees')
    
    i = i + 1
    if i == n:
        break

plt.plot(IAF_9L[1], IAF_9L[0], label='IAF_9L', marker='*')     
plt.plot(IAF_9R[1], IAF_9R[0], label='IAF_9R', marker='^')     
plt.plot(IAF_27R[1], IAF_27R[0], label='IAF_27R', marker='D')     
plt.plot(FAF_9L[1], FAF_9L[0], label='FAF_9L', marker='o')     
plt.plot(IF_27R[1], IF_27R[0], label='IF_27R', marker='v')  
plt.plot(FAF_9R[1], FAF_9R[0], label='FAF_9R', marker='X')     
plt.plot(IF_9R[1], IF_9R[0], label='IF_9R', marker='P')     
plt.legend()

plt.savefig('arrival_{}.png'.format(n), dpi=500)

### Plot multiple departure flights in one plot

In [None]:
n = 100 # number of flight to plot

i = 0
plt.figure(figsize=(20, 15))

for cs in cs_dep:
    df_arr = df.filter(df['acId'] == cs)
    yy = np.array(df_arr.select("lat").collect()).reshape(-1)
    xx = np.array(df_arr.select("lon").collect()).reshape(-1)
    
    plt.plot(xx, yy)
    plt.xlabel('Longitude/Degrees')
    plt.ylabel('Latitude/Degrees')
    
    i = i + 1
    if i == n:
        break

plt.plot(IAF_9L[1], IAF_9L[0], label='IAF_9L', marker='*')     
plt.plot(IAF_9R[1], IAF_9R[0], label='IAF_9R', marker='^')     
plt.plot(IAF_27R[1], IAF_27R[0], label='IAF_27R', marker='D')     
plt.plot(FAF_9L[1], FAF_9L[0], label='FAF_9L', marker='o')     
plt.plot(IF_27R[1], IF_27R[0], label='IF_27R', marker='v')  
plt.plot(FAF_9R[1], FAF_9R[0], label='FAF_9R', marker='X')     
plt.legend()

plt.savefig('departure_{}.png'.format(n), dpi=500)

### Plot multiple unknown flights in one plot

In [None]:
n = 100 # number of flight to plot

i = 0
plt.figure(figsize=(20, 15))

for cs in cs_unknown:
    df_arr = df.filter(df['acId'] == cs)
    yy = np.array(df_arr.select("lat").collect()).reshape(-1)
    xx = np.array(df_arr.select("lon").collect()).reshape(-1)
    
    plt.plot(xx, yy)
    plt.xlabel('Longitude/Degrees')
    plt.ylabel('Latitude/Degrees')
    
    i = i + 1
    if i == n:
        break

plt.plot(IAF_9L[1], IAF_9L[0], label='IAF_9L', marker='*')     
plt.plot(IAF_9R[1], IAF_9R[0], label='IAF_9R', marker='^')     
plt.plot(IAF_27R[1], IAF_27R[0], label='IAF_27R', marker='D')     
plt.plot(FAF_9L[1], FAF_9L[0], label='FAF_9L', marker='o')     
plt.plot(IF_27R[1], IF_27R[0], label='IF_27R', marker='v')
plt.plot(FAF_9R[1], FAF_9R[0], label='FAF_9R', marker='X')     
plt.legend()

plt.savefig('unknown_{}.png'.format(n), dpi=500)

### Return callsigns within a radius in km 
* Euclidean distance slow the calculation
* GeoSpark can be used here to speed up the searching with buildin tree structures

In [None]:
pandas_df = df.select("*").toPandas()

In [None]:
# distance function between two lat/lon

from math import sin, cos, sqrt, atan2, radians
def getDist(lat1, lon1, lat2, lon2):
    
  R = 6373.0
  lat1, lon1, lat2, lon2 = radians(lat1), radians(lon1), radians(lat2), radians(lon2)
  dlon, dlat = lon2 - lon1, lat2 - lat1
  a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
  c = 2 * atan2(sqrt(a), sqrt(1 - a))

  return R * c

In [None]:
# apply distance function to dataframe
pandas_df['dist']=list(map(lambda k: getDist(pandas_df.loc[k]['lat'], pandas_df.loc[k]['lon'], IAF_9R[0], IAF_9R[1]), pandas_df.index))

In [None]:
from math import sqrt

In [None]:
def getL2(lat1, lon1, lat2, lon2):
    return sqrt((lat1 - lat2)**2 + (lon1 - lon2)**2)

In [None]:
import time

In [None]:
t0 = time.time()
pandas_df['dist']=list(map(lambda k: getL2(pandas_df.loc[k]['lat'], pandas_df.loc[k]['lon'], FAF_9R[0], FAF_9R[1]), pandas_df.index))
t1 = time.time()
print(t1 - t0)

In [None]:
pandas_df[pandas_df['dist']<0.1]['acId'].nunique()

In [None]:
pandas_df[pandas_df['dist']<0.5]['acId'].nunique()

# Configure GeoSpark

In [None]:
# !pip install geospark

### Compile the packages locally
<ul>
<li>The source code for SNAPSHOT version is here: https://github.com/apache/incubator-sedona/releases</li>
<li>Download or clone the source code, in the root folder, run: “mvn clean install -DskipTests"</li>
<li>Then copy the compiled jars in core/target and sql/target to SPARK_HOME/jars </li>
</ul>

check .travis.xml in the root directory of the source code for additional informations


In [16]:
from geospark.register import upload_jars
from geospark.register import GeoSparkRegistrator
upload_jars()
GeoSparkRegistrator.registerAll(spark)
from pyspark import SparkConf
from geospark.utils import GeoSparkKryoRegistrator, KryoSerializer
SparkConf().set("spark.serializer", KryoSerializer.getName)
SparkConf().set("spark.kryo.registrator", GeoSparkKryoRegistrator.getName)
from geospark.utils.adapter import Adapter

In [17]:
# load 3rd party jars
spark.sparkContext.addPyFile("/home/ypang6/anaconda3/lib/python3.7/site-packages/pyspark/jars/geospark-sql_3.0-1.3.2-SNAPSHOT.jar")
spark.sparkContext.addPyFile("/home/ypang6/anaconda3/lib/python3.7/site-packages/pyspark/jars/geospark-1.3.2-SNAPSHOT.jar")

In [None]:
# combine columns
# from pyspark.sql import functions as f
# df = df.withColumn('point', f.concat(f.col('lat'), f.lit(','), f.col('lon')))
# df.show()

## Build SpatialDF

In [18]:
# register pyspark df in SQL
df.registerTempTable("pointtable")

# create shape column in geospark
spatialdf = spark.sql(
  """
  SELECT ST_Point(CAST(lat AS Decimal(24, 20)), CAST(lon AS Decimal(24, 20))) AS geom, recTime, acId, alt
  FROM pointtable
  """)

spatialdf.createOrReplaceTempView("spatialdf")

In [19]:
spatialdf.printSchema()

root
 |-- geom: geometry (nullable = false)
 |-- recTime: integer (nullable = true)
 |-- acId: string (nullable = true)
 |-- alt: double (nullable = true)



In [20]:
spatialdf.show(5)

+--------------------+----------+----+-----+
|                geom|   recTime|acId|  alt|
+--------------------+----------+----+-----+
|POINT (33.62935 -...|1564632506|OPS7|10.06|
|POINT (33.62948 -...|1564632507|OPS7|10.06|
|POINT (33.62951 -...|1564632508|OPS7|10.06|
|POINT (33.62955 -...|1564632509|OPS7|10.06|
|POINT (33.62955 -...|1564632510|OPS7|10.06|
+--------------------+----------+----+-----+
only showing top 5 rows



## Convert SpatialDF to SpatialRDD

In [21]:
spatial_rdd = Adapter.toSpatialRdd(spatialdf, "geom")
spatial_rdd.analyze()

True

# Spatial KNN Query with SQL APIs

In [30]:
# register pyspark df in SQL
spatialdf.registerTempTable("spatialdf")

SQL_spatial_knn_query_result = spark.sql(
  """
  SELECT ST_Distance(ST_Point(33.631727, -84.549405), geom) AS Dist, recTime, acId, alt
  FROM spatialdf
  ORDER BY Dist ASC
  LIMIT 100
  """)

In [31]:
SQL_spatial_knn_query_result.createOrReplaceTempView("query1")

In [32]:
SQL_spatial_knn_query_result.show()

+--------------------+----------+-------+-----+
|                Dist|   recTime|   acId|  alt|
+--------------------+----------+-------+-----+
|6.777905281830643E-5|1564708076|DAL2105|27.13|
|9.441398201858383E-5|1564706454|DAL2035|26.69|
|1.087841900174624...|1564707735|DAL1402|25.81|
|1.134636505735247...|1564721025|AAL2043|26.88|
|1.194738465072900...|1564708957|RPA4525|26.69|
|1.303610371190070...|1564708352|DAL1680|26.56|
|1.468128059700728...|1564709152|SWA1396|26.56|
|1.522957648883272...|1564707029|DAL2826| 25.5|
|1.524270317115531...|1564707240|DAL1705|26.75|
|1.641767340291201...|1564707834| DAL362|26.63|
|1.820823989245820...|1564714187|NKS1821|26.75|
|1.968603565886242...|1564707503| NKS221|26.25|
|2.061892334777792...|1564706630|DAL1778| 26.5|
|2.107937380432419...|1564713509|EDV5418|26.75|
|2.342520010690241...|1564712954|DAL2495|26.88|
|2.353592997986974...|1564713631|SWA1677|26.19|
|2.503477581179296E-4|1564707962|EDV3323|25.75|
|2.592951985760132...|1564709674|DAL2875

# Spatial KNN Query with Python APIs

In [22]:
# set extra class path
SPARK_GeoSpark_SQL = "/home/ypang6/anaconda3/lib/python3.7/site-packages/pyspark/jars/geospark-sql_3.0-1.3.2-SNAPSHOT.jar"
SPARK_GeoSpark = '/home/ypang6/anaconda3/lib/python3.7/site-packages/pyspark/jars/geospark-1.3.2-SNAPSHOT.jar'
SparkConf().set("spark.driver.extraClassPath", SPARK_GeoSpark)
SparkConf().set("spark.executor.extraClassPath", SPARK_GeoSpark)
SparkConf().set("spark.driver.extraClassPath", SPARK_GeoSpark_SQL)
SparkConf().set("spark.executor.extraClassPath", SPARK_GeoSpark_SQL)

<pyspark.conf.SparkConf at 0x7f732342fc50>

In [23]:
from geospark.core.spatialOperator import KNNQuery
from geospark.core.enums import IndexType
from shapely.geometry import Point

loc = (33.631727, -84.549405)

point = Point(loc[0], loc[1])

k = 5 ## K Nearest Neighbors

build_on_spatial_partitioned_rdd = False ## Set to TRUE only if run join query
spatial_rdd.buildIndex(IndexType.RTREE, build_on_spatial_partitioned_rdd)

using_index = True
result = KNNQuery.SpatialKnnQuery(spatial_rdd, point, k, using_index)

TypeError: 'JavaPackage' object is not callable

In [None]:
??KNNQuery.SpatialKnnQuery

### Spatial Range Query

In [None]:
from geospark.core.geom.envelope import Envelope
from geospark.core.enums import IndexType
from geospark.core.spatialOperator import RangeQuery

range_query_window = Envelope(-90.01, -80.01, 30.01, 40.01)
consider_boundary_intersection = False ## Only return gemeotries fully covered by the window

build_on_spatial_partitioned_rdd = False ## Set to TRUE only if run join query
spatial_rdd.buildIndex(IndexType.QUADTREE, build_on_spatial_partitioned_rdd)

using_index = True

query_result = RangeQuery.SpatialRangeQuery(
    spatial_rdd,
    range_query_window,
    consider_boundary_intersection,
    using_index
)

## Spatial Radius Query

In [None]:
from geospark.core.SpatialRDD import CircleRDD
from geospark.core.enums import GridType
from geospark.core.spatialOperator import JoinQuery

In [None]:
from shapely.geometry import Point
from pyspark.sql.types import IntegerType, StructField, StructType
from geospark.sql.types import GeometryType

schema = StructType(
    [
        StructField("recTime", IntegerType(), False),
        StructField("geom", GeometryType(), False)
    ]
)


timestamp = 1567346400
loc = (33.631727, -84.549405)

point = Point(loc[0], loc[1])

data = [
    [timestamp, point]
]


object_df = spark.createDataFrame(data, schema)



In [None]:
object_df.printSchema()

In [None]:
object_rdd = Adapter.toSpatialRdd(object_df, "geom")
object_rdd.analyze()

In [None]:
circle_rdd = CircleRDD(object_rdd, 0.1) ## Create a CircleRDD using the given distance
circle_rdd.analyze()

circle_rdd.spatialPartitioning(GridType.KDBTREE)
spatial_rdd.spatialPartitioning(circle_rdd.getPartitioner())

consider_boundary_intersection = False ## Only return gemeotries fully covered by each query window in queryWindowRDD
using_index = False

result = JoinQuery.DistanceJoinQueryFlat(spatial_rdd, circle_rdd, using_index, consider_boundary_intersection)

### create shape column in geopandas

In [None]:
# import geopandas
# gdf = geopandas.GeoDataFrame(
#     df, geometry=geopandas.points_from_xy(df.Longitude, df.Latitude))