# Project Title
### Data Engineering Capstone Project

#### Project Summary
A USA based tourist agency what’s to analyze USA visitors, which can be done using the I94 immigration data. the goal is to structure a warehouse server in order to answer queries about the visitors Behaviors. Queries such as the following:
* How long visitors from a given county in average stay?
* What time of year has the most visitors for tourism?
* What state has the most visitors in given month? 
* What states does a given group of people more likely to visit?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import numpy as np
import csv
import boto3
import io
import configparser
import psycopg2

### Step 1: Scope the Project and Gather Data

#### Scope 

The tourist agency is new, and it doesn't have the budget to gather the wanted data. So, it used I94 immigration data, which is an open data from the US National Tourism and Trade Office. This data might not answer all of their future business intelligence questions but it’s a good start. The scope of this project is to make a warehouse ready to analyze the behavior of USA state tourists that are captured in the i94 immigration data. 

In order to analyze the tourist’s behavior in groups, a USA demographic data is included. the demographic data is from the US Census Bureau's 2015 American Community Survey which is available in website called Opensoft. 

the agency wanted to make this warehouse and analyze the data as soon as possible, using familiar tools such as SQL. because of that, the database selected is a relational database management system (RDMS) which allows for fast aggregation using familiar tools. Of course, warehouse is Online Analytical Processing (OLAP) database that will help the agency make intelligent business decisions.

For easy setup the warehouse is made using a cloud service. cloud services allow for easy expansion and adjacent, which is a fit for the new agency. Amazon web services, offers a cloud RDMS solution called Redshift that is suitable of handling large datasets because of its parallel processing.



#### Describe and Gather Data 
##### I94 immigration dataset
The dataset comes from the US National Tourism and Trade Office. i94 is a form filled by non-USA citizens , to keep track of the USA vistors. The dataset includes 2016 data which good enough for the goal of the agency.
##### USA Cities demographics data
The demographic data is from the US Census Bureau's 2015 American Community Survey which is available in website called Opensoft. the data is gathered in 2015, which is close to the i94 data. this data shows all USA cities with population greater or equal than 65000. as mentioned before this project is focused on states not cities, due to the limitations of the i94 data. because of that the demographics data needs to be grouped up by states. 

In [2]:
# Read in the data here from localy stored sas files.
file_name = "i94_jan16_sub"
fname = '../../data/18-83510-I94-Data-2016/'+file_name+'.sas7bdat'
data_immig_chunks = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1", chunksize = 10000)
df_chunk = next(data_immig_chunks)
display(df_chunk.shape)

(10000, 28)

In [3]:
df_chunk.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,7.0,2016.0,1.0,101.0,101.0,BOS,20465.0,1.0,MA,,...,,,1996.0,D/S,M,,LH,346608285.0,424,F1
1,8.0,2016.0,1.0,101.0,101.0,BOS,20465.0,1.0,MA,,...,,,1996.0,D/S,M,,LH,346627585.0,424,F1
2,9.0,2016.0,1.0,101.0,101.0,BOS,20469.0,1.0,CT,20480.0,...,,M,1999.0,07152016,F,,AF,381092385.0,338,B2
3,10.0,2016.0,1.0,101.0,101.0,BOS,20469.0,1.0,CT,20499.0,...,,M,1971.0,07152016,F,,AF,381087885.0,338,B2
4,11.0,2016.0,1.0,101.0,101.0,BOS,20469.0,1.0,CT,20499.0,...,,M,2004.0,07152016,M,,AF,381078685.0,338,B2


In [4]:
USA_cities_df = pd.read_csv("us-cities-demographics.csv", sep=';', error_bad_lines=False)
USA_cities_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


### Step 2: Explore and Assess the Data
#### Explore the Data 
pandas library is used to explore the i94 immigration dataset. Because the dataset is in a table form pandas library is an intuitive tool for this goal. Also because the i94 immigration data is in SAS format, and the database is using redshift the files is transformed to csv using pandas library. The steps of transforming data to csv can be seen below can be seen below. The csv files are then uploaded into an S3 server. this take time, but only needs to be done once.

The i94 data has a lot of data that is not explained even with the SAS description file. So, the goal is clear out these data, and focus on what is useful. Because of that many columns in the staging table is not used. the i94 data includes three types of visitors based on the intended activity which is stored in column I94VISA in the staging table. since the database is for a travel agency they are only interested to study the activities of the visitors that visit the USA for pleasure motives. 
#### Cleaning Steps

Before transforming the data, frame chunks the data undergo a cleaning process as seen below. a problem that is created from the transformation from cvs to that Null values in columns type VARCHAR are copied as empty strings. So, when the data is in the staging table the empty strings is transformed back to null values. After that, more cleaning is done in SQL. the ETL process used is a staging then insert process, so the cleaning is done using the insert queries. see the Step 4 section to see the process.

### <span style="color:red"> DO NOT RUN THE BELOW CELLS UNLESS YOU WANT TO UPLOAD THE LOACAL SAS FILES TO S3 </span>.

*note: S3 file structure is "../month/file_part#.csv"*


In [25]:
#To reduce the files numbers , the chunksize has been increased.
#change the file_month to change files
file_month = 'apr'
file_name = "i94_{}16_sub".format(file_month)
fname = '../../data/18-83510-I94-Data-2016/{}.sas7bdat'.format(file_name)
data_immig_chunks = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1", chunksize = 1000000)
print(fname)

../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat


In [26]:
#This dictionry is created to translate the contant of i94cit, and i94res 
#translate it from codes to strings
country_codes = pd.read_csv("country_codes.csv", sep=r',', error_bad_lines=False)
country_codes_dict = {}

for index, row in country_codes.iterrows():
    country_codes_dict[row['code']] = row['country']

In [35]:
#get info from dwh.cfg file
config = configparser.ConfigParser()
config.read('dwh.cfg')

KEY = config.get('AWS','KEY')
SECRET = config.get('AWS','SECRET')

file_key= config.get('S3','file_key')
bucket = config.get('S3','bucket')

In [36]:
#Initialize the s3 server to upload in
s3_resource = boto3.resource('s3',
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

In [38]:
#Convert dataframes into CSV files and upload them to S3
count = 1
for df_chunk in data_immig_chunks:
    #clearing out None values fro important columns
    print("dimensions of chunk before cleaning: "+str(df_chunk.shape))
    df_chunk.dropna(subset=['cicid', 'i94cit', 'i94res', 'i94mode',\
                            'fltno', 'arrdate', 'depdate', 'i94addr'],inplace=True)
    print("dimensions of chunk after cleaning: "+str(df_chunk.shape))
    
    #translate the codes of i94mode.
    df_chunk = df_chunk.replace({'i94mode':{ 1:'Air', 2 : 'Sea', 3:'Land'}})
    #translate the codes of i94cit, and i94res.
    df_chunk = df_chunk.replace({'i94cit': country_codes_dict})
    df_chunk = df_chunk.replace({'i94res': country_codes_dict})
    
    #transform dataframe to csv 
    csv_buffer = io.StringIO()
    df_chunk.to_csv(csv_buffer, index=False)
    
    #upload file to s3
    s3_resource.Object(bucket,'{}/{}/{}_part{}.csv'.format(file_key,file_month,file_name,count))\
        .put(Body=csv_buffer.getvalue())
    print("file :"+'{}_part{}.csv'.format(file_name,count)+" Uploaded!")
    count = count+1

dimensions of chunk before cleaning: (1000000, 28)
dimensions of chunk after cleaning: (928367, 28)
file :i94_apr16_sub_part1.csv Uploaded!
dimensions of chunk before cleaning: (1000000, 28)
dimensions of chunk after cleaning: (888034, 28)
file :i94_apr16_sub_part2.csv Uploaded!
dimensions of chunk before cleaning: (96313, 28)
dimensions of chunk after cleaning: (65169, 28)
file :i94_apr16_sub_part3.csv Uploaded!


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The model used is a star schema data model. the schema is selected because, it makes queries simple, and aggregation fast. As can be seen in the graph below, the fact table is focused on dates, which is what the i94 form is forced on too. The dimension tables are for the tourists, flights, and USA states.

![Data Model](USA_Tourism_data_model_v1.png)

#### 3.2 Mapping Out Data Pipelines
Once the needed data in csv form in s3, the model can be implemented with following steps: 

1. Copy data to from S3 to Redshift.
2. Converting empty strings to null
2. Insert and transform the data into the model tables. 

Copy and insert queries are from file ```sql_queries.py```

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
To create the data model, first tables needs to be made in the Redshift server, assuming that Redshift server is already up and running. This can be done by ``` run create_table.py``` in a python console. After that the staging query for the i94 data, needs to be executed.

In [5]:
from sql_queries import insert_table_queries, copy_table_queries,\
states_table_insert

In [6]:
#get info from dwh.cfg file
config = configparser.ConfigParser()
config.read('dwh.cfg')

#connect to the Redshift server
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".\
                        format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [62]:
#Copy data from S3 to Staging table 
print("Query: "+copy_table_queries)
cur.execute(copy_table_queries)
conn.commit()

Query:  copy staging_i94 
    FROM 's3://dend-bucket-azizdul/i94_csv_files/apr/'
    credentials 'aws_iam_role=arn:aws:iam::397229940073:role/dwhRole'
	delimiter ',' removequotes
	IGNOREHEADER 1
    region 'us-east-2' ;
    


In [63]:
#showing that null values apper as a empty strings 
cur.execute("SELECT count(*) FROM staging_i94 WHERE '' IN (i94port, visapost, gender, airline,visatype, occup,entdepa, entdepd,entdepu,matflag,dtaddto, insnum)")
conn.commit()
query_answer = cur.fetchall()
print(query_answer)

[(1881570,)]


In [64]:
#list of columns of type VARCHAR
columns_list = ['i94port', 'visapost', 'gender', 'airline','visatype', 'occup','entdepa', 'entdepd','entdepu','matflag','dtaddto', 'insnum' ]
print("converting empty strings to null")
for column in columns_list:
    try:
        print("Update staging_i94 set {} = NULL where {} = ''".format(column,column))
        cur.execute("Update staging_i94 set {} = NULL where {} = ''".format(column,column))
        conn.commit()
    except psycopg2.Error as e:
        print("Error: Inserting Rows")
        print (e)
print('DONE!')
    


converting empty strings to null
Update staging_i94 set i94port = NULL where i94port = ''
Update staging_i94 set visapost = NULL where visapost = ''
Update staging_i94 set gender = NULL where gender = ''
Update staging_i94 set airline = NULL where airline = ''
Update staging_i94 set visatype = NULL where visatype = ''
Update staging_i94 set occup = NULL where occup = ''
Update staging_i94 set entdepa = NULL where entdepa = ''
Update staging_i94 set entdepd = NULL where entdepd = ''
Update staging_i94 set entdepu = NULL where entdepu = ''
Update staging_i94 set matflag = NULL where matflag = ''
Update staging_i94 set dtaddto = NULL where dtaddto = ''
Update staging_i94 set insnum = NULL where insnum = ''
DONE!


In [65]:
#conforming that null values are turned back to null
cur.execute("SELECT count(*) FROM staging_i94 WHERE '' IN (i94port, visapost, gender, airline,visatype, occup,entdepa, entdepd,entdepu,matflag,dtaddto, insnum)")
conn.commit()
query_answer = cur.fetchall()
print(query_answer)

[(0,)]


In [7]:
#execute insert queries from staging table
for query in insert_table_queries:
    print("Query: "+query)
    try:
        cur.execute(query)
        conn.commit()
    except psycopg2.Error as e:
        print("Error: Inserting Rows")
        print (e)

Query: 
    INSERT INTO tourists( 
    cicid, i94cit, i94res, 
    i94mode,visapost, biryear, gender, visatype)
        SELECT DISTINCT 
        cicid, i94cit, i94res, 
        i94mode, visapost, biryear,
        gender, visatype
        FROM staging_i94
        WHERE cicid IS NOT NULL
        AND i94mode IS NOT NULL
        AND i94visa = 2.0
        AND cicid NOT IN (SELECT cicid 
                         from tourists)

Query: 
    INSERT INTO flights 
    (fltno, airline, i94port)
        SELECT DISTINCT
        fltno, airline, i94port
        FROM staging_i94
        WHERE fltno IS NOT NULL
        AND fltno != 'LAND' AND fltno != 'SEA'
        AND i94visa = 2.0
        AND fltno NOT IN (SELECT fltno 
                         from flights)

Query: 
    INSERT INTO stay_durations 
    (cicid, fltno,arr_state_code, arrdate, depdate)
        SELECT DISTINCT
        cicid, fltno, i94addr,
        DATEADD(day, CAST(arrdate AS int) ,'1960-1-1'),
        DATEADD(day, CAST(depdate AS int) 

In [8]:
#group demographics data by state
USA_state_df = USA_cities_df.groupby(by=["State Code","State"],  as_index=False).agg({
                                                    'Male Population': 'sum',
                                                    'Female Population': 'sum',
                                                    'Total Population': 'sum',
                                                    'Foreign-born': 'sum',
                                                    'Median Age': 'median',
                                                    })

In [9]:
#get values from Dataframe
USA_states_data = USA_state_df.values

In [10]:
#execute insert query for states table
USA_states_data
for row in USA_states_data:
    try:
        cur.execute(states_table_insert, row)
        conn.commit()
    except psycopg2.Error as e:
        print("Error: Inserting Rows")
        print (e)


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [11]:
# Quality check if tables are empty
tables_list = ['tourists', 'flights', 'states', 'stay_durations']
print("Checking if tables in DB contain data")
for table in tables_list:
    try:
        cur.execute("select count(*) from {}".format(table))
        conn.commit()
        query_answer = cur.fetchall()
        if(query_answer[0][0]> 0):
            print(table+": Pass")
        else:
            print(table+": Fail")
    except psycopg2.Error as e:
        print("Error: Inserting Rows")
        print (e)

Checking if tables in DB contain data
tourists: Pass
flights: Pass
states: Pass
stay_durations: Pass


In [12]:
#check if columns have null rows
#put in checking_list (table,primery key, column)
checking_list = [['tourists', 'cicid', 'gender'],['stay_durations', 'cicid', 'fltno'] ]
print("Checking if tables in DB contain data")
for n in checking_list:
    print("SELECT {} from {} WHERE {} IS NULL".format(n[1],n[0],n[2]))
    cur.execute("SELECT {} from {} WHERE {} IS NULL".format(n[1],n[0],n[2]))
    query_answer = cur.fetchall()
    #print(query_answer)
    if not query_answer:
        print("Null test for {} table with column {}".format(n[0], n[2])+": Pass")
    else:
        print("Null test for {} table with column {}".format(n[0], n[2])+": Fail")


Checking if tables in DB contain data
SELECT cicid from tourists WHERE gender IS NULL
Null test for tourists table with column gender: Fail
SELECT cicid from stay_durations WHERE fltno IS NULL
Null test for stay_durations table with column fltno: Pass


#### 4.3 Data dictionary 

| Field              | Type    | Table          | Explanation                                                                       | Source                 |
| ------------------ | ------- | -------------- | --------------------------------------------------------------------------------- | ---------------------- |
| i94port            | VARCHAR | Flights        | This is the code of the airport the visitors landed on                            | I94 Immigration        |
| airline            | VARCHAR | Flights        | The Airline compony used by the visitor                                           | I94 Immigration        |
| fltno              | VARCHAR | Flights        | The flight number the visitor arrived with                                        | I94 Immigration        |
| arr\_state\_code   | VARCHAR | Stay\_duration |  The state abbreviations/code that the visitor arrived on                         | I94 Immigration        |
| arrdate            | NUMERIC | Stay\_duration | the date the visitor arrived                                                      | I94 Immigration        |
| depdate            | NUMERIC | Stay\_duration | the date the visitor departed                                                     | I94 Immigration        |
| cicid              | FLOAT   | Tourists       | Id for each visitor                                                               | I94 Immigration        |
| i94cit             | NUMERIC | Tourists       | the city the visitor arrived from                                                 | I94 Immigration        |
| 94res              | NUMERIC | Tourists       | the city the visitor resides in                                                   | I94 Immigration        |
| i94mode            | VARCHAR | Tourists       | the transportation method that the visitor entered the USA with                   | I94 Immigration        |
| visapost           | VARCHAR | Tourists       | Department of State where  the visitor Visa was issued                            | I94 Immigration        |
| biryear            | NUMERIC | Tourists       | birth year of the visitor                                                         | I94 Immigration        |
| gender             | VARCHAR | Tourists       | the gender of the visitor                                                         | I94 Immigration        |
| visatype           | VARCHAR | Tourists       | Class of admission legally admitting the non-immigrant to temporarily stay in U.S | I94 Immigration        |
| state\_code        | VARCHAR | States         |  The state abbreviations/code                                                     | us cities demographics |
| state              | VARCHAR | States         |  The state Name                                                                   | us cities demographics |
| female\_pop        | NUMERIC | States         | Total number of female population                                                 | us cities demographics |
| total\_pop         | NUMERIC | States         | Total number of male population                                                   | us cities demographics |
| foreign\_born\_num | NUMERIC | States         | Total number of foreign born population                                           | us cities demographics |
| median\_Age        | NUMERIC | States         | the age median of the state                                                       | us cities demographics |

#### 4.4 Q&A

### Clearly state the rationale for the choice of tools and technologies for the project.
#### AWS redshift: 
I selected redshift service because it can process large data such as the i94 data. Then using SQL, the queries stated at the beginning can be executed. 
#### AWS S3: 
from experience uploading data and downloading it is fast and accessible. Also, integrating it with redshift works well. 
#### Pandas: 
Pandas library provides an intuitive way to explore and transform 2d structured data , as can be seen in the code. 

### Propose how often the data should be updated and why.
It depends on how often the US National Tourism and Trade Office uploads new i94 data. Regardless, monthly updates would be the most fitting, because daily or weekly doesn't change outcome of the intended queries

### Write a description of how you would approach the problem differently under the following scenarios:
#### The data was increased by 100x.
I would change the way of putting csv files to the S3 server and make the process more automated (i.e use Airflow) and make the process run on a more capable machine. Also, Because there are a lot of large table movement, I would scale up the redshift server. Meaning that nodes are more capable machines with more vCPUs onboard. which is also means increasing the capacity size of the redshift server.
#### The data populates a dashboard that must be updated on a daily basis by 7am every day.
From my previous assumption that data is updated at least in a monthly basis. But with this scenario, then a workflow management platform like airflow is needed. I would create two Directed Acyclic Graphs (DAGs), one that automate handling transforming the data to S3. Then, another one that handling data from S3 to staging, to tables, to quality checks. the DAGs are set with a daily schedule interval. 
#### The database needed to be accessed by 100+ people.
In this case the Redshift server needs to be scaled out, meaning increase the number of nodes. this would allow the cluster to handle more queries at the same time.