# Immigration to U.S. 
## Data Engineering Capstone Project

## Project Summary
In this project I connect different data sources to give users the possibility to analyse immigration to the U.S. according Temperature and U.S. City Demographic Data.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Configure environment and start Spark Session

In [1]:
# Import all needed packages
import pandas as pd
import numpy as np
import inspect
import configparser
import os
import glob
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import split, udf, col, lower
from pyspark.sql.types import IntegerType
import logging
import re
from datetime import datetime, timedelta

In [2]:
# configure pandas settings
pd.set_option("display.max_columns", 50)

In [3]:
# configure logging format
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)

In [4]:
# Get params from config file
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

# input_data = "./"
# output_data = "s3a://dend-capstone-project"
input_data = './data/'
output_data = '/Users/daniel/Desktop/output/'
# output_data = 'C:/temp/'

In [5]:
# Create spark session
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11") \
        .getOrCreate()
    spark.conf.set("spark.sql.debug.maxToStringFields", 1000)
    return spark

In [6]:
spark = create_spark_session()

## Step 1: Scope the Project and Gather Data

### Scope 
*Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc*

The main purpose of the project is to link different data sources. To do this, it is important to preprocess the data, find and filter out errors in the data, replace missing entries meaningfully or, if necessary, remove them.

I use the data sets provided by Udacity from various sources.

In the end, a data structure should emerge which enables the user to carry out his own questions and analyses. To create this, I will use the tools and services I have learned. In particular, I will use Spark to process the data and generate the data tables. Furthermore, I will use the infrastructure of AWS to use more suitable data processing and deployment services, independent of the local computer.

### Describe and Gather Data 
*Describe the data sets you're using. Where did it come from? What type of information is included?*

In [7]:
# show shape of spark dataframe
def spark_shape(df):
    return (df.count(), len(df.columns))

#### I94 Immigration Data

This data comes from the [US National Tourism and Trade Office](https://travel.trade.gov/research/reports/i94/historical/2016.html) as sas-File. A smaller dataset is given as parquet-files. It contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries). The complete dataset has a size of 40.790.529 rows and 34 columns.

In addition to the data files, Udacity offers a description file containing explanations of the individual parameters. Some of the codes used, for example for cities, states and countries, are decoded.

##### Smaller immigration test dataset

In [8]:
# load smaller test table from parquet files
immigration_data_small = os.path.join(input_data, 'sas_data/')
immigration_data_small

'./data/sas_data/'

In [9]:
immigration_small_df = spark.read.parquet(immigration_data_small)

In [10]:
spark_shape(immigration_small_df)

(3096313, 28)

##### Complete immigration dataset

In [None]:
# load full table from sas files 
immigration_data_path = os.path.join(input_data, 'data/18-83510-I94-Data-2016/*.sas7bdat')
immigration_data = [f for f in glob.glob(immigration_data_path)]
immigration_data

In [None]:
# Function according https://stackoverflow.com/a/56666722
# Keep all columns in either df1 or df2
def outter_union(df1, df2):

    # Add missing columns to df1
    left_df = df1
    for column in set(df2.columns) - set(df1.columns):
        left_df = left_df.withColumn(column, F.lit(None))

    # Add missing columns to df2
    right_df = df2
    for column in set(df1.columns) - set(df2.columns):
        right_df = right_df.withColumn(column, F.lit(None))

    return left_df.unionByName(right_df)

In [None]:
for file in immigration_data:
    if file == immigration_data[0]:
        immigration_df = spark.read.format('com.github.saurfang.sas.spark').load(file)
    else:
        new_df = spark.read.format('com.github.saurfang.sas.spark').load(file)
        immigration_df = outter_union(immigration_df, new_df)

In [None]:
# ATTENTION: full immigration table has up to 40 million rows (slow)
spark_shape(immigration_df)

In [11]:
# parse description file
i94_desc_path = os.path.join(input_data, 'I94_SAS_Labels_Descriptions.SAS')
with open(i94_desc_path) as f:
    lines = f.readlines()
    i94_desc_string = ''.join(lines)
    i94_desc_string = i94_desc_string.replace('\n', '')
    i94_desc_string = i94_desc_string.replace('\t', '')

In [12]:
# use Regular Expressions to extract the necessary data
i94_param = re.findall(r'\/\*\s+(.*?)\*\/', i94_desc_string)
    
for item in i94_param:
    # replace multiple whitespaces
    item = re.sub('\s+',' ',item)
    print(item)

I94YR - 4 digit year 
I94MON - Numeric month 
I94CIT & I94RES - This format shows all the valid and invalid codes for processing 
I94PORT - This format shows all the valid and invalid codes for processing 
ARRDATE is the Arrival Date in the USA. It is a SAS date numeric field that a permament format has not been applied. Please apply whichever date format works for you. 
I94MODE - There are missing values as well as not reported (9) 
I94ADDR - There is lots of invalid codes in this variable and the list below shows what we have found to be valid, everything else goes into 'other' 
DEPDATE is the Departure Date from the USA. It is a SAS date numeric field that a permament format has not been applied. Please apply whichever date format works for you. 
I94BIR - Age of Respondent in Years 
I94VISA - Visa codes collapsed into three categories: 1 = Business 2 = Pleasure 3 = Student
COUNT - Used for summary statistics 
DTADFILE - Character Date Field - Date added to I-94 Files - CIC does not 

| Column | Description |
|---|---|
| I94YR | 4 digit year |
| I94MON | Numeric month |
| I94CIT | 3 digit code for the country of birth of the immigrant |
| I94RES | 3 digit code for the resident country of the immigrant |
| I94PORT | Port of arrival |
| ARRDATE | Arrival Date in the USA |
| I94MODE | Mode of transportation: 1 = Air; 2 = Sea; 3 = Land; 9 = Not reported |
| I94ADDR | State Code of arrival |
| DEPDATE | Departure Date from the USA |
| I94BIR | Age of Respondent in Years |
| I94VISA | Visa codes collapsed into three categories: 1 = Business 2 = Pleasure 3 = Student|
| COUNT | Used for summary statistics |
| DTADFILE | Character Date Field |
| VISAPOST | Department of State where where Visa was issued |
| OCCUP | Occupation that will be performed in U.S. |
| ENTDEPA | Arrival Flag - admitted or paroled into the U.S. |
| ENTDEPD | Departure Flag - Departed, lost I-94 or is deceased |
| ENTDEPU | Update Flag - Either apprehended, overstayed, adjusted to perm residence |
| MATFLAG | Match flag - Match of arrival and departure records |
| BIRYEAR | 4 digit year of birth |
| DTADDTO | Character Date Field - Date to which admitted to U.S. (allowed to stay until) |
| GENDER | Non-immigrant sex |
| INSNUM | INS number |
| AIRLINE | Airline used to arrive in U.S. |
| ADMNUM | Admission Number |
| FLTNO | Flight number of Airline used to arrive in U.S. |
| VISATYPE | Class of admission legally admitting the non-immigrant to temporarily stay in U.S.|

#### World Temperature Data

This dataset comes from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) as csv-File. It contains information about the global land temperature by state and starts in 1750 for average land temperature. The dataset has a size of 645.675 rows and 5 columns.

In [13]:
temp_data = os.path.join(input_data, 'GlobalLandTemperaturesByState.csv')
temp_data

'./data/GlobalLandTemperaturesByState.csv'

In [14]:
temp_df = spark.read.csv(temp_data, header=True)

In [15]:
spark_shape(temp_df)

(645675, 5)

| Column | Description |
|---|---|
| dt | Timestamp in YYYY-MM-DD |
| AverageTemperature | Global Average Land Temperature by City |
| AverageTemperatureUncertainty | 95% confidence interval around the average |
| State | Name of State |
| Country | Name of Country |

#### U.S. City Demographic Data

This data comes from [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/) as a csv-File. It contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. It comes from the US Census Bureau's 2015 American Community Survey. The dataset has a size of 2.891 rows and 12 columns.

In [16]:
cities_data = os.path.join(input_data, 'us-cities-demographics.csv')
cities_data

'./data/us-cities-demographics.csv'

In [17]:
cities_df = spark.read.csv(cities_data, header=True, sep=';')

In [18]:
spark_shape(cities_df)

(2891, 12)

| Column | Description |
|---|---|
| City | Name of City |
| State | US state of the city |
| Median Age | Median age of the population |
| Male Population | Number of male population |
| Female Population | Number of female population |
| Total Population | Number of total population |
| Number of Veterans | Number of veterans in the city |
| Foreign-born | Number of citizens that were not born in the city |
| Average Household Size | Average number of people living in a house in the city |
| State Code | Code of the state |
| Race | Race class |
| Count | Number of people for each race |

#### Airport Code Table

This data comes from [datahub.io](https://datahub.io/core/airport-codes#data) as a csv-File. It is a simple table of airport codes and corresponding cities. The dataset has a size of 55.075 rows and 12 columns.

In [19]:
airports_data = os.path.join(input_data, 'airport-codes_csv.csv')
airports_data

'./data/airport-codes_csv.csv'

In [20]:
airports_df = spark.read.csv(airports_data, header=True)

In [21]:
spark_shape(airports_df)

(55075, 12)

| Column | Description |
|---|---|
| ident | Unique identifier |
| type | Type of airport |
| name | Name of Airport |
| elevation_ft | Altitude of airport |
| continent | Continent |
| iso_country | ISO code of the country of the airport |
| iso_region | ISO code for the region of the airport |
| municipality | City of airport |
| gps_code | GPS code of airport |
| iata_code | IATA code of airport |
| local_code | Local code of airport |
| coordinates | GPS coordinates of airport |

### Write raw tables as parquet files to S3

Store raw data from the internet to S3 bucket as parquet files for backup.

In [22]:
# put all tables into one dictionary
df_raw_all = {'immigration_small_data': immigration_small_df,
            #'immigration_data': immigration_df, 
            'temp_data': temp_df, 
            'cities_data': cities_df,
            'airport_data': airports_df
            }

In [23]:
for file, df in df_raw_all.items():
    # remove spaces and minus in column-names of dataframes
    df = df.select([F.col(col).alias(col.replace(' ', '_').replace('-', '_')) for col in df.columns])
    # write parquet files to raw path
    path = os.path.join(output_data, 'raw', file)
    df.write.parquet(path)

#### Restore data from S3 bucket

In [24]:
raw_path = os.path.join(output_data, 'raw')
table_names = [y for x, y, z in os.walk(raw_path)][0]
subdirs = glob.glob(raw_path + '/*/')
df_raw_all = {}
for name, path in zip(table_names, subdirs):
    df_raw_all[name] = spark.read.parquet(path)
    logging.info(f'Dataframe <{name}> from parquet-file in <{path}> successfully loaded')

2020-03-18 18:08:44,934 - Dataframe <immigration_small_data> from parquet-file in </Users/daniel/Desktop/output/raw/immigration_small_data/> successfully loaded
2020-03-18 18:08:45,063 - Dataframe <airport_data> from parquet-file in </Users/daniel/Desktop/output/raw/airport_data/> successfully loaded
2020-03-18 18:08:45,215 - Dataframe <temp_data> from parquet-file in </Users/daniel/Desktop/output/raw/temp_data/> successfully loaded
2020-03-18 18:08:45,339 - Dataframe <cities_data> from parquet-file in </Users/daniel/Desktop/output/raw/cities_data/> successfully loaded


In [25]:
# replace large immigration dataframe with the smaller one for testing on local computer
df_raw_all['immigration_data'] = df_raw_all['immigration_small_data']

## Step 2: Explore and Assess the Data
### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [26]:
# functions for exploring

# count NULL values in columns
def na_cols_in_df(df):
    total_rows = df.count()
    table = {}
    for col in df.columns:
        total_na = df.where(f"{col} is NULL").count()
        percent_na = total_na / total_rows * 100
        table[col] = [total_na, percent_na]
    table = pd.DataFrame(table, index=['Total NA', 'Percent NA'])
    # round each number by 2 decimals
    table = table.round(2)
    print(f'Total number of rows: {total_rows}')
    return table

#### I94 Immigration Data

In [27]:
df_raw_all['immigration_data'].limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,57.0,2.0,1.0,20160430,ACK,,G,O,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2
6,5748523.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,66.0,2.0,1.0,20160430,ACK,,G,O,,M,1950.0,10292016,F,,NZ,94979690000.0,10,B2
7,5748524.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,41.0,2.0,1.0,20160430,ACK,,G,O,,M,1975.0,10292016,F,,NZ,94979750000.0,10,B2
8,5748525.0,2016.0,4.0,245.0,464.0,HOU,20574.0,1.0,FL,20581.0,27.0,2.0,1.0,20160430,ACK,,G,O,,M,1989.0,10292016,M,,NZ,94973250000.0,28,B2
9,5748526.0,2016.0,4.0,245.0,464.0,LOS,20574.0,1.0,CA,20581.0,26.0,2.0,1.0,20160430,ACK,,G,O,,M,1990.0,10292016,F,,NZ,95013550000.0,2,B2


In [28]:
na_cols_in_df(df_raw_all['immigration_data']).T

Total number of rows: 3096313


Unnamed: 0,Total NA,Percent NA
cicid,0.0,0.0
i94yr,0.0,0.0
i94mon,0.0,0.0
i94cit,0.0,0.0
i94res,0.0,0.0
i94port,0.0,0.0
arrdate,0.0,0.0
i94mode,239.0,0.01
i94addr,152592.0,4.93
depdate,142457.0,4.6


Lot's of missing data in some columns. I will focus the work on the columns with the most data available.

#### World Temperature Data

In [29]:
df_raw_all['temp_data'].limit(10).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,State,Country
0,1976-05-01,16.558999999999994,0.307,Tennessee,United States
1,1976-06-01,22.253,0.26,Tennessee,United States
2,1976-07-01,24.023000000000003,0.325,Tennessee,United States
3,1976-08-01,23.02,0.185,Tennessee,United States
4,1976-09-01,19.52,0.181,Tennessee,United States
5,1976-10-01,11.659,0.276,Tennessee,United States
6,1976-11-01,4.643,0.122,Tennessee,United States
7,1976-12-01,2.062,0.182,Tennessee,United States
8,1977-01-01,-4.7650000000000015,0.126,Tennessee,United States
9,1977-02-01,3.482,0.244,Tennessee,United States


In [30]:
na_cols_in_df(df_raw_all['temp_data']).T

Total number of rows: 645675


Unnamed: 0,Total NA,Percent NA
dt,0.0,0.0
AverageTemperature,25648.0,3.97
AverageTemperatureUncertainty,25648.0,3.97
State,0.0,0.0
Country,0.0,0.0


Some Temperature Data is missing. These rows should be dropped in the next steps.

#### U.S. City Demographic Data

In [31]:
df_raw_all['cities_data'].limit(10).toPandas()

Unnamed: 0,City,State,Median_Age,Male_Population,Female_Population,Total_Population,Number_of_Veterans,Foreign_born,Average_Household_Size,State_Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712,41971,80683,4815,8355,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762,43270,85032,5783,3269,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751,58077,109828,5204,16315,2.65,NC,Asian,11060


In [32]:
na_cols_in_df(df_raw_all['cities_data']).T

Total number of rows: 2891


Unnamed: 0,Total NA,Percent NA
City,0.0,0.0
State,0.0,0.0
Median_Age,0.0,0.0
Male_Population,3.0,0.1
Female_Population,3.0,0.1
Total_Population,0.0,0.0
Number_of_Veterans,13.0,0.45
Foreign_born,13.0,0.45
Average_Household_Size,16.0,0.55
State_Code,0.0,0.0


#### Airport Code Table

In [33]:
df_raw_all['airport_data'].limit(10).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


In [34]:
na_cols_in_df(df_raw_all['airport_data']).T

Total number of rows: 55075


Unnamed: 0,Total NA,Percent NA
ident,0.0,0.0
type,0.0,0.0
name,0.0,0.0
elevation_ft,7006.0,12.72
continent,0.0,0.0
iso_country,0.0,0.0
iso_region,0.0,0.0
municipality,5676.0,10.31
gps_code,14045.0,25.5
iata_code,45886.0,83.32


In [None]:
df_raw_all['airport_data'].groupBy(['name', 'type', 'coordinates']).count().where('count > 1').orderBy('count', ascending=False).show()

In [None]:
df_raw_all['airport_data'].where("name = 'Mukho Port Heliport'").show()

In [None]:
df_raw_all['airport_data'].where("name LIKE 'Cheonmi-ri South%'").show()

There are some airports in the dataset with duplicate data. These rows should be dropped in the next step.

#### Cleaning Steps
*Document steps necessary to clean the data*

* Temperature data will only be used for 2012 which is the last complete dataset and the nearest to the immigrations dataset from 2016
* Remove data from the 2016s immigration dataset which has a timecode before 2016
* Drop duplicates in all data tables
* Remove nulls and inconsistent data

## Step 3: Define the Data Model
### 3.1 Conceptual Data Model
*Map out the conceptual data model and explain why you chose that model*

#### Table links
![Image](images/table_links.png)

To get an overview of possible links between the individual data tables, it is useful to choose a graphical representation. From this, it is much easier to derive a decision for a later data model.

It is easy to see that there are many links between the individual tables, for example to connect the country and city columns. 

#### Choosen data model
![Image](images/star_schema.png)

I think that a star schema with one fact and several dimension tables fits very well to this application. The Immigrations table is a classic fact table with numerous single events that can be analyzed by the user. The dimension tables, which provide additional information on date, city, state and temperature are structured around this table. In the case of temperature data, I choose a division into a (entry) country table and a US state table. With this it is possible to perform analyses that consider where the traveler comes from as well as where he goes.

I left out the airport table because the data for the exercise do not provide any added value. Only the column `elevation_ft` is added as additional information. However, for our observation case I do not see any useful addition to the exercise.

### 3.2 Mapping Out Data Pipelines
*List the steps necessary to pipeline the data into the chosen data model*

1. Staging of the necessary tables
2. Creating the fact and dimension tables

## Step 4: Run Pipelines to Model the Data 
### 4.1 Create the data model
*Build the data pipelines to create the data model.*

#### 4.1.1 Immigrations table
#### Get country codes from SAS description file

In [35]:
i94_country_codes = dict(re.findall(r'(\d{3})\s+\=\s+\'(.*?)\'', i94_desc_string))
#i94_country_codes
lol = list(map(list, i94_country_codes.items()))
countries_table = spark.createDataFrame(lol, ["country_id", "country"])
countries_table = countries_table.withColumn('country', lower(col('country')))
countries_table.show()

+----------+--------------------+
|country_id|             country|
+----------+--------------------+
|       582|mexico air sea, a...|
|       236|         afghanistan|
|       101|             albania|
|       316|             algeria|
|       102|             andorra|
|       324|              angola|
|       529|            anguilla|
|       518|     antigua-barbuda|
|       687|          argentina |
|       151|             armenia|
|       532|               aruba|
|       438|           australia|
|       103|             austria|
|       152|          azerbaijan|
|       512|             bahamas|
|       298|             bahrain|
|       274|          bangladesh|
|       513|            barbados|
|       104|             belgium|
|       581|              belize|
+----------+--------------------+
only showing top 20 rows



In [36]:
path = os.path.join(output_data, 'staging', 'countries_table')
countries_table.write.parquet(path)

#### Get state codes from SAS description file

In [37]:
i94_state_codes = dict(re.findall(r"'(\w{2})'\s*=\s*'(.*?)\s*'", i94_desc_string))
# remove port keys
port_keys = ['AG', 'NK']
for key in port_keys:
    del i94_state_codes[key]
#i94_state_codes
lol = list(map(list, i94_state_codes.items()))
states_staging = spark.createDataFrame(lol, ["state_id", "state"])
states_staging = states_staging.withColumn('state', lower(col('state')))
states_staging.show()

+--------+-----------------+
|state_id|            state|
+--------+-----------------+
|      AL|          alabama|
|      AK|           alaska|
|      AZ|          arizona|
|      AR|         arkansas|
|      CA|       california|
|      CO|         colorado|
|      CT|      connecticut|
|      DE|         delaware|
|      DC|dist. of columbia|
|      FL|          florida|
|      GA|          georgia|
|      GU|             guam|
|      HI|           hawaii|
|      ID|            idaho|
|      IL|         illinois|
|      IN|          indiana|
|      IA|             iowa|
|      KS|           kansas|
|      KY|         kentucky|
|      LA|        louisiana|
+--------+-----------------+
only showing top 20 rows



In [38]:
path = os.path.join(output_data, 'staging', 'states_table')
states_staging.write.parquet(path)

#### Create Mode and Visa codes from SAS description file

In [39]:
i94_mode_codes = {
    1 : 'Air',
    2 : 'Sea',
    3 : 'Land',
    9 : 'Not reported'
}
i94_visa_codes = {
    1 : 'Business',
    2 : 'Pleasure',
    3 : 'Student'
}

In [40]:
lol = list(map(list, i94_mode_codes.items()))
modes_table = spark.createDataFrame(lol, ["mode_id", "mode"])
modes_table.show()

+-------+------------+
|mode_id|        mode|
+-------+------------+
|      1|         Air|
|      2|         Sea|
|      3|        Land|
|      9|Not reported|
+-------+------------+



In [88]:
path = os.path.join(output_data, 'staging', 'modes_table')
modes_table.write.parquet(path)

In [42]:
lol = list(map(list, i94_visa_codes.items()))
visas_table = spark.createDataFrame(lol, ["visa_id", "visa"])
visas_table.show()

+-------+--------+
|visa_id|    visa|
+-------+--------+
|      1|Business|
|      2|Pleasure|
|      3| Student|
+-------+--------+



In [89]:
path = os.path.join(output_data, 'staging', 'visas_table')
visas_table.write.parquet(path)

#### Create Immigration table

In [44]:
immigrations_stage = df_raw_all['immigration_data']

In [45]:
# replace all kind of None strings with Spark Null
immigrations_stage = immigrations_stage.replace(['NaN', 'NONE', 'Null', 'null', 'None'],[None, None, None, None, None])

In [46]:
# first day in 2016 as SAS date
start_date = abs(datetime(2016, 1, 1).date() - datetime(1960, 1, 1).date()).days

In [47]:
# drop rows with dates before 2016
immigrations_stage = immigrations_stage.filter(immigrations_stage.arrdate >= start_date).filter(immigrations_stage.depdate >= start_date)

In [48]:
# create timestamp from sas dates
get_timestamp = udf(lambda x: (datetime(1960, 1, 1).date() + timedelta(x)).isoformat() if x else None)
immigrations_stage = immigrations_stage.withColumn('arrdate_ts', get_timestamp(immigrations_stage.arrdate))
immigrations_stage = immigrations_stage.withColumn('depdate_ts', get_timestamp(immigrations_stage.depdate))

In [49]:
# add mode columns
immigrations_stage = immigrations_stage.join(modes_table, modes_table.mode_id == immigrations_stage.i94mode)

In [50]:
# add visa columns
immigrations_stage = immigrations_stage.join(visas_table, visas_table.visa_id == immigrations_stage.i94visa)

In [51]:
# add country names to i94cit and i94res
immigrations_stage = immigrations_stage.join(countries_table, countries_table.country_id == immigrations_stage.i94cit).withColumnRenamed('country','i94cit_country').drop('country_id')
immigrations_stage = immigrations_stage.join(countries_table, countries_table.country_id == immigrations_stage.i94res).withColumnRenamed('country','i94res_country').drop('country_id')

In [52]:
immigrations_stage.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,arrdate_ts,depdate_ts,mode_id,mode,visa_id,visa,i94cit_country,i94res_country
0,5761453.0,2016.0,4.0,299.0,299.0,WAS,20574.0,1.0,DC,20580.0,50.0,1.0,1.0,20160430,ULN,,G,O,,M,1966.0,10292016,M,,KE,94964160000.0,93,B1,2016-04-30,2016-05-06,1,Air,1,Business,mongolia,mongolia
1,5761458.0,2016.0,4.0,299.0,299.0,WAS,20574.0,1.0,WA,20588.0,40.0,1.0,1.0,20160430,ULN,,G,O,,M,1976.0,10292016,M,,KE,94964490000.0,93,B1,2016-04-30,2016-05-14,1,Air,1,Business,mongolia,mongolia
2,5761467.0,2016.0,4.0,299.0,299.0,SFR,20574.0,1.0,CA,20581.0,31.0,1.0,1.0,20160430,ULN,,G,O,,M,1985.0,10292016,F,,UA,94981180000.0,892,B1,2016-04-30,2016-05-07,1,Air,1,Business,mongolia,mongolia
3,5761472.0,2016.0,4.0,299.0,299.0,SFR,20574.0,1.0,VA,20581.0,32.0,1.0,1.0,20160430,ULN,,G,O,,M,1984.0,10292016,F,,UA,94980810000.0,892,B1,2016-04-30,2016-05-07,1,Air,1,Business,mongolia,mongolia
4,5761474.0,2016.0,4.0,299.0,299.0,SFR,20574.0,1.0,VA,20595.0,29.0,1.0,1.0,20160430,ULN,,G,O,,M,1987.0,10292016,F,,UA,94980750000.0,892,B1,2016-04-30,2016-05-21,1,Air,1,Business,mongolia,mongolia


In [53]:
path = os.path.join(output_data, 'staging', 'immigrations_stage')
immigrations_stage.write.parquet(path)

In [54]:
immigrations_table = immigrations_stage.selectExpr(
    'cast(cicid as int)', 
    'arrdate_ts', 
    'depdate_ts', 
    'i94port', 
    'cast(i94mode as int)',
    'airline',
    'fltno',
    'cast(i94yr as int)',
    'cast(i94mon as int)',
    'visatype',
    'visa AS i94visa',
    'cast(i94bir as int)',
    'i94cit_country AS i94cit',
    'i94res_country AS i94res',
    'i94addr',
    'gender'
).dropDuplicates()

In [55]:
immigrations_table.limit(5).toPandas()

Unnamed: 0,cicid,arrdate_ts,depdate_ts,i94port,i94mode,airline,fltno,i94yr,i94mon,visatype,i94visa,i94bir,i94cit,i94res,i94addr,gender
0,5291801,2016-04-28,2016-06-13,DAL,1,KE,31,2016,4,B2,Pleasure,46,mongolia,mongolia,TX,
1,4111492,2016-04-22,2016-04-23,NYC,1,KE,85,2016,4,B2,Pleasure,34,mongolia,mongolia,MA,F
2,621773,2016-04-03,2016-04-08,ORL,1,AV,28,2016,4,B1,Business,38,ecuador,ecuador,FL,M
3,3478949,2016-04-18,2016-04-23,NYC,1,XL,538,2016,4,B1,Business,28,ecuador,ecuador,NY,F
4,3479800,2016-04-18,2016-04-24,ATL,1,DL,680,2016,4,B1,Business,44,ecuador,ecuador,NH,M


In [56]:
immigrations_table.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- arrdate_ts: string (nullable = true)
 |-- depdate_ts: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- visatype: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- gender: string (nullable = true)



In [57]:
path = os.path.join(output_data, 'ops', 'immigrations_table')
immigrations_table.write.parquet(path)

#### 4.1.2 Cities table
#### Get city codes from SAS description file

In [58]:
i94_city_codes = dict(re.findall(r"'(\w{3})'\s*=\s*'(.*?)\s*'", i94_desc_string))
#i94_city_codes

In [59]:
lol = list(map(list, i94_city_codes.items()))
cities_table = spark.createDataFrame(lol, ["city_id", "city_state"])
cities_table.show()

+-------+--------------------+
|city_id|          city_state|
+-------+--------------------+
|    ALC|           ALCAN, AK|
|    ANC|       ANCHORAGE, AK|
|    BAR|BAKER AAF - BAKER...|
|    DAC|   DALTONS CACHE, AK|
|    PIZ|DEW STATION PT LA...|
|    DTH|    DUTCH HARBOR, AK|
|    EGL|           EAGLE, AK|
|    FRB|       FAIRBANKS, AK|
|    HOM|           HOMER, AK|
|    HYD|           HYDER, AK|
|    JUN|          JUNEAU, AK|
|    5KE|       KETCHIKAN, AK|
|    KET|       KETCHIKAN, AK|
|    MOS|MOSES POINT INTER...|
|    NIK|         NIKISKI, AK|
|    NOM|             NOM, AK|
|    PKC|     POKER CREEK, AK|
|    ORI|  PORT LIONS SPB, AK|
|    SKA|         SKAGWAY, AK|
|    SNP| ST. PAUL ISLAND, AK|
+-------+--------------------+
only showing top 20 rows



In [60]:
# split city_state in two separate columns
cities_table = cities_table.withColumn("city", split("city_state", ", ").getItem(0)).withColumn("state_code", split("city_state", ", ").getItem(1))
cities_table = cities_table.drop('city_state')
cities_table.show()

+-------+--------------------+----------+
|city_id|                city|state_code|
+-------+--------------------+----------+
|    ALC|               ALCAN|        AK|
|    ANC|           ANCHORAGE|        AK|
|    BAR|BAKER AAF - BAKER...|        AK|
|    DAC|       DALTONS CACHE|        AK|
|    PIZ|DEW STATION PT LA...|        AK|
|    DTH|        DUTCH HARBOR|        AK|
|    EGL|               EAGLE|        AK|
|    FRB|           FAIRBANKS|        AK|
|    HOM|               HOMER|        AK|
|    HYD|               HYDER|        AK|
|    JUN|              JUNEAU|        AK|
|    5KE|           KETCHIKAN|        AK|
|    KET|           KETCHIKAN|        AK|
|    MOS|MOSES POINT INTER...|        AK|
|    NIK|             NIKISKI|        AK|
|    NOM|                 NOM|        AK|
|    PKC|         POKER CREEK|        AK|
|    ORI|      PORT LIONS SPB|        AK|
|    SKA|             SKAGWAY|        AK|
|    SNP|     ST. PAUL ISLAND|        AK|
+-------+--------------------+----

In [61]:
path = os.path.join(output_data, 'ops', 'cities_table')
cities_table.write.parquet(path)

#### 4.1.3 States table
#### Get states data from Demographic table

In [62]:
df_raw_all['cities_data'].limit(10).show()

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|         State|Median_Age|Male_Population|Female_Population|Total_Population|Number_of_Veterans|Foreign_born|Average_Household_Size|State_Code|                Race|Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|       Alabama|      38.5|          3

In [63]:
df_raw_all['cities_data'].printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median_Age: string (nullable = true)
 |-- Male_Population: string (nullable = true)
 |-- Female_Population: string (nullable = true)
 |-- Total_Population: string (nullable = true)
 |-- Number_of_Veterans: string (nullable = true)
 |-- Foreign_born: string (nullable = true)
 |-- Average_Household_Size: string (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [64]:
# convert necessary column to integer
demographics_table = df_raw_all['cities_data'].withColumn("Male_Population", df_raw_all['cities_data']["Male_Population"].cast(IntegerType()))\
                                                .withColumn("Female_Population", df_raw_all['cities_data']["Female_Population"].cast(IntegerType()))\
                                                .withColumn("Total_Population", df_raw_all['cities_data']["Total_Population"].cast(IntegerType()))\
                                                .withColumn("Number_of_Veterans", df_raw_all['cities_data']["Number_of_Veterans"].cast(IntegerType()))\
                                                .withColumn("Foreign_born", df_raw_all['cities_data']["Foreign_born"].cast(IntegerType()))\
                                                .withColumn("Count", df_raw_all['cities_data']["Count"].cast(IntegerType()))
# pivot race column, aggregate by sum for state view
group_cols = ['State_Code', 'State', 'Male_Population', 'Female_Population', 'Total_Population', 'Number_of_Veterans', 'Foreign_born']
demographics_table = demographics_table.groupBy(group_cols).pivot('Race').sum('Count')
demographics_table = demographics_table.groupBy(['State_Code', 'State']).sum()

In [65]:
# replace blancs, 'sum' and brackets in column names
demographics_table = demographics_table.select([F.col(col).alias(col.replace(' ', '_').replace('-','_').lower().replace(r'sum(', '').replace(r')', '')) for col in demographics_table.columns])

In [66]:
demographics_table.orderBy('State_Code').limit(10).toPandas()

Unnamed: 0,state_code,state,male_population,female_population,total_population,number_of_veterans,foreign_born,american_indian_and_alaska_native,asian,black_or_african_american,hispanic_or_latino,white
0,AK,Alaska,152945,145750,298695,27492,33258,36339,36825,23107,27261,212696
1,AL,Alabama,497248,552381,1049629,71543,52154,8084,28769,521068,39313,498920
2,AR,Arkansas,286479,303400,589879,31704,62108,9381,22062,149608,77813,384733
3,AZ,Arizona,2227455,2272087,4499542,264505,682313,129708,229183,296222,1508157,3591611
4,CA,California,12278281,12544179,24822460,928270,7448257,401386,4543730,2047009,9856464,14905129
5,CO,Colorado,1454619,1481050,2935669,187896,337631,62613,148790,208043,703722,2463916
6,CT,Connecticut,432157,453424,885581,24953,225866,10729,48311,231822,309992,505674
7,DC,District of Columbia,319705,352523,672228,25963,95117,6130,35072,328786,71129,285402
8,DE,Delaware,32680,39277,71957,3063,3336,414,1193,44182,5516,23743
9,FL,Florida,3236773,3487375,6796738,388228,1688931,46759,264933,1652619,1942022,4758144


In [87]:
# save states temperature table
states_table = demographics_table.select('*')
# write table to ops folder
path = os.path.join(output_data, 'ops', 'states_table')
states_table.write.parquet(path)

#### 4.1.4 Temperature tables

In [74]:
temperatures_table = df_raw_all['temp_data']

In [75]:
temperatures_table.select('dt').rdd.max()

Row(dt='2013-09-01')

In [76]:
# filter table for the last complete year of the dataset which is in 2012
temperatures_table = temperatures_table.where("dt LIKE '2012%'")
temperatures_table.show(20)

+----------+------------------+-----------------------------+---------+-------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|    State|      Country|
+----------+------------------+-----------------------------+---------+-------------+
|2012-01-01|             5.799|                        0.294|Tennessee|United States|
|2012-02-01|7.0280000000000005|                         0.24|Tennessee|United States|
|2012-03-01|15.655999999999999|          0.17600000000000002|Tennessee|United States|
|2012-04-01|            15.826|                        0.254|Tennessee|United States|
|2012-05-01|            21.705|                         0.17|Tennessee|United States|
|2012-06-01|            23.524|                        0.281|Tennessee|United States|
|2012-07-01|27.383000000000003|                        0.324|Tennessee|United States|
|2012-08-01|             24.64|                        0.304|Tennessee|United States|
|2012-09-01|20.968000000000004|                       

In [77]:
# split date column to month and year because temperature in 2012 is only recorded monthly
temperatures_table = temperatures_table.selectExpr(
    'year(dt) AS year',
    'month(dt) AS month',
    'AverageTemperature AS average_temperature',
    'State AS state',
    'Country AS country'
)
temperatures_table.show(5)

+----+-----+-------------------+---------+-------------+
|year|month|average_temperature|    state|      country|
+----+-----+-------------------+---------+-------------+
|2012|    1|              5.799|Tennessee|United States|
|2012|    2| 7.0280000000000005|Tennessee|United States|
|2012|    3| 15.655999999999999|Tennessee|United States|
|2012|    4|             15.826|Tennessee|United States|
|2012|    5|             21.705|Tennessee|United States|
+----+-----+-------------------+---------+-------------+
only showing top 5 rows



In [79]:
# save states temperature table
us_state_temperatures_table = temperatures_table.select('*')
# write table to ops folder
path = os.path.join(output_data, 'ops', 'us_state_temperatures_table')
us_state_temperatures_table.write.parquet(path, partitionBy=['country', 'state', 'year', 'month'])

In [82]:
# create countries temperature table
country_temperatures_table = temperatures_table.select('*').groupBy(['Country', 'month', 'year']).agg(F.avg('average_temperature').alias('average_temperature'))
# lower country names for join
country_temperatures_table = country_temperatures_table.withColumn('Country', lower(col('Country')))

In [83]:
# write table to ops folder
path = os.path.join(output_data, 'ops', 'country_temperatures_table')
country_temperatures_table.write.parquet(path)

In [None]:
# test join
country_temperatures_table.join(countries_table, country_temperatures_table.Country == countries_table.country).show()

#### 4.1.5 Dates table

In [84]:
arrdate_table = immigrations_table.select('arrdate_ts').distinct()
depdate_table = immigrations_table.select('depdate_ts').distinct()
date_table = arrdate_table.union(depdate_table).distinct().orderBy('arrdate_ts')
date_table = date_table.selectExpr(
    'arrdate_ts AS date',
    'day(arrdate_ts) AS day',
    'month(arrdate_ts) AS month',
    'year(arrdate_ts) AS year',
    'weekofyear(arrdate_ts) AS week',
    'weekday(arrdate_ts) AS weekday'
)

In [85]:
# write table to ops folder
path = os.path.join(output_data, 'ops', 'date_table')
date_table.write.parquet(path)

In [86]:
date_table.show(5)

+----------+---+-----+----+----+-------+
|      date|day|month|year|week|weekday|
+----------+---+-----+----+----+-------+
|2016-01-03|  3|    1|2016|  53|      6|
|2016-01-05|  5|    1|2016|   1|      1|
|2016-01-06|  6|    1|2016|   1|      2|
|2016-01-10| 10|    1|2016|   1|      6|
|2016-01-13| 13|    1|2016|   2|      2|
+----------+---+-----+----+----+-------+
only showing top 5 rows



#### 4.2 Data Quality Checks
*Explain the data quality checks you'll perform to ensure the pipeline ran as expected.*

The Data Quality Check is divided into three parts. For this purpose, all existing data tables from the `dev` folder are first read in one after the other.

In a first step before the import, it is checked whether the data table is part of the used star schema.

Secondly, the number of rows is determined. If the table is empty, an error message is displayed.

In a third step, the number of columns in the imported data table is determined and it is checked whether there is more than one column. If there is only one column or no column at all, an error message is also generated.

The fourth step checks whether all imported data sources completely correspond to the previously defined star schema. If there are any discrepancies, this will also be displayed to the user.

In [114]:
# check part of star_schema, number of rows and columns of tables in ops folder 
star_schema_tables = ['immigrations_table', 
                   'date_table', 
                   'cities_table', 
                   'states_table', 
                   'us_state_temperatures_table', 
                   'country_temperatures_table']
found_tables = []

path = os.path.join(output_data, 'ops')
table_names = [y for x, y, z in os.walk(path)][0]
subdirs = glob.glob(path + '/*/')
df_check = {}
for name, path in zip(table_names, subdirs):
    # check part of schema
    if name not in star_schema_tables:
        raise ValueError(f'Table {name} not known from star schema.')
    # load table from parquet
    df_check[name] = spark.read.parquet(path)
    print(f'Dataframe <{name}> successfully loaded.')
    # count rows and columns
    rows_count = df_check[name].count()
    cols_count = len(df_check[name].columns)
    if rows_count < 1:
        raise ValueError(f'Data quality check failed! <{name}> contains 0 rows.')
    if cols_count < 2:
        raise ValueError(f'Data quality check failed! <{name}> contains 0 columns.')
    print(f'Data Quality Checks passed successfully. <{name}> contains <{rows_count}> rows and <{cols_count}> columns.\n')
    found_tables.append(name)

# sorting both the lists 
star_schema_tables.sort() 
found_tables.sort() 
# check part of star schema 
if star_schema_tables != found_tables:
    raise ValueError('Tables not identical to star schema.')
print ('All tables from star schema exist.')

Dataframe <us_state_temperatures_table> successfully loaded.
Data Quality Checks passed successfully. <us_state_temperatures_table> contains <2892> rows and <5> columns.

Dataframe <cities_table> successfully loaded.
Data Quality Checks passed successfully. <cities_table> contains <656> rows and <3> columns.

Dataframe <states_table> successfully loaded.
Data Quality Checks passed successfully. <states_table> contains <49> rows and <12> columns.

Dataframe <country_temperatures_table> successfully loaded.
Data Quality Checks passed successfully. <country_temperatures_table> contains <84> rows and <4> columns.

Dataframe <date_table> successfully loaded.
Data Quality Checks passed successfully. <date_table> contains <207> rows and <6> columns.

Dataframe <immigrations_table> successfully loaded.
Data Quality Checks passed successfully. <immigrations_table> contains <2587414> rows and <16> columns.

All tables from star schema exist.


In [119]:
# Create tables
immigrations_table.createOrReplaceTempView("immigrations_table")
date_table.createOrReplaceTempView("date_table")
country_temperatures_table.createOrReplaceTempView("country_temperatures_table")
states_table.createOrReplaceTempView("states_table")
cities_table.createOrReplaceTempView("cities_table")
us_state_temperatures_table.createOrReplaceTempView("us_state_temperatures_table")

spark.sql("""
SELECT weekday, count(*) 
FROM immigrations_table i
JOIN date_table d
ON i.arrdate_ts = d.date
JOIN cities_table c
ON i.i94port = c.city_id
WHERE c.state_code = 'FL'
GROUP BY 1
""").show()

+-------+--------+
|weekday|count(1)|
+-------+--------+
|      1|   66873|
|      6|   74947|
|      3|   85483|
|      5|  108968|
|      4|  121404|
|      2|   69165|
|      0|   66645|
+-------+--------+



#### 4.3 Data dictionary 
*Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.*

#### Fact Table: Immigrations
| Column | Description |
|---|---|
| ARRDATE | Arrival Date in the USA |
| DEPDATE | Departure Date from the USA |
| I94PORT | Port of arrival |
| I94MODE | Mode of transportation |
| AIRLINE | Airline used to arrive in U.S. |
| FLTNO | Flight number of Airline used to arrive in U.S. |
| I94YR | 4 digit year |
| I94MON | Numeric month |
| VISATYPE | Class of admission legally admitting the non-immigrant to temporarily stay in U.S.|
| I94VISA | Visa codes collapsed into three categories|
| I94BIR | Age of Respondent in Years |
| I94CIT | country of birth of the immigrant |
| I94RES | resident country of the immigrant |
| I94ADDR | State Code of arrival |
| GENDER | Non-immigrant sex |

#### Dim Table: States
| Column | Description |
|---|---|
| state_code | US state code |
| state | US state |
| male_population | Number of male population |
| female_population | Number of female population |
| population | Number of total population |
| number_of_veterans | Number of veterans in the city |
| foreign-born | Number of citizens that were not born in the city |
| american_indian_and_alaska_native | Number of american indian and alaska native people |
| asian | Number of asian people |
| black_or_african_american | Number of black or african american people |
| hispanic_or_latino | Number of black or hispanic or latino people |
| white | Number of white people |

#### Dim Table: Dates
| Column | Description |
|---|---|
| date | Date as YYYY-MM-DD |
| day | Day |
| month | Month |
| year | Year |
| week | Number of week of year |
| weekday | Number of day of week |

#### Dim Table: Cities
| Column | Description |
|---|---|
| city_id | 3 letter code for the city |
| city | City name |
| state_code | State code |

#### Dim Table: Country_temperatures
| Column | Description |
|---|---|
| country | Country name |
| year | Year |
| month | Month |
| AverageTemperature | Average Temperature of country for a specific month |

#### Dim Table: US_state_temperatures
| Column | Description |
|---|---|
| state | State name |
| year | Year |
| month | Month |
| AverageTemperature | Average Temperature of US State for a specific month |

#### Step 5: Complete Project Write Up
**Clearly state the rationale for the choice of tools and technologies for the project.**
For this project I used standardized tools for data processing and services from AWS.

In particular it concerns:
* Apache Spark: A unified analytics engine for large-scale data processing which works well with big data sources. It is easy to use because there are tools to use it directly in your Jupyter Notebook, i.e. by using the Data Frame API PySpark. You can use it for SQL-queries, streaming and complex analytics.

* S3: With S3, AWS offers us an inexpensive, easy-to-use storage solution that is characterized by scalability, high availability, security and performance.

* EMR: It is a cloud-based platform for large data volumes. EMR is characterized by the fact that large data volumes can be processed quickly and cost-effectively with Spark. Should performance requirements increase, the service can be easily extended.

**Propose how often the data should be updated and why.**
From the immigration data you can see that we will receive a new file every month. Therefore, it will make sense to start the data pipeline once a month.

**Write a description of how you would approach the problem differently under the following scenarios**
 * The data was increased by 100x.

AWS's flexible services make it easy to increase storage capacity on the S3 bucket and improve performance with additional processing nodes in EMR. 

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.

If the update requirements change, it would be advisable to use orchestration services such as Apache Airflow. DAGs can be executed regularly and thus keep the database up-to-date.

 * The database needed to be accessed by 100+ people.

As described in the first point, the services used by AWS offer the simple possibility of scalability. Additional nodes for processing requests can be added quickly and easily to accommodate a larger user base. 