In [17]:
import os
from geopandas import GeoDataFrame

from pyspark import SparkConf, SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rank
from pyspark.sql.window import Window
from pyspark.sql.types import StringType
from pyspark.storagelevel import StorageLevel

from sedona.utils import KryoSerializer, SedonaKryoRegistrator
from sedona.utils.adapter import Adapter
from sedona.register import SedonaRegistrator

### Create SparkSession

In [18]:
conf = SparkConf()

In [19]:
#Create Spark configuration to run Spark jobs with Sedona capabilities
conf.set("spark.sql.autoBroadcastJoinThreshold",-1)
conf.set("spark.serializer", KryoSerializer.getName)
conf.set("spark.kryo.registrator", SedonaKryoRegistrator.getName)
conf.set('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.0-incubating,'
           'org.datasyslab:geotools-wrapper:geotools-24.0')

<pyspark.conf.SparkConf at 0x7f8d7bd6b978>

In [3]:
#Additional configuration to have access to S3 bucket
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.4,com.amazonaws:aws-java-sdk-bundle:1.12.264')
conf.set("spark.hadoop.fs.s3a.access.key", os.getenv('AWS_S3_KEY'))
conf.set("spark.hadoop.fs.s3a.secret.key", os.getenv('AWS_S3_SECRET'))

<pyspark.conf.SparkConf at 0x7f042d54b3c8>

In [20]:
os.environ['PYSPARK_DRIVER_PYTHON'] = "/home/venv36/bin/python"
spark = SparkSession.builder.master("spark://spark:7077"). \
    config(conf=conf). \
    appName('OpenSky_app'). \
    getOrCreate()

In [21]:
#Add spatial functionality to the SparkSession
SedonaRegistrator.registerAll(spark)

True

### Load states boundaries dataset

In [22]:
#Load data from local file to geoDataFrame
geo_admin_url = 'admin1-us.geojson'
gdf_states = GeoDataFrame.from_file(geo_admin_url)

#Create pySpark dataframe and view
spark_states_df = spark.createDataFrame(gdf_states)
spark_states_df.createOrReplaceTempView('states')

In [23]:
spark_states_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- ISO3166-1-Alpha-3: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- id: string (nullable = true)
 |-- geometry: geometry (nullable = true)



In [24]:
#Inspect view
spark.sql(
    """
    SELECT * 
    FROM states 
    ORDER BY name 
    """
).show(5)

+----------+--------------------+-----------------+----------+--------+--------------------+
|      name|             country|ISO3166-1-Alpha-3|state_code|      id|            geometry|
+----------+--------------------+-----------------+----------+--------+--------------------+
|   Alabama|United States of ...|              USA|        AL|USA-3541|POLYGON ((-85.054...|
|    Alaska|United States of ...|              USA|        AK|USA-3563|MULTIPOLYGON (((-...|
|   Arizona|United States of ...|              USA|        AZ|USA-3520|POLYGON ((-109.04...|
|  Arkansas|United States of ...|              USA|        AR|USA-3528|POLYGON ((-89.662...|
|California|United States of ...|              USA|        CA|USA-3521|POLYGON ((-114.35...|
+----------+--------------------+-----------------+----------+--------+--------------------+
only showing top 5 rows



### Create dataframe from local data

In [25]:
df=spark.read.csv('/opt/bitnami/spark/temp/states_2022-01-03-00.csv',inferSchema =True, header = True)
# df=spark.read.csv('/opt/bitnami/spark/temp/',inferSchema =True, header = True)

### Create dataframe from S3 located data

In [None]:
df=spark.read.csv('s3a://avia-data-2022-03-01/data/states_2022-01-03-00.csv',inferSchema =True, header = True)

### Discover data schema

In [26]:
df.printSchema()

root
 |-- time: integer (nullable = true)
 |-- icao24: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- velocity: double (nullable = true)
 |-- heading: double (nullable = true)
 |-- vertrate: double (nullable = true)
 |-- callsign: string (nullable = true)
 |-- onground: boolean (nullable = true)
 |-- alert: boolean (nullable = true)
 |-- spi: boolean (nullable = true)
 |-- squawk: integer (nullable = true)
 |-- baroaltitude: double (nullable = true)
 |-- geoaltitude: double (nullable = true)
 |-- lastposupdate: double (nullable = true)
 |-- lastcontact: double (nullable = true)



# A total number of flying vehicles in particular dataset

In [27]:
unicue_vehicles = df.select('icao24').distinct().count()
print(f'Number of unique vehicles is {unicue_vehicles}')

Number of unique vehicles is 10508


### Modify DataFrame and add a column with WKT coordinates, shapely geometry and direction

#### WITHOUT UDFs

In [28]:
#Filterout bad data and vegicles on the ground
df.dropna(subset=("icao24", "lat", "lon")). \
    filter(col('onground') == False). \
    createOrReplaceTempView('points')

In [13]:
#Enrich data with named directions and states based on coordinates
merged_df = spark.sql(
    """
    SELECT 
      time, 
      icao24, 
      velocity, 
      vertrate, 
      geoaltitude,
      s.name,
      CASE 
          WHEN heading > 337.5 OR heading <= 22.5 THEN 'North' 
          WHEN 67.5 >= heading AND heading > 22.5 THEN 'Northeast' 
          WHEN 112.5 >= heading AND heading > 67.5 THEN 'East' 
          WHEN 157.5 >= heading AND heading > 112.5 THEN 'Southeast' 
          WHEN 202.5 >= heading AND heading > 157.5 THEN 'South' 
          WHEN 247.5 >= heading AND heading > 202.5 THEN 'Southwest' 
          WHEN 292.5 >= heading AND heading > 247.5 THEN 'West' 
          WHEN 337.5 >= heading AND heading > 292.5 THEN 'Northwest' 
          ELSE NULL 
      END AS heading_str 
    FROM 
      (
        SELECT 
          *, 
          ST_GeomFromWKT(
            CONCAT('POINT(', lon, ' ', lat, ')')
          ) as geometry 
        FROM 
          points
      ) AS p, 
      states as s 
    WHERE 
      ST_Intersects(p.geometry, s.geometry)
    """
)
merged_df.drop(col('geometry')).persist(StorageLevel.MEMORY_ONLY).createOrReplaceTempView('merged_view')

In [14]:
merged_df.printSchema()

root
 |-- time: integer (nullable = true)
 |-- icao24: string (nullable = true)
 |-- velocity: double (nullable = true)
 |-- vertrate: double (nullable = true)
 |-- geoaltitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- heading_str: string (nullable = true)



### Find top values of vertical speed, speed of ascending and descending, and the highest flight in the all US teritory

In [33]:
TOP_values_df = spark.sql(
    """
    SELECT DISTINCT "fastest vertival" as nomination, icao24, name as state, mv.velocity as value
    FROM merged_view as mv
    INNER JOIN    
        (SELECT max(mv) as velocity
        FROM (SELECT icao24, max(velocity) as mv
                FROM merged_view
                GROUP BY icao24) as mv) as mx
                ON mv.velocity = mx.velocity
    UNION
    SELECT DISTINCT "fastest ascending" as nomination, icao24, name as state, mv.vertrate as value
    FROM merged_view as mv
    INNER JOIN    
        (SELECT max(mv) as vertrate
        FROM (SELECT icao24, max(vertrate) as mv
                FROM merged_view
                GROUP BY icao24) as mv) as mx
                ON mv.vertrate = mx.vertrate
    UNION
    SELECT DISTINCT "fastest descending" as nomination, icao24, name as state, mv.vertrate as value
    FROM merged_view as mv
    INNER JOIN    
        (SELECT min(mv) as vertrate
        FROM (SELECT icao24, min(vertrate) as mv
                FROM merged_view
                GROUP BY icao24) as mv) as mx
                ON mv.vertrate = mx.vertrate
    UNION
    SELECT DISTINCT "highest flight" as nomination, icao24, name as state, mv.geoaltitude as value
    FROM merged_view as mv
    INNER JOIN    
        (SELECT max(geoalt) as geoalt
        FROM (SELECT icao24, max(geoaltitude) as geoalt
                FROM merged_view
                GROUP BY icao24) as mv) as mx
                ON mv.geoaltitude = mx.geoalt
    """
)

In [34]:
pd_TOP_values_df = TOP_values_df.toPandas()

In [31]:
pd_TOP_values_df

Unnamed: 0,nomination,icao24,state,value
0,fastest descending,a92a7c,California,-95.9104
1,fastest ascending,a1e9a0,Indiana,165.8112
2,fastest vertival,a43ca8,Alabama,554.63386
3,fastest ascending,a585e4,Louisiana,165.8112
4,highest flight,a8793d,South Carolina,38221.92


### Find the most common side of the world as direction of flight

In [19]:
#The most common direction
spark.sql(
    """
    SELECT heading_str,
           Count (*) AS quantity
    FROM   merged_view
    WHERE  heading_str IS NOT NULL
    GROUP  BY heading_str
    ORDER  BY quantity DESC  
    """
).show()

+-----------+--------+
|heading_str|quantity|
+-----------+--------+
|       West| 1103720|
|       East|  857165|
|  Northwest|  621351|
|  Northeast|  532352|
|  Southwest|  520046|
|      North|  514020|
|      South|  471141|
|  Southeast|  430334|
+-----------+--------+



### Find top values of vertical speed, speed of ascending and descending, and the highest flight in each state
define views for every nomination

In [20]:
#Fastest vehicle
spark.sql(
    """
    SELECT DISTINCT "fastest vertival" as nomination, name as state, icao24,  mv.velocity as value
    FROM merged_view as mv
    INNER JOIN    
        (SELECT state, max(mv) as velocity
        FROM (SELECT name as state, icao24, max(velocity) as mv
                FROM merged_view
                GROUP BY name, icao24) as mv
        GROUP BY state) as mx
    ON mv.velocity = mx.velocity and mv.name = mx.state
    """
).createOrReplaceTempView('fastest_State')

In [21]:
#Fastest ascending
spark.sql(
    """
    SELECT DISTINCT "fastest ascending" as nomination, name as state, icao24,  mv.vertrate as value
        FROM merged_view as mv
        INNER JOIN    
            (SELECT state, max(mv) as vertrate
            FROM (SELECT name as state, icao24, max(vertrate) as mv
                    FROM merged_view
                    GROUP BY name, icao24) as mv
            GROUP BY state) as mx
        ON mv.vertrate = mx.vertrate and mv.name = mx.state
    """
).createOrReplaceTempView('fastest_ASC')

In [22]:
#Fastest descending
spark.sql(
    """
    SELECT DISTINCT "fastest ascending" as nomination, name as state, icao24,  mv.vertrate as value
        FROM merged_view as mv
        INNER JOIN    
            (SELECT state, min(mv) as vertrate
            FROM (SELECT name as state, icao24, min(vertrate) as mv
                    FROM merged_view
                    GROUP BY name, icao24) as mv
            GROUP BY state) as mx
        ON mv.vertrate = mx.vertrate and mv.name = mx.state
    """
).createOrReplaceTempView('fastest_DESC')

In [23]:
#Highest flight
spark.sql(
    """    
    SELECT DISTINCT "highest flight" as nomination, name as state, icao24, mv.geoaltitude as value
    FROM merged_view as mv
    INNER JOIN    
        (SELECT state, max(geoalt) as geoalt
        FROM (SELECT name as state, icao24, max(geoaltitude) as geoalt
                FROM merged_view
                GROUP BY name, icao24) as mv
        GROUP BY state) as mx
                ON mv.geoaltitude = mx.geoalt and mv.name = mx.state
    """
).createOrReplaceTempView('highest')

In [24]:
#The most common direction
windowSpec = Window.partitionBy("state").orderBy("quantity")
df_heading = spark.sql(
    """
        SELECT name as state, heading_str, count (*) as quantity
            FROM merged_view
            WHERE heading_str IS NOT NULL
            GROUP BY heading_str, name
            ORDER BY quantity DESC
    """
).withColumn("rank",rank().over(windowSpec)). \
    filter(col('rank') == 1). \
    sort(col('state')). \
    select('state', 'heading_Str')
df_heading.createOrReplaceTempView('heading_state')

In [25]:
state_result_df = spark.sql(
    """
 SELECT fst.state,
       fst.highest_altitude,
       fst.highest_plane,
       fst.fastest_desc,
       fst.fastest_dc_plain,
       fst.fastest_asc,
       fst.fastest_asc_pain,
       fst.highest_velocity,
       fst.fastest_plain,
       hs.heading_str AS direction
FROM   (SELECT fa.state,
               fa.highest_altitude,
               fa.highest_plane,
               fa.fastest_desc,
               fa.fastest_dc_plain,
               fa.fastest_asc,
               fa.fastest_asc_pain,
               fs.value  AS highest_velocity,
               fs.icao24 AS fastest_plain
        FROM   (SELECT fd.state,
                       fd.highest_altitude,
                       fd.highest_plane,
                       fd.fastest_desc,
                       fd.fastest_dc_plain,
                       fa.value  AS fastest_asc,
                       fa.icao24 AS fastest_asc_pain
                FROM   (SELECT h.state   AS state,
                               h.highest_altitude,
                               h.highest_plane,
                               fd.value  AS fastest_desc,
                               fd.icao24 AS fastest_dc_plain
                        FROM   (SELECT s.NAME   AS state,
                                       h.value  AS highest_altitude,
                                       h.icao24 AS highest_plane
                                FROM   states AS s
                                       LEFT JOIN highest AS h
                                              ON h.state = s.NAME) AS h
                               LEFT JOIN fastest_desc AS fd
                                      ON h.state = fd.state) AS fd
                       LEFT JOIN fastest_asc AS fa
                              ON fd.state = fa.state) AS fa
               LEFT JOIN fastest_state AS fs
                      ON fa.state = fs.state) AS fst
       LEFT JOIN heading_state AS hs
              ON fst.state = hs.state
       ORDER BY fst.state
    """
)

In [26]:
#Store results as csv
pd_state_result_df = state_result_df.toPandas()

In [27]:
pd_state_result_df

Unnamed: 0,state,highest_altitude,highest_plane,fastest_desc,fastest_dc_plain,fastest_asc,fastest_asc_pain,highest_velocity,fastest_plain,direction
0,Alabama,26365.2,a6c88a,-31.21152,a11d3c,42.91584,ab4010,554.63386,a43ca8,North
1,Alaska,12169.14,a7f63d,-27.96032,a8103e,29.58592,a71a72,269.353564,7811a3,Southwest
2,Arizona,14599.92,a6ff7e,-40.31488,acce1a,34.46272,a7771b,309.499532,a9961b,North
3,Arkansas,35852.1,a384f2,-27.96032,a243d2,32.512,a2c0ff,291.595373,aa5631,South
4,California,36423.6,ad32f0,-95.9104,a92a7c,165.8112,a1a278,302.578801,a9961b,Northeast
5,Colorado,38061.9,a03ce6,-28.93568,a8f607,39.66464,a63836,297.735548,a88d20,North
6,Connecticut,12527.28,abfa24,-23.73376,ac838f,38.36416,a2badf,346.995052,a05646,South
7,Delaware,14516.1,a9f221,-26.33472,a0acbb,30.8864,a5232b,331.497191,abf66c,Northwest
8,District of Columbia,13235.94,a4d236,-14.30528,a1f1de,25.68448,a80689,312.880087,4d20f9,West
9,District of Columbia,13235.94,a4d236,-14.30528,a1f1de,25.68448,a6451a,312.880087,4d20f9,West
