# Analysis on tourists to the United States
### Data Engineering Capstone Project

#### Project Summary
The project provides data in a data warehouse located on Redshift to report on and analyze tourist/immigrant travel behavior and patterns in relation to a state's characteristics. The scope is limited to tourists/immigrants entering the United States.

The project combines data from US immigration authories detailing when, why and how people came to the United States as well as US demographics and airport codes.

The setup is aimed at giving a lot of flexibility in using the data while ensuring high quality of the data. This way different target groups address a variety of questions and can get value from the data. However, the focus is on providing reporting capabilities to the US authorities. US authorities can e.g. report on the number of tourists/immigrants who came to the US and relate visitors' travel patterns to a state's characteristics.

Since the target groups and their goals are quite diverse, it is important to keep the skill threshold necessary to interact with the data low. For this reason, descriptions on all codes, e.g. travel or airport codes are provided. It saves the user time when creating a report. Additionally, Redshift is used since fewer skills are needed than when using e.g. Apache Spark. 

Since the data is saved in S3 buckets, it allows for the data to be easily scaled if the project meets the approval of the users. More on how the project can be scaled will be detailed under step 5.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, TimestampType as Timestamp
from datetime import datetime, timedelta
import pyspark.sql.functions
from pyspark.sql.functions import udf, col, expr
import os
import configparser

### Step 1: Scope the Project and Gather Data

#### Scope 

The plan is to provide high quality data on visitors to the United States in a data warehouse located on Redshift to allow for easy reports and analyses. Since visitors generate GDP and influence infrastructure decisions, it is important to base decisions on high quality data.

The project combines data from US immigration authories detailing when, why and how people came to the United States as well as US demographic data on population composition. Lastly, airport codes are provided for easier analysis.

The aim is to give different target groups the opportunity to get value from the data. However, the focus is on providing reporting capabilities to the US authorities allowing them to drill down or slice and dice the data as desired. US authorities can report on the number of visitors who came to the US and give details on their distribution to US states, countries of origin and visa type. They also can find out if visitors prefer to travel to rural or urban areas when relating immigration to demographic data provided in this project. This can be valuable when it comes to touristic development of areas in the US. Furthermore, data scientists can analyze travel patterns based on country of origin and age to forecast the number of tourists/immigrants per state or city. 

Further possible questions that can be answered with the project dataset:
1. Do visitors' median age and the median age of visited state correlate?
2. Are there relations between number of visitors and their characteristics and state population?
3. Do cities or states with a large number of foreign-bors attract more foreign visitors?
4. Do visitors on a business visa travel to urban or rural areas?
5. How many airports does a state have and how many visitors does it attract?

The scope is limited to tourists/immigrants entering the United States. This project sample is only focused on immigration data from April 2016, which has more than three million data points. The project rubric states that the dataset should have more than one million rows of data. It is hence assumed that users report on visitors to the US on a monthly basis. US demographics data is also of the year 2016.

Data is gathered and cleaned with the help of Apache Spark since the dataset the project works with is middle to large-sized and Apache Spark is a fast engine for large-scale data processing. Data is then loaded into S3 buckets since it consitutes low-cost storage and provides the possibility to be loaded into a data lake if need be. In a last step the data is loaded into Redshift since it is optimized for OLAP workloads. The goal is to create a data model that allows the users to easily understand and use it.

#### Describe and Gather Data 

•	Visitor arrivals program (I94 Immigration Data): This data comes from the US National Tourism and Trade Office. Each report contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries). Souce: https://travel.trade.gov/research/reports/i94/historical/2016.html
•	U.S. City Demographic Data: This data comes from OpenSoft but originally it is from the US Census Bureau's 2015 American Community Survey. It contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. Source: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/
•	Airport Code Table: This is a simple table of airport codes and corresponding cities. Source: https://datahub.io/core/airport-codes#data

In [None]:
#create the config object and read cfg file
config = configparser.ConfigParser()
config.read('dwh.cfg')
#Accessing the AWS user IAM credentials in the dwh.cfg file using config object
os.environ['KEY']=config['AWS']['KEY']
os.environ['SECRET']=config['AWS']['SECRET']

In [5]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.2").enableHiveSupport().getOrCreate()

In [6]:
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [8]:
df_immigration.show(5)
df_immigration.printSchema()

In [None]:
# load airport data
df_airport = spark.read.format("csv").option("header", True).load("airport-codes_csv.csv")

In [None]:
df_airport.show(5)
df_airport.printSchema()

In [None]:
# create song data schema to ensure that schema is inferred correctly
demoSchema = R([
    Fld("city",Str()),
    Fld("state_name",Str()),
    Fld("median_age",Dbl()),
    Fld("male_population",Dbl()),
    Fld("female_population",Dbl()),
    Fld("total_population",Dbl()),
    Fld("number of veterans",Dbl()),
    Fld("foreign_born",Dbl()),
    Fld("avg_household_size",Dbl()),
    Fld("state_code",Str()),
    Fld("race",Str()),
    Fld("count",Dbl()),
])

In [None]:
# load demographics data
df_us_demograhics = spark.read.format("csv").option("header", True).option("delimiter", ";").schema(demoSchema).load("us-cities-demographics.csv")

In [None]:
df_us_demograhics.show(5)
df_us_demograhics.printSchema()

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

The goal is to clean the data ensuring high quality and eliminating columns with little value while keeping granularity of the data to allow a variety of questions to be answered with it. The next paragraphs give an overview of identified quality issues.

#### I94 Immigration Data:
The data type is sas7bdat. The dataset contains columns that are deprecated according to the data dictionary I94_SAS_Labels_Description.SAS. It also contains columns with codes but missing code descriptions. Futhermore, there are columns that contain largely null values. Since end users probably won't understand travel, visa, city or country codes, the descriptions in I94_SAS_Labels_Description.SAS need to be provided for the users. So instead of 209 as country of residence, the user can now see that the visitor's residence it Japan. 

#### US city demographics dataset:
The data type is csv. Every city is listed multiple times in the dataset since every line states the number of people per race in the city. Since the only column that contains the value per race is 'count' and every other column constitutes the total across all races, duplicates on cities ocurr. This makes it difficult to join demographics data to other datasets. Hence, I decided to join based on state coded but kept the information on city level to be able to determine a state's composition. For example, does it have a lot of large cities or mostly small ones. Cities in this dataset do not fully align with cities in the other datasets. However, it is not a fallacy since not every city has a port or an airport but it needs to be taken into consideration for the data model.

#### Airport dataset:
The data type is csv. There is no information on when the dataset was compiled. It is assumed that it lists airports in the world as of the year 2016. The dataset contains all types of airports all over the world. Hence, the dataset needs to be filtered to fit the scope of the project.
 
#### Cleaning Steps

#### I94 Immigration Data:
SAS date was converted to a standard format since this makes it easier for a user to work with the data.
Duplicate records were dropped to ensure quality.
Depricated columns or columns containing mostly nulls were dropped since they yield nor informational value.
Columns were renamed to make it easier for the user to work with them.
Rows containing null values for column 'cicid' were dropped since it is the primary key of the dataset.
Data types of columns were cast of necessary to ensure that only the necessary storage space is used.
Descriptions on codes were provided as state above.

#### US city demographics dataset:
The race column 'count' was dropped due to the join problem stated above and race being a social construct. 
Furthermore, the column 'Number of Veterans' was dropped since it does not relate to travel.
Since city is the primary key of this dataset it was ensured, that it does not contain null values

#### Airport dataset:
Columns on codes that do not relate to other datasets like 'local_code' or 'ident' were dropped since they cannot be used as join criteria and yield no other informational value. GPS data and elevation were also dropped since they do not pertain to visitors' data.
Only airport with an IATA code were kept since this is the join criterion for the relation to visitors data.
Since the project focuses on visitors to the United States, only US airports are considered. The 'continent' column was consequently dropped since there is only one continent left, namely North America.
The dataset contains closed airports and helipads, baloonports and seaplane bases. These are not relevant since visitors will only come to the US via proper airports that are still open. Hence, they were filtered out and only.
Duplicates were dropped to ensure quality.
The column 'iso_region' was split into country and state code to be able to link it to table us_states.
The column 'coordinates' was split into longitude and latitude to save the user the work to split it.
Columns were cast to the desired data type if schema was inferred incorrectly.

In [None]:
#### Clean airport data

In [None]:
# extract columns to create songs table
df_airport = df_airport.select('iata_code', 'name', 'iso_country','iso_region','municipality','coordinates', 'type')
df_airport.head()


In [None]:
# filter blank iata_codes out since this column will be a primary key and drop duplicates
df_airport = df_airport.filter(df_airport.iata_code != '').dropDuplicates()
# filter to onyl US airports
df_airport = df_airport.filter(df_airport.iso_country == 'US').dropDuplicates()
#only airports are relevant, not helipads etc.
df_airport = df_airport.filter(df_airport.type.contains('airport'))
df_airport.show(5)

In [None]:
# split iso_region in country and state
split_col = pyspark.sql.functions.split(df_airport['iso_region'], '-')
df_airport = df_airport.withColumn('country_code', split_col.getItem(0))
df_airport = df_airport.withColumn('state_code', split_col.getItem(1))
df_airport.show(5)

In [None]:
# split coordinates into longitude and latitude
split_col = pyspark.sql.functions.split(df_airport['coordinates'], ', ')
df_airport = df_airport.withColumn('latitude', split_col.getItem(0))
df_airport = df_airport.withColumn('longitude', split_col.getItem(1))
df_airport.show(5)

In [None]:
df_airport = df_airport.drop('coordinates')
df_airport = df_airport.drop('country_code')
df_airport.show(5)

In [None]:
df_airport.printSchema()
#cast string coordinates to double
df_airport = df_airport.withColumn("latitude", df_airport["latitude"].cast(Dbl()))
df_airport = df_airport.withColumn("longitude", df_airport["longitude"].cast(Dbl()))

In [None]:
# clean demographics
# select relevant columns
df_us_demograhics = df_us_demograhics.select('city', 'state_name', 'median_age','male_population','female_population','total_population', 'foreign_born', 'avg_household_size', 'state_code').dropDuplicates()
df_us_demograhics.show(5)

In [None]:
# ensure that city is not null since it is the primary key
df_us_demograhics = df_us_demograhics.filter(df_us_demograhics.city != '')
df_us_demograhics.show(5)

In [None]:
df_us_demograhics.printSchema()

In [None]:
# clean immigration data
df_immigration.show(5)
df_immigration.printSchema()

In [None]:
# convert sas date to date
def convert_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None

In [None]:
# convert arrival and departure date to date
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), T.DateType())
df_immigration = df_immigration.withColumn("arrival_date", udf_datetime_from_sas("arrdate")).withColumn("departure_date", udf_datetime_from_sas("depdate"))

In [None]:
# make sure that cicid is not null since it is the primary key
df_immigration = df_immigration.dropDuplicates()
df_immigration.printSchema(5)

In [None]:
# select only relevant columns
df_immigration = df_immigration.select('cicid', col("i94yr").alias("year"), 
                                       col("i94mon").alias("month"),
                                       col("i94cit").alias("city_code_origin"),
                                       col("i94res").alias("country_code_residence"),
                                       col("i94port").alias("city_code_destination"),
                                       col("arrival_date"),
                                       col("i94mode").alias("travel_code"),
                                       col("i94addr").alias("state_code_residence"),
                                       col("departure_date"),
                                       col("i94visa").alias("visa_code"),
                                       col("biryear").alias("birth_year"),
                                       col("gender"),
                                       col("airline")
                                      ).distinct()
df_immigration.show(5)

In [None]:
#delete nulls in cicid since it will be the primary key. No null values expected
df_immigration = df_immigration.where(col("cicid").isNotNull())

In [None]:
df_immigration.printSchema()
#cast double columns to int
df_immigration = df_immigration.withColumn("year", df_immigration["year"].cast(Int()))
df_immigration = df_immigration.withColumn("month", df_immigration["month"].cast(Int()))
df_immigration = df_immigration.withColumn("city_code_origin", df_immigration["city_code_origin"].cast(Int()))
df_immigration = df_immigration.withColumn("country_code_residence", df_immigration["country_code_residence"].cast(Int()))
df_immigration = df_immigration.withColumn("travel_code", df_immigration["travel_code"].cast(Int()))
df_immigration = df_immigration.withColumn("visa_code", df_immigration["visa_code"].cast(Int()))
df_immigration = df_immigration.withColumn("birth_year", df_immigration["birth_year"].cast(Int()))
df_immigration.printSchema()
df_immigration.show(5)

In [None]:
def split_codes_to_dict(string, separator):
    dictionary = {}
    for line in string.split("\n"):
        line = line.strip()
        #split into code and country description
        l = line.split(separator) #.strip()
        #save in dicctionary
        string = dict(zip(l[::2], l[1::2]))
        dictionary.update(string)
        #strip leading
        #print(dictionary)
    return dictionary

In [None]:
# get city and residence country codes and description
#I94CIT & I94RES
country_codes= """
   582 =  'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'
   236 =  'AFGHANISTAN'
   101 =  'ALBANIA'
   316 =  'ALGERIA'
   102 =  'ANDORRA'
   324 =  'ANGOLA'
   529 =  'ANGUILLA'
   518 =  'ANTIGUA-BARBUDA'
   687 =  'ARGENTINA '
   151 =  'ARMENIA'
   532 =  'ARUBA'
   438 =  'AUSTRALIA'
   103 =  'AUSTRIA'
   152 =  'AZERBAIJAN'
   512 =  'BAHAMAS'
   298 =  'BAHRAIN'
   274 =  'BANGLADESH'
   513 =  'BARBADOS'
   104 =  'BELGIUM'
   581 =  'BELIZE'
   386 =  'BENIN'
   509 =  'BERMUDA'
   153 =  'BELARUS'
   242 =  'BHUTAN'
   688 =  'BOLIVIA'
   717 =  'BONAIRE, ST EUSTATIUS, SABA' 
   164 =  'BOSNIA-HERZEGOVINA'
   336 =  'BOTSWANA'
   689 =  'BRAZIL'
   525 =  'BRITISH VIRGIN ISLANDS'
   217 =  'BRUNEI'
   105 =  'BULGARIA'
   393 =  'BURKINA FASO'
   243 =  'BURMA'
   375 =  'BURUNDI'
   310 =  'CAMEROON'
   326 =  'CAPE VERDE'
   526 =  'CAYMAN ISLANDS'
   383 =  'CENTRAL AFRICAN REPUBLIC'
   384 =  'CHAD'
   690 =  'CHILE'
   245 =  'CHINA, PRC'
   721 =  'CURACAO' 
   270 =  'CHRISTMAS ISLAND'
   271 =  'COCOS ISLANDS'
   691 =  'COLOMBIA'
   317 =  'COMOROS'
   385 =  'CONGO'
   467 =  'COOK ISLANDS'
   575 =  'COSTA RICA'
   165 =  'CROATIA'
   584 =  'CUBA'
   218 =  'CYPRUS'
   140 =  'CZECH REPUBLIC'
   723 =  'FAROE ISLANDS (PART OF DENMARK)'  
   108 =  'DENMARK'
   322 =  'DJIBOUTI'
   519 =  'DOMINICA'
   585 =  'DOMINICAN REPUBLIC'
   240 =  'EAST TIMOR'
   692 =  'ECUADOR'
   368 =  'EGYPT'
   576 =  'EL SALVADOR'
   399 =  'EQUATORIAL GUINEA'
   372 =  'ERITREA'
   109 =  'ESTONIA'
   369 =  'ETHIOPIA'
   604 =  'FALKLAND ISLANDS'
   413 =  'FIJI'
   110 =  'FINLAND'
   111 =  'FRANCE'
   601 =  'FRENCH GUIANA'
   411 =  'FRENCH POLYNESIA'
   387 =  'GABON'
   338 =  'GAMBIA'
   758 =  'GAZA STRIP' 
   154 =  'GEORGIA'
   112 =  'GERMANY'
   339 =  'GHANA'
   143 =  'GIBRALTAR'
   113 =  'GREECE'
   520 =  'GRENADA'
   507 =  'GUADELOUPE'
   577 =  'GUATEMALA'
   382 =  'GUINEA'
   327 =  'GUINEA-BISSAU'
   603 =  'GUYANA'
   586 =  'HAITI'
   726 =  'HEARD AND MCDONALD IS.'
   149 =  'HOLY SEE/VATICAN'
   528 =  'HONDURAS'
   206 =  'HONG KONG'
   114 =  'HUNGARY'
   115 =  'ICELAND'
   213 =  'INDIA'
   759 =  'INDIAN OCEAN AREAS (FRENCH)' 
   729 =  'INDIAN OCEAN TERRITORY' 
   204 =  'INDONESIA'
   249 =  'IRAN'
   250 =  'IRAQ'
   116 =  'IRELAND'
   251 =  'ISRAEL'
   117 =  'ITALY'
   388 =  'IVORY COAST'
   514 =  'JAMAICA'
   209 =  'JAPAN'
   253 =  'JORDAN'
   201 =  'KAMPUCHEA'
   155 =  'KAZAKHSTAN'
   340 =  'KENYA'
   414 =  'KIRIBATI'
   732 =  'KOSOVO' 
   272 =  'KUWAIT'
   156 =  'KYRGYZSTAN'
   203 =  'LAOS'
   118 =  'LATVIA'
   255 =  'LEBANON'
   335 =  'LESOTHO'
   370 =  'LIBERIA'
   381 =  'LIBYA'
   119 =  'LIECHTENSTEIN'
   120 =  'LITHUANIA'
   121 =  'LUXEMBOURG'
   214 =  'MACAU'
   167 =  'MACEDONIA'
   320 =  'MADAGASCAR'
   345 =  'MALAWI'
   273 =  'MALAYSIA'
   220 =  'MALDIVES'
   392 =  'MALI'
   145 =  'MALTA'
   472 =  'MARSHALL ISLANDS'
   511 =  'MARTINIQUE'
   389 =  'MAURITANIA'
   342 =  'MAURITIUS'
   760 =  'MAYOTTE (AFRICA - FRENCH)' 
   473 =  'MICRONESIA, FED. STATES OF'
   157 =  'MOLDOVA'
   122 =  'MONACO'
   299 =  'MONGOLIA'
   735 =  'MONTENEGRO' 
   521 =  'MONTSERRAT'
   332 =  'MOROCCO'
   329 =  'MOZAMBIQUE'
   371 =  'NAMIBIA'
   440 =  'NAURU'
   257 =  'NEPAL'
   123 =  'NETHERLANDS'
   508 =  'NETHERLANDS ANTILLES'
   409 =  'NEW CALEDONIA'
   464 =  'NEW ZEALAND'
   579 =  'NICARAGUA'
   390 =  'NIGER'
   343 =  'NIGERIA'
   470 =  'NIUE'
   275 =  'NORTH KOREA'
   124 =  'NORWAY'
   256 =  'OMAN'
   258 =  'PAKISTAN'
   474 =  'PALAU'
   743 =  'PALESTINE' 
   504 =  'PANAMA'
   441 =  'PAPUA NEW GUINEA'
   693 =  'PARAGUAY'
   694 =  'PERU'
   260 =  'PHILIPPINES'
   416 =  'PITCAIRN ISLANDS'
   107 =  'POLAND'
   126 =  'PORTUGAL'
   297 =  'QATAR'
   748 =  'REPUBLIC OF SOUTH SUDAN'
   321 =  'REUNION'
   127 =  'ROMANIA'
   158 =  'RUSSIA'
   376 =  'RWANDA'
   128 =  'SAN MARINO'
   330 =  'SAO TOME AND PRINCIPE'
   261 =  'SAUDI ARABIA'
   391 =  'SENEGAL'
   142 =  'SERBIA AND MONTENEGRO'
   745 =  'SERBIA' 
   347 =  'SEYCHELLES'
   348 =  'SIERRA LEONE'
   207 =  'SINGAPORE'
   141 =  'SLOVAKIA'
   166 =  'SLOVENIA'
   412 =  'SOLOMON ISLANDS'
   397 =  'SOMALIA'
   373 =  'SOUTH AFRICA'
   276 =  'SOUTH KOREA'
   129 =  'SPAIN'
   244 =  'SRI LANKA'
   346 =  'ST. HELENA'
   522 =  'ST. KITTS-NEVIS'
   523 =  'ST. LUCIA'
   502 =  'ST. PIERRE AND MIQUELON'
   524 =  'ST. VINCENT-GRENADINES'
   716 =  'SAINT BARTHELEMY' 
   736 =  'SAINT MARTIN' 
   749 =  'SAINT MAARTEN' 
   350 =  'SUDAN'
   602 =  'SURINAME'
   351 =  'SWAZILAND'
   130 =  'SWEDEN'
   131 =  'SWITZERLAND'
   262 =  'SYRIA'
   268 =  'TAIWAN'
   159 =  'TAJIKISTAN'
   353 =  'TANZANIA'
   263 =  'THAILAND'
   304 =  'TOGO'
   417 =  'TONGA'
   516 =  'TRINIDAD AND TOBAGO'
   323 =  'TUNISIA'
   264 =  'TURKEY'
   161 =  'TURKMENISTAN'
   527 =  'TURKS AND CAICOS ISLANDS'
   420 =  'TUVALU'
   352 =  'UGANDA'
   162 =  'UKRAINE'
   296 =  'UNITED ARAB EMIRATES'
   135 =  'UNITED KINGDOM'
   695 =  'URUGUAY'
   163 =  'UZBEKISTAN'
   410 =  'VANUATU'
   696 =  'VENEZUELA'
   266 =  'VIETNAM'
   469 =  'WALLIS AND FUTUNA ISLANDS'
   757 =  'WEST INDIES (FRENCH)' 
   333 =  'WESTERN SAHARA'
   465 =  'WESTERN SAMOA'
   216 =  'YEMEN'
   139 =  'YUGOSLAVIA'
   301 =  'ZAIRE'
   344 =  'ZAMBIA'
   315 =  'ZIMBABWE'
   403 =  'INVALID: AMERICAN SAMOA'
   712 =  'INVALID: ANTARCTICA' 
   700 =  'INVALID: BORN ON BOARD SHIP'
   719 =  'INVALID: BOUVET ISLAND (ANTARCTICA/NORWAY TERR.)'
   574 =  'INVALID: CANADA'
   720 =  'INVALID: CANTON AND ENDERBURY ISLS' 
   106 =  'INVALID: CZECHOSLOVAKIA'
   739 =  'INVALID: DRONNING MAUD LAND (ANTARCTICA-NORWAY)' 
   394 =  'INVALID: FRENCH SOUTHERN AND ANTARCTIC'
   501 =  'INVALID: GREENLAND'
   404 =  'INVALID: GUAM'
   730 =  'INVALID: INTERNATIONAL WATERS' 
   731 =  'INVALID: JOHNSON ISLAND' 
   471 =  'INVALID: MARIANA ISLANDS, NORTHERN'
   737 =  'INVALID: MIDWAY ISLANDS' 
   753 =  'INVALID: MINOR OUTLYING ISLANDS - USA'
   740 =  'INVALID: NEUTRAL ZONE (S. ARABIA/IRAQ)' 
   710 =  'INVALID: NON-QUOTA IMMIGRANT'
   505 =  'INVALID: PUERTO RICO'
    0  =  'INVALID: STATELESS'
   705 =  'INVALID: STATELESS'
   583 =  'INVALID: UNITED STATES'
   407 =  'INVALID: UNITED STATES'
   999 =  'INVALID: UNKNOWN'
   239 =  'INVALID: UNKNOWN COUNTRY'
   134 =  'INVALID: USSR'
   506 =  'INVALID: U.S. VIRGIN ISLANDS'
   755 =  'INVALID: WAKE ISLAND'  
   311 =  'Collapsed Tanzania (should not show)'
   741 =  'Collapsed Curacao (should not show)'
    54 =  'No Country Code (54)'
   100 =  'No Country Code (100)'
   187 =  'No Country Code (187)'
   190 =  'No Country Code (190)'
   200 =  'No Country Code (200)'
   219 =  'No Country Code (219)'
   238 =  'No Country Code (238)'
   277 =  'No Country Code (277)'
   293 =  'No Country Code (293)'
   300 =  'No Country Code (300)'
   319 =  'No Country Code (319)'
   365 =  'No Country Code (365)'
   395 =  'No Country Code (395)'
   400 =  'No Country Code (400)'
   485 =  'No Country Code (485)'
   503 =  'No Country Code (503)'
   589 =  'No Country Code (589)'
   592 =  'No Country Code (592)'
   791 =  'No Country Code (791)'
   849 =  'No Country Code (849)'
   914 =  'No Country Code (914)'
   944 =  'No Country Code (944)'
   996 =  'No Country Code (996)'"""

In [None]:
# remove quotes
country_codes = country_codes.replace("'",'')

In [None]:
# convert dictionary to data frame to be able to convert it to parquet file
#https://stackoverflow.com/questions/61339594/how-to-convert-a-dictionary-to-dataframe-in-pyspark
data_country = {}

data_country = split_codes_to_dict(country_codes, " =  ")
df_country_code = spark.createDataFrame(data_country.items(), 
                      schema=R(fields=[
                          Fld("country_code", Str()), 
                          Fld("country_name", Str())]))

df_country_code = df_country_code.withColumn("country_code", df_country_code["country_code"].cast(Int()))
df_country_code.show()
df_country_code.printSchema()

In [None]:
#delete nulls in country_code since it will be the primary key. No null values expected
df_country_code = df_country_code.where(col("country_code").isNotNull())

In [None]:
#travel code and description and remove quotes
travel_code = """
   1 = 'Air'
   2 = 'Sea'
   3 = 'Land'
   9 = 'Not reported'""".replace("'",'')

In [None]:
# convert dictionary to data frame to be able to convert it to parquet file
data_travel = {}

data_travel = split_codes_to_dict(travel_code, " = ")
df_travel_code = spark.createDataFrame(data_travel.items(), 
                      schema=R(fields=[
                          Fld("travel_code", Str()), 
                          Fld("travel_name", Str())]))

df_travel_code = df_travel_code.withColumn("travel_code", df_travel_code["travel_code"].cast(Int()))
df_travel_code.show()
df_travel_code.printSchema()

In [None]:
#delete nulls in travel_code since it will be the primary key. No null values expected
df_travel_code = df_travel_code.where(col("travel_code").isNotNull())

In [None]:
visa_codes = """
   1 = 'Business'
   2 = 'Pleasure'
   3 = 'Student'
""".replace("'",'')

In [None]:
# convert dictionary to data frame to be able to convert it to parquet file
data_visa = {}

data_visa = split_codes_to_dict(visa_codes, " = ")
df_visa_code = spark.createDataFrame(data_visa.items(), 
                      schema=R(fields=[
                          Fld("visa_code", Str()), 
                          Fld("visa_name", Str())]))

df_visa_code = df_visa_code.withColumn("visa_code", df_visa_code["visa_code"].cast(Int()))
df_visa_code.show()
df_visa_code.printSchema()

In [None]:
#delete nulls in visa_code since it will be the primary key. No null values expected
df_visa_code = df_visa_code.where(col("visa_code").isNotNull())

In [None]:
#number of rows per dataframe to compare to redshift load
print((df_immigration.count(), len(df_immigration.columns)))
print((df_airport.count(), len(df_airport.columns)))
print((df_us_demograhics.count(), len(df_us_demograhics.columns)))
print((df_country_code.count(), len(df_country_code.columns)))
print((df_travel_code.count(), len(df_travel_code.columns)))
print((df_visa_code.count(), len(df_visa_code.columns)))

In [None]:
#Write to S3
#output_data = "output/" #create your own bucket
output_data = config.get('S3', 'output_data')

In [None]:
df_us_demograhics.write.mode('overwrite').parquet(output_data + "us_demographics/")

In [None]:
df_immigration.write.mode('overwrite').parquet(output_data + "us_immigration/")

In [None]:
df_airport.write.mode('overwrite').parquet(output_data + "airport/")

In [None]:
df_country_code.write.mode('overwrite').parquet(output_data + "country_code/")

In [None]:
df_travel_code.write.mode('overwrite').parquet(output_data + "travel_code/")

In [None]:
df_visa_code.write.mode('overwrite').parquet(output_data + "visa_code/")

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

As mentioned earlier, the goal of this project is to provide high quality data to allow for a variety of analyses and reports while making it as easy as possible for the user to interact with the data. Furthermore, the data will be updated in batches and does not consitute transactional data. Hence a data warehouse and a relational database setup was chosen since it provides structured data for query and analysis using a understandable and performant dimensional model facilitating analyses.

A modified version of the start schema is used to model the data since analyses on travel data is the goal. The star schema simplifies queries and allows for fast aggregations which fit the goals of this project. There are two fact tables and five dimensional tables. A visual representation of the data model can be found in the README file.

Primary and foreign keys are defined in the tables even though Redshift does not enforce them since the query optimizer uses those constraints to generate more efficient query plans.

DISTKEY and SORTKEY are set to optimize data distribution and query processing. The choice of DISTSTYLE is left to Amazon Redshift as it is best practice.

The six staging tables that handle load from S3 buckets to increase efficiency of ETL processes and ensure data integrity.

The two fact tables are:

#### us_immigration
This table contains all information available on visitors to the US include travel, visa and country codes. The column 'cicid' is the primary key for this table. Foreign keys are set to illustrate relations to dimension tables. Not null constraints are added to foreign keys. The column 'state_code_residence' was chosen as a DISTKEY and SORTKEY since it is used to link to states and municipalties and will be joined with those tables often.

#### us_municipality
This table holds facts about a US municipalities such as total population and average household size. The primary key is a composite of city and state_code since the city name must not be unique. Foreign key is set to 'state_code' to illustrate the relation to table 'us_states'. The column 'state_code' was chosen as a DISTKEY and SORTKEY since it is used to link to 'us_states' and will be joined with this table often. This ensures that data with same value in 'state_code' are on the same slice speeding up queries. Since the cities in this table do not align with cities in 'us_immigration' or 'airport', it is not linked to them.

The dimension tables are:

#### us_states
This table contains all state codes and state names of the US. Primary key is 'state_code'. The column 'state_code' was chosen as DISTKEY and SORTKEY since it is the DISTKEY for referenced tables 'us_immigration' and 'us_municipality'. This way data that is joined is on the same slice.

#### travel_code
This table contains information on the mode of transport used by the visitor entering the US. 'travel_code' is the primary and distribution key since it is unique and used to join table to 'us_immigration'.

#### visa_code
This table contains information on the type of visa the visitor used. 'visa_code' is the primary and distribution key since it is unique and used to join table to 'us_immigration'.

#### airport
This table holds information on US airports. The column 'iata_code' is the primary since it is by definition unique. The column 'state_code' was chosen as a DISTKEY and SORTKEY since it is used to link to 'us_immigration' and will be joined with this table often. 'city_code_destination' is not the DISTKEY in table 'us_immigration' hence making 'iata_code' the DISTKEY does not yield any efficiencies.

and distribution key since it is by definiton unique and used to join table to 'us_immigration'.

#### country
This table contains country codes and its names. The are the countries visitors to the US are coming from. 'country_code' is the primary and distribution key since it is unique and used to join table to 'us_immigration'.

The staging tables are:

staging_airport
staging_us_demographics
staging_us_immigration
staging_country_code
staging_visa_code
staging_travel_code

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Load datasets:
The data files that are available as csv are loaded from the workspace into pyspark dataframes. Immigration data is provided in sas7bdat file format and saved on disk. The path is the following: ../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat.
2. Clean data
Ensure primary keys are not null since Redshift does not enforce key constraints
3. Write data as parquet files to S3 buckets
4. Drop tables on Redshift if necessary
5. Create tables on Redshift
6. Copy data from S3 into staging tables
7. Insert data from staging tables to final tables
8. Run data quality checks


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

In [None]:
# Write code here
import psycopg2
import configparser
from sql_queries import create_table_queries, drop_table_queries, copy_table_queries, insert_table_queries, data_quality_count_check, data_quality_null_references_check

In [None]:
def drop_tables(cur, conn):
    """
        Load data in parquet files from S3 to staging tables on Redshift specified in copy_table_queries 
        
        Arguments:
            cur - PostgreSQL cursor object
            conn - psycopg2 connection instance
        
        Returns:
            None
    """
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()

In [None]:
def create_tables(cur, conn):
    """
        Load data in parquet files from S3 to staging tables on Redshift specified in copy_table_queries 
        
        Arguments:
            cur - PostgreSQL cursor object
            conn - psycopg2 connection instance
        
        Returns:
            None
    """
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()

In [None]:
def load_staging_tables(cur, conn):
    """
        Load data in parquet files from S3 to staging tables on Redshift specified in copy_table_queries 
        
        Arguments:
            cur - PostgreSQL cursor object
            conn - psycopg2 connection instance
        
        Returns:
            None
    """
    for query in copy_table_queries:
        cur.execute(query)
        conn.commit()

In [None]:
def insert_tables(cur, conn):
    """
        Load data from staging tables to analytics tables specified in insert_table_queries
        
        Arguments:
            cur - PostgreSQL cursor object
            conn - psycopg2 connection instance
        
        Returns:
            None
    """
    for query in insert_table_queries:
        cur.execute(query)
        conn.commit()

In [None]:
# Perform quality checks here
#run quality checks to see how many records exist in each redshift table
def count_check(cur, conn):
    for query in data_quality_count_check:
        cur.execute(query)
        output = cur.fetchall()
        print("{} has {} records".format(query.split(' ')[-1], output[0][0]))
        conn.commit()

In [None]:
# Perform quality checks here
#run quality checks to see how many records exist in each redshift table
def null_reference_check(cur, conn):
    for query in data_quality_null_references_check:
        cur.execute(query)
        output = cur.fetchall()
        print("table {}".format(query.split(' ')[3], output[0][0]))
        print("column {} has {} null references".format(query.split(' ')[1], output[0][0]))
        conn.commit()

In [None]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

drop_tables(cur, conn)
create_tables(cur, conn)
load_staging_tables(cur, conn)
insert_tables(cur, conn)
#count_check(cur, conn)
#null_reference_check(cur, conn)

conn.close()

#### 4.2 Data Quality Checks
* To ensure that the pipeline ran as expected the count of the tables can be compared to the count of the dataframes before writing them to s3.
* The second data quality check outputs the number of null references for foreign keys on tables.

In [None]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

count_check(cur, conn)
null_reference_check(cur, conn)

conn.close()

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In [None]:
### Rationale for the choice of tools and technologies for the project
Apache Spark is used to clean and write the data to S3 since the datasets the project works with are middle to large-sized and Apache Spark is a fast engine for large-scale data processing. Furthermore, it can be used under its open source license.
Data is loaded into S3 buckets since it consitutes low-cost storage and provides the possibility to be loaded into a data lake if need be. 
Processed data is stored in Apache Parquet format since its provides efficient columnar data storage and compression and works well in combination with Redshift. Additionally, it is licensed under the Apache software foundation and available to any project
Redshift is used since fewer skills are needed than when using e.g. Apache Spark. Any user with basic SQL knowledge can get value from the data by using OLAP queries on Redshift. Furthermore, Redshift is scalable and expenses can be controlled by spinning clusters up or down as needed.
Node type DC2 is used for Redshift over DC1 since these are the same price as DC1 instances of the same size, but I/O is significantly higher.

### Update frequency
I94 immigration data is published on a monthly basis and should be updated every month to provide the latest data to the user. Codes used in the data set should be updated from the SAS descriptions if necessary.
U.S. City Demographic Data data is published on a yearly basis and should be updated once new data is published by the US Census Bureau.
Airport dataset should be updated with every load of I94 immigration data to ensure data quality

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

### Scenario: data increased by 100x
The RAM to process data in the current fashion would probably not be enough. The cleaning process can be run on an Apache Spark cluster that parallelizes the task across the RAM of multiple machines. 
Increasing the number of prefixe in an Amazon S3 buckets enables us to parallelize reads since S3 can achieve at least 3,500 PUT/COPY/POST/DELETE requests per second per prefix in a bucket. 
It could also be considered to add more nodes on the Redshift cluster since Redshift is designed to scale out by adding nodes to the cluster. Adding nodes adds disk space as well as computing power

### Scenario: data populates a dashboard that must be updated on a daily basis by 7am every day
The ETL pipeline should be run using a tool especially designed for ETL piplines. This allows for more control over the workflow and includes scheduling and logging.
Apache Airflow could be used separating the current pipeline into tasks run with a DAG (Directed Acyclic Graph) that performs these inter-dependent tasks. This would allow for tables to be truncated or dropped individually. The pipeline can be scheduled to run every night notifying data engineers about any failures.

### Scenario: database needed to be accessed by 100+ people
This is a strength of Amazon Redshift. Its columnar based storage system allows fast processing of aggregation queries simultaneously by large number of users. It could also be considered to add more nodes on the Redshift cluster to get more computing power