# Building up Data Pipelines using Apache Airflow 
### Data Engineering Capstone Project

#### Project Summary
The main goal of the capstone project is to create a Data Lake in S3 and a DWH in Redshift using Data Pipeline orchestrating tool - Apache Airflow

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
# Do all imports and installs here
import pandas as pd
import re
import os
import configparser
import boto3
from io import StringIO
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import seaborn as sns
plt.style.use('ggplot')

config = configparser.ConfigParser()
config.read('./dwh.cfg')

AWS_ACCESS_KEY = config.get('AWS', 'AWS_KEY_ID')
AWS_SECRET = config.get('AWS','AWS_SECRET')
AWS_REGION = config.get('AWS','REGION')
S3_BUCKET = config.get('S3','RAW_DATA_BUCKET')

os.environ["AWS_ACCESS_KEY_ID"]= AWS_ACCESS_KEY
os.environ["AWS_SECRET_ACCESS_KEY"]= AWS_SECRET

spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.2").enableHiveSupport().getOrCreate()

pd.set_option('display.max_columns', 500)
pd.set_option('display.max_rows', 500)

In [None]:
from pyspark.sql.functions import from_unixtime, unix_timestamp, to_date, expr,\
                                  date_add,udf,col,avg,mean,year,month,split,lit
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import StringType, DateType

### Step 1: Project Scope and Data Gathering

The scope of the project is to create a Data Lake which could be accessible for the Data Scientists and a DWH which
could be accessed by Data Analysts who are interested in providing deeper insights into US immigration. Main focus areas include
the issued visa types and the immigrant profiles associated with it.

**Data Sources from Udacity**
- **I94 Immigration Data**: This data comes from the US National Tourism and Trade Office found [here](https://travel.trade.gov/research/reports/i94/historical/2016.html). Each report contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries)
- **U.S. City Demographics Data**: This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. Dataset comes from OpenSoft found [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
- **Airport Codes**: This is a simple table of airport codes and corresponding cities. The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code (from wikipedia). It comes from [here](https://datahub.io/core/airport-codes#data)
**External Data Sources**
I have enriched the dataset using the following data sources to make the analysis more useful and more informative

- **Port of Entry/Nationality Codes**: This dataset contains information about various of port of entry codes which would be used to join the data in the I94 immigration data. The nationality codes contain the country abbreviations [Port Codes; Nationality Codes And Port of Entry codes](https://fam.state.gov/fam/09FAM/09FAM010205.html)
- **US Visa Types**: This data is extracted from the US DHS and Wikipedia which would give information on various visa types offered by US [US Non-immigrant Visa Types](https://www.dhs.gov/immigration-statistics/nonimmigrant/NonimmigrantCOA) and [US Immigrant Visa Types](https://en.m.wikipedia.org/wiki/Visa_policy_of_the_United_States#Classes_of_visas)
- **Airline Codes**: This data source consists of airlines IATA abbreviations and the country of origin [Airline Codes](https://www.iata.org/en/about/members/airline-list?page=30&search=&ordering=Alphabetical)

In the following sections I will explore various data sets needed for the project

#### 1. Exploring the I94 Data

In [None]:
# Read in the data here
df_i94_sample = pd.read_csv('./data/immigration_data_sample.csv')

In [None]:
df_i94_sample.head()

**Observations:** 
- The columns in the immigration data are encoded. Therefore these columns could be Foreign Keys in the Fact table
- To create the corresponding Dimension Tables the `SAS_Label_Descriptions` file will be used to extract them into `csv` files which will be stored in `meta_data` folder

#### 1.1 Extracting meta data from i94 immigration data label Descriptions

***Extracting Raw Data***

In [None]:
# Reading the Label Description file and extracting information which can form dimensional tables later
with open('./data/I94_SAS_Labels_Descriptions.SAS') as i94_descritpion:
    i94_label_content = i94_descritpion.read()

In [None]:
text = re.search(r'value i94cntyl.*? ;', i94_label_content, re.DOTALL).group()
text=text.split('\n')

In [None]:
i94cit_i94res = pd.DataFrame(columns=['i94_code','country_name'])

In [None]:
i94cit_i94res['i94_code'] = [code.split('=')[0].strip() for code in text[1:]]
i94cit_i94res['country_name'] = [code.split('=')[1].rstrip(' ;').replace("'","") for code in text[1:]]

In [None]:
i94cit_i94res.head(10)

**Observation:** This data can be used to map the country codes to their respective names

In [None]:
text = re.search(r'i94prtl.*?;', i94_label_content, re.DOTALL).group()
text=text.split('\n')

In [None]:
i94port_i94code = pd.DataFrame(columns=['i94_port_code','i94_airport_location','i94_airport_state'])

In [None]:
i94port_i94code['i94_port_code'] = [re.search('[A-Z](?=)\w+', code).group() for code in text[1:-1]]
i94port_i94code['i94_airport_location'] = [re.findall('\'(.*?),', code.split('\t=\t')[1])[0] if len(re.findall('\'(.*?),', code.split('\t=\t')[1]))>0 else code.split('\t=\t')[1].strip(" '") 
                                           for code in text[1:-1]]

In [None]:
i94port_i94code['i94_airport_state'] = [code.split('\t=\t')[1].strip(" '") for code in text[1:-1]]
i94port_i94code['i94_airport_state'] = i94port_i94code['i94_airport_state'].str.split(', ')
i94port_i94code['i94_airport_state'] = [code[-1] for code in i94port_i94code['i94_airport_state']]

In [None]:
for code in i94port_i94code.i94_airport_state:
    if len(code)>2 and 'Collapsed' not in code and 'No PORT Code' not in code:
        tmp = code.split(' ')
        if len(tmp)>1:
            if len(tmp[0])==2 and '(BPS)' in tmp[1] or '#ARPT' in tmp[1]:
                i94port_i94code.loc[i94port_i94code.i94_airport_state==code,'i94_airport_state'] = tmp[0]

            if len(tmp[1]) == 2 and len(tmp[0])!=2:
                i94port_i94code.loc[i94port_i94code.i94_airport_state==code,'i94_airport_state'] = tmp[1]

In [None]:
i94port_i94code.head(10)

**Observations:**
- This table can help in mapping the `i94port` from the immigration data and its `airport_location` and `state_code` in US

In [None]:
i94port_i94code.to_csv('i94_meta_data/i94port_i94code.csv',index=False)

In [None]:
text = re.search(r'value i94addrl.*? ;', i94_label_content, re.DOTALL).group()
text=text.split('\n')

In [None]:
i94_state_code = pd.DataFrame(columns=['i94_state_code','i94_state_name'])

In [None]:
i94_state_code['i94_state_code'] = [code.split("'='")[0].strip("\t'") for code in text[1:]]
i94_state_code['i94_state_name'] = [code.split("'='")[1].strip(" ';") for code in text[1:]]

In [None]:
i94_state_code.to_csv('i94_meta_data/i94_state_code.csv',index=False)

#### 1.2 Combining all the above steps

In [None]:
data_dict = {}
key_name=''
for line in i94_label_content.split("\n"):
    line = re.sub(r"\s+", " ", line)
    if '/*' in line and '-' in line:
        line = line.strip('/*')
        key_name = line.split('-')[0].strip()
        data_dict[key_name] = []
    if '=' in line and key_name!='':
        data_dict[key_name].append([item.strip(';').strip(" ").replace('\'', '').lstrip().rstrip() for item in line.split('=')])

In [None]:
for key in data_dict:
    if len(data_dict[key])>0:
        if 'CIT' in key and 'RES' in key:
            i94cit_i94res = pd.DataFrame(data_dict[key],columns=['i94_country_code','country_name'])
            i94cit_i94res.loc[i94cit_i94res.country_name.str.contains('MEXICO'),'country_name'] = 'MEXICO'
            i94cit_i94res.to_csv('i94_meta_data/i94cit_i94res.csv',index=False)
        if 'PORT' in key:
            i94port_i94code = pd.DataFrame(data_dict[key],columns=['i94_port_code','i94_airport_location'])
            i94port_i94code[['port_city', 'port_state']] = i94port_i94code['i94_airport_location'].str.rsplit(',', 1, expand=True)
            i94port_i94code.loc[i94port_i94code.port_city == 'MARIPOSA AZ','port_state'] = 'AZ'
            i94port_i94code.loc[i94port_i94code.port_city == 'MARIPOSA AZ','port_city'] = 'MARIPOSA'
            i94port_i94code.loc[i94port_i94code.port_city == 'WASHINGTON DC','port_state'] = 'DC'
            i94port_i94code.drop(['i94_airport_location'], axis=1, inplace=True)
            i94port_i94code.to_csv('i94_meta_data/i94port_i94code.csv',index=False)
        if 'MODE' in key:
            i94mode = pd.DataFrame(data_dict[key],columns=['i94_mode_code','i94_mode'])
            i94mode.to_csv('i94_meta_data/i94mode.csv',index=False)
        if 'ADDR' in key:
            i94addr = pd.DataFrame(data_dict[key],columns=['i94_state_code','i94_state_name'])
            i94addr.to_csv('i94_meta_data/i94addr.csv',index=False)
        if 'VISA' in key:
            i94visa = pd.DataFrame(data_dict[key],columns=['i94_visa_code','i94_visa'])
            i94visa.to_csv('i94_meta_data/i94visa.csv',index=False)

In [None]:
data_dict.keys()

In [None]:
i94cit_i94res.head()

#### 1.3 Exploring the SAS data

In [None]:
!ls ../../data/18-83510-I94-Data-2016/

In [None]:
from datetime import datetime, timedelta
def to_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
udf_to_datetime_sas = udf(lambda x: to_datetime(x), DateType())

In [None]:
def to_datetime_frm_str(x):
    try:
        if x != 'D/S':
            return datetime.strptime(x, '%m%d%Y')
        else:
            return None
    except:
        return None
udf_to_datetime_frm_str = udf(lambda x: to_datetime_frm_str(x), DateType())

In [None]:
df_i94 =spark.read.format('com.github.saurfang.sas.spark').option("dateFormat", "yyyyMMdd").option("inferSchema", "true").\
                 load('../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat').withColumn("arrival_date", lit(udf_to_datetime_sas("arrdate")))\
                .withColumn("departure_date", lit(udf_to_datetime_sas("depdate")))\
                .withColumn("departure_deadline", lit(udf_to_datetime_frm_str("dtaddto"))) \
                .withColumn("month_year", lit('jan'+'_'+'16'))

In [None]:
# Printing shape of the dataset
print("Number of Columns: {}".format(len(df_i94.columns)))
print("Number of Rows: {}".format(df_i94.count()))

In [None]:
df_i94.limit(15).toPandas()

In [None]:
df_i94.printSchema()

In [None]:
df_i94 = df_i94.drop('validres','delete_days','delete_mexl','delete_dup','delete_recdup','depdate','delete_visa','arrdate','dtadfile','dtaddto' ,'occup', 'entdepa', 'entdepd', 'entdepu')

In [None]:
def create_cast_select_exprs(sas_cols, schema_cols):
    if sas_cols != '':
        exprs = ["{} AS {}".format(dfc,sc) for dfc, sc in zip(sas_cols, schema_cols)]
    else:
        raise ValueError('Cannot create Select Expression without proper header')
    return exprs

In [None]:
sas_columns = ['cast(cicid as int)','cast(i94yr as int)','cast(i94mon as int)','cast(i94cit as int)',
			   'cast(i94res as int)','i94port','arrival_date','cast(i94mode as int)',
			   'i94addr','departure_date','departure_deadline','cast(i94bir as int)','cast(i94visa as int)',
			   'cast(count as int)','visapost','matflag','cast(biryear as int)',
			   'gender','insnum','airline','cast(admnum as float)','fltno','visatype',"month_year"]

schema_columns = ['cicid','entry_year','entry_month','country_id','res_id','port_id','arrival_date',
				  'mode_id','state_code','departure_date','departure_deadline','age','visa_reason_id','count','visa_post',
				  'matched_flag','birth_year','gender','ins_num','airline_abbr','admission_num','flight_no','visa_type','month_year']

In [None]:
df_i94=df_i94.selectExpr(create_cast_select_exprs(sas_columns,schema_columns))
df_i94.printSchema()

In [None]:
df_i94.limit(5).toPandas()

In [None]:
# storing the data in S3 for later use
%%time
for file in os.listdir('../../data/18-83510-I94-Data-2016/'):
    path='../../data/18-83510-I94-Data-2016/{}'.format(file)
    df_i94 = spark.read.format('com.github.saurfang.sas.spark').option("inferSchema", "true").option("dateFormat", "yyyyMMdd").load(path)\
             .withColumn("arrival_date", udf_to_datetime_sas("arrdate")) \
             .withColumn("departure_date", udf_to_datetime_sas("depdate")).withColumn("departure_deadline", udf_to_datetimefrstr("dtaddto"))
    df_i94=df_i94.drop('validres','delete_days','delete_mexl','delete_dup','delete_recdup','delete_visa','arrdate','dtadfile', 'occup', 'entdepa', 'entdepd', 'entdepu')
    bucket_path = r"s3a://{}/i94_raw_data/{}_sasdata.parquet".format(S3_BUCKET,file.strip('.sas7bdat').split('_')[1])
    print(bucket_path)
    df_i94.write.parquet(bucket_path, "overwrite")

In [None]:
%%time
for file in os.listdir('../../data/18-83510-I94-Data-2016/'):
    path='../../data/18-83510-I94-Data-2016/{}'.format(file)
    print(path)
    df_i94 = spark.read.format('com.github.saurfang.sas.spark').option("inferSchema", "true").option("dateFormat", "yyyyMMdd").load(path)
    df_i94=df_i94.drop('validres','delete_days','delete_mexl','delete_dup','delete_recdup','delete_visa','arrdate','dtadfile', 'occup', 'entdepa', 'entdepd', 'entdepu')
    df_i94_total = df_i94.union(df_i94)

In [None]:
# Printing shape of the combined dataset shape
print("Number of Columns: {}".format(len(df_i94_total.columns)))
print("Number of Rows: {}".format(df_i94_total.count()))

**Observations:** From the dataset and the SAS_Label description file:
- The column `cicid` is like uuid to me
- The columns `validres, delete_days, delete_mexl, delete_dup, delete_recdup, delete_visa` are additional columns and can be removed by inferring from the sample csv file

#### 1.4 Exploring the Airport Codes Data

In [None]:
data_spark = {} # using this dictionary for plotting in section 2

In [None]:
from pyspark.sql.types import *
def parse_latitude(x):
    y = x.strip().split(',')
    return float(y[1])


def parse_longitude(x):
    y = x.strip().split(',')
    return float(y[0])


def port_of_entry(x):
    return x.strip().split(', ')[0]


def parse_state_code(x):
    return x.strip().split('-')[-1]


def parse_country_code(x):
    return x.strip().split('-')[0]


udf_parse_port_of_entry = udf(lambda x: port_of_entry(x), StringType())
udf_parse_latitude = udf(lambda x: parse_latitude(x), FloatType())
udf_parse_longitude = udf(lambda x: parse_longitude(x), FloatType())
udf_parse_state_code = udf(lambda x: parse_state_code(x), StringType())
udf_parse_country_code = udf(lambda x: parse_country_code(x), StringType())

In [None]:
df_airport = spark.read.format('csv').options(header='true', inferSchema='true').load('./data/airport-codes.csv')

In [None]:
df_airport.limit(10).toPandas()

In [None]:
df_airport.printSchema()

In [None]:
df_airport = df_airport.withColumn("airport_latitude", udf_parse_latitude("coordinates"))\
.withColumn("airport_longitude", udf_parse_longitude("coordinates"))\
.withColumn("country", udf_parse_country_code("iso_region"))\
.withColumn("state_code", udf_parse_state_code("iso_region"))\
.withColumnRenamed("ident", "icao_code")\
.withColumnRenamed("nearest_city", "nearest_city")\

In [None]:
df_airport=df_airport.drop("coordinates", "gps_code", "local_code","iso_region", "iso_country")
df_airport.limit(10).toPandas()

In [None]:
# Printing Shape of the Dataset
print("Number of Columns: {}".format(len(df_airport.columns)))
print("Number of Rows: {}".format(df_airport.count()))

df_airport=df_airport.drop_duplicates()

In [None]:
df_airport.printSchema()

In [None]:
df_airport.summary("count").show()

In [None]:
data_spark['airport-codes'] = df_airport

#### 1.5 Exploring the US cities Demographics Data

In [None]:
df_demographics = spark.read.format('csv').options(header='true', inferSchema='true',delimiter=';').load('./data/us-cities-demographics.csv')\
                .withColumnRenamed("State", "state")\
                .withColumnRenamed("State Code", "state_code")\
                .withColumnRenamed("City", "city")\
                .withColumnRenamed("Median Age", "median_age")\
                .withColumnRenamed("Male Population", "male_population")\
                .withColumnRenamed("Female Population", "female_population")\
                .withColumnRenamed("Total Population", "total_population")\
                .withColumnRenamed("Number of Veterans", "num_of_veterans")\
                .withColumnRenamed("Foreign-born", "foreign_born")\
                .withColumnRenamed("Average Household Size", "avg_household_size")\
                .withColumnRenamed("Race", "predominant_race")\
                .withColumnRenamed("Count", "count")\

In [None]:
# Printing Shape of the Dataset
print("Number of Columns: {}".format(len(df_demographics.columns)))
print("Number of Rows: {}".format(df_demographics.count()))

In [None]:
df_demographics=df_demographics.drop_duplicates()

In [None]:
df_demographics.limit(10).toPandas()

In [None]:
df_demographics.printSchema()

In [None]:
# Printing Shape of the Dataset after dropping duplicates
print("Number of Columns: {}".format(len(df_demographics.columns)))
print("Number of Rows: {}".format(df_demographics.count()))

In [None]:
df_demographics.summary("count").show()

In [None]:
df_demographics.where(df_demographics.male_population.isNull()).show()

In [None]:
data_spark['us-cities-demographics'] = df_demographics

#### 1.6 Exploring I94 labels data

**checking i94 meta_data**

In [None]:
i94_addr = spark.read.format('csv').load('i94_meta_data/i94addr.csv', header=True, inferSchema=True)
i94_addr.limit(5).toPandas()

In [None]:
i94_addr_df = i94_addr.selectExpr("i94_state_code as state_code","i94_state_name as state_name")
i94_addr_df.summary("count").show()

In [None]:
i94_cit = spark.read.format('csv').load('i94_meta_data/i94cit_i94res.csv', header=True, inferSchema=True)
i94_cit.limit(5).toPandas()

In [None]:
i94_cit_df = i94_cit.selectExpr("i94_country_code as country_id","country_name as country")
i94_cit_df.summary("count").show()

In [None]:
i94_mode = spark.read.format('csv').load('i94_meta_data/i94mode.csv', header=True, inferSchema=True)
i94_mode_df = i94_mode.selectExpr("i94_mode_code as mode_id","i94_mode as transportation_mode")

In [None]:
i94_mode_df.show()

In [None]:
i94_port = spark.read.format('csv').load('i94_meta_data/i94port_i94code.csv', header=True, inferSchema=True)
i94_port.limit(5).toPandas()

In [None]:
i94_port.filter(i94_port.port_state.contains('BPS')).show(20)

In [None]:
i94_port.filter(i94_port.port_state.contains('#ARPT')).show(20)

In [None]:
def parse_state_code(x):
    if x is not None and 'BPS' in x:
        return x.strip().split('(BPS)')[0].strip()
    elif x is not None and 'ARPT' in x:
        return x.strip().split('#ARPT')[0].strip()
    elif x is not None and 'SECTOR HQ' in x:
        return x.strip().split('(BP - SECTOR HQ)')[0].strip()
    elif x is not None and 'INTL' in x:
        return x.strip().split('#INTL')[0].strip()
    else:
        return x
udf_parse_state_code = udf(lambda x: parse_state_code(x), StringType())

In [None]:
i94_port=i94_port.withColumn("port_state_cleaned", udf_parse_state_code("port_state"))

In [None]:
i94_port.filter(i94_port.port_state.contains('BPS')).show(20)

In [None]:
i94_port.filter(i94_port.port_state.contains('ARPT')).show(20)

In [None]:
i94_port_df = i94_port.selectExpr("i94_port_code as port_code","port_city as city","port_state_cleaned as state_code_or_country")

In [None]:
i94_port_df.show(20)

In [None]:
data_spark['port-of-entry-codes'] = i94_port_df

In [None]:
i94_visa = spark.read.format('csv').load('i94_meta_data/i94visa.csv', header=True, inferSchema=True)
i94_visa.show()

In [None]:
i94_visa_df = i94_visa.selectExpr("i94_visa_code as visa_code","i94_visa as visa_purpose")

In [None]:
i94_visa_df.show()

In [None]:
visa = spark.read.format('csv').load('./data/visa-type.csv', header=True, inferSchema=True)
visa.show(10)

In [None]:
visa_df = visa.withColumnRenamed("visa-type", "visa_type").withColumnRenamed("description", "visa_type_description")
visa_df.limit(10).toPandas()

In [None]:
data_spark['visa-type']=visa_df

In [None]:
visa_port = spark.read.format('csv').load('./data/visa-issuing-ports.csv', header=True, inferSchema=True)
visa_port.show(5)

In [None]:
visa_port_df = visa_port.selectExpr("Post as port_of_issue","Code as visa_post_code")

In [None]:
visa_port_df.limit(10).toPandas()

In [None]:
data_spark['visa-issue-ports']=visa_port_df

In [None]:
port_of_entry = spark.read.format('csv').load('./data/port-of-entry-codes.csv', header=True, inferSchema=True)
port_of_entry.show(5)

**Note: I have used the port of entry data from I94 SAS labels**

#### 1.7 Exploring Airlines data

In [None]:
airlines = spark.read.format('csv')\
    .option("delimiter", "\t")\
    .load('./data/airlines-codes.csv', header=True, inferSchema=True)\
    .withColumnRenamed("AIRLINE NAME", "airline_name")\
    .withColumnRenamed("IATA DESIGNATOR", "airline_iata_code")\
    .withColumnRenamed("3 DIGIT CODE", "airline_3_digit_code")\
    .withColumnRenamed("ICAO CODE", "airline_icao_code")\
    .withColumnRenamed("COUNTRY / TERRITORY", "origin_country")

In [None]:
airlines.limit(10).toPandas()

In [None]:
# Printing Shape of the Dataset after dropping duplicates
airlines=airlines.drop_duplicates(['airline_iata_code'])
print("Number of Columns: {}".format(len(airlines.columns)))
print("Number of Rows: {}".format(airlines.count()))

In [None]:
airlines.summary("count").show()

In [None]:
data_spark['airlines-codes'] = airlines

### Step 2: Explore and Assess the Data
#### Explore the Data 
Exploring data quality issues, like missing values, duplicate data, etc.



In [None]:
sns.set(rc={'figure.figsize':(15,8)})

In [None]:
data_files = ['airlines-codes','airport-codes','port-of-entry-codes','us-cities-demographics','visa-issue-ports','visa-type']

for file in data_files:
    df = data_spark[file]
    null_count=df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas().T.reset_index()
    null_count.columns = ['col','null_counts']
    null_count['%_null_counts']=null_count['null_counts']/(df.count())
    if (null_count['%_null_counts']>0).any():
        plt.figure(figsize=(15,8))
        sns.barplot(x='col',y='%_null_counts',data=null_count)
        plt.title(file)
        plt.xticks(rotation=90)

**Observations:**
- In demographics data the % percentage of null values in the data set is very small, these rows can be dropped
- In airlines data the `airline_3_digit_code` columns has a very few number of null values, not needed to drop rows there. Can be a choice
- In airports data the `iata_code` column has high number of null values. this column can be dropped
- In port of entry codes data the `state_code_or_country` column has some null values, but this is because the data is not having any codes provided

#### Cleaning Steps
All the necessary steps I have provided in the above sections

### Step 3: Project Architecture and DWH Data Model
The following architecture is used in the project
![Project_Architecture](./AWS_Help/architechture.png)
#### 3.1 Conceptual Data Model
I used a Star schema to build the DWH in redshift and is as follows:
![Schema](./AWS_Help/data_model.png)

#### 3.2 Mapping Out Data Pipelines
To create the Data Lake and DWH the following steps are to be followed:

1. Create AWS account and fill the `dwh.cfg` file with necessary details
2. Use the data in the `data` folder and upload them into S3 bucket using the `create_resources.py` file
3. Copy the `dag` , `emr_bootstrap` folder and `plugins` folder to your `AIRFLOW` directory
4. Once the data is in place run the `capstone_meta_data_dag` to fill up a new S3 bucket which is used as a **`staging area`**
5. Run the `capstone_immigration_dag` to fill up the data lake in S3 with the SAS data
6. Run the `capstone_dwh_dag` to fill up the DWH in redshift

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

**Note: All the codes are available in the dags folder** 

#### 4.1.1 capstone_meta_data_dag 

**Note:** Run when the data is refreshed not needed to run every day, can be monthly or by trigger
**Action:** Uses the Data stored in a S3 bucket and performs all the regex operations for the I94 Labels data and other transforms needed for other data and stores the data in parquet format in a different S3 bucket which is used as staging area

![meta_data_dag](./AWS_Help/metadata_dag.PNG)


#### 4.1.2 capstone_immigration_dag 

**Note:** Run when the data is added monthly can be daily if the data is provided daily
**Action:** Uses the SAS Data stored in a S3 bucket and performs all the transforms needed for the data and stores it in parquet format in a different S3 bucket which is used as staging area

![meta_data_dag](./AWS_Help/immigration_data_transform.png)


#### 4.1.3 capstone_DWH_dag 

**Note:** Run when the data is added monthly can be daily if the data is provided daily
**Action:** Uses the SAS Data and other meta data stored in a staging S3 bucket and performs copy operations to AWS redshift and the data model is contrained accordingly

![meta_data_dag](./AWS_Help/dwh_dag.png)

#### 4.2 Data Quality Checks

From the data model it can be observed that the DWH tables have key constraints defined accordingly with data type and primary key when needed

Data Quality checks were included:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Extras

##### move files to S3

In [None]:
import configparser
import boto3
import pandas as pd
from io import StringIO

config = configparser.ConfigParser()
config.read('./dwh.cfg')

AWS_ACCESS_KEY = config.get('AWS', 'AWS_KEY_ID')
AWS_SECRET = config.get('AWS','AWS_SECRET')
AWS_REGION = config.get('AWS','REGION')
S3_BUCKET = config.get('S3','BUCKET_NAME')

In [None]:
s3 = boto3.resource('s3',region_name=AWS_REGION,
                    aws_access_key_id=AWS_ACCESS_KEY,
                    aws_secret_access_key=AWS_SECRET)

In [None]:
csv_buffer = StringIO()
df_airports = pd.read_csv('airport-codes_csv.csv')
df_airports.to_csv(csv_buffer)
s3.Object(S3_BUCKET,'airport_codes/airport-codes_csv.csv').put(Body=csv_buffer.getvalue())

In [None]:
df_demographics = pd.read_csv('us-cities-demographics.csv')
df_demographics.to_csv(csv_buffer)
s3.Object(S3_BUCKET,'us-demographics/us-cities-demographics.csv').put(Body=csv_buffer.getvalue())

In [None]:
out_buffer = BytesIO()
df_i94.to_parquet(out_buffer, index=False)