<img src=https://d1.awsstatic.com/logos/aws-logo-lockups/poweredbyaws/PB_AWS_logo_RGB_REV_SQ.8c88ac215fe4e441dc42865dd6962ed4f444a90d.png width="350">

# <center> Data Engineering Lab 1 </center>
<center><b> Data Architect: </b> Carlos Contreras </center>

Dataset and metadata details; i.e. column definitions, at the [USA Bureau of Transport Statistics URL](https://www.transtats.bts.gov/DL_SelectFields.asp?gnoyr_VQ=FGK&QO_fu146_anzr=b0-gvzr)

In [1]:
import sys
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Other Libs for the ETL
from datetime import datetime, timedelta
from pyspark.sql.functions import lit, to_date, max as _max, date_sub, col

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1662381385407_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Spark settings
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Parameters below, to be set as Job input parameters (not hard-coded)

In [4]:
# Set the BUCKET name
BUCKET_NAME = 'acd-bayarea-2022-graphdb-dev'
# Source data path. 
s3_file_full_path = f's3://{BUCKET_NAME}/flight-performance-raw-csv-sample/'

# Job params
input_last_days_to_filter = 7
s3_vertices_location = f's3://{BUCKET_NAME}/neptune-flight-stats-graph-data/vertices/'
s3_edges_location = f's3://{BUCKET_NAME}/neptune-flight-stats-graph-data/edges/'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# DF
df_flight_stats_raw = glueContext.create_dynamic_frame.from_options(
    's3',
    {'paths': [s3_file_full_path]},
    'csv',
    {'withHeader': True}).toDF()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Temporary print dataset / dataFrame Schema
df_flight_stats_raw.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- YEAR: string (nullable = true)
 |-- QUARTER: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY_OF_MONTH: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- FL_DATE: string (nullable = true)
 |-- MKT_UNIQUE_CARRIER: string (nullable = true)
 |-- BRANDED_CODE_SHARE: string (nullable = true)
 |-- MKT_CARRIER_AIRLINE_ID: string (nullable = true)
 |-- MKT_CARRIER: string (nullable = true)
 |-- MKT_CARRIER_FL_NUM: string (nullable = true)
 |-- SCH_OP_UNIQUE_CARRIER: string (nullable = true)
 |-- SCH_OP_CARRIER_AIRLINE_ID: string (nullable = true)
 |-- SCH_OP_CARRIER: string (nullable = true)
 |-- SCH_OP_CARRIER_FL_NUM: string (nullable = true)
 |-- OP_UNIQUE_CARRIER: string (nullable = true)
 |-- OP_CARRIER_AIRLINE_ID: string (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- TAIL_NUM: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: string (nullable = true)
 |-- ORIGIN_AIRPORT_ID: string (nullable = true)
 |-- ORIGIN_AIRPORT

**Performance note**: To improve the execution plan, avoid PRINTS (e.g. SHOW()) like the one below, 
                    on real Spark Jobs (i.e. not like in Jupyter). 

In [7]:
# Temporary print dataset sample
df_flight_stats_raw.select('FL_DATE','ORIGIN_AIRPORT_ID','ORIGIN_CITY_NAME','DEST_AIRPORT_ID','DEST_CITY_NAME').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------------+----------------+---------------+--------------+
|   FL_DATE|ORIGIN_AIRPORT_ID|ORIGIN_CITY_NAME|DEST_AIRPORT_ID|DEST_CITY_NAME|
+----------+-----------------+----------------+---------------+--------------+
|2018-01-23|            10146|      Albany, GA|          10397|   Atlanta, GA|
|2018-01-24|            10146|      Albany, GA|          10397|   Atlanta, GA|
|2018-01-25|            10146|      Albany, GA|          10397|   Atlanta, GA|
|2018-01-26|            10146|      Albany, GA|          10397|   Atlanta, GA|
|2018-01-27|            10146|      Albany, GA|          10397|   Atlanta, GA|
|2018-01-28|            10146|      Albany, GA|          10397|   Atlanta, GA|
|2018-01-29|            10146|      Albany, GA|          10397|   Atlanta, GA|
|2018-01-30|            10146|      Albany, GA|          10397|   Atlanta, GA|
|2018-01-31|            10146|      Albany, GA|          10397|   Atlanta, GA|
|2018-01-03|            10397|     Atlanta, GA|     

### Get max Flight date from S3 partition, to limit the last N days

<br>

- This might not be necessary, since you might ingest the entire incremental new dataset

<br>

In [8]:
# FL_DATE cast data type
df_flight_stats = df_flight_stats_raw.withColumn('FL_DATE',to_date(df_flight_stats_raw.FL_DATE, 'yyyy-MM-dd'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# Get max date
flight_date_max = df_flight_stats.select(_max('FL_DATE')).first()[0]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Days to store in graph
last_days_to_filter = timedelta(input_last_days_to_filter)
flight_date_max_minus_n_days = flight_date_max-last_days_to_filter

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Collect last 7 days
df_flight_stats = df_flight_stats.where(col('FL_DATE') >= lit(str(flight_date_max_minus_n_days)))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---

## Creating Edges File, for a Time-Series graph

Pseudo-columns for Edges file:

- ORIGIN_AIRPORT_ID to be converted as ~from
- DEST_AIRPORT_ID to be converted as ~to

Label columns: 

- String **flight_connection** to be added as ~label

Time-series columns: 

- DISTANCE 
- ORIGIN_CITY_NAME
- DEST_CITY_NAME
- MKT_CARRIER_FL_NUM
- MKT_UNIQUE_CARRIER
- FL_DATE
- DEP_DELAY
- DEP_DEL15
- ARR_DELAY
- ARR_DEL15
- CANCELLED
- ACTUAL_ELAPSED_TIME
- WEATHER_DELAY
- SECURITY_DELAY

In [12]:
# Collect columns we require.
df_edges_file = df_flight_stats.select('ORIGIN_AIRPORT_ID','ORIGIN','DEST_AIRPORT_ID','DEST','ORIGIN_CITY_NAME',
                                       'DEST_CITY_NAME',
                                       'MKT_UNIQUE_CARRIER','MKT_CARRIER_FL_NUM','DISTANCE','FL_DATE',
                                       'DEP_DELAY','DEP_DEL15','ARR_DELAY','ARR_DEL15','CANCELLED',
                                       'ACTUAL_ELAPSED_TIME','WEATHER_DELAY','SECURITY_DELAY')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# Adding new column with label "route", defined as Integer
df_edges_file = df_edges_file.withColumn('~label', lit('flight_connection'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# Rename columns, to make it Neptune-Gremlin-compatible and more human-readable
df_edges_file = df_edges_file.withColumnRenamed('ORIGIN_AIRPORT_ID','~from')\
                        .withColumnRenamed('DEST_AIRPORT_ID','~to')\
                        .withColumnRenamed('ORIGIN','ORIGIN_CODE:string')\
                        .withColumnRenamed('DEST','DESTINATION_CODE:string')\
                        .withColumnRenamed('ORIGIN_CITY_NAME','ORIGIN_CITY_NAME:string')\
                        .withColumnRenamed('DEST_CITY_NAME','DESTINATION_CITY_NAME:string')\
                        .withColumnRenamed('MKT_UNIQUE_CARRIER','CARRIER_NAME:int')\
                        .withColumnRenamed('MKT_CARRIER_FL_NUM','CARRIER_FLIGHT_NUM:int')\
                        .withColumnRenamed('DISTANCE','DISTANCE:int')\
                        .withColumnRenamed('FL_DATE','FLIGHT_DATE:date')\
                        .withColumnRenamed('DEP_DELAY','DEPARTURE_DELAY:int')\
                        .withColumnRenamed('DEP_DEL15','DEPARTURE_DELAY_15MIN:int')\
                        .withColumnRenamed('ARR_DELAY','ARRIVAL_DELAY:int')\
                        .withColumnRenamed('ARR_DEL15','ARRIVAL_DELAY_15MIN:int')\
                        .withColumnRenamed('CANCELLED','CANCELLED:int')\
                        .withColumnRenamed('ACTUAL_ELAPSED_TIME','ACTUAL_ELAPSED_TIME:int')\
                        .withColumnRenamed('WEATHER_DELAY','WEATHER_DELAY:int')\
                        .withColumnRenamed('SECURITY_DELAY','SECURITY_DELAY:int')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# Print sample
df_edges_file.select('~from','~to','~label','FLIGHT_DATE:date').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----+-----------------+----------------+
|~from|  ~to|           ~label|FLIGHT_DATE:date|
+-----+-----+-----------------+----------------+
|10146|10397|flight_connection|      2018-01-24|
|10146|10397|flight_connection|      2018-01-25|
|10146|10397|flight_connection|      2018-01-26|
|10146|10397|flight_connection|      2018-01-27|
|10146|10397|flight_connection|      2018-01-28|
|10146|10397|flight_connection|      2018-01-29|
|10146|10397|flight_connection|      2018-01-30|
|10146|10397|flight_connection|      2018-01-31|
|10397|10146|flight_connection|      2018-01-24|
|10397|10146|flight_connection|      2018-01-25|
|10397|10146|flight_connection|      2018-01-26|
|10397|10146|flight_connection|      2018-01-27|
|10397|10146|flight_connection|      2018-01-28|
|10397|10146|flight_connection|      2018-01-29|
|10397|10146|flight_connection|      2018-01-30|
|10397|10146|flight_connection|      2018-01-31|
|10397|11612|flight_connection|      2018-01-24|
|10397|11612|flight_

#### Writing data to Amazon S3

In [16]:
# Writing data to Amazon S3
df_edges_file = df_edges_file.repartition(1)
df_edges_file.write.mode("append").format("csv").option("header", "true").save(s3_edges_location)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---

## Creating Vertices File

In [17]:
# Collect columns we require.
df_vertices_file = df_flight_stats.\
                          select(col('ORIGIN_AIRPORT_ID').alias('~id'),
                                 col('ORIGIN').alias('ORIGIN_CODE:string'),
                                 col('ORIGIN_CITY_NAME').alias('CITY_NAME:string'))\
                   .union(df_flight_stats.\
                          select(col('DEST_AIRPORT_ID').alias('~id'),
                                 col('DEST').alias('DESTINATION_CODE:string'),
                                 col('DEST_CITY_NAME').alias('CITY_NAME:string')))\
                   .distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# Adding new column with label "route", defined as Integer
df_vertices_file = df_vertices_file.withColumn('~label', lit('airport'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
df_vertices_file.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------------------+--------------------+-------+
|  ~id|ORIGIN_CODE:string|    CITY_NAME:string| ~label|
+-----+------------------+--------------------+-------+
|10785|               BTV|      Burlington, VT|airport|
|11612|               EVV|      Evansville, IN|airport|
|10792|               BUF|         Buffalo, NY|airport|
|12953|               LGA|        New York, NY|airport|
|11042|               CLE|       Cleveland, OH|airport|
|10397|               ATL|         Atlanta, GA|airport|
|11617|               EWN|New Bern/Morehead...|airport|
|15096|               SYR|        Syracuse, NY|airport|
|13487|               MSP|     Minneapolis, MN|airport|
|10146|               ABY|          Albany, GA|airport|
|11641|               FAY|    Fayetteville, NC|airport|
|11433|               DTW|         Detroit, MI|airport|
+-----+------------------+--------------------+-------+

#### Writing data to Amazon S3

In [20]:
# Writing data to Amazon S3, to a single file
df_vertices_file = df_vertices_file.repartition(1)
df_vertices_file.write.mode("append").format("csv").option("header", "true").save(s3_vertices_location)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…