
# Data Engineering Capstone Project - Immigration Data Analytical Database

#### Project Summary
My capstone project automates the data cleaning and loading process using Airflow and Redshift. Data files are located in s3 bucket. Main database consists of one fact table and multiple dimension tables. Additional datasets are loaded into staging tables. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import awswrangler as wr


### Step 1: Scope the Project and Gather Data

#### Scope 
For my data engineering capstone project I developed a data pipeline that creates an analytical database and supporting tables. Analytical database contain US immigration data populated on a monthly basis. Additional datasets are also available in staging tables. Insights can be drawn from main analytical tables or combining with other information tables provided. All data files are hosted in Amazon s3 bucket.  Tables are hosted in Amazon Redshift Database and ETL/ELT pipeline was developed using Apache Airflow.


### Datasets
Following datasets were used to create analytical database:
- I94 Immigration Data: This dataset comes from the US National Tourism and Trade Office. Each data file contains monthly information on international visitors arrival. Data fields include information on arrival departure time frame, citizenship country, residence country, arrival mode, and some traveller information such as birth year, age at arrival, occupation, gender etc. Each file contains 28 data columns and 3 million rows. Immigration data comes with a data dictionary that defines column contents of the main dataset which can be parsed and used in building the data model.


*I94 immigration data sample:*

![I94 immigration data sample1](./images/immig1.png)
![I94 immigration data sample2](./images/immig2.png)


- World Temperature Data: This kaggle dataset contains city, country, latitude, longitude, average temperature, and temperature uncertainty data.

 *World temperature data sample:*

![world-temperature](./images/world-temp.png)

- U.S. City Demographic Data: This dataset contains information about the demographics of all US cities and census-designated places with a population greater than or equal to 65,0000. Dataset comes from OpenSoft.

*U.S. city demographic data sample:*

![city-demo](./images/city-demo.png)

- Airport Codes: This dataset contains data on airport codes and corresponding cities. According to wikipedia, The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code. 

*Airport Codes data sample:*

![Airport-codes](./images/airport.png)

- Manually Collected Data: Additional datasets describing gender definitions, and visa classes were collected through online research. These datasets improves the capabilities of main analytical database.

In [2]:
# Read in the data here. Change path based on your data file location.
pd.set_option('display.max_columns', None)
fname = './data/immigration/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat'
pd.set_option('display.max_columns', None)
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,7.0,2016.0,1.0,101.0,101.0,BOS,20465.0,1.0,MA,,20.0,3.0,1.0,,,,T,,,,1996.0,D/S,M,,LH,346608285.0,424,F1
1,8.0,2016.0,1.0,101.0,101.0,BOS,20465.0,1.0,MA,,20.0,3.0,1.0,,,,T,,,,1996.0,D/S,M,,LH,346627585.0,424,F1
2,9.0,2016.0,1.0,101.0,101.0,BOS,20469.0,1.0,CT,20480.0,17.0,2.0,1.0,,,,T,N,,M,1999.0,07152016,F,,AF,381092385.0,338,B2
3,10.0,2016.0,1.0,101.0,101.0,BOS,20469.0,1.0,CT,20499.0,45.0,2.0,1.0,,,,T,N,,M,1971.0,07152016,F,,AF,381087885.0,338,B2
4,11.0,2016.0,1.0,101.0,101.0,BOS,20469.0,1.0,CT,20499.0,12.0,2.0,1.0,,,,T,N,,M,2004.0,07152016,M,,AF,381078685.0,338,B2


In [4]:
print(df.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2847924 entries, 0 to 2847923
Data columns (total 28 columns):
 #   Column    Dtype  
---  ------    -----  
 0   cicid     float64
 1   i94yr     float64
 2   i94mon    float64
 3   i94cit    float64
 4   i94res    float64
 5   i94port   object 
 6   arrdate   float64
 7   i94mode   float64
 8   i94addr   object 
 9   depdate   float64
 10  i94bir    float64
 11  i94visa   float64
 12  count     float64
 13  dtadfile  object 
 14  visapost  object 
 15  occup     object 
 16  entdepa   object 
 17  entdepd   object 
 18  entdepu   object 
 19  matflag   object 
 20  biryear   float64
 21  dtaddto   object 
 22  gender    object 
 23  insnum    object 
 24  airline   object 
 25  admnum    float64
 26  fltno     object 
 27  visatype  object 
dtypes: float64(13), object(15)
memory usage: 608.4+ MB
None


In [5]:
# Check for columns with null values
df.isna().sum()

cicid             0
i94yr             0
i94mon            0
i94cit            0
i94res            0
i94port           0
arrdate           0
i94mode          60
i94addr      177129
depdate      522612
i94bir         1190
i94visa           0
count             0
dtadfile      90486
visapost    1386375
occup       2802355
entdepa          61
entdepd      521813
entdepu     2847880
matflag      521813
biryear        1190
dtaddto         707
gender       216929
insnum      2709236
airline       61279
admnum            0
fltno         12232
visatype          0
dtype: int64

In [6]:
# Use this code to copy sas file to s3 bucket in parquet format.
#wr.s3.to_parquet(df, "s3://hg-dend/sas-files/i94_jan16_sub.parquet")

In [7]:
# Check if addnum is unique and can be used as primary key
mask = df.admnum.duplicated(keep=False)
df[mask].sort_values(by=['admnum'])

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
2700037,6036966.0,2016.0,1.0,254.0,276.0,SAI,20465.0,1.0,,20468.0,74.0,2.0,1.0,20160316,,,A,L,,M,1942.0,02252016,F,3975,OZ,0.000000e+00,00627,GMT
2697887,6034811.0,2016.0,1.0,254.0,276.0,AGA,20478.0,1.0,GU,,56.0,2.0,1.0,20160316,,,A,,,,1960.0,03092016,F,3660,TW,0.000000e+00,00311,GMT
2637012,5953924.0,2016.0,1.0,254.0,276.0,SAI,20456.0,1.0,,20460.0,43.0,2.0,1.0,20160301,,,A,L,,M,1973.0,02162016,F,3988,OZ,0.000000e+00,00607,GMT
2637011,5953923.0,2016.0,1.0,254.0,276.0,SAI,20456.0,1.0,,20458.0,35.0,2.0,1.0,20160301,,,A,L,,M,1981.0,02162016,M,3993,7C,0.000000e+00,03404,GMT
2698071,6034995.0,2016.0,1.0,254.0,276.0,AGA,20478.0,1.0,GU,,4.0,2.0,1.0,20160316,,,A,,,,2012.0,03092016,M,3688,TW,0.000000e+00,00311,GMT
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2143587,4818518.0,2016.0,1.0,111.0,111.0,XXX,20454.0,9.0,AZ,,33.0,2.0,1.0,20160126,,,A,,,,1983.0,03302016,M,3297,,9.427546e+10,,WT
2806692,5949292.0,2016.0,1.0,129.0,129.0,LUK,20472.0,3.0,AZ,20479.0,51.0,1.0,1.0,20160301,,,Z,D,,M,1965.0,04172016,M,,,9.427546e+10,LAND,WB
2632638,5949308.0,2016.0,1.0,129.0,582.0,XXX,20462.0,9.0,AZ,20479.0,51.0,1.0,1.0,20160301,,,A,D,,M,1965.0,04172016,M,3303,,9.427546e+10,,WB
49980,71726.0,2016.0,1.0,260.0,260.0,XXX,20473.0,9.0,AL,,39.0,2.0,1.0,,,,U,,,,1977.0,03282016,M,,G6B,9.463559e+10,94130,CP


### Airport data

In [8]:
airport_data = pd.read_csv(r'./data/airport/airport-codes_csv.csv', sep=',', encoding='UTF-8')
print(airport_data.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   ident         55075 non-null  object 
 1   type          55075 non-null  object 
 2   name          55075 non-null  object 
 3   elevation_ft  48069 non-null  float64
 4   continent     27356 non-null  object 
 5   iso_country   54828 non-null  object 
 6   iso_region    55075 non-null  object 
 7   municipality  49399 non-null  object 
 8   gps_code      41030 non-null  object 
 9   iata_code     9189 non-null   object 
 10  local_code    28686 non-null  object 
 11  coordinates   55075 non-null  object 
dtypes: float64(1), object(11)
memory usage: 5.0+ MB
None


In [9]:
# Check if ident can be primary key
len(airport_data)- len(airport_data.ident.unique())

0

In [10]:
airport_data.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [11]:
# Check for null values
airport_data.isnull().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

Coordinates are not in numeric format. Need to convert to numeric and clean null values.

In [12]:
# Test clean up. Final clean up will be done using sql
values = {'elevation_ft': -99999, 'continent':'-','iso_country':'-', 'iso_country':'-', 'municipality':'-', 'gps_code':'-', 'iata_code':'-', 'local_code':'-'}
airport_data.fillna(value=values, inplace=True)

### us-cities-demographics

In [13]:
us_cities = pd.read_csv(r'./data/us-city-demo/us-cities-demographics.csv', sep=';', encoding='UTF-8')

In [14]:
us_cities.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   City                    2891 non-null   object 
 1   State                   2891 non-null   object 
 2   Median Age              2891 non-null   float64
 3   Male Population         2888 non-null   float64
 4   Female Population       2888 non-null   float64
 5   Total Population        2891 non-null   int64  
 6   Number of Veterans      2878 non-null   float64
 7   Foreign-born            2878 non-null   float64
 8   Average Household Size  2875 non-null   float64
 9   State Code              2891 non-null   object 
 10  Race                    2891 non-null   object 
 11  Count                   2891 non-null   int64  
dtypes: float64(6), int64(2), object(4)
memory usage: 271.2+ KB


In [15]:
us_cities.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [16]:
us_cities.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [17]:
# Clean column names
us_cities.columns = ['city', 'state_name', 'median_age', 'male_population', 'female_population',
       'total_population', 'num_veterans', 'foreign_born',
       'average_household_size', 'state_code', 'race', 'count']

In [18]:
us_cities['city'] = us_cities['city'].str.upper()
us_cities['state_name'] = us_cities['city'].str.upper()

In [19]:
len(us_cities.city.unique())

567

In [21]:
us_cities.head()

Unnamed: 0,city,state_name,median_age,male_population,female_population,total_population,num_veterans,foreign_born,average_household_size,state_code,race,count
0,SILVER SPRING,SILVER SPRING,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,QUINCY,QUINCY,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,HOOVER,HOOVER,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,RANCHO CUCAMONGA,RANCHO CUCAMONGA,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,NEWARK,NEWARK,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


### Load temperature data

In [22]:
fname = './data/world-temp/GlobalLandTemperaturesByCity.csv'
temperature_data = pd.read_csv(fname, index_col=0, parse_dates=[0])

In [23]:
temperature_data.head()

Unnamed: 0_level_0,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1743-12-01,,,Århus,Denmark,57.05N,10.33E
1744-01-01,,,Århus,Denmark,57.05N,10.33E
1744-02-01,,,Århus,Denmark,57.05N,10.33E
1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [24]:
temperature_data.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 8599212 entries, 1743-11-01 to 2013-09-01
Data columns (total 6 columns):
 #   Column                         Dtype  
---  ------                         -----  
 0   AverageTemperature             float64
 1   AverageTemperatureUncertainty  float64
 2   City                           object 
 3   Country                        object 
 4   Latitude                       object 
 5   Longitude                      object 
dtypes: float64(2), object(4)
memory usage: 459.2+ MB


In [25]:
# null will be replaced with -999, longitude, latitude in NSEW format. will be coverted to numeric to be compatible with airport data
temperature_data.isnull().sum()

AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
dtype: int64

In [26]:
# # Clean up
# short_temp_data.columns = ['avg_temp', 'avg_temp_error', 'city', 'country', 'latitude', 'longitude']
# short_temp_data['city']  = short_temp_data.city.str.upper()
# short_temp_data['country']  = short_temp_data.country.str.upper()
# us_temp_data = short_temp_data[short_temp_data['country']=='UNITED STATES'].copy()
# us_temp_data['latitude'] = us_temp_data['latitude'].apply(lambda x: (1 if x[-1] == 'N' else -1) * float(x[:-1]))
# us_temp_data['longitude'] = us_temp_data['longitude'].apply(lambda x: (1 if x[-1]=='E' else -1)*float(x[:-1]))

In [27]:
# Read SAS data dictionary, parse to extract dimension tables.
with open(r'./data/immigration/I94_SAS_Labels_Descriptions.SAS') as file:
    file_string = file.read()

In [28]:
sas_source_code_tables_data = [
  {'table_name': 'country',
   'parse_string': 'i94cntyl',
   'end_string': ';',
   'columns': ['country_code', 'country'],
   'dq_checks': [{'check_sql': "SELECT COUNT(*) FROM country WHERE country_code is null", 'expected_result': 0}]
  },
  {'table_name': 'entry_port',
   'parse_string': 'i94prtl',
   'end_string': ';',
   'columns': ['port_code', 'addr', 'city', 'state'],
   'dq_checks': [{'check_sql': "SELECT COUNT(*) FROM entry_port WHERE port_code is null", 'expected_result': 0}]
  },
  {'table_name': 'arrival_mode',
   'parse_string': 'i94model',
   'end_string': ';',
   'columns': ['arrival_code', 'arrival_type'],
   'dq_checks': [{'check_sql': "SELECT COUNT(*) FROM arrival_mode WHERE arrival_code is null", 'expected_result': 0}]
  },
  {'table_name': 'region',
   'parse_string': 'i94addrl',
   'end_string': ';',
   'columns': ['region_code', 'region_name'],
   'dq_checks': [{'check_sql': "SELECT COUNT(*) FROM region_code WHERE region_code is null", 'expected_result': 0}]
  },
  {'table_name': 'visa_type',
   'parse_string': 'I94VISA',
   'end_string': '*/',
   'columns': ['visa_code', 'visa_type'],
   'dq_checks': [{'check_sql': "SELECT COUNT(*) FROM visa_type WHERE visa_code is null", 'expected_result': 0}]
  }
]

In [29]:
def parse_sas_file(file_string, parse_string, end_string, table_name, columns):
    '''Function accepts sas file as a string, segments data tables base on parse_string and end_string.
        It will further process string segment to clean and extract code and value fields as lists.
        Function returns the ziped list of code and value.
    
    '''
    filtered_string = file_string[file_string.index(parse_string):]
    filtered_string = filtered_string[:filtered_string.index(end_string)]
    # clean string  by removing ' and tabs
    filtered_string = filtered_string.replace("'", "").replace('\t', "")
    # Remove line with parse_string
    filtered_list = filtered_string.split('\n')
    filtered_list = filtered_list[1:] 

    df = pd.DataFrame(filtered_list)
    
    df[[0,1]] = df[0].str.split('=', n = 1, expand = True)
    df[0] = df[0].str.strip()
    df[1] = df[1].str.strip()
    df[0] = df[0].str.upper()
    df[1] = df[1].str.upper()
    df = df.dropna()
   
    if table_name=='entry_port':
       
        df[2] = df[1].apply(lambda x: x.split(',')[0].upper())
        df[3] = df[1].apply(lambda x: x.split(',')[-1].upper())
        df[2] = df[2].str.strip()
        df[3] = df[3].str.strip()#ARPT
        df[2] = df[2].str.replace(' #ARPT', "", regex = False)
        df[3] = df[3].str.replace(' (BPS)', "", regex = False)
        df[3] = df[3].str.replace(' #ARPT', "", regex = False)
    
        df.columns=columns
    
    else:
 
        df.columns=columns

    return df.drop_duplicates().dropna()
    
            

In [30]:
# Test 
us_ports = parse_sas_file(file_string, 'i94prtl', ';', 'entry_port', ['port_code', 'addr', 'city', 'state']) 
#us_ports = parse_sas_file(file_string, 'I94VISA', '*/', 'visa_type', ['code', 'value'])
#us_ports = parse_sas_file(file_string, 'i94model', ';', 'arrival_mode', ['code', 'value'])

In [31]:
us_ports[:5]

Unnamed: 0,port_code,addr,city,state
0,ALC,"ALCAN, AK",ALCAN,AK
1,ANC,"ANCHORAGE, AK",ANCHORAGE,AK
2,BAR,"BAKER AAF - BAKER ISLAND, AK",BAKER AAF - BAKER ISLAND,AK
3,DAC,"DALTONS CACHE, AK",DALTONS CACHE,AK
4,PIZ,"DEW STATION PT LAY DEW, AK",DEW STATION PT LAY DEW,AK


In [32]:
# file strig is defind above
for item in sas_source_code_tables_data:
    parse_string = item.get('parse_string')
    end_string = item.get('end_string')
    table_name = item.get('table_name')
    columns = item.get('columns')

  
    print(table_name)
 


    dm_df = parse_sas_file(file_string, parse_string, end_string, table_name , columns)
   

country
entry_port
arrival_mode
region
visa_type


### Read temperature data

### Step 2: Explore and Assess the Data
#### Explore the Data 
Datasets have multiple quality issues such as null values, mixed numeric and string data, date formatting issues etc. Data cleanup is performed in Airflow using table specific sql queries.


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
U.S. Immigration analytical database has a star schema with one fact table and multiple dimension tables. Dimension tables are directly populated through truncate, copy pattern or special operator that parses and loads the data. Dimension tables are relatively small in size. Consequently, truncate-copy pattern is a reasonable way to to maintain idempotent data pipeline without compromising the performance. Dimension tables are distributed across all nodes for faster query performance. Immigration dataset is first loaded into staging a staging table, which will then be cleaned and populates that fact table.

Additional datasets containing world temperature, US city demographics, and airport codes can be combined with main database to answer various analytical questions. These datasets are loaded into staging tables, cleaned and transformed into more analytical friendly formats.

Database schema is shown below:

![Main-dataset](./images/er_capstone.png)

#### 3.2 Mapping Out Data Pipelines
Following custom operators were developed for loading datasets from s3 bucket to RedShift, cleaning, and validation. 

- SASfileToRedshiftOperator : Parse and load tables from SAS data dictionary to Redshift.
- S3ToRedshiftOperator : Load files in CSV or Parquet format to Redshift using copy command.
- LoadFactOperator : Clean and load immigration fact table from staging table.
- CleanTablesOperator : Perform various data cleanup operations, for example fill null values, convert from string to numeric format etc.
- DataQualityOperator : Performs data quality checks to ensure tables are populated without errors using pre-defined set of queries and expected result.

Airflow data pipeline is shown below:

![pipeline](./images/graph_success.png)


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Implemented using Airflow. See [README.md](./README.md) for details.

#### 4.2 Data Quality Checks
Table specific quality checks are defined in Airflow.

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Data dictionary file is available in csv format. [immigration_data_dict.csv](./immigration_data_dict.csv)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
   1.  Apache Airflow is a great open source tool for easy scheduling of etl workflows. In this project Airflow is used to keep the analytical database up to date and to ensure data quality.

   1. Redshift offers Massively parallel processing (MPP) that enables fast execution of the most complex queries operating on large amounts of data. Columnar storage of tables reduces i/o requirements and improves analytical query performance. Redshift is easily scalable and can be configured within minutes.

   1. S3 object storage was selected as the raw data storage solution due to performance and redshift compatibility advantages.

* Propose how often the data should be updated and why.
    1. Immigration raw datasets are updated on monthly basis. Consequently, pipeline is scheduled monthly. It can be adjusted if data is available more frequently.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
    1. Airflow and Redshift are easily scalable to handle increased data load.

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
    1. In this case DAG need to be scheduled on daily basis to ensure most updated datasets are available for the dash board update. 

 * The database needed to be accessed by 100+ people.
    1. AWS offers many options for identity and access management based on their roles. Redshift cluster resources may need scaling up based on demand. Redshift offer auto scaling based on demand, which maximizes performance while optimizing cost.