In [87]:
import pandas as pd
import aws_keys
import boto3

In [88]:
# load access keys
access_key, secret_key = aws_keys.aws_keys()

In [89]:
# create client object
s3 = boto3.client('s3',
                aws_access_key_id = access_key,
                aws_secret_access_key = secret_key,
                region_name='us-east-2')

In [90]:
# list buckets
response = s3.list_buckets()
response['Buckets']

[{'Name': 'aws-glue-assets-276295123250-us-east-2',
  'CreationDate': datetime.datetime(2023, 6, 11, 13, 18, 17, tzinfo=tzutc())},
 {'Name': 'flights-data-processed',
  'CreationDate': datetime.datetime(2023, 6, 11, 0, 26, 54, tzinfo=tzutc())},
 {'Name': 'flights-data-raw',
  'CreationDate': datetime.datetime(2023, 6, 7, 2, 11, 37, tzinfo=tzutc())}]

In [91]:
import io 

# read raw data in a polars data frame
obj = s3.get_object(Bucket='flights-data-raw', Key='Airline_Delay_Cause.csv')
flights = pd.read_csv(io.BytesIO(obj['Body'].read()))
flights.shape

(337562, 21)

In [92]:
# preview of raw data
flights.head(2)

Unnamed: 0,year,month,carrier,carrier_name,airport,airport_name,arr_flights,arr_del15,carrier_ct,weather_ct,...,security_ct,late_aircraft_ct,arr_cancelled,arr_diverted,arr_delay,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay
0,2023,3,9E,Endeavor Air Inc.,ABY,"Albany, GA: Southwest Georgia Regional",89.0,8.0,4.46,1.0,...,0.0,0.93,1.0,1.0,412.0,262.0,38.0,53.0,0.0,59.0
1,2023,3,9E,Endeavor Air Inc.,AEX,"Alexandria, LA: Alexandria International",62.0,8.0,3.95,0.37,...,0.0,2.4,0.0,0.0,357.0,188.0,7.0,44.0,0.0,118.0


In [93]:
# drop unwanted columns 
flights = flights.drop(['carrier_ct','weather_ct','nas_ct','security_ct','late_aircraft_ct'], axis=1)

In [94]:

flights['city_state']=flights['airport_name'].apply(lambda x: x.split(': ')[0])
flights['airport_name']=flights['airport_name'].apply(lambda x: x.split(': ')[1])
flights['city']=flights['city_state'].apply(lambda x: x.split(', ')[0])
flights['state']=flights['city_state'].apply(lambda x: x.split(', ')[1])
flights = flights.drop('city_state', axis=1)




In [95]:
# preview of data
flights.head(2)

Unnamed: 0,year,month,carrier,carrier_name,airport,airport_name,arr_flights,arr_del15,arr_cancelled,arr_diverted,arr_delay,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay,city,state
0,2023,3,9E,Endeavor Air Inc.,ABY,Southwest Georgia Regional,89.0,8.0,1.0,1.0,412.0,262.0,38.0,53.0,0.0,59.0,Albany,GA
1,2023,3,9E,Endeavor Air Inc.,AEX,Alexandria International,62.0,8.0,0.0,0.0,357.0,188.0,7.0,44.0,0.0,118.0,Alexandria,LA


### Create Carrier Table

In [96]:
# create a carrier table

carrier = flights[['carrier', 'carrier_name']].drop_duplicates(['carrier', 'carrier_name'], keep='first').sort_values('carrier')
# create an id column 
id_ = [i for i in range(1, len(carrier)+1)]
carrier['id'] = id_
carrier = carrier[['id', 'carrier', 'carrier_name']]
carrier.head(2)

Unnamed: 0,id,carrier,carrier_name
0,1,9E,Endeavor Air Inc.
163905,2,9E,Pinnacle Airlines Inc.


In [97]:

# replace the carrier and carrier_name columns with a foreign key
flights = flights.merge(carrier, on=['carrier', 'carrier_name']).drop(['carrier', 'carrier_name'], axis=1)
flights = flights.rename(columns={'id': 'carrier_id'})
flights.head(2)
flights.shape

(337562, 17)

### Create Airport Table

In [98]:
airport = flights[['airport', 'airport_name', 'city', 'state']].drop_duplicates('airport', keep='first').sort_values('airport')

id_ = [i for i in range(1, len(airport)+1)]
airport['id'] = id_
airport = airport[['id', 'airport', 'airport_name', 'city', 'state']]
airport.head(2)

Unnamed: 0,id,airport,airport_name,city,state
191,1,ABE,Lehigh Valley International,Allentown/Bethlehem/Easton,PA
107711,2,ABI,Abilene Regional,Abilene,TX


In [99]:
airport.head(3)

Unnamed: 0,id,airport,airport_name,city,state
191,1,ABE,Lehigh Valley International,Allentown/Bethlehem/Easton,PA
107711,2,ABI,Abilene Regional,Abilene,TX
7131,3,ABQ,Albuquerque International Sunport,Albuquerque,NM


In [100]:
flights = flights.merge(airport, on=['airport', 'airport_name', 'city', 'state']).drop(['airport', 'airport_name', 'city', 'state'], axis=1)
flights = flights.rename(columns={'id': 'airport_id'})

arr_flights = Number of Flights
arr_del15 = Number of delayed flights (15 min after schedule)
arr_diverted = number of diverte lights
num_cancelled = number of cancelled flights
arr_delay = total delay in minutes
carrier_delay = carrier delay in minutes
weather_delay = weather delay in minutes
nas_delay = national air system delay in minutes
security_delay = security delay in minutes
late_aircraft_delay = late aircraft delay in minutes


In [101]:
flights.head(5)

Unnamed: 0,year,month,arr_flights,arr_del15,arr_cancelled,arr_diverted,arr_delay,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay,carrier_id,airport_id
0,2023,3,89.0,8.0,1.0,1.0,412.0,262.0,38.0,53.0,0.0,59.0,1,5
1,2023,2,68.0,4.0,0.0,0.0,89.0,26.0,0.0,63.0,0.0,0.0,1,5
2,2023,1,82.0,10.0,0.0,0.0,1800.0,1393.0,272.0,56.0,0.0,79.0,1,5
3,2022,12,69.0,3.0,0.0,0.0,136.0,38.0,0.0,1.0,0.0,97.0,1,5
4,2022,11,86.0,2.0,0.0,0.0,54.0,0.0,26.0,28.0,0.0,0.0,1,5


### Create Date Table

In [102]:
flights.head()

Unnamed: 0,year,month,arr_flights,arr_del15,arr_cancelled,arr_diverted,arr_delay,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay,carrier_id,airport_id
0,2023,3,89.0,8.0,1.0,1.0,412.0,262.0,38.0,53.0,0.0,59.0,1,5
1,2023,2,68.0,4.0,0.0,0.0,89.0,26.0,0.0,63.0,0.0,0.0,1,5
2,2023,1,82.0,10.0,0.0,0.0,1800.0,1393.0,272.0,56.0,0.0,79.0,1,5
3,2022,12,69.0,3.0,0.0,0.0,136.0,38.0,0.0,1.0,0.0,97.0,1,5
4,2022,11,86.0,2.0,0.0,0.0,54.0,0.0,26.0,28.0,0.0,0.0,1,5


In [103]:
flights['year'].value_counts()

year
2019    20133
2018    19868
2022    19664
2021    19221
2007    18821
2008    18275
2020    18096
2006    17376
2009    17052
2010    16934
2005    16339
2004    16061
2013    15557
2011    15055
2012    13931
2014    13508
2015    13058
2017    12052
2016    11765
2003     8449
2023     4555
Name: count, dtype: int64

In [104]:
# Create date column
flights['date'] = pd.to_datetime(flights[['year', 'month']].assign(day=1))
flights = flights.sort_values('date')

# Create date ID column
flights['date_id'] = pd.factorize(flights['date'])[0] + 1

# Create a new table with unique year and month data
date = flights[['date_id', 'date', 'year', 'month']].drop_duplicates().reset_index(drop=True)

# delete the year and month rows from the flights table
flights = flights.drop(['date','year', 'month'], axis=1)
flights.head()


Unnamed: 0,arr_flights,arr_del15,arr_cancelled,arr_diverted,arr_delay,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay,carrier_id,airport_id,date_id
14937,109.0,19.0,0.0,0.0,771.0,275.0,155.0,204.0,0.0,137.0,9,33,1
51042,257.0,60.0,4.0,1.0,2938.0,376.0,91.0,1242.0,0.0,1229.0,27,102,1
194313,102.0,24.0,2.0,0.0,1427.0,200.0,97.0,557.0,0.0,573.0,3,178,1
45500,30.0,8.0,3.0,0.0,390.0,68.0,59.0,260.0,0.0,3.0,22,95,1
80562,270.0,48.0,0.0,0.0,2742.0,489.0,103.0,519.0,0.0,1631.0,29,196,1


In [105]:
# reorder columns

flights['id'] = flights.reset_index().index+1
flights = flights[['id','date_id','carrier_id','airport_id','arr_flights','arr_del15','arr_cancelled','arr_diverted','arr_delay','carrier_delay','weather_delay','nas_delay','security_delay','late_aircraft_delay']]

In [106]:
flights.head()

Unnamed: 0,id,date_id,carrier_id,airport_id,arr_flights,arr_del15,arr_cancelled,arr_diverted,arr_delay,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay
14937,1,1,9,33,109.0,19.0,0.0,0.0,771.0,275.0,155.0,204.0,0.0,137.0
51042,2,1,27,102,257.0,60.0,4.0,1.0,2938.0,376.0,91.0,1242.0,0.0,1229.0
194313,3,1,3,178,102.0,24.0,2.0,0.0,1427.0,200.0,97.0,557.0,0.0,573.0
45500,4,1,22,95,30.0,8.0,3.0,0.0,390.0,68.0,59.0,260.0,0.0,3.0
80562,5,1,29,196,270.0,48.0,0.0,0.0,2742.0,489.0,103.0,519.0,0.0,1631.0


In [107]:
carrier.head()

Unnamed: 0,id,carrier,carrier_name
0,1,9E,Endeavor Air Inc.
163905,2,9E,Pinnacle Airlines Inc.
95,3,AA,American Airlines Inc.
254412,4,AQ,Aloha Airlines Inc.
215,5,AS,Alaska Airlines Inc.


In [108]:
airport.head()

Unnamed: 0,id,airport,airport_name,city,state
191,1,ABE,Lehigh Valley International,Allentown/Bethlehem/Easton,PA
107711,2,ABI,Abilene Regional,Abilene,TX
7131,3,ABQ,Albuquerque International Sunport,Albuquerque,NM
129098,4,ABR,Aberdeen Regional,Aberdeen,SD
0,5,ABY,Southwest Georgia Regional,Albany,GA


In [109]:
date.head()

Unnamed: 0,date_id,date,year,month
0,1,2003-06-01,2003,6
1,2,2003-07-01,2003,7
2,3,2003-08-01,2003,8
3,4,2003-09-01,2003,9
4,5,2003-10-01,2003,10


### Export Tables to S3

In [60]:
# uploade files to bucket
csv_buffer = io.StringIO()
flights_pd = pd.DataFrame(flights.to_numpy(), columns=flights.columns)
flights_pd.to_csv(csv_buffer, index=False)
s3.put_object(Bucket='flights-data-processed', 
              Body=csv_buffer.getvalue(), 
              Key='flights.csv')


# uploade files to bucket
csv_buffer = io.StringIO()
carrier_pd = pd.DataFrame(carrier.to_numpy(), columns=carrier.columns)
carrier_pd.to_csv(csv_buffer, index=False)
s3.put_object(Bucket='flights-data-processed', 
              Body=csv_buffer.getvalue(), 
              Key='carriers.csv')


# uploade files to bucket
csv_buffer = io.StringIO()
airport_pd = pd.DataFrame(airport.to_numpy(), columns=airport.columns)
airport_pd.to_csv(csv_buffer, index=False)
s3.put_object(Bucket='flights-data-processed', 
              Body=csv_buffer.getvalue(), 
              Key='airports.csv')

{'ResponseMetadata': {'RequestId': 'DJPCBJCYW1E7EXBM',
  'HostId': 'ybmf8x910ON8l9RHE1C5b0qTtQ9fk0a8cqK9aEd2NxFzUjSyfKej59bls/V86dmGwOz36qct1EM=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'ybmf8x910ON8l9RHE1C5b0qTtQ9fk0a8cqK9aEd2NxFzUjSyfKej59bls/V86dmGwOz36qct1EM=',
   'x-amz-request-id': 'DJPCBJCYW1E7EXBM',
   'date': 'Sun, 11 Jun 2023 14:24:38 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"2d155295dda89966c4ec3c1b7660ab42"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"2d155295dda89966c4ec3c1b7660ab42"',
 'ServerSideEncryption': 'AES256'}

### Send Tables to Redshift Cluster