# US Visitors DW
__Supporting officials' decision-making to provide better visitors experience in the US__
***
## Overview
The purpose of this data engineering capstone project is to give students a chance to combine what they've learned throughout the program. This project will be an important part of learners portfolio that will help to achieve data engineering-related career goals. We could choose to complete the project provided by the Udacity team or define the scope and data ourselves. I took the first approach in building the DW on the data on immigration to the United States provided by Udacity.

## Business Scenario
We are D2I (Data to Insights), a business consulting firm specialized in data warehouse services through assisting the enterprises with navigating their data needs and creating strategic operational solutions that deliver tangible business results. Specifically, we can help with the modernization of corporations' data warehousing infrastructure by improving performance and ease of use for end users, enhancing functionality, decreasing total cost of ownership while making it possible for real-time decision making. In total, our full suite of services includes helping enterprises with data profiling, data standardization, data acquisition, data transformation and integration.

We have been contracted by the U.S. Customs and Border Protection to help them see what is hidden behind the data flood. We aim to model and create a brand new analytics solution on top of the state-of-the-art technolgies available to enable them to unleash insights from data then providing better customer experiences when coming to the US.

## Structure of the Project
Following the Udacity guide for this project, we structured this documentation with steps below:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

## Step 1: Scope the Project and Gather Data

### The Scope 
_Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>_

The main deliverable of our work here will be a data warehouse in the cloud that will support answering questions through analytics tables and dashboards. Additionally, as we developed a general source-of-truth database, the Government of the US could open the solution through a web API so backend web services could query the warehouse for information relating to international visitors.

The main information and questions a user may want to extract from the data mart would be:

* Visitors by world regions.
* Visitors by demographics.
* Correlations between destination and source demographics.
* Correlations between destination and source climates.
* Correlations between immigration by source region, and the source region temperature.
* Correlations between visitor demographics, and states visited.

***
__IMMIGRATION DATA__

For decades, U.S. immigration officers issued the I-94 Form (Arrival/Departure Record) to foreign visitors (e.g., business visitors, tourists and foreign students) who lawfully entered the United States. The I-94 was a small white paper form that a foreign visitor received from cabin crews on arrival flights and from U.S. Customs and Border Protection at the time of entry into the United States. It listed the traveler's immigration category, port of entry, data of entry into the United States, status expiration date and had a unique 11-digit identifying number assigned to it. Its purpose was to record the traveler's lawful admission to the United States.

This is the main dataset and there is a file for each month of the year of 2016 available in the directory `../../data/18-83510-I94-Data-2016/` in the [SAS](https://www.sas.com/en_us/home.html) binary database storage format `sas7bdat`. Combined, the 12 datasets have got more than 40 million rows (40.790.529) and 28 columns.

Let's make some EDA (exploratory data analysis) using just the the month of April. The related dataset has more than three million records (3.096.313).

__Some key-words to use:__

Redshift: Massively Parallel, column-oriented
Break the large files up into smaller files and ingest them to Redshift in parallel. This is the way COPY command works. S3 and Redshift in the same region in order to avoid big latency times.

In [8]:
# Importing the libraries needed in this project
import os
import pandas as pd
from datetime import datetime

from helper.util import convert_sas_date, convert_integer

In [9]:
#immigration_fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
#immigration = pd.read_sas(immigration_fname, 'sas7bdat', encoding="ISO-8859-1")
immigration = pd.read_csv("immigration_data_sample.csv").drop("Unnamed: 0", axis=1)

In [10]:
immigration.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [15]:
list(set(immigration.columns) - set(["visapost", "occup", "entdepu", "insnum"]) - set(["count", "entdepa", "entdepd", "matflag", "dtaddto", "biryear", "fltno", "admnum"]))

['i94addr',
 'i94mon',
 'cicid',
 'i94visa',
 'i94res',
 'arrdate',
 'i94yr',
 'depdate',
 'airline',
 'i94mode',
 'i94port',
 'dtadfile',
 'visatype',
 'gender',
 'i94cit',
 'i94bir']

In [13]:
immigration.isna().mean().round(4) * 100

cicid        0.00
i94yr        0.00
i94mon       0.00
i94cit       0.00
i94res       0.00
i94port      0.00
arrdate      0.00
i94mode      0.01
i94addr      4.92
depdate      4.60
i94bir       0.03
i94visa      0.00
count        0.00
dtadfile     0.00
visapost    60.76
occup       99.74
entdepa      0.01
entdepd      4.47
entdepu     99.99
matflag      4.47
biryear      0.03
dtaddto      0.02
gender      13.38
insnum      96.33
airline      2.70
admnum       0.00
fltno        0.63
visatype     0.00
dtype: float64

In [None]:
immigration = convert_integer(immigration, ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', \
                              'arrdate', 'i94mode', 'i94bir', 'i94visa', 'count', 'biryear', 'dtadfile', 'depdate'])

In [None]:
port = dict(zip(pd.read_csv("lookup/I94PORT.csv").to_dict("list")["ID"], pd.read_csv("lookup/I94PORT.csv").to_dict("list")["Port"]))
immigration["i94port_desc"] = immigration["i94port"].map(port, na_action='ignore')

In [None]:
countries = dict(zip(pd.read_csv("lookup/I94CIT_I94RES.csv").to_dict("list")["Code"], pd.read_csv("lookup/I94CIT_I94RES.csv").to_dict("list")["I94CTRY"]))
immigration["i94cit_desc"] = immigration["i94cit"].map(countries, na_action='ignore')
immigration["i94res_desc"] = immigration["i94res"].map(countries, na_action='ignore')

In [None]:
modes = dict(zip(pd.read_csv("lookup/I94MODE.csv").to_dict("list")["ID"], pd.read_csv("lookup/I94MODE.csv").to_dict("list")["Mode"]))
immigration["i94mode_desc"] = immigration["i94mode"].map(modes, na_action='ignore')

In [None]:
addrs = dict(zip(pd.read_csv("lookup/I94ADDR.csv").to_dict("list")["Code"], pd.read_csv("lookup/I94ADDR.csv").to_dict("list")["State"]))
immigration["i94addr_desc"] = immigration["i94addr"].map(addrs, na_action='ignore')

In [None]:
visas = dict(zip(pd.read_csv("lookup/I94VISA.csv").to_dict("list")["ID"], pd.read_csv("lookup/I94VISA.csv").to_dict("list")["Type"]))
immigration["i94visa_desc"] = immigration["i94visa"].map(visas, na_action='ignore')

__Data Dictionary__: Here, we describe the various fields of the dataset:

| Column Name | Description |
| :--- | :--- |
| CICID* | ID that uniquely identify one record in the dataset |
| I94YR | 4 digit year |
| I94MON | Numeric month |
| I94CIT | 3 digit code of source city for immigration |
| I94RES | 3 digit code of source country for immigration  |
| I94PORT | Port addmitted through |
| ARRDATE | Arrival date in the USA |
| I94MODE | Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported) |
| I94ADDR | State of arrival |
| DEPDATE | Departure date |
| I94BIR | Age of Respondent in Years |
| I94VISA | Visa codes collapsed into three categories: (1 = Business; 2 = Pleasure; 3 = Student) |
| COUNT | Used for summary statistics |
| DTADFILE | Character Date Field |
| VISAPOST | Department of State where where Visa was issued |
| OCCUP | Occupation that will be performed in U.S. |
| ENTDEPA | Arrival Flag. Whether admitted or paroled into the US |
| ENTDEPD | Departure Flag. Whether departed, lost visa, or deceased |
| ENTDEPU | Update Flag. Update of visa, either apprehended, overstayed, or updated to PR |
| MATFLAG | Match flag |
| BIRYEAR | 4 digit year of birth |
| DTADDTO | Character date field to when admitted in the US |
| GENDER | Gender |
| INSNUM | INS number |
| AIRLINE | Airline used to arrive in U.S. |
| ADMNUM | Admission number, should be unique and not nullable |
| FLTNO | Flight number of Airline used to arrive in U.S. |
| VISATYPE | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |

***

__Global Temperature Data__

There are a range of organizations that collate climate trends data. The three most cited land and ocean temperature data sets are NOAA’s MLOST, NASA’s GISTEMP and the UK’s HadCrut.

The Berkeley Earth, which is affiliated with Lawrence Berkeley National Laboratory, has repackaged the data from a newer compilation put it all together. The Berkeley Earth Surface Temperature Study combines 1.6 billion temperature reports from 16 pre-existing archives. It is nicely packaged and allows for slicing into interesting subsets (for example by country). They publish the source data and the code for the transformations they applied. They also use methods that allow weather observations from shorter time series to be included, meaning fewer observations need to be thrown away.

In the original dataset from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data), several files are available but in this capstone project we will be using only the `GlobalLandTemperaturesByCity`.

In [14]:
temperature_fname = '../../data2/GlobalLandTemperaturesByCity.csv'
world_temperature = pd.read_csv(temperature_fname)

In [15]:
world_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


__Data Dictionary__

| Column Name | Description |
| :--- | :--- |
| dt | Date in format YYYY-MM-DD |
| AverageTemperature | Average temperature of the city in a given date |
| City | City Name |
| Country | Country Name |
| Latitude | Latitude |
| Longitude | Longitude |

The dataset provides a long period of the world's temperature (from year 1743 to 2013). However, since the immigration dataset only has data of the US National Tourism Office in the year of 2016, the vast majority of the data here is useless. We are only keeping the American cities' latitude and longitude fields to form a dimension table for cities. It would be interesting if we could cross the two tables in order to analyse how the waves of immigration to the US relate to the changes in the temperature. But this is just unfeasible due to the different dates.

In [16]:
world_temperature = world_temperature.groupby(["Country", "City"]).agg({"AverageTemperature": "mean", 
                                                                        "Latitude": "first", "Longitude": "first"}).reset_index()

In [17]:
world_temperature.head()

Unnamed: 0,Country,City,AverageTemperature,Latitude,Longitude
0,Afghanistan,Baglan,10.790278,36.17N,69.61E
1,Afghanistan,Gardez,17.27424,32.95N,69.89E
2,Afghanistan,Gazni,10.311996,32.95N,67.98E
3,Afghanistan,Herat,14.213004,34.56N,62.27E
4,Afghanistan,Jalalabad,14.342919,34.56N,70.05E


__Airports Data__

The airport codes may refer to either [IATA](https://en.wikipedia.org/wiki/IATA_airport_code) airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the [ICAO](https://en.wikipedia.org/wiki/ICAO_airport_code) airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code (from wikipedia).

Airport codes from around the world. Downloaded from public domain source http://ourairports.com/data/ who compiled this data from multiple different sources.

`airport-codes.csv` contains the list of all airport codes, the attributes are identified in datapackage description. Some of the columns contain attributes identifying airport locations, other codes (IATA, local if exist) that are relevant to identification of an airport.
Original source url is http://ourairports.com/data/airports.csv (stored in archive/data.csv).

In [9]:
airport = pd.read_csv("airport-codes_csv.csv")

In [25]:
airport.head(20)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100.0,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810.0,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038.0,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87.0,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350.0,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


__U.S. City Demographic Data__

This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey.

This product uses the Census Bureau Data API but is not endorsed or certified by the Census Bureau.

In [92]:
us_cities_demographics = pd.read_csv("us-cities-demographics.csv", sep=";")

In [95]:
us_cities_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


# ETL

In [1]:
import helper.etl as e

In [2]:
spark = e.create_spark_session()

In [3]:
#immi = e.etl_immigration_data(spark, load_size=10)
#immi = e.etl_immigration_data(spark, load_size=10, output_path=e.OUTPUT + "immigration.parquet")

In [4]:
immi = e.etl_immigration_data(spark, input_path='../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat', 
                              input_format = "com.github.saurfang.sas.spark", load_size=100)

In [6]:
immi.show(10)

+-------+------+-----+-------+------+----------+-----+----------+-------+-------+-------+--------+--------+------+------+------+
|i94addr|i94mon|cicid|i94visa|i94res|   arrdate|i94yr|   depdate|airline|i94mode|i94port|dtadfile|visatype|gender|i94cit|i94bir|
+-------+------+-----+-------+------+----------+-----+----------+-------+-------+-------+--------+--------+------+------+------+
|   null|     4|    6|      2|   692|2016-04-29| 2016|      null|   null|   null|    XXX|    null|      B2|  null|   692|    37|
|     AL|     4|    7|      3|   276|2016-04-07| 2016|      null|   null|      1|    ATL|20130811|      F1|     M|   254|    25|
|     MI|     4|   15|      2|   101|2016-04-01| 2016|2016-08-25|     OS|      1|    WAS|20160401|      B2|     M|   101|    55|
|     MA|     4|   16|      2|   101|2016-04-01| 2016|2016-04-23|     AA|      1|    NYC|20160401|      B2|  null|   101|    28|
|     MA|     4|   17|      2|   101|2016-04-01| 2016|2016-04-23|     AA|      1|    NYC|20160401

In [5]:
temp = e.etl_temperature_data(spark, load_size=10)

In [6]:
airport = e.etl_airport_data(spark, load_size=10)

In [7]:
demographics = e.etl_demographics_data(spark, load_size=10)

In [15]:
import logging
import boto3
from botocore.exceptions import ClientError
# Retrieve the list of existing buckets
s3 = boto3.client('s3')
response = s3.list_buckets()

# Output the bucket names
print('Existing buckets:')
for bucket in response['Buckets']:
    print(f'  {bucket["Name"]}')

Existing buckets:
  data-engineer-capstone
  elasticbeanstalk-us-east-1-900646315604


In [16]:
def create_bucket(bucket_name, region=None):
    """Create an S3 bucket in a specified region

    If a region is not specified, the bucket is created in the S3 default
    region (us-east-1).

    :param bucket_name: Bucket to create
    :param region: String region to create bucket in, e.g., 'us-west-2'
    :return: True if bucket created, else False
    """

    # Create bucket
    try:
        if region is None:
            s3_client = boto3.client('s3')
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            s3_client = boto3.client('s3', region_name=region)
            location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name,
                                    CreateBucketConfiguration=location)
    except ClientError as e:
        logging.error(e)
        return False
    return True

In [19]:
create_bucket("y435")

True

In [None]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [None]:
# Performing cleaning tasks here





### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.