# Data Pipelines for United States International Visitor Air Travel Data
### Data Engineering Capstone Project

#### Project Summary
+ SUMMARY: This project gathers I94 Immigration data for International travel to the United States, US cities demographics data, and airport codes data in and creates a data pipeline which uses Spark in order to create a data model that includes Fact and Dimension tables for analyzing data relating to Air travel to the United States.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType
from pyspark.sql import SparkSession

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc.

`Initially the raw data will be loaded into the project workspace.`

`This project creates a data pipeline which extracts the following datasets:`

* I94 Immigration data
* US cities demographics data
* Airport codes data

`Using Spark, the data will then be cleaned and transformed into  a data model consisting of fact and dimension tables.`

`Finally, the analytics tables with clean data will be written to parquet files which can be read and analyzed using Spark or copied into a data warehouse such as Amazon Redshift for analysis.`

`Data quality checks will be applied at various stages in the pipeline to ensure the data is loaded correctly and is of high quality.`

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

#### 1. I94 Immigration Data

* Contains international visitor arrival data by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry for select countries.

In [2]:
# Read in the data to a Pandas dataframe here
df = pd.read_sas('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat', encoding='unicode_escape')

In [3]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


#### 2. Airport Codes Data

* This is a simple table of airport codes and corresponding cities.

* The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code (from wikipedia).

In [4]:
airport_code_df = pd.read_csv('airport-codes_csv.csv')

In [5]:
airport_code_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


#### 3. US Cities Demographics Data

* This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. 

* This data comes from the US Census Bureau's 2015 American Community Survey.

In [6]:
us_cities_df = pd.read_csv('us-cities-demographics.csv', delimiter=';')

In [7]:
us_cities_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


#### 4. Country Codes Data

* This dataset contains information that relates the country ID from the I94 Immigration dataset to the full name of the country. 

* This data comes from the I94_SAS_Labels_Descriptions.SAS file

In [8]:
country_df = pd.read_csv('country_codes.csv', delimiter=',')

In [9]:
country_df.head()

Unnamed: 0,i94_res,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


### Step 2: Explore and Assess the Data

#### Checking for Null Airport ID (ident)

In [10]:
airport_code_df['ident'].isna().value_counts()

False    55075
Name: ident, dtype: int64

#### Checking for Null iata_code
+ Will need to drop records with Null iata_code

In [11]:
airport_code_df['iata_code'].isna().value_counts()

True     45886
False     9189
Name: iata_code, dtype: int64

#### Counting number of each type of airport
+ Data model will include only airports classified as small_aiport, medium_airport, large_airport, or closed

In [12]:
airport_code_df['type'].value_counts()

small_airport     33965
heliport          11287
medium_airport     4550
closed             3606
seaplane_base      1016
large_airport       627
balloonport          24
Name: type, dtype: int64

#### Checking number of records where the airline is Null

In [13]:
df['airline'].isna().value_counts()

False    3012686
True       83627
Name: airline, dtype: int64

#### Counting each state's population by race

In [14]:
pop_df = us_cities_df[['State','Race','Count']].groupby(["State","Race"])

In [15]:
pop_df2 = pop_df[['State','Race','Count']].sum(level="Race")

In [16]:
pop_df2.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Count
State,Race,Unnamed: 2_level_1
Alabama,American Indian and Alaska Native,8084
Alabama,Asian,28769
Alabama,Black or African-American,521068
Alabama,Hispanic or Latino,39313
Alabama,White,498920


#### Checking each data set for duplicates
+ Will drop all duplicate records in later steps if found

In [17]:
print(len(df.index))
df2 = df.drop_duplicates()
print(len(df2.index))

3096313
3096313


In [18]:
print(len(us_cities_df.index))
us_cities_df2 = us_cities_df.drop_duplicates()
print(len(us_cities_df2))

2891
2891


In [19]:
print(len(airport_code_df.index))
airport_code_df2 = airport_code_df.drop_duplicates()
print(len(airport_code_df2))

55075
55075


#### Top 10 most common iata_code

In [20]:
airport_code_df['iata_code'].value_counts().head(10)

0      80
PRI     3
OHE     3
SVD     2
CQP     2
KYF     2
SHO     2
DLR     2
RCH     2
RZS     2
Name: iata_code, dtype: int64

#### Unique Airport types

In [21]:
airport_code_df['type'].unique()

array(['heliport', 'small_airport', 'closed', 'seaplane_base',
       'balloonport', 'medium_airport', 'large_airport'], dtype=object)

#### Top 10 most common destination cities

In [22]:
df['i94port'].value_counts().head(10)

NYC    485916
MIA    343941
LOS    310163
SFR    152586
ORL    149195
HHW    142720
NEW    136122
CHI    130564
HOU    101481
FTL     95977
Name: i94port, dtype: int64

#### Cleaning Steps
##### The following steps will be applied to clean the raw data:
1. Airport data will be filtered to only include airports of types small airport, medium airport, large airport, and closed. Additionally, the final airports table will only include airports with a valid IATA code.
2. Travelers table will be created by selecting only travelers who have either a valid flight number or a valid airline since we are creating data model only based on air travel data.
3. Travel records with an invalid destination city or destination state will be removed.
4. Population data will be aggregated to include the total population, male population, female populaiton, and foreign-born population totals for each state.
5. Data types will be cast to the desired types based on the data model.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Read the data from source files into Spark Dataframes
2. Performing data cleaning steps to handle unwanted data
3. Join and transform data into fact and dimension tables
4. Run data quality checks to ensure the fact and dimension tables are created correctly and contain records
5. Write final fact and dimension tables to destination folders stored as parquet files

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### Load data into Spark for cleaning and transformation

In [23]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
#df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [24]:
i94_schema = StructType([
        StructField("cicid", LongType()),
        StructField("i94res", LongType()),
        StructField("i94addr", StringType()),
        StructField("visapost", StringType()),
        StructField("i94r", LongType()),
        StructField("i94mon", LongType()),
        StructField("arrdate", LongType()),
        StructField("depdate", LongType()),
        StructField("dtadfile", StringType()),
        StructField("dtaddto", StringType()),
        StructField("i94port", StringType()),
        StructField("traveler_id", LongType()),
        StructField("birth_year", StringType()),
        StructField("age", LongType()),
        StructField("gender", StringType()),
        StructField("airline", StringType()),
        StructField("flight_no", StringType()),
        StructField("visa_type", StringType())
    ])

In [25]:
#write to parquet
#df_spark.write.parquet("sas_data")
df_spark=spark.read.options(schema=i94_schema, header='True').parquet("sas_data")

#### Create airports Dimension Table

In [26]:
airports_dim_schema = StructType([
        StructField("airport_id", StringType()),
        StructField("iata_code", StringType()),
        StructField("airport_type", StringType()),
        StructField("airport_name", StringType()),
        StructField("municipality", StringType()),
        StructField("iso_region", StringType()),
        StructField("coordinates", StringType()),        
    ])

In [27]:
airports_spark = spark.read.options(delimiter=',',schema=airports_dim_schema, header='True').csv('airport-codes_csv.csv')

In [28]:
airports_stg = airports_spark.select('ident','iata_code','type','name','municipality','iso_region','coordinates')

In [29]:
airports_stg = airports_stg.createOrReplaceTempView("airports")

In [30]:
airports_dim = spark.sql("""SELECT 
                ident as airport_id, 
                iata_code, 
                type as airport_type, 
                name as airport_name, 
                municipality, 
                iso_region,
                coordinates 
            FROM airports WHERE type in ('small_airport','medium_airport','large_airport','closed')
            AND iata_code is not null""")

In [31]:
airports_dim.show(10)

+----------+---------+-------------+--------------------+---------------+----------+--------------------+
|airport_id|iata_code| airport_type|        airport_name|   municipality|iso_region|         coordinates|
+----------+---------+-------------+--------------------+---------------+----------+--------------------+
|       03N|      UTK|small_airport|      Utirik Airport|  Utirik Island|    MH-UTI|  169.852005, 11.222|
|      07FA|      OCA|small_airport|Ocean Reef Club A...|      Key Largo|     US-FL|-80.274803161621,...|
|       0AK|      PQS|small_airport|Pilot Station Air...|  Pilot Station|     US-AK|-162.899994, 61.9...|
|      0CO2|      CSE|small_airport|Crested Butte Air...|  Crested Butte|     US-CO|-106.928341, 38.8...|
|      0TE7|      JCY|small_airport|   LBJ Ranch Airport|   Johnson City|     US-TX|-98.6224975585999...|
|      13MA|      PMX|small_airport|Metropolitan Airport|         Palmer|     US-MA|-72.3114013671999...|
|       16A|      NUP|small_airport| Nunapitch

#### Create travelers Dimension Table

In [32]:
travelers_spark = df_spark.select('cicid','biryear','i94bir','gender','airline','fltno','visatype','visapost')

In [33]:
travelers_stg = travelers_spark.createOrReplaceTempView("travelers")

In [34]:
travelers_dim = spark.sql("""SELECT 
                cast(cicid as bigint) as traveler_id, 
                cast(biryear as int) as birth_year, 
                cast(i94bir as int) as age, 
                gender, 
                airline, 
                fltno as flight_no, 
                visatype as visa_type 
              FROM travelers
              WHERE airline is not null
              OR fltno is not null""")

In [35]:
travelers_dim.show(10)

+-----------+----------+---+------+-------+---------+---------+
|traveler_id|birth_year|age|gender|airline|flight_no|visa_type|
+-----------+----------+---+------+-------+---------+---------+
|    5748517|      1976| 40|     F|     QF|    00011|       B1|
|    5748518|      1984| 32|     F|     VA|    00007|       B1|
|    5748519|      1987| 29|     M|     DL|    00040|       B1|
|    5748520|      1987| 29|     F|     DL|    00040|       B1|
|    5748521|      1988| 28|     M|     DL|    00040|       B1|
|    5748522|      1959| 57|     M|     NZ|    00010|       B2|
|    5748523|      1950| 66|     F|     NZ|    00010|       B2|
|    5748524|      1975| 41|     F|     NZ|    00010|       B2|
|    5748525|      1989| 27|     M|     NZ|    00028|       B2|
|    5748526|      1990| 26|     F|     NZ|    00002|       B2|
+-----------+----------+---+------+-------+---------+---------+
only showing top 10 rows



#### Create travel Fact Table

In [36]:
travel_spark = df_spark.select('cicid','i94res','i94addr','visapost','i94yr','i94mon','arrdate','depdate','dtadfile','dtaddto','i94port')

In [37]:
travel_stg = travel_spark.createOrReplaceTempView("travel")

In [38]:
travel = spark.sql("""SELECT 
                cast(cicid as bigint) as traveler_id, 
                cast(i94res as int) as country_id,
                i94addr as state_code,
                i94port as destination_city,
                cast(i94yr as int) as year, 
                cast(i94mon as int) as month, 
                cast(arrdate as int) as arrival_date, 
                cast(depdate as int) as departure_date, 
                dtadfile as file_create_dt, 
                dtaddto as admitted_until_dt 
              FROM travel WHERE i94addr is not NULL
              AND i94port is not null""")

In [39]:
travel.show(10)

+-----------+----------+----------+----------------+----+-----+------------+--------------+--------------+-----------------+
|traveler_id|country_id|state_code|destination_city|year|month|arrival_date|departure_date|file_create_dt|admitted_until_dt|
+-----------+----------+----------+----------------+----+-----+------------+--------------+--------------+-----------------+
|    5748517|       438|        CA|             LOS|2016|    4|       20574|         20582|      20160430|         10292016|
|    5748518|       438|        NV|             LOS|2016|    4|       20574|         20591|      20160430|         10292016|
|    5748519|       438|        WA|             LOS|2016|    4|       20574|         20582|      20160430|         10292016|
|    5748520|       438|        WA|             LOS|2016|    4|       20574|         20588|      20160430|         10292016|
|    5748521|       438|        WA|             LOS|2016|    4|       20574|         20588|      20160430|         10292016|


#### Create us_states Dimension Table

In [40]:
us_states_dim_schema = StructType([
        StructField("State Code", LongType()),
        StructField("State", StringType()),
        StructField("Male Population", LongType()),
        StructField("Female Population", StringType()),
        StructField("Total Population", StringType()),
        StructField("Foreign-born", StringType())
    ])

In [41]:
us_states_spark = spark.read.options(delimiter=';',schema=us_states_dim_schema, header='True').csv('us-cities-demographics.csv')

In [42]:
us_states_stg = us_states_spark.select('State Code','State','Male Population','Female Population','Total Population','Foreign-born')

In [43]:
us_states_stg = us_states_stg.createOrReplaceTempView("us_states_stg")

In [44]:
us_states_dim = spark.sql("""SELECT 
                `State Code` as state_code, 
                first(State) as state, 
                sum(`Male Population`) as male_population,
                sum(`Female Population`) AS female_population, 
                sum(`Total Population`) as total_population,
                sum(`Foreign-born`) as foreign_born_population 
              FROM us_states_stg GROUP BY state_code 
              ORDER BY State""")

In [45]:
us_states_dim.show(10)

+----------+--------------------+---------------+-----------------+----------------+-----------------------+
|state_code|               state|male_population|female_population|total_population|foreign_born_population|
+----------+--------------------+---------------+-----------------+----------------+-----------------------+
|        AL|             Alabama|      2448200.0|        2715106.0|       5163306.0|               252541.0|
|        AK|              Alaska|       764725.0|         728750.0|       1493475.0|               166290.0|
|        AZ|             Arizona|    1.1137275E7|      1.1360435E7|      2.249771E7|              3411565.0|
|        AR|            Arkansas|      1400724.0|        1482165.0|       2882889.0|               307753.0|
|        CA|          California|    6.1055672E7|      6.2388681E7|    1.23444353E8|            3.7059662E7|
|        CO|            Colorado|      7273095.0|        7405250.0|     1.4678345E7|              1688155.0|
|        CT|       

#### Create country Dimension Table

In [46]:
country_dim_schema = StructType([
        StructField("i94_res", LongType()),
        StructField("country", StringType())
    ])

In [47]:
country_spark = spark.read.options(delimiter=',',schema=country_dim_schema, header='True').csv('country_codes.csv')

In [48]:
country_stg = country_spark.select('i94_res','country')

In [49]:
country_stg = country_stg.createOrReplaceTempView("country_stg")

In [50]:
country_dim = spark.sql("""SELECT 
                cast(i94_res as int) as country_id, 
                country as country_name 
             FROM country_stg
             ORDER BY country_name""")

In [51]:
country_dim.show(10)

+----------+---------------+
|country_id|   country_name|
+----------+---------------+
|       236|    AFGHANISTAN|
|       101|        ALBANIA|
|       316|        ALGERIA|
|       102|        ANDORRA|
|       324|         ANGOLA|
|       529|       ANGUILLA|
|       518|ANTIGUA-BARBUDA|
|       687|      ARGENTINA|
|       151|        ARMENIA|
|       532|          ARUBA|
+----------+---------------+
only showing top 10 rows



In [52]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [53]:
# Perform quality checks here

+ Verify that there are no Null values in the key columns of any of the tables

In [None]:
print(country_dim.filter("country_id is null").count())

In [None]:
print(airports_dim.filter("airport_id is null").count())

In [None]:
print(us_states_dim.filter("state_code is null").count())

In [None]:
print(travelers_dim.filter("traveler_id is null").count())

In [54]:
print(travel.filter("traveler_id is null").count())
print(travel.filter("country_id is null").count())
print(travel.filter("state_code is null").count())

0
0
0
0
0
0
0


+ Check the counts of the tables and verify they match the count of records in the source table views

In [55]:
print(country_dim.count())
print(airports_dim.count())
print(us_states_dim.count())
print(travelers_dim.count())
print(travel.count())

289
8978
49
3077655
2943721


In [56]:
print(spark.sql("""SELECT COUNT(1) as country_dim FROM (SELECT 
                cast(i94_res as int) as country_id, 
                country as country_name 
             FROM country_stg
             ORDER BY country_name)""").collect())
print(spark.sql("""SELECT COUNT(1) as airports_dim FROM (SELECT 
                ident as airport_id, 
                iata_code, 
                type as airport_type, 
                name as airport_name, 
                municipality, 
                iso_region,
                coordinates 
            FROM airports WHERE type in ('small_airport','medium_airport','large_airport','closed')
            AND iata_code is not null)""").collect())
print(spark.sql("""SELECT COUNT(1) as us_states_dim FROM (SELECT 
                `State Code` as state_code, 
                first(State) as state, 
                sum(`Male Population`) as male_population,
                sum(`Female Population`) AS female_population, 
                sum(`Total Population`) as total_population,
                sum(`Foreign-born`) as foreign_born_population 
              FROM us_states_stg GROUP BY state_code 
              ORDER BY State)""").collect())
print(spark.sql("""SELECT COUNT(1) as travelers_dim FROM (SELECT 
                cast(cicid as bigint) as traveler_id, 
                cast(biryear as int) as birth_year, 
                cast(i94bir as int) as age, 
                gender, 
                airline, 
                fltno as flight_no, 
                visatype as visa_type 
              FROM travelers
              WHERE airline is not null
              OR fltno is not null)""").collect())
print(spark.sql("""SELECT COUNT(1) as travel FROM (SELECT
                cast(cicid as bigint) as traveler_id, 
                cast(i94res as int) as country_id,
                i94addr as state_code,
                i94port as destination_city,
                cast(i94yr as int) as year, 
                cast(i94mon as int) as month, 
                cast(arrdate as int) as arrival_date, 
                cast(depdate as int) as departure_date, 
                dtadfile as file_create_dt, 
                dtaddto as admitted_until_dt 
              FROM travel WHERE i94addr is not NULL
              AND i94port is not null)""").collect())

[Row(country_dim=289)]
[Row(airports_dim=8978)]
[Row(us_states_dim=49)]
[Row(travelers_dim=3077655)]
[Row(travel=2943721)]


+ Verify key columns have unique values for all tables

In [57]:
print(country_dim.select("country_id").distinct().count())
print(country_dim.count())

289
289


In [58]:
print(airports_dim.select("airport_id").distinct().count())
print(airports_dim.count())

8978
8978


In [59]:
print(us_states_dim.select("state_code").distinct().count())
print(us_states_dim.count())

49
49


In [60]:
print(travelers_dim.select("traveler_id").distinct().count())
print(travelers_dim.count())

3077655
3077655


In [61]:
print(travel.select("traveler_id","country_id","state_code").distinct().count())
print(travel.count())

2943721
2943721


#### Writing the final certified tables to Parquet files

In [62]:
country_dim.write.parquet("analytics_tables/country_dim/country_dim.parquet",mode="overwrite")

In [63]:
us_states_dim.write.parquet("analytics_tables/us_states_dim/us_states_dim.parquet",mode="overwrite")

In [64]:
airports_dim.write.parquet("analytics_tables/airports_dim/airports_dim.parquet",mode="overwrite")

In [65]:
travelers_dim.write.parquet("analytics_tables/travelers_dim/travelers_dim.parquet",mode="overwrite")

In [67]:
travel.write.parquet("analytics_tables/travel/travel.parquet",partitionBy=["year","month"], mode="overwrite")

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### The data dictionary and data model are included in the file `data_dictionary.xlsx`

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Rationale for choosing Pandas for data exploration and Spark for ETL pipelines and data quality checks:

+ Pandas library was chosen for data exploration and analysis because Pandas stores data in local memory and is meant for working with smaller sets of data.

+ For data transformations, cleaning, and ETL with the full data sets, Spark was chosen as it is optimized for working with large datasets and the tasks can be parallelized across the Spark cluster.

#### How often should the data be updated?

+ The data can be expected to be updated on a daily basis, with new records added to the `travel` Fact table and new travelers added to the `travelers` table as new unique records are identified

#### Changes in the approach for different scenarios:

#### 1.) The data was increased by 100x

+ If the size of the data were to be increased by 100x, we could store the datasets in S3 which provides virtually unlimited storage capacity, we could add additional nodes to the Spark cluster, and we can use a cloud data warehouse such as Amazon Redshift.

#### 2.) The data populates a dashboard that must be updated on a daily basis by 7am every day

+ If the data populates a dashboard that must be updated by a certain time each day, Airflow can be used to schedule jobs which start each day with enough time to complete before 7am. The start time can be adjusted depending on how many resources are allocated and how much data is expected to be processed for that day's batch.

#### 3.) The database needed to be accessed by 100+ people

+ If the database needed to be accessed by 100+ people, we could load the data into a managed data warehouse such as Amazon Redshift or Snowflake which are able to scale up depending on the amount of users and number of queries being run.