# Building Delta Lake-house for flights in United states
### Data Engineering Capstone Project

#### Project Summary
The objective for this project is to build a delta lake for US flights data </br>
and to build ETL pipline to move the data through each table in the delta lake archticture from raw (bronz) cleaned (silver) to aggregated (gold) tables </br>
to reach at this step to a star schema model ready for analysts to create dashboards</br>
we will also simulate the process of receiving the raw data via streaming app like Apache Kafka 

The project follows the follow steps:
* [Step 1: Scope the Project and Gather Data](#step1)
* [Step 2: Explore and Assess the Data](#step2)
* [Step 3: Define the Data Model](#step3)
* [Step 4: Run ETL to Model the Data](#step4)
* [Step 5: Complete Project Write Up](#step5)

In [1]:
# This cell imports all nessecary libraries
%run ./utilities/config

importing libraries ....

Libraries (
    os, pandas as pd, glob,logging, HTML, FlightsRaw
)
Are available now

pandas maximum column width is set to 400 and maximum number of columns to None
Setting up variables ....
vars_df is available as HTML content to display simply run HTML(vars_df)


In [2]:
# Display the variables availble in the enviroment
HTML(vars_df)

Unnamed: 0,Name,Value,Description
0,working_dir,D:\1- Study\In-progress\DEND\flights-delta-lake,string path for current working directory
1,data_source,D:\1- Study\In-progress\DEND\flights-delta-lake\data_source\,string path for data source location
2,delta_lake,D:\1- Study\In-progress\DEND\flights-delta-lake\delta_lake\,string path for delta lake location
3,flight_raw_path,D:\1- Study\In-progress\DEND\flights-delta-lake\delta_lake\flights_raw\,string path for flight raw data
4,flight_bronz_path,D:\1- Study\In-progress\DEND\flights-delta-lake\delta_lake\flight_bronz,string path for flight bronz data
5,flight_silver_path,D:\1- Study\In-progress\DEND\flights-delta-lake\delta_lake\flight_silver,string path for flight silver data
6,flight_gold_path,D:\1- Study\In-progress\DEND\flights-delta-lake\delta_lake\flight_gold,string path for flight gold data
7,date_gold_path,D:\1- Study\In-progress\DEND\flights-delta-lake\delta_lake\date_gold,string path for date gold data
8,checkpoints_path,D:\1- Study\In-progress\DEND\flights-delta-lake\checkpoints,string path for checkpoints directory
9,flight_raw_checkpoint,D:\1- Study\In-progress\DEND\flights-delta-lake\checkpoints\flight_raw\,string path for flight raw checkpoint


In [3]:
# This cell configures spark and makes it available
%run ./utilities/spark_config

importing libraries ....

Libraries (
    pandas as pd, psutils, logging, findspark,
    pyspark.sql.SparkSession, delta.configure_spark_with_delta_pip,
    HTML, create_stream_writer, read_stream_delta, read_stream_raw,
    stop_all_streams, stop_named_stream, until_stream_is_ready,
    register_delta_table, transform_raw, get_flight_schema,
    create_or_update_date_table, load_csv_to_dataframe,
    transform_lookup_airport, transform_lookup_plane
)
Are available now

Setting up spark configurations.....
spark session is now available in the environment as spark
spark_config_df is available as HTML content to display simply run HTML(spark_config_df)
Using flights_db database..


In [4]:
# Display Spark Configurations
HTML(spark_config_df)

Unnamed: 0,Config,Value,Description
0,spark.sql.extensions,io.delta.sql.DeltaSparkSessionExtension,Using delta io extension
1,spark.sql.catalog.spark_catalog,org.apache.spark.sql.delta.catalog.DeltaCatalog,Setting spark catalog to use DeltaCatalog
2,spark.ui.port,4050,Spark UI port number
3,spark.sql.shuffle.partitions,12,setting the number of shuffle partitions to the number of cores available
4,spark.sql.adaptive.enabled,True,Enabling adaptive query optimization
5,spark.memory.offHeap.enabled,True,Enabling offHeap memory
6,spark.memory.offHeap.size,10737418240,Setting offHeap memory to 10 GB


### Step 1: Scope the Project and Gather Data <a id='step1'></a>

#### Scope 
- dataset

### Delta Lake
Building a data lake-house using [Delta Lake](https://delta.io/)</br>
Delta Lake is an open-source storage layer that brings reliability to data lakes. Delta Lake provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing. Delta Lake runs on top of an existing data lake and is fully compatible with Apache Spark APIs for more information follow  [Delta Lake ducemntations](https://docs.delta.io/latest/index.html)</br>
The Delta architecture design pattern consists of landing data in successively cleaner Delta Lake tables from raw (Bronze) to clean (Silver) to aggregate (Gold). This is also referred to as a “multi-hop” architecture. You move data through this pipeline as needed.  
The end outputs are actionable insights, clean data for machine learning applications,  dashboards, and reports of business metrics.

![Architecture](images/delta_archticture.png)

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

### Step 2: Explore and Assess the Data <a id='step2'></a>
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [5]:
flights_raw = FlightsRaw(flight_raw_checkpoint, flight_raw_data_path, flight_raw_path)

In [20]:
flights_raw.retrive(5, multiprocess=True)

Only 173 files left to retrive


In [22]:
flights_raw.retrived

{'1987': 1311826,
 '1988': 5202096,
 '1989': 5041200,
 '1990': 5270893,
 '1991': 5076925,
 '1992': 5092157,
 '1993': 5070501,
 '1994': 5180048,
 '1995': 5327435,
 '1996': 5351983,
 '1997': 5411843,
 '1998': 5384721,
 '1999': 5527884,
 '2000': 5683047,
 '2001': 5967780,
 '2002': 5271359,
 '2003': 6488540,
 '2004': 7129270,
 '2005': 7140596,
 '2006': 7141922,
 '2007': 7453215,
 '2008': 7009728,
 '200805': 606293,
 '200806': 608665,
 '200807': 627931,
 '200808': 612279,
 '200809': 540908,
 '200810': 556205,
 '200811': 523272,
 '200812': 544956,
 '200901': 532339,
 '200902': 488410,
 '200903': 557422,
 '200904': 537793,
 '200905': 546832,
 '200906': 557594,
 '200907': 580134,
 '200908': 568301,
 '200909': 510852,
 '200910': 531799,
 '200911': 509540,
 '200912': 529269,
 '201001': 521809,
 '201002': 483270,
 '201003': 549262,
 '201004': 529330,
 '201005': 542747,
 '201006': 551687,
 '201007': 570788,
 '201008': 569217,
 '201009': 526107,
 '201010': 545519,
 '201011': 520999,
 '201012': 5393

In [23]:
print(f'{flights_raw.rows_retrived:,}')

203,451,420


In [9]:
len(flights_raw.source)

173

In [10]:
flights_raw.retrived_files

['D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1987\\198710.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1987\\198711.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1987\\198712.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1988\\198801.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1988\\198802.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1988\\198803.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1988\\198804.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1988\\198805.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1988\\198806.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1988\\198807.csv',
 'D:\\1- S

In [11]:
flights_raw.save()

Saved checkpoint at D:\1- Study\In-progress\DEND\flights-delta-lake\checkpoints\flight_raw\flight_raw.pickle


In [12]:
flights_raw = FlightsRaw(flight_raw_checkpoint)

No source is provided trying to load from latest checkpoint
loaded checkpoint from D:\1- Study\In-progress\DEND\flights-delta-lake\checkpoints\flight_raw\flight_raw.pickle


In [13]:
flights_raw.retrived_files

['D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1987\\198710.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1987\\198711.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1987\\198712.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1988\\198801.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1988\\198802.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1988\\198803.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1988\\198804.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1988\\198805.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1988\\198806.csv',
 'D:\\1- Study\\In-progress\\DEND\\flights-delta-lake\\delta_lake\\flights_raw\\1988\\198807.csv',
 'D:\\1- S

In [14]:
print(f'{flights_raw.rows_retrived:,}')

21,902,940


In [15]:
schema = get_flight_schema(spark, flights_raw)
schema

StructType(List(StructField(Year,IntegerType,true),StructField(Month,IntegerType,true),StructField(DayofMonth,IntegerType,true),StructField(DayOfWeek,IntegerType,true),StructField(DepTime,DoubleType,true),StructField(CRSDepTime,IntegerType,true),StructField(ArrTime,DoubleType,true),StructField(CRSArrTime,IntegerType,true),StructField(UniqueCarrier,StringType,true),StructField(FlightNum,IntegerType,true),StructField(TailNum,StringType,true),StructField(ActualElapsedTime,DoubleType,true),StructField(CRSElapsedTime,IntegerType,true),StructField(AirTime,StringType,true),StructField(ArrDelay,DoubleType,true),StructField(DepDelay,DoubleType,true),StructField(Origin,StringType,true),StructField(Dest,StringType,true),StructField(Distance,DoubleType,true),StructField(TaxiIn,StringType,true),StructField(TaxiOut,StringType,true),StructField(Cancelled,IntegerType,true),StructField(CancellationCode,StringType,true),StructField(Diverted,IntegerType,true),StructField(CarrierDelay,StringType,true),Str

In [16]:
stop_all_streams(spark)

False

In [17]:
flight_stream_df = read_stream_raw(spark, flight_raw_path + os.sep + '*', schema=schema)

In [18]:
flight_transformed_df = transform_raw(flight_stream_df, 'gharib_local_pc', 'Year')

In [19]:
flight_raw_writer = create_stream_writer(flight_transformed_df,
                                         flight_bronz_checkpoint,
                                         'flight_raw_to_bronz',
                                         'p_Year')

In [20]:
flight_raw_to_bronz = flight_raw_writer.start(flight_bronz_path)

In [21]:
flight_raw_to_bronz.isActive

True

In [22]:
flight_raw_to_bronz.status

{'message': 'Initializing sources',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [23]:
until_stream_is_ready(spark, 'flight_raw_to_bronz')


Timed out.


False

In [24]:
register_delta_table(spark, 'flight_bronz', flight_bronz_path)

In [25]:
delta_flights = spark.read.table('flight_bronz')

In [26]:
print(f'{delta_flights.count():,}')

21,902,940


In [27]:
print(f'{flights_raw.rows_retrived:,}')

21,902,940


In [28]:
assert delta_flights.count() == flights_raw.rows_retrived, "number of rows retrived doesn't match"
print("Assertion passed.")

Assertion passed.


In [29]:
flight_bronz_stream_df = read_stream_delta(spark, flight_bronz_path)

In [31]:
flight_bronz_transformed_df = transform_flight_bronz(flight_bronz_stream_df)

In [32]:
flight_bronz_writer = create_stream_writer(flight_bronz_transformed_df,
                                           flight_silver_checkpoint,
                                           'flight_bronz_to_silver',
                                           'p_Year')

In [33]:
flight_bronz_to_silver = flight_bronz_writer.start(flight_silver_path)

In [34]:
flight_bronz_to_silver.isActive

True

In [35]:
flight_bronz_to_silver.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [36]:
until_stream_is_ready(spark, 'flight_bronz_to_silver')


Timed out.


False

In [37]:
register_delta_table(spark, 'flight_silver', flight_silver_path)

In [38]:
delta_flights_silver = spark.read.table('flight_silver')

In [39]:
print(f'{delta_flights_silver.count():,}')

21,902,940


In [40]:
assert delta_flights_silver.count() == flights_raw.rows_retrived, "number of rows retrived doesn't match"
print("Assertion passed.")

Assertion passed.


In [41]:
spark.sql("""
SELECT * FROM flight_silver LIMIT 10
""").toPandas()

Unnamed: 0,p_Year,date,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,1989,1989-05-31,06:45,06:29,07:46,07:31,UA,402,,61,62,,15,16,STL,ORD,258,,,False,,False,,,,,
1,1989,1989-05-01,19:24,19:12,20:12,19:56,UA,403,,48,44,,16,12,BIL,GTF,177,,,False,,False,,,,,
2,1989,1989-05-02,19:08,19:12,19:47,19:56,UA,403,,39,44,,-9,-4,BIL,GTF,177,,,False,,False,,,,,
3,1989,1989-05-04,19:08,19:12,19:51,19:56,UA,403,,43,44,,-5,-4,BIL,GTF,177,,,False,,False,,,,,
4,1989,1989-05-05,19:09,19:12,19:50,19:56,UA,403,,41,44,,-6,-3,BIL,GTF,177,,,False,,False,,,,,
5,1989,1989-05-06,19:09,19:12,19:50,19:56,UA,403,,41,44,,-6,-3,BIL,GTF,177,,,False,,False,,,,,
6,1989,1989-05-07,19:10,19:12,19:53,19:56,UA,403,,43,44,,-3,-2,BIL,GTF,177,,,False,,False,,,,,
7,1989,1989-05-08,19:10,19:12,19:49,19:56,UA,403,,39,44,,-7,-2,BIL,GTF,177,,,False,,False,,,,,
8,1989,1989-05-09,19:09,19:12,19:49,19:56,UA,403,,40,44,,-7,-3,BIL,GTF,177,,,False,,False,,,,,
9,1989,1989-05-10,19:08,19:12,19:49,19:56,UA,403,,41,44,,-7,-4,BIL,GTF,177,,,False,,False,,,,,


In [42]:
Dict_Null = {col:delta_flights.filter(delta_flights[col].isNull()).count() for col in delta_flights.columns}
Dict_Null

{'p_Year': 0,
 'Month': 0,
 'DayofMonth': 0,
 'DayOfWeek': 0,
 'DepTime': 239976,
 'CRSDepTime': 0,
 'ArrTime': 301605,
 'CRSArrTime': 0,
 'UniqueCarrier': 0,
 'FlightNum': 0,
 'TailNum': 21902940,
 'ActualElapsedTime': 301605,
 'CRSElapsedTime': 0,
 'AirTime': 21902940,
 'ArrDelay': 301605,
 'DepDelay': 239976,
 'Origin': 0,
 'Dest': 0,
 'Distance': 158803,
 'TaxiIn': 21902940,
 'TaxiOut': 21902940,
 'Cancelled': 0,
 'CancellationCode': 21902940,
 'Diverted': 0,
 'CarrierDelay': 21902940,
 'WeatherDelay': 21902940,
 'NASDelay': 21902940,
 'SecurityDelay': 21902940,
 'LateAircraftDelay': 21902940,
 'data_source': 0,
 'ingest_time': 0,
 'ingest_date': 0}

In [43]:
Dict_Null = {col:delta_flights_silver.filter(delta_flights_silver[col].isNull()).count() for col in delta_flights_silver.columns}
Dict_Null

{'p_Year': 0,
 'date': 0,
 'DepTime': 242629,
 'CRSDepTime': 16,
 'ArrTime': 308337,
 'CRSArrTime': 1169,
 'UniqueCarrier': 0,
 'FlightNum': 0,
 'TailNum': 21902940,
 'ActualElapsedTime': 301605,
 'CRSElapsedTime': 0,
 'AirTime': 21902940,
 'ArrDelay': 301605,
 'DepDelay': 239976,
 'Origin': 0,
 'Dest': 0,
 'Distance': 158803,
 'TaxiIn': 21902940,
 'TaxiOut': 21902940,
 'Cancelled': 0,
 'CancellationCode': 21902940,
 'Diverted': 0,
 'CarrierDelay': 21902940,
 'WeatherDelay': 21902940,
 'NASDelay': 21902940,
 'SecurityDelay': 21902940,
 'LateAircraftDelay': 21902940}

In [44]:
flights_raw.retrive(12, multiprocess=True)

In [45]:
print(f'{flights_raw.rows_retrived:,}')

87,660,238


In [46]:
print(f'{delta_flights.count():,}')

49,311,327


In [47]:
print(f'{delta_flights_silver.count():,}')

27,827,925


In [48]:
flight_raw_to_bronz.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [49]:
flight_bronz_to_silver.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [50]:
print(f'{delta_flights.count():,}')

49,311,327


In [51]:
print(f'{delta_flights_silver.count():,}')

49,311,327


In [52]:
flight_raw_to_bronz.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [53]:
flight_bronz_to_silver.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [57]:
print(f'{delta_flights.count():,}')

87,660,238


In [58]:
print(f'{delta_flights_silver.count():,}')

87,660,238


In [55]:
assert delta_flights_silver.count() == flights_raw.rows_retrived, "number of rows retrived doesn't match"
print("Assertion passed.")

Assertion passed.


In [56]:
assert delta_flights.count() == flights_raw.rows_retrived, "number of rows retrived doesn't match"
print("Assertion passed.")

Assertion passed.


In [59]:
df_num_rows_by_month = spark.sql("""
SELECT 
    p_Year,
    Month,
    FORMAT_NUMBER(COUNT(1), 0) AS Count
FROM
    flight_bronz
GROUP BY
    1, 2
ORDER BY
    1, 2
""").toPandas()

In [61]:
df_num_rows_by_month

Unnamed: 0,p_Year,Month,Count
0,1987,10,448620
1,1987,11,422803
2,1987,12,440403
3,1988,1,436950
4,1988,2,412579
...,...,...,...
190,2003,8,556984
191,2003,9,527714
192,2003,10,552370
193,2003,11,528171


In [68]:
delta_flights_silver.columns

['p_Year',
 'date',
 'DepTime',
 'CRSDepTime',
 'ArrTime',
 'CRSArrTime',
 'UniqueCarrier',
 'FlightNum',
 'TailNum',
 'ActualElapsedTime',
 'CRSElapsedTime',
 'AirTime',
 'ArrDelay',
 'DepDelay',
 'Origin',
 'Dest',
 'Distance',
 'TaxiIn',
 'TaxiOut',
 'Cancelled',
 'CancellationCode',
 'Diverted',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay']

In [97]:
spark.sql("""
SELECT 
   *
FROM
    flight_silver
WHERE Cancelled = true
LIMIT 
    5
""").toPandas()

Unnamed: 0,p_Year,date,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,2003,2003-07-14,,12:30,,13:06,UA,750,000000,,,,,,DEN,COS,72,0,0,True,A,False,0,0,0,0,0
1,2003,2003-07-12,,08:10,,11:42,UA,750,000000,,,,,,SEA,DEN,1024,0,0,True,A,False,0,0,0,0,0
2,2003,2003-07-08,,15:00,,18:11,UA,752,000000,,,,,,ORD,PHL,678,0,0,True,C,False,0,0,0,0,0
3,2003,2003-07-15,,15:00,,18:11,UA,752,000000,,,,,,ORD,PHL,678,0,0,True,A,False,0,0,0,0,0
4,2003,2003-07-22,,15:00,,18:11,UA,752,N385UA,,,,,,ORD,PHL,678,0,0,True,C,False,0,0,0,0,0


In [113]:
def create_or_update_flight_gold(spark, flight_silver_table, flight_gold_table):
    """Create summary table aggrigated by date

    Args:
        spark (SparkSession): spark session 
        flight_silver_table (str): name of flight silver table
        flight_gold_table (str): name of flight gold table
    """
    spark.sql(f"""
        DROP TABLE IF EXISTS {flight_gold_table}
    """)

    spark.sql(f"""
        CREATE TABLE {flight_gold_table} AS (
            SELECT 
                date,
                COUNT(DISTINCT UniqueCarrier) AS num_carriers,
                SUM(CAST(Cancelled AS INTEGER)) AS total_cancelled,
                SUM(CAST(Diverted AS INTEGER))AS total_diverted,
                ROUND(AVG(AirTime), 2) AS avg_air_time,
                SUM(AirTime) AS total_air_time,
                SUM(CASE WHEN DepDelay <= 0 OR DepDelay IS NULL THEN 0 ELSE 1 END) AS count_dep_delay,  
                SUM(CASE WHEN DepDelay < 0 OR DepDelay IS NULL THEN 0 ELSE DepDelay END) AS total_dep_delay,  
                ROUND(AVG(CASE WHEN DepDelay <= 0 THEN NULL ELSE DepDelay END), 2) AS average_dep_delay,
                SUM(CASE WHEN ArrDelay <= 0 OR ArrDelay IS NULL THEN 0 ELSE 1 END) AS count_arr_delay,  
                SUM(CASE WHEN ArrDelay < 0 OR ArrDelay IS NULL THEN 0 ELSE ArrDelay END) AS total_arr_delay,  
                ROUND(AVG(CASE WHEN ArrDelay <= 0 THEN NULL ELSE ArrDelay END), 2) AS average_arr_delay,
                SUM(CASE WHEN CarrierDelay <= 0 OR CarrierDelay IS NULL THEN 0 ELSE 1 END) AS count_carrier_delay,  
                SUM(CASE WHEN CarrierDelay IS NULL THEN 0 ELSE CarrierDelay END) AS total_carrier_delay,
                ROUND(AVG(CASE WHEN CarrierDelay <= 0 THEN NULL ELSE CarrierDelay END), 2) AS average_carrier_delay,
                SUM(CASE WHEN WeatherDelay <= 0 OR WeatherDelay IS NULL THEN 0 ELSE 1 END) AS count_weather_delay,  
                SUM(CASE WHEN WeatherDelay IS NULL THEN 0 ELSE WeatherDelay END) AS total_weather_delay,
                ROUND(AVG(CASE WHEN WeatherDelay <= 0 THEN NULL ELSE WeatherDelay END), 2) AS average_weather_delay,
                SUM(CASE WHEN NASDelay <= 0 OR NASDelay IS NULL THEN 0 ELSE 1 END) AS count_nas_delay,  
                SUM(CASE WHEN NASDelay IS NULL THEN 0 ELSE NASDelay END) AS total_nas_delay,
                ROUND(AVG(CASE WHEN NASDelay <= 0 THEN NULL ELSE NASDelay END), 2) AS average_nas_delay,
                SUM(CASE WHEN SecurityDelay <= 0 OR SecurityDelay IS NULL THEN 0 ELSE 1 END) AS count_security_delay,  
                SUM(CASE WHEN SecurityDelay IS NULL THEN 0 ELSE SecurityDelay END) AS total_security_delay,
                ROUND(AVG(CASE WHEN SecurityDelay <= 0 THEN NULL ELSE SecurityDelay END), 2) AS average_security_delay,
                SUM(CASE WHEN LateAircraftDelay <= 0 OR LateAircraftDelay IS NULL THEN 0 ELSE 1 END) AS count_aircraft_delay,  
                SUM(CASE WHEN LateAircraftDelay IS NULL THEN 0 ELSE LateAircraftDelay END) AS total_aircraft_delay,
                ROUND(AVG(CASE WHEN LateAircraftDelay <= 0 THEN NULL ELSE LateAircraftDelay END), 2) AS average_aircraft_delay,
                SUM(Distance) AS total_distance,
                ROUND(AVG(Distance), 2) AS average_distance,
                COUNT(1)AS num_flights
            FROM
                {flight_silver_table}
            GROUP BY
                date
            ORDER BY
                date
        )
    """)


In [114]:
create_or_update_flight_gold(spark, 'flight_silver', 'flight_gold')

In [117]:
flight_gold_df = spark.read.table('flight_gold')

In [120]:
flight_gold_df.write.format('delta').save(flight_gold_path)

In [134]:
test = spark.sql("""SELECT MIN(date) as min_date, MAX(date) as max_date FROM flight_gold""").collect()

In [137]:
test[0].asDict()

{'min_date': datetime.date(1987, 10, 1),
 'max_date': datetime.date(2003, 12, 31)}

In [141]:
m = 1
y = 2019

start = pd.to_datetime(f'{m}{y}', format='%m%Y')
end = pd.to_datetime(f'{m}{y + 1}', format='%m%Y')  - pd.Timedelta('1d')

df = pd.DataFrame({'Date': pd.date_range(start, end)})

df

Unnamed: 0,Date
0,2019-01-01
1,2019-01-02
2,2019-01-03
3,2019-01-04
4,2019-01-05
...,...
360,2019-12-27
361,2019-12-28
362,2019-12-29
363,2019-12-30


In [142]:
from pyspark.sql import DataFrame

In [145]:
spark.createDataFrame(df).show()

+-------------------+
|               Date|
+-------------------+
|2019-01-01 00:00:00|
|2019-01-02 00:00:00|
|2019-01-03 00:00:00|
|2019-01-04 00:00:00|
|2019-01-05 00:00:00|
|2019-01-06 00:00:00|
|2019-01-07 00:00:00|
|2019-01-08 00:00:00|
|2019-01-09 00:00:00|
|2019-01-10 00:00:00|
|2019-01-11 00:00:00|
|2019-01-12 00:00:00|
|2019-01-13 00:00:00|
|2019-01-14 00:00:00|
|2019-01-15 00:00:00|
|2019-01-16 00:00:00|
|2019-01-17 00:00:00|
|2019-01-18 00:00:00|
|2019-01-19 00:00:00|
|2019-01-20 00:00:00|
+-------------------+
only showing top 20 rows



In [11]:
import pandas as pd
from pyspark.sql.functions import (
    col,
    current_timestamp,
    from_json,
    from_unixtime,
    lag,
    lead,
    lit,
    mean,
    stddev,
    max,
    from_unixtime,
    unix_timestamp,
    lpad,
    concat_ws,
    to_date
)
import pyspark.sql.functions  as F

In [5]:
table = 'flight_gold'
query = f"""
            SELECT 
                MIN(date) as min_date,
                MAX(date) as max_date 
            FROM
                {table}
            WHERE
                YEAR(date) > 2000
        """
min_max_date_dict = spark.sql(query).collect()[0].asDict()
min_year = min_max_date_dict['min_date'].year
max_year = min_max_date_dict['max_date'].year
start_date = pd.to_datetime(f'{1}{min_year}', format='%m%Y')
end_date = pd.to_datetime(f'{1}{max_year}', format='%m%Y')
pdf = pd.DataFrame({'dte': pd.date_range(start_date, end_date)})
pdf.dte = pdf.dte.astype('str')

In [37]:
print(start_date.date())

2001-01-01


In [23]:
date_df = spark.createDataFrame(pdf)

In [33]:
test_date_df = (
    date_df.limit(10)
    .withColumn('dte', to_date('dte'))
    .withColumn('year', F.year('dte'))
    .withColumn('quarter', F.concat_ws('Q', F.lit(''), F.quarter('dte')))
    .withColumn('month', F.month('dte'))
    .withColumn('day', F.dayofmonth('dte'))
    .withColumn('dayofweek', F.dayofweek('dte'))
    .withColumn('dayofyear', F.dayofyear('dte'))
    .withColumn('weekofyear', F.weekofyear('dte'))
    .withColumn('month_short', F.date_format('dte', 'MMM'))
    .withColumn('month_name', F.date_format('dte', 'MMMM'))
    .withColumn('month_year', F.concat_ws('-', F.col('year'), F.col('month_short')))
    .withColumn('sort_month_year', F.col('year') * 100 + F.col('month'))
    .withColumn('quarter_year', F.concat_ws('-', F.col('year'), F.col('quarter')))
    .withColumn('sort_quarter_year', F.col('year') * 100 + F.quarter('dte'))
)
test_date_df.toPandas()

Unnamed: 0,dte,year,quarter,month,day,dayofweek,dayofyear,weekofyear,month_short,month_name,month_year,sort_month_year,quarter_year,sort_quarter_year
0,2001-01-01,2001,Q1,1,1,2,1,1,Jan,January,2001-Jan,200101,2001-Q1,200101
1,2001-01-02,2001,Q1,1,2,3,2,1,Jan,January,2001-Jan,200101,2001-Q1,200101
2,2001-01-03,2001,Q1,1,3,4,3,1,Jan,January,2001-Jan,200101,2001-Q1,200101
3,2001-01-04,2001,Q1,1,4,5,4,1,Jan,January,2001-Jan,200101,2001-Q1,200101
4,2001-01-05,2001,Q1,1,5,6,5,1,Jan,January,2001-Jan,200101,2001-Q1,200101
5,2001-01-06,2001,Q1,1,6,7,6,1,Jan,January,2001-Jan,200101,2001-Q1,200101
6,2001-01-07,2001,Q1,1,7,1,7,1,Jan,January,2001-Jan,200101,2001-Q1,200101
7,2001-01-08,2001,Q1,1,8,2,8,2,Jan,January,2001-Jan,200101,2001-Q1,200101
8,2001-01-09,2001,Q1,1,9,3,9,2,Jan,January,2001-Jan,200101,2001-Q1,200101
9,2001-01-10,2001,Q1,1,10,4,10,2,Jan,January,2001-Jan,200101,2001-Q1,200101


In [171]:
date_df.printSchema()

root
 |-- dte: string (nullable = true)



In [98]:
from pyspark.sql.functions import (
    col,
    current_timestamp,
    from_json,
    from_unixtime,
    lag,
    lead,
    lit,
    mean,
    stddev,
    max,
    from_unixtime,
    unix_timestamp,
    lpad,
    concat_ws,
    when,
    count,
    isnan
)
from pyspark.sql.types import StringType, IntegerType, TimestampType

In [72]:
test = delta_flights.limit(10)

In [90]:
test2 = (
    test
    .withColumn("date", concat_ws("-", col("p_Year"), col("Month"), col("DayofMonth")).cast("date"))
    .withColumn("DepTime", col("DepTime").cast("integer").cast("string"))
    .withColumn("DepTime", lpad(col("DepTime"),4,'0'))
    .withColumn("DepTime", from_unixtime(unix_timestamp(col("DepTime"), "HHmm"),"HH:mm"))
    .withColumn("Cancelled", col("Cancelled").cast('boolean'))
    .withColumn("CarrierDelay", col("CarrierDelay").cast('integer'))
    
)

In [91]:
test2.printSchema()

root
 |-- p_Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: double (nullable = true)
 |-- Car

In [93]:
test2.columns

['p_Year',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'DepTime',
 'CRSDepTime',
 'ArrTime',
 'CRSArrTime',
 'UniqueCarrier',
 'FlightNum',
 'TailNum',
 'ActualElapsedTime',
 'CRSElapsedTime',
 'AirTime',
 'ArrDelay',
 'DepDelay',
 'Origin',
 'Dest',
 'Distance',
 'TaxiIn',
 'TaxiOut',
 'Cancelled',
 'CancellationCode',
 'Diverted',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'data_source',
 'ingest_time',
 'ingest_date',
 'date']

In [92]:
test2.toPandas()

Unnamed: 0,p_Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,data_source,ingest_time,ingest_date,date
0,2021,3,3,3,07:14,724,818.0,843,OO,3133,N728SK,64.0,79.0,49.0,-25.0,-10.0,SGU,PHX,262.0,5.0,10.0,False,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25,2021-03-03
1,2021,3,3,3,09:17,922,1031.0,1040,OO,3134,N752SK,74.0,78.0,48.0,-9.0,-5.0,PHX,SGU,262.0,3.0,23.0,False,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25,2021-03-03
2,2021,3,3,3,13:21,1330,1501.0,1530,OO,3135,N766SK,160.0,180.0,129.0,-29.0,-9.0,MHT,ORD,843.0,16.0,15.0,False,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25,2021-03-03
3,2021,3,3,3,16:36,1645,2002.0,2010,OO,3136,N752EV,146.0,145.0,112.0,-8.0,-9.0,DFW,TRI,868.0,7.0,27.0,False,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25,2021-03-03
4,2021,3,3,3,18:38,1844,1903.0,1925,OO,3137,N715SK,85.0,101.0,69.0,-22.0,-6.0,PHX,BFL,425.0,3.0,13.0,False,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25,2021-03-03
5,2021,3,3,3,16:48,1650,1808.0,1834,OO,3139,N744SK,80.0,104.0,57.0,-26.0,-2.0,ORD,BNA,409.0,4.0,19.0,False,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25,2021-03-03
6,2021,3,3,3,16:51,1652,1929.0,1902,OO,3140,N727SK,98.0,70.0,45.0,27.0,-1.0,PSP,PHX,261.0,5.0,48.0,False,,0.0,27.0,0.0,0.0,0.0,0.0,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25,2021-03-03
7,2021,3,3,3,12:42,1245,1452.0,1456,OO,3141,N771SK,190.0,191.0,153.0,-4.0,-3.0,DFW,YUM,1022.0,5.0,32.0,False,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25,2021-03-03
8,2021,3,3,3,07:17,726,821.0,836,OO,3142,N614SK,124.0,130.0,104.0,-15.0,-9.0,LBB,PHX,588.0,8.0,12.0,False,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25,2021-03-03
9,2021,3,3,3,20:40,2045,2144.0,2215,OO,3144,N773SK,124.0,150.0,97.0,-31.0,-5.0,DFW,DRO,674.0,2.0,25.0,False,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25,2021-03-03


In [89]:
spark.sql("DESCRIBE EXTENDED flight_bronz").toPandas()

Unnamed: 0,col_name,data_type,comment
0,p_Year,int,Year used for partitioning the table
1,Month,int,
2,DayofMonth,int,
3,DayOfWeek,int,
4,DepTime,double,
5,CRSDepTime,int,
6,ArrTime,double,
7,CRSArrTime,int,
8,UniqueCarrier,string,
9,FlightNum,int,


In [100]:
Dict_Null = {col:delta_flights.filter(delta_flights[col].isNull()).count() for col in delta_flights.columns}
Dict_Null

{'p_Year': 0,
 'Month': 0,
 'DayofMonth': 0,
 'DayOfWeek': 0,
 'DepTime': 303021,
 'CRSDepTime': 0,
 'ArrTime': 305421,
 'CRSArrTime': 0,
 'UniqueCarrier': 0,
 'FlightNum': 0,
 'TailNum': 158280,
 'ActualElapsedTime': 313728,
 'CRSElapsedTime': 5,
 'AirTime': 313728,
 'ArrDelay': 313728,
 'DepDelay': 303075,
 'Origin': 0,
 'Dest': 0,
 'Distance': 0,
 'TaxiIn': 305423,
 'TaxiOut': 303880,
 'Cancelled': 0,
 'CancellationCode': 5365575,
 'Diverted': 0,
 'CarrierDelay': 5147504,
 'WeatherDelay': 5147504,
 'NASDelay': 5147504,
 'SecurityDelay': 5147504,
 'LateAircraftDelay': 5147504,
 'data_source': 0,
 'ingest_time': 0,
 'ingest_date': 0}

In [103]:
spark.sql("""
SELECT 
    Diverted,
    FORMAT_NUMBER(COUNT(1), 0) AS count
FROM
    flight_bronz
GROUP BY
    Diverted
ORDER BY
    COUNT(1) DESC
""").toPandas()

Unnamed: 0,Diverted,count
0,0.0,5660732
1,1.0,9285


In [39]:
lookup_tables_path = os.path.join(data_source, 'LookupTables', '')
l_plane_path = os.path.join(lookup_tables_path, 'L_PLANE.csv')
l_airport_path = os.path.join(lookup_tables_path, 'L_AIRPORT.csv')
l_cancelation_path = os.path.join(lookup_tables_path, 'L_CANCELLATION.csv')
l_unique_carrier_path = os.path.join(lookup_tables_path, 'L_UNIQUE_CARRIERS.csv')


In [38]:
def load_csv_to_dataframe(spark, path):
    """Loads csv file into spark dataframe

    Args:
        spark (SparkSession): spark session 
        path (str): string path for the csv file to be loaded

    Returns:
        dataframe: spark dataframe
    """
    dataframe = (
        spark.read.format('csv')
        .option('header', 'true')
        .option('inferSchema', 'true')
        .load(path)
    )
    
    return dataframe

In [48]:
dataframe = load_csv_to_dataframe(spark, l_plane_path)

In [51]:
dataframe.dropna(how='all', subset=dataframe.columns[1:]).limit(10).toPandas()

Unnamed: 0,tailnum,type,manufacturer,issue_date,model,status,aircraft_type,engine_type,year
0,N10156,Corporation,EMBRAER,02/13/2004,EMB-145XR,Valid,Fixed Wing Multi-Engine,Turbo-Fan,2004
1,N102UW,Corporation,AIRBUS INDUSTRIE,05/26/1999,A320-214,Valid,Fixed Wing Multi-Engine,Turbo-Fan,1998
2,N10323,Corporation,BOEING,07/01/1997,737-3TO,Valid,Fixed Wing Multi-Engine,Turbo-Jet,1986
3,N103US,Corporation,AIRBUS INDUSTRIE,06/18/1999,A320-214,Valid,Fixed Wing Multi-Engine,Turbo-Fan,1999
4,N104UA,Corporation,BOEING,01/26/1998,747-422,Valid,Fixed Wing Multi-Engine,Turbo-Fan,1998
5,N104UW,Corporation,AIRBUS INDUSTRIE,07/02/1999,A320-214,Valid,Fixed Wing Multi-Engine,Turbo-Fan,1999
6,N10575,Corporation,EMBRAER,06/24/2003,EMB-145LR,Valid,Fixed Wing Multi-Engine,Turbo-Fan,2002
7,N105UA,Corporation,BOEING,10/15/2007,747-451,Valid,Fixed Wing Multi-Engine,Turbo-Jet,1994
8,N105UW,Corporation,AIRBUS INDUSTRIE,07/22/1999,A320-214,Valid,Fixed Wing Multi-Engine,Turbo-Fan,1999
9,N106US,Corporation,AIRBUS INDUSTRIE,08/05/1999,A320-214,Valid,Fixed Wing Multi-Engine,Turbo-Fan,1999


In [47]:
df = (
        dataframe.select(
            F.col('Code').alias('airport_code'),
            F.split(F.split('Description', ', ')[1], ': ')[1].alias('airport_name'),
            F.split('Description', ', ')[0].alias('city'),
            F.split(F.split('Description', ', ')[1], ': ')[0].alias('country')
            )
    )
df.toPandas()

Unnamed: 0,airport_code,airport_name,city,country
0,01A,Afognak Lake Airport,Afognak Lake,AK
1,03A,Bear Creek Mining Strip,Granite Mountain,AK
2,04A,Lik Mining Camp,Lik,AK
3,05A,Little Squaw Airport,Little Squaw,AK
4,06A,Kizhuyak Bay,Kizhuyak,AK
...,...,...,...,...
6575,ZXZ,Waterville Airport,Waterville,WA
6576,ZYL,Savannakhet,Sylhet,Bangladesh
6577,ZZU,Mzuzu Airport,Mzuzu,Malawi
6578,ZZV,Zanesville Municipal,Zanesville,OH


In [41]:
(
    delta_flights.
    limit(10)
    .withColumn("DepTime", from_unixtime(unix_timestamp(col("DepTime"), "MMM d, yyyy hh:mm:ss a"),"yyyy-MM-dd HH:mm:ss.SSSSSS"))

Unnamed: 0,p_Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,data_source,ingest_time,ingest_date
0,2021,3,3,3,714.0,724,818.0,843,OO,3133,N728SK,64.0,79.0,49.0,-25.0,-10.0,SGU,PHX,262.0,5.0,10.0,0.0,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25
1,2021,3,3,3,917.0,922,1031.0,1040,OO,3134,N752SK,74.0,78.0,48.0,-9.0,-5.0,PHX,SGU,262.0,3.0,23.0,0.0,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25
2,2021,3,3,3,1321.0,1330,1501.0,1530,OO,3135,N766SK,160.0,180.0,129.0,-29.0,-9.0,MHT,ORD,843.0,16.0,15.0,0.0,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25
3,2021,3,3,3,1636.0,1645,2002.0,2010,OO,3136,N752EV,146.0,145.0,112.0,-8.0,-9.0,DFW,TRI,868.0,7.0,27.0,0.0,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25
4,2021,3,3,3,1838.0,1844,1903.0,1925,OO,3137,N715SK,85.0,101.0,69.0,-22.0,-6.0,PHX,BFL,425.0,3.0,13.0,0.0,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25
5,2021,3,3,3,1648.0,1650,1808.0,1834,OO,3139,N744SK,80.0,104.0,57.0,-26.0,-2.0,ORD,BNA,409.0,4.0,19.0,0.0,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25
6,2021,3,3,3,1651.0,1652,1929.0,1902,OO,3140,N727SK,98.0,70.0,45.0,27.0,-1.0,PSP,PHX,261.0,5.0,48.0,0.0,,0.0,27.0,0.0,0.0,0.0,0.0,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25
7,2021,3,3,3,1242.0,1245,1452.0,1456,OO,3141,N771SK,190.0,191.0,153.0,-4.0,-3.0,DFW,YUM,1022.0,5.0,32.0,0.0,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25
8,2021,3,3,3,717.0,726,821.0,836,OO,3142,N614SK,124.0,130.0,104.0,-15.0,-9.0,LBB,PHX,588.0,8.0,12.0,0.0,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25
9,2021,3,3,3,2040.0,2045,2144.0,2215,OO,3144,N773SK,124.0,150.0,97.0,-31.0,-5.0,DFW,DRO,674.0,2.0,25.0,0.0,,0.0,,,,,,gharib_local_pc,2021-06-25 15:46:03.993,2021-06-25


In [None]:
dataframe = spark.read.format('csv').option('header', 'true').

In [None]:
# Performing cleaning tasks here





### Step 3: Define the Data Model <a id='step3'></a>
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data <a id='step4'></a>
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up <a id='step5'></a>
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.