# US Immigration Analytics
### Data Engineering Capstone Project

#### Project Summary
Implements analytical tables that can answer different queries regarding immigration to different states in the US.

Project steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. 
What data do you use? 
What is your end solution look like? What tools did you use? etc



**Project goal:**
- Build a star schema with the immigration data as a fact table and dates, airport code, us city demographic and world tempreture as dimension tables.
- This will allow varies kind of analysis on the immigration data, for example what is the destination state for most USA immigrants that came from Egypt.
- I will restructure the data in the workspace using Spark, then I'm going to upload the structured data into AWS S3 using boto, finally I will create the star schema on AWS Redshift.
- I will access the tables from python and perform my queries on AWS Redshift.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

I'm using 4 datasets provided by Udacity:
 - **I94 Immigration Data**
   - **Source:** https://travel.trade.gov/research/reports/i94/historical/2016.html
   - **Path in Udacity workspace:** ../../data/18-83510-I94-Data-2016/
   - There's a file for each month of the year, e.g. i94_apr16_sub.sas7bdat
   - **Full file path:** ../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat
   - These files are large
   - Data are reported in a **monthly** publication highlighting overseas visitor arrivals by country of residence, ports of entry, mode of transportation, type of visa, and more.
   - **Description:** https://travel.trade.gov/research/programs/i94/description.asp , https://travel.trade.gov/research/programs/i94/index.asp
   - **Size:** 6GB
   - **Count:** 40,790,529
   - 29 Columns:
| Column   | Type   | Description                                                                                                                                                                                                                    |
|----------|--------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| cicid    | double | Id, Part of the composite primary key                                                                                                                                                                                          |
| i94yr    | double | 4 digit year of the arrival, Part of the Composite primary key                                                                                                                                                                 |
| i94mon   | double | Numeric month of the arrival, Part of the composite primary key                                                                                                                                                                |
| i94cit   | double | Visitor country of citizenship (3 digit code of origin city)                                                                                                                                                                   |
| i94res   | double | Visitor country of residence                                                                                                                                                                                                   |
| i94port  | string | Port of entry, can be in the USA or outside USA. (City, State if USA/Country otherwise)  3 character code of destination city  --> Foreign key City Temperature (City) --> Foreign key U.S. City Demographics (City and State) |
| arrdate  | double | Arrival Date in the USA, It is a SAS date numeric field                                                                                                                                                                        |
| i94mode  | double | Port mode (Air, Sea, Land, Not reported + missing values). 1 digit travel code                                                                                                                                                 |
| i94addr  | string | Landing state. --> Foreign Key to the U.S. City Demographics (State)                                                                                                                                                           |
| depdate  | double | Departure Date from the USA. It is a SAS date numeric field                                                                                                                                                                    |
| i94bir   | double | Age of Respondent in Years                                                                                                                                                                                                     |
| i94visa  | double | Visa code (Business, Pleasure, Student). Reason for immigration                                                                                                                                                                |
| count    | double | -Used for summary statistics (I think they might consider a group of visitor as 1 record and this field represent the count of the group?)                                                                                     |
| dtadfile | string | -Character Date Field - Date added to I-94 Files                                                                                                                                                                               |
| visapost | string | Department of State where Visa was issued (Where the visa was issued before arriving to USA)                                                                                                                                   |
| occup    | string | -Occupation that will be performed in U.S. (Profession)                                                                                                                                                                        |
| entdepa  | string | -Arrival Flag - admitted or paroled into the U.S.                                                                                                                                                                              |
| entdepd  | string | -Departure Flag - Departed, lost I-94 or is deceased                                                                                                                                                                           |
| entdepu  | string | -Update Flag - Either apprehended, overstayed, adjusted to perm residence                                                                                                                                                      |
| matflag  | string | Match flag - Match of arrival and departure records (If value missing -> No depature date)                                                                                                                                     |
| biryear  | double | 4 digit year of birth                                                                                                                                                                                                          |
| dtaddto  | string | Character Date Field - Date to which admitted to U.S. (allowed to stay until)                                                                                                                                                  |
| gender   | string | Non-immigrant sex                                                                                                                                                                                                              |
| insnum   | string | -INS number                                                                                                                                                                                                                    |
| airline  | string | Code of the Airline used to arrive in U.S.  --> Foreign Key to Airport Codes, IATA, and Local Code?  --> Should also use municipality column in Airport Codes                                                                  |
| admnum   | double | Admission Number is the number on a CBP Form I–94 or CBP Form I–94W, Does this mean it's unique? The answer is no                                                                                                              |
| fltno    | string | Flight number of Airline used to arrive in U.S.                                                                                                                                                                                |
| visatype | string | (B1, B2 .. etc) Class of admission legally admitting the non-immigrant to temporarily stay in U.S.                                                                                                                             |
   - There is also a data dictionary table provided for this dataset ***I94_SAS_Labels_Descriptions.SAS***, it contains some code to value mapping, and I will create a table for each map:
     - **I94CIT & I94RES:** 3 digits maps to a country name
     - **I94PORT:** 3 characters maps to city and state code or country
     - **I94MODE:** 1 digit maps to port type (Air, Sea, Land, Not reported)
     - **I94ADDR:** 2 characters maps to a USA state name
     - **I94VISA:** 1 digit maps to a visa category (Business, Pleasure, Student)
     

  
 - **World Temperature Data**
   - **Source:** https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data
   - **Path:** ../../data2/
   - **One file:** GlobalLandTemperaturesByCity.csv
   - **Size:** 509MB
   - **Count:** 8,599,212
   - 7 Columns
| Column                        | Type   | Description                                                      |
|-------------------------------|--------|------------------------------------------------------------------|
| dt                            | date   | Date: starts in 1750 for average land temperature (Can be empty) |
| AverageTemperature            | double | Global average land temperature in celsius (Can be empty)        |
| AverageTemperatureUncertainty | double | The 95% confidence interval around the average (Can be empty)    |
| City                          | string | 3448 Unique values                                               |
| Country                       | string |                                                                  |
| Latitude                      | string |                                                                  |
| Longitude                     | string |                                                                  |

  
 - **U.S. City Demographic Data**
   - **Source:** https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
   - **Path:** us-cities-demographics.csv
   - **Size:** 252KB
   - **Count:** 2,891
   - This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.
   - 12 Columns 
| Column                 | Type   | Description                                                                                                        |
|------------------------|--------|--------------------------------------------------------------------------------------------------------------------|
| City                   | string | City within USA                                                                                                    |
| State                  | string | The USA State of the city                                                                                          |
| Median Age             | double | Median of the Ages of the people living in that city                                                               |
| Male Population        | int    | Number of males in the city (Can be null) (Same across different records with same city)                           |
| Female Population      | int    | of females in the city (Can be null) (Same across different records with same city)                                |
| Total Population       | int    | Sum of male and female population (Same across different records with same city)                                   |
| Number of Veterans     | int    | USA Veterans count                                                                                                 |
| Foreign-born           | int    | Number of foreign born americans                                                                                   |
| Average Household Size | double | Average number of a household                                                                                      |
| State Code             | string | Code of the sate                                                                                                   |
| Race                   | string | Race of the count (White, Asian, American Indian and Alaska Native, Hispanic or Latino, Black or African-American) |
| Count                  | int    | Count of people from the race (Races can be mixed also)                                                            |


  
 - **Airport Codes Data**
   - **Source:** https://datahub.io/core/airport-codes#data
   - **Path:** airport-codes_csv.csv
   - Simple table of airport codes and corresponding cities
   - This data is updated **nightly**.
   - **Size:** 5.8 MB
   - **Count:** 55,075
   - The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code
   - 12 Columns
| Column       | Type   | Description                                                                                         |
|--------------|--------|-----------------------------------------------------------------------------------------------------|
| ident        | string | -Identifier might be the airport code? - nope, iata and local codes are the airport codes           |
| type         | string | Airport type [heliport,small_airport,closed,seaplane_base,balloonport,medium_airport,large_airport] |
| name         | string | Airport name                                                                                        |
| elevation_ft | int    | Elavation of the airport from sea level                                                             |
| continent    | string | Code for continent of the Airport [NaN, OC, AF, AN, EU, AS, SA]                                     |
| iso_country  | string | Code for country of the Airport                                                                     |
| iso_region   | string | Code for region of the Airport                                                                      |
| municipality | string | City of the Airport                                                                                 |
| gps_code     | string | Airport GPS code (Contains nulls)                                                                   |
| iata_code    | string | IATA airport code (Contains nulls)                                                                  |
| local_code   | string | ICAO airport code (Contains nulls)                                                                  |
| coordinates  | string | Airport coordinates lat., long. comma separated                                                     |
  

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

- I will do a full analysis on the data sets here to understand how can I use them to create the a conceptual model.


In [2]:
# Read in the sample immigration data into a Pandas DataFrame
#fname = 'immigration_data_sample.csv'
#df = pd.read_csv(fname)
#df.info()
#df.head()

In [40]:
# Read SaS file using Spark
#df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#df_spark.printSchema()

#write to parquet
#df_spark.write.parquet("i94_immigration/i94_apr16_sub")
#df_spark=spark.read.parquet("i94_immigration/i94_apr16_sub")
#df_spark.count()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [3]:
# Do all imports and installs here 
import pandas as pd
import numpy as np
import glob   # For reading all files in a directory
import ntpath # For reading file name from path
from datetime import datetime, timedelta
from pyspark.sql.functions import *
from pyspark.sql.types import *

#### i94_immigration, checking schema to decide what types need to be changed
- **cicid** type to long
- **i94yr, i94mon, i94cit, i94res, i94mode, i94bir, i94visa** to int 
- **arrdate, depdate** to date
- **matflag** has 2 values, (null, 'M'), need to be altered later to boolean
- **biryear** to int
- **dmnum** type to long
- I'm not interested in the following columns **[count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,insnum]**

In [4]:
# Checking schema to decide what types need to be changed
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark.printSchema()
df_spark.show(5)

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [5]:
# drop unwanted columns
cols_to_drop = ['count','dtadfile','visapost','occup','entdepa','entdepd','entdepu','insnum']
df_spark = df_spark.drop(*cols_to_drop)
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [4]:
# Used for converting matflag to boolean
@udf("boolean")
def isMatch(x):
    return (True if x == 'M' else False)

# Used for Unifying null and 0 values to be 9 instead for i94mode 
@udf("integer")
def isModeNull(x):
    return (9 if (x == None) or (x == 0) else x)

# Used to convert SAS date into DateType
def convert_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), DateType())

In [57]:
# Alter some columns types:
# cicid type to long
df_spark = df_spark.withColumn("cicid", df_spark["cicid"].cast(LongType()))
# i94yr, i94mon, i94cit, i94res, arrdate, i94mode, depdate, i94bir, i94visa to int 
df_spark = df_spark.withColumn("i94yr", df_spark["i94yr"].cast(IntegerType()))
df_spark = df_spark.withColumn("i94mon", df_spark["i94mon"].cast(IntegerType()))
df_spark = df_spark.withColumn("i94cit", df_spark["i94cit"].cast(IntegerType()))
df_spark = df_spark.withColumn("i94res", df_spark["i94res"].cast(IntegerType()))

df_spark = df_spark.withColumn("arrdate", udf_datetime_from_sas("arrdate"))

df_spark = df_spark.withColumn("i94mode", df_spark["i94mode"].cast(IntegerType()))
# Use 9 instead of null for missing values, use 9 instead of 0 for Not reported values
df_spark = df_spark.withColumn("i94mode", isModeNull(df_spark["i94mode"]))

df_spark = df_spark.withColumn("depdate", udf_datetime_from_sas("depdate"))

df_spark = df_spark.withColumn("i94bir", df_spark["i94bir"].cast(IntegerType()))
df_spark = df_spark.withColumn("i94bir", df_spark["i94bir"].cast(IntegerType()))
df_spark = df_spark.withColumn("i94visa", df_spark["i94visa"].cast(IntegerType()))
# matflag has 2 values, (null, 'M'), to boolean
df_spark = df_spark.withColumn("match_flag",isMatch(df_spark["matflag"]))
df_spark = df_spark.drop("matflag")
df_spark = df_spark.withColumnRenamed("match_flag","matflag")
# biryear to int
df_spark = df_spark.withColumn("biryear", df_spark["biryear"].cast(IntegerType()))                                                
# admnum type to long
df_spark = df_spark.withColumn("admnum", df_spark["admnum"].cast(LongType()))
df_spark.printSchema()

root
 |-- cicid: long (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: long (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- matflag: boolean (nullable = true)



In [5]:
# Function to restructure the dataframe
def restructure_i94(df):
    # Drop not needed columns - Noticed some new columns in one of the months, [validres,delete_days,delete_mexl,delete_dup,delete_visa,delete_recdup]
    cols_to_drop = ['count','dtadfile','visapost','occup','entdepa','entdepd','entdepu','insnum','validres','delete_days','delete_mexl','delete_dup','delete_visa','delete_recdup']
    df = df.drop(*cols_to_drop)
    # Alter some columns types:
    # cicid type to long
    df = df.withColumn("cicid", df["cicid"].cast(LongType()))
    # i94yr, i94mon, i94cit, i94res, arrdate, i94mode, depdate, i94bir, i94visa to int 
    df = df.withColumn("i94yr", df["i94yr"].cast(IntegerType()))
    df = df.withColumn("i94mon", df["i94mon"].cast(IntegerType()))
    df = df.withColumn("i94cit", df["i94cit"].cast(IntegerType()))
    df = df.withColumn("i94res", df["i94res"].cast(IntegerType()))
    
    df = df.withColumn("arrdate", udf_datetime_from_sas("arrdate"))
    
    df = df.withColumn("i94mode", df["i94mode"].cast(IntegerType()))
    # Use 8 instead of null for missing values
    df = df.withColumn("i94mode", isModeNull(df["i94mode"]))
    
    df = df.withColumn("depdate", udf_datetime_from_sas("depdate"))
    
    df = df.withColumn("i94bir", df["i94bir"].cast(IntegerType()))
    df = df.withColumn("i94bir", df["i94bir"].cast(IntegerType()))
    df = df.withColumn("i94visa", df["i94visa"].cast(IntegerType()))
    # matflag has 2 values, (null, 'M'), to boolean
    df = df.withColumn("match_flag",isMatch(df["matflag"]))
    df = df.drop("matflag")
    df = df.withColumnRenamed("match_flag","matflag")
    # biryear to int
    df = df.withColumn("biryear", df["biryear"].cast(IntegerType()))                                                
    # admnum type to long
    df = df.withColumn("admnum", df["admnum"].cast(LongType()))
    return df

In [59]:
# Convert SAS files to Parquet files and store them localy
i94SASFiles = glob.glob("../../data/18-83510-I94-Data-2016/*") # Gets all SAS files
for i94SASFile in i94SASFiles:
    fileName = ntpath.basename(i94SASFile).split(".")[0]
    df_spark = spark.read.format('com.github.saurfang.sas.spark').load(i94SASFile)
    df_spark = restructure_i94(df_spark)
    df_spark.coalesce(1).write.parquet("i94_immigration/"+fileName)

In [6]:
# Read all i94DF parquet dataset into a Dataframe
i94DF = None # i94 Dataframe
i94ParquetFolders = glob.glob("i94_immigration/*") 

i94_schema = StructType([
    StructField('cicid', LongType(), True),
    StructField('i94yr', IntegerType(), True),
    StructField('i94mon', IntegerType(), True),
    StructField('i94cit', IntegerType(), True),
    StructField('i94res', IntegerType(), True),
    StructField('i94port', StringType(), True),
    StructField('arrdate', DateType(), True),
    StructField('i94mode', IntegerType(), True),
    StructField('i94addr', StringType(), True),
    StructField('depdate', DateType(), True),
    StructField('i94bir', IntegerType(), True),
    StructField('i94visa', IntegerType(), True),
    StructField('biryear', IntegerType(), True),
    StructField('dtaddto', StringType(), True),
    StructField('gender', StringType(), True),
    StructField('airline', StringType(), True),
    StructField('admnum', LongType(), True),
    StructField('fltno', StringType(), True),
    StructField('visatype', StringType(), True),
    StructField('matflag', BooleanType(), True)
])
for i94ParquetFolder in i94ParquetFolders:
    df_spark=spark.read.schema(i94_schema).parquet(i94ParquetFolder)
    if i94DF is None:
        i94DF = df_spark
    else:
        i94DF = i94DF.union(df_spark)
# Total count of all records in i94 
totalCount = i94DF.count()
print("total count = "+str(totalCount))
# Show some data
i94DF.show(5)
i94DF.printSchema()

total count = 40790529
+-----+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-------+--------+------+-------+-----------+-----+--------+-------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|biryear| dtaddto|gender|airline|     admnum|fltno|visatype|matflag|
+-----+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-------+--------+------+-------+-----------+-----+--------+-------+
|  687| 2016|     9|   213|   213|    HOU|2016-09-01|      1|     TX|2016-09-28|    27|      1|   1989|11022016|     M|     QK|95279388530| 8111|      B1|   true|
|  799| 2016|     9|   369|   369|    WAS|2016-09-01|      1|     VA|2016-09-28|    71|      2|   1945|11032016|     M|     KL|95310934730|  651|      B2|   true|
| 1001| 2016|     9|   582|   582|    LVG|2016-09-01|      1|     NV|2016-09-05|    25|      2|   1991|10312016|     M|     4O|95076479830|  970|      B2|   true|

In [10]:
# Is cicid unique?
is_cicid_unique = (i94DF.select("cicid").distinct().count() == i94DF.count())
print("Is cicid unique? "+str(is_cicid_unique))

Is cicid unique? False


In [8]:
# What are the duplicated cicid ?
duplicated_cicid_df = i94DF.groupBy('cicid').agg(count('cicid').alias('count')).where(column('count')>1)
duplicated_cicid_df.show(3)

+-------+-----+
|  cicid|count|
+-------+-----+
|5342075|    9|
|5342416|    7|
|5342504|    7|
+-------+-----+
only showing top 3 rows



In [9]:
# Investigating one of the duplicates entries
one_duplicate = i94DF.where(i94DF.cicid == 5342416)
one_duplicate.show()
# It looks like we have a composite primary key [cicid, i94yr, i94mon]

+-------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-------+--------+------+-------+-----------+-----+--------+-------+
|  cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|biryear| dtaddto|gender|airline|     admnum|fltno|visatype|matflag|
+-------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-------+--------+------+-------+-----------+-----+--------+-------+
|5342416| 2016|     9|   504|   504|    MIA|  20721|      1|     FL|   null|    31|      2|   1985|03232017|     M|     AA|12054498685|  960|      B2|  false|
|5342416| 2016|     4|   576|   576|    MIA|  20572|      1|   null|  20610|    76|      2|   1940|10272016|     F|     AA|94793677030|01520|      B2|   true|
|5342416| 2016|    12|   273|   273|    HOU|  20812|      1|     TX|  20829|    20|      2|   1996|06232017|     F|     SQ|18638744885|   52|      B2|   true|
|5342416| 2016|     5|   687|   687|    MIA|  

In [10]:
# Is [cicid, i94yr, i94mon] unique?
composite_pk_df = i94DF.groupBy('cicid','i94yr','i94mon').agg(count('cicid').alias('count')).where(column('count')>1)
composite_pk_df.show()
# the answer is yes, so we can say that for i94 dataset, [cicid, i94yr, i94mon] is a composite primary key

+-----+-----+------+-----+
|cicid|i94yr|i94mon|count|
+-----+-----+------+-----+
+-----+-----+------+-----+



In [17]:
# Checking if i94mode has 4 values only
i94DF.select('i94mode').distinct().show()
# It has 4 values

+-------+
|i94mode|
+-------+
|      1|
|      3|
|      9|
|      2|
+-------+



In [12]:
# Checking all values for matflag
i94DF.select('matflag').distinct().show()

+-------+
|matflag|
+-------+
|   true|
|  false|
+-------+



In [11]:
# Checking all values for gender
i94DF.select('gender').distinct().show()

+------+
|gender|
+------+
|     F|
|  null|
|     M|
|     U|
|     X|
+------+



In [14]:
# is admnum unique? 
is_admnum_unique = (i94DF.select("admnum").distinct().count() == i94DF.count())
print("Is admnum unique? "+str(is_admnum_unique))

Is admnum unique? False


In [15]:
# What is the maximum admnum
i94DF.agg({"admnum": "max"}).collect()[0]
# This need to be casted to Long

Row(max(admnum)=99999998130)

#### Creating mapping files for these columns:
- **I94CIT & I94RES:** 3 digits maps to a country name
- **I94PORT        :** 3 characters maps to city and state code or country
- **I94MODE        :** 1 digit maps to port type (Air, Sea, Land, Not reported)
- **I94ADDR        :** 2 characters maps to a USA state name
- **I94VISA        :** 1 digit maps to a visa category (Business, Pleasure, Student)

In [8]:
#  - I94CIT & I94RES: 3 digits maps to a country name
code_country_schema = StructType([
    StructField('code', IntegerType(), True),
    StructField('country', StringType(), True)
])
code_country_df = spark.read.option("quote","\"").csv("lookups/code_country.csv",schema = code_country_schema,header = True)
code_country_df.printSchema()
code_country_df.show(3)

root
 |-- code: integer (nullable = true)
 |-- country: string (nullable = true)

+----+-----------+
|code|    country|
+----+-----------+
| 582|     MEXICO|
| 236|AFGHANISTAN|
| 101|    ALBANIA|
+----+-----------+
only showing top 3 rows



In [9]:
#  - I94PORT        : 3 characters maps to city and state code or country
ports_schema = StructType([
    StructField('code', StringType(), True),
    StructField('port', StringType(), True),
    StructField('state_code', StringType(), True)
])
ports_df = spark.read.option("quote","\"").csv("lookups/ports.csv",schema = ports_schema,header = True)
ports_df.printSchema()
ports_df.show(3)

root
 |-- code: string (nullable = true)
 |-- port: string (nullable = true)
 |-- state_code: string (nullable = true)

+----+--------------------+----------+
|code|                port|state_code|
+----+--------------------+----------+
| ALC|               ALCAN|        AK|
| ANC|           ANCHORAGE|        AK|
| BAR|BAKER AAF - BAKER...|        AK|
+----+--------------------+----------+
only showing top 3 rows



In [10]:
#  - I94MODE        : 1 digit maps to port type (Air, Sea, Land, Not reported)
port_mode_schema = StructType([
    StructField('code', IntegerType(), True),
    StructField('mode', StringType(), True),
])
port_mode_df = spark.read.option("quote","\"").option("nullValue","null").csv("lookups/port_mode.csv",schema = port_mode_schema,header = True)
port_mode_df.printSchema()
port_mode_df.show(6)

root
 |-- code: integer (nullable = true)
 |-- mode: string (nullable = true)

+----+------------+
|code|        mode|
+----+------------+
|   1|         Air|
|   2|         Sea|
|   3|        Land|
|   9|Not reported|
+----+------------+



In [11]:
#  - I94ADDR        : 2 characters maps to a USA state name
states_schema = StructType([
    StructField('state_code', StringType(), True),
    StructField('state_name', StringType(), True),
])
states_df = spark.read.option("quote","\"").csv("lookups/states.csv",schema = states_schema,header = True)
states_df.printSchema()
states_df.show(3)

root
 |-- state_code: string (nullable = true)
 |-- state_name: string (nullable = true)

+----------+----------+
|state_code|state_name|
+----------+----------+
|        AL|   ALABAMA|
|        AK|    ALASKA|
|        AZ|   ARIZONA|
+----------+----------+
only showing top 3 rows



In [12]:
#  - I94VISA        : 1 digit maps to a visa category (Business, Pleasure, Student)
visa_category_schema = StructType([
    StructField('code', IntegerType(), True),
    StructField('visa_category', StringType(), True),
])
visa_category_df = spark.read.option("quote","\"").csv("lookups/visa_category.csv",schema = visa_category_schema,header = True)
visa_category_df.printSchema()
visa_category_df.show(3)

root
 |-- code: integer (nullable = true)
 |-- visa_category: string (nullable = true)

+----+-------------+
|code|visa_category|
+----+-------------+
|   1|     Business|
|   2|     Pleasure|
|   3|      Student|
+----+-------------+



In [13]:
#Checking if i94 data matches the lookup columns
# Checking for "I94CIT & I94RES"
i94_origin_country = i94DF.join(code_country_df,(i94DF.i94cit == code_country_df.code) | (i94DF.i94res == code_country_df.code),how='left')\
                    .select(i94DF.cicid,i94DF.i94res,code_country_df.code,code_country_df.country)
i94_origin_country.where(i94_origin_country.code.isNull()).show()
# Perfect, all codes match

+-----+------+----+-------+
|cicid|i94res|code|country|
+-----+------+----+-------+
+-----+------+----+-------+



In [24]:
# Checking for "I94PORT"
i94_ports = i94DF.join(ports_df,i94DF.i94port == ports_df.code,how='left')\
                    .select(i94DF.i94port,ports_df.code,ports_df.port,ports_df.state_code)
i94_ports.where(i94_ports.code.isNull()).show()
# Code: OCA was missing, after searching I found out that it corresponds to "Ocean Reef Club Airport" in Florida, so I added it as a new entry in ports.csv
# Now all codes match

+-------+----+----+----------+
|i94port|code|port|state_code|
+-------+----+----+----------+
+-------+----+----+----------+



In [28]:
# Checking for "I94MODE"
i94_port_mode = i94DF.join(port_mode_df,i94DF.i94mode == port_mode_df.code,how='left')\
                    .select(i94DF.i94mode,port_mode_df.code,port_mode_df.mode)
i94_port_mode.where(i94_port_mode.code.isNull()).show()
# All match

+-------+----+----+
|i94mode|code|mode|
+-------+----+----+
+-------+----+----+



In [27]:
# Checking for "I94ADDR"
i94_states = i94DF.join(states_df,i94DF.i94addr == states_df.state_code,how='left')\
                    .select(i94DF.i94addr,states_df.state_code,states_df.state_name)
i94_states.select('i94addr').distinct().where(i94_states.state_code.isNull()).show(5)

+-------+
|i94addr|
+-------+
|     .N|
|     07|
|     DZ|
|      K|
|     LT|
+-------+
only showing top 5 rows



In [26]:
# How many non matching i94addr?
i94_states.select('i94addr').distinct().where(i94_states.state_code.isNull()).count()

730

In [30]:
# Checking for "I94VISA"
i94_visa_category = i94DF.join(visa_category_df,i94DF.i94visa == visa_category_df.code,how='left')\
                    .select(i94DF.i94visa,visa_category_df.code,visa_category_df.visa_category)
i94_visa_category.where(i94_visa_category.code.isNull()).show()
# All match

+-------+----+-------------+
|i94visa|code|visa_category|
+-------+----+-------------+
+-------+----+-------------+



In [53]:
# What are minimum and maximum arrival date?
i94DF.agg(min("arrdate"),max("arrdate"),min("depdate"),max("depdate")).show()

+------------+------------+------------+------------+
|min(arrdate)|max(arrdate)|min(depdate)|max(depdate)|
+------------+------------+------------+------------+
|  2016-01-01|  2016-12-31|  1920-08-10|  2092-05-09|
+------------+------------+------------+------------+



#### Analysing World Tempreture Dataset

In [13]:
worldTemp_schema = StructType([
    StructField('dt', DateType(), True),
    StructField('AverageTemperature', DoubleType(), True),
    StructField('AverageTemperatureUncertainty', DoubleType(), True),
    StructField('City', StringType(), True),
    StructField('Country', StringType(), True),
    StructField('Latitude', StringType(), True),
    StructField('Longitude', StringType(), True)
])
worldTempDF = spark.read.csv("../../data2/",schema = worldTemp_schema,header = True)
worldTempDF.printSchema()
worldTempDF.show(5)

root
 |-- dt: date (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null

In [14]:
# I don't need columns [AverageTemperatureUncertainty,Latitude,Longitude]
worldTempDF = worldTempDF.drop("AverageTemperatureUncertainty","Latitude","Longitude")
worldTempDF.printSchema()
worldTempDF.show(5)

root
 |-- dt: date (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)

+----------+------------------+-----+-------+
|        dt|AverageTemperature| City|Country|
+----------+------------------+-----+-------+
|1743-11-01|             6.068|Århus|Denmark|
|1743-12-01|              null|Århus|Denmark|
|1744-01-01|              null|Århus|Denmark|
|1744-02-01|              null|Århus|Denmark|
|1744-03-01|              null|Århus|Denmark|
+----------+------------------+-----+-------+
only showing top 5 rows



In [58]:
worldTempDF.count()

8599212

In [59]:
worldTempDF.select("City").distinct().where(worldTempDF.Country == "United States").show(5)

+---------------+
|           City|
+---------------+
|     Charleston|
|         Corona|
|    Springfield|
|          Tempe|
|North Las Vegas|
+---------------+
only showing top 5 rows



#### Get Min and Max dt
- This shows that the tempreture dataset doesn't overlap with the i94 dataset, that's kinda disappointing.
- At least we can get the average of tempretures per city, country

In [60]:
worldTempDF.agg(min("dt"),max("dt")).show()
# This shows that the tempreture dataset doesn't overlap with the i94 dataset, that's kinda disappointing.
# At least we can get the average of tempretures per city, country

+----------+----------+
|   min(dt)|   max(dt)|
+----------+----------+
|1743-11-01|2013-09-01|
+----------+----------+



#### Create a new dataset out of world tempreture dataset with average tempreture per city and per country

In [15]:
worldAverageTempDF = worldTempDF.where(worldTempDF.AverageTemperature.isNotNull()).groupBy("City","Country").agg(avg("AverageTemperature").alias("AvgTemp"))
worldAverageTempDF.show()

+------------+------------------+------------------+
|        City|           Country|           AvgTemp|
+------------+------------------+------------------+
|   Allentown|     United States| 9.523295607566514|
|      Atyrau|        Kazakhstan|  8.06841197046249|
|     Bintulu|          Malaysia|26.156729436109686|
| Butterworth|          Malaysia|27.212006301502615|
|      Cainta|       Philippines|26.448334487877297|
|      Ciamis|         Indonesia|24.768451303885037|
|      Dodoma|          Tanzania| 22.18983868935102|
|      Fuling|             China|16.844962073931832|
|      Fuyang|             China| 15.14607384169885|
|         Ife|           Nigeria|26.373105171411947|
|  Jhunjhunun|             India|25.178274018379312|
|      Maxixe|        Mozambique|24.087632069970855|
|      Owerri|           Nigeria| 26.61457809798272|
|Puerto Plata|Dominican Republic|26.035798296727982|
| Santo André|            Brazil|19.699367843511453|
|      Soweto|      South Africa|15.0695739455

In [62]:
worldAverageTempDF.printSchema()

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- AvgTemp: double (nullable = true)



In [16]:
worldAverageTempDF.count()

3490

In [17]:
# Join temp with code_country
worldAverageTempDF = worldAverageTempDF.join(code_country_df,lower(worldAverageTempDF.Country) == lower(code_country_df.country), 'left')\
                            .select(worldAverageTempDF.City,\
                                    worldAverageTempDF.Country,\
                                    worldAverageTempDF.AvgTemp,\
                                    code_country_df.code.alias("Country_Code"))
worldAverageTempDF.show()
# Code will be null for United states, since code_country.csv comes form the dataset of the immigration to the United states

+------------+------------------+------------------+------------+
|        City|           Country|           AvgTemp|Country_Code|
+------------+------------------+------------------+------------+
|   Allentown|     United States| 9.523295607566514|        null|
|      Atyrau|        Kazakhstan|  8.06841197046249|         155|
|     Bintulu|          Malaysia|26.156729436109686|         273|
| Butterworth|          Malaysia|27.212006301502615|         273|
|      Cainta|       Philippines|26.448334487877297|         260|
|      Ciamis|         Indonesia|24.768451303885037|         204|
|      Dodoma|          Tanzania| 22.18983868935102|         353|
|      Fuling|             China|16.844962073931832|         245|
|      Fuyang|             China| 15.14607384169885|         245|
|         Ife|           Nigeria|26.373105171411947|         343|
|  Jhunjhunun|             India|25.178274018379312|         213|
|      Maxixe|        Mozambique|24.087632069970855|         329|
|      Owe

In [18]:
worldAverageTempDF.select("country").distinct().filter( (column("Country") != "United States") & column("Country_Code").isNull()).show(n=10,truncate=False)

+-------------+
|country      |
+-------------+
|Cambodia     |
|Congo        |
|Côte D'Ivoire|
+-------------+



In [19]:
worldAverageTempDF.count()

3490

In [20]:
worldAverageTempDF.printSchema()

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- AvgTemp: double (nullable = true)
 |-- Country_Code: integer (nullable = true)



In [21]:
# Write dataframe to parquet file
worldAverageTempDF.coalesce(1).write.parquet("world_temp/")

#### Analysing U.S. Cities Demographics Dataset

In [133]:
usCitiesDemographics_schema = StructType([
    StructField('city', StringType(), True),
    StructField('state', StringType(), True),
    StructField('median_age', DoubleType(), True),
    StructField('male_population', IntegerType(), True),
    StructField('female_population', IntegerType(), True),
    StructField('total_population', IntegerType(), True),
    StructField('number_of_veterans', IntegerType(), True),
    StructField('foreign_born', IntegerType(), True),
    StructField('average_household_size', DoubleType(), True),
    StructField('state_code', StringType(), True),
    StructField('race', StringType(), True),
    StructField('count', IntegerType(), True)
])

usCitiesDemographicsDF = spark.read.csv("us-cities-demographics.csv",schema = usCitiesDemographics_schema,header = True,sep = ";")
usCitiesDemographicsDF.printSchema()
usCitiesDemographicsDF.orderBy(usCitiesDemographicsDF.city, usCitiesDemographicsDF.state).show(6)

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- race: string (nullable = true)
 |-- count: integer (nullable = true)

+-------+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   city|state|median_age|male_population|female_population|total_population|number_of_veterans|foreign_born|average_household_size|state_code|                race|count|
+-------+-----+----------+---------------+-----------------+----------------+------------------+------------+---------------

In [49]:
usCitiesDemographicsDF.count()

2891

#### Create a new dataset out of U.S. Cities Demographics dataset with only the columns and rows in interest

In [134]:
# I'm not interested in the following columns [number_of_veterans, average_household_size, race, count]
cols_to_drop = ['number_of_veterans', 'average_household_size', 'race', 'count']
usCitiesDemographicsDF = usCitiesDemographicsDF.drop(*cols_to_drop)
# This will result in duplicated fields, because the data is partitioned by race, so I will remove duplicates by city and state
usCitiesDemographicsDF = usCitiesDemographicsDF.dropDuplicates()
usCitiesDemographicsDF.orderBy(usCitiesDemographicsDF.city, usCitiesDemographicsDF.state).show(6)
usCitiesDemographicsDF.printSchema()

+-------+----------+----------+---------------+-----------------+----------------+------------+----------+
|   city|     state|median_age|male_population|female_population|total_population|foreign_born|state_code|
+-------+----------+----------+---------------+-----------------+----------------+------------+----------+
|Abilene|     Texas|      31.3|          65212|            60664|          125876|        8129|        TX|
|  Akron|      Ohio|      38.1|          96886|           100667|          197553|       10024|        OH|
|Alafaya|   Florida|      33.5|          39504|            45760|           85264|       15842|        FL|
|Alameda|California|      41.4|          37747|            40867|           78614|       18841|        CA|
| Albany|   Georgia|      33.3|          31695|            39414|           71109|         861|        GA|
| Albany|  New York|      32.8|          47627|            50825|           98452|       11948|        NY|
+-------+----------+----------+------

In [135]:
usCitiesDemographicsDF.count()

596

In [51]:
# Write dataframe to parquet file
usCitiesDemographicsDF.coalesce(1).write.parquet("us_cities_demographics/")

In [21]:
# Join usCitiesDemographics with i94 states lookup just to check if `state_code` matches
usCitiesDemographicsWithStateNamesDF = usCitiesDemographicsDF.join(states_df, usCitiesDemographicsDF.state_code == states_df.state_code)\
                                        .select("city",\
                                                "state",\
                                                "total_population",\
                                                "foreign_born",\
                                               usCitiesDemographicsDF.state_code,\
                                               states_df.state_name)
usCitiesDemographicsWithStateNamesDF.show(5)
# This means we can join US Cities with i94 data using state_code
# Notice state = state_name

+----------+--------------------+----------------+------------+----------+-----------------+
|      city|               state|total_population|foreign_born|state_code|       state_name|
+----------+--------------------+----------------+------------+----------+-----------------+
|    Nashua|       New Hampshire|           87975|       12693|        NH|    NEW HAMPSHIRE|
|  Carlsbad|          California|          113466|       17689|        CA|       CALIFORNIA|
|  Missoula|             Montana|           71024|        2771|        MT|          MONTANA|
|Fort Worth|               Texas|          836969|      143404|        TX|            TEXAS|
|Washington|District of Columbia|          672228|       95117|        DC|DIST. OF COLUMBIA|
+----------+--------------------+----------------+------------+----------+-----------------+
only showing top 5 rows



#### Analysing Airport Codes Dataset

In [52]:
airportCodes_schema = StructType([
    StructField('ident', StringType(), True),
    StructField('type', StringType(), True),
    StructField('name', StringType(), True),
    StructField('elevation_ft', IntegerType(), True),
    StructField('continent', StringType(), True),
    StructField('iso_country', StringType(), True),
    StructField('iso_region', StringType(), True),
    StructField('municipality', StringType(), True),
    StructField('gps_code', StringType(), True),
    StructField('iata_code', StringType(), True),
    StructField('local_code', StringType(), True),
    StructField('coordinates', StringType(), True)
])
airportCodesDF = spark.read.csv("airport_codes.csv",schema = airportCodes_schema,header = True)
airportCodesDF.printSchema()
airportCodesDF.show(5)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|   

#### Create a new dataset out of Airport codes dataset with only the columns and rows in interest

In [53]:
# I'm not interested in the following columns ['type', 'elevation_ft', 'continent', 'coordinates']
cols_to_drop = ['type', 'elevation_ft', 'continent', 'coordinates']
airportCodesDF = airportCodesDF.drop(*cols_to_drop)
airportCodesDF.printSchema()
airportCodesDF.show(5)

root
 |-- ident: string (nullable = true)
 |-- name: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)

+-----+--------------------+-----------+----------+------------+--------+---------+----------+
|ident|                name|iso_country|iso_region|municipality|gps_code|iata_code|local_code|
+-----+--------------------+-----------+----------+------------+--------+---------+----------+
|  00A|   Total Rf Heliport|         US|     US-PA|    Bensalem|     00A|     null|       00A|
| 00AA|Aero B Ranch Airport|         US|     US-KS|       Leoti|    00AA|     null|      00AA|
| 00AK|        Lowell Field|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|
| 00AL|        Epps Airpark|         US|     US-AL|     Harvest|    00AL|     null|      00AL|
| 00

In [54]:
# Write dataframe to parquet file
airportCodesDF.coalesce(1).write.parquet("airport_codes/")

#### Cleaning Steps
Document steps necessary to clean the data

#### Performed cleaning tasks here
More details can be found in section **3.2 Mapping Out Data Pipelines**, I will just abstract the cleaning here.
- For **i94 immigration**:
  - Some columns need to be dropped
  - All column data type need to be set
  - Some values for some columns need to be altered
- For **World Tempreture**:
  - Some columns need to be dropped
  - Use average tempreture per city per country
  - Join with `code_country` to get `countrycode` column
- For **US Cities Demographics**:
  - Some columns need to be dropped
  - Remove duplicates
- For **Airport Codes**:
  - Some columns need to be dropped

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

![Conceptual Data Model](images/conceptual_data_model.png)

- The model consists of a fact table and 9 dimension tables
- It represents a Star Schema, which will make the joining between tables easy and clear for a business user
- Analytical queries are meant to be performed on the fact table

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model.

##### i94_immigration Table
- Use PySpark to read i94 immigration data from SAS files into a DataFrame, lets call it `df1`
- Drop the following columns from `df1` as they are not needed 
  - count
  - dtadfile
  - visapost
  - occup
  - entdepa
  - entdepd
  - entdepu
  - insnum
  - validres
  - delete_days
  - delete_mexl
  - delete_dup
  - delete_visa
  - delete_recdup
- Cast all columns to their desired types, and adjust values of `i94mode` and `matflag`
  - `i94mode`: Use 9 instead of null for missing values, use 9 instead of 0 for Not reported values
  - `matflag`: Make it boolean true if value is M, false otherwise
- Store the `df1` as Parquet file(s)
- Upload Parquet files to AWS S3 using boto
- Load the these files into the corresponding table in AWS RedShift

##### Lookup Tables
- Tables `code_country`, `ports`, `port_mode`, `states`, `visa_category` are static lookup tables.
- Upload their files to AWS S3 using boto
- Load each file to it's corresponding table in AWS RedShift

##### i94_dates Table
- We can auto-generate this table, however we don't want to create unnecessary records
- Instead of auto-generating it, we can get all the date values from `i94_immigration` table and insert it into `i94_dates` table
- We can do that after loading `i94_immigration`, we can run a query on AWS RedShift to do that for us

##### world_temp Table
- Load the CSV file into a DataFrame, lets call it `df1`
- Drop the following columns from `df1` as they are not needed 
  - AverageTemperatureUncertainty
  - Latitude
  - Longitude
- The dates in the dataset doesn't overlap with the `i94_immigration` dataset, for that we will use the **average tempreture per city, country**
- Load `code_country` into a DataFrame and join it with `df1` on `country` to get the `Country_Code`
  - _Notice:_ Code will be null for United states, since code_country.csv comes form the dataset of the immigration to the United states
- Write `df1` to a parquet file
- Upload the file to AWS S3 using boto
- Load the file into it's corresponding table in AWS RedShift

##### us_cities_demographics Table
- Load the CSV file into a DataFrame, lets call it `df1`
- Drop the following columns from `df1` as they are not needed 
  - number_of_veterans
  - average_household_size
  - race
  - count
- Dropping these columns will cause duplicated record, because there are repeated records and the only difference between them is `race` and `count`
- Drop duplicates by `city` and `state`
- Write `df1` to a parquet file
- Upload the file to AWS S3 using boto
- Load the records in this file to it's corresponding table in AWS Redshift

##### airport_codes Table
- Load the CSV file into a DataFrame, lets call it `df1`
- Drop the following columns from `df1` as they are not needed 
  - type
  - elevation_ft
  - continent
  - coordinates
- Write `df1` to a parquet file
- Upload the file to AWS S3 using boto
- Load the records in this file to it's corresponding table in AWS Redshift


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [2]:
import boto3
import configparser
import pandas as pd
import json
import os

#### Read Configuration

In [16]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('aws/credentials.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

BUCKET_NAME            = config.get("S3", "BUCKET_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME","BUCKET_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME,BUCKET_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole
9,BUCKET_NAME,capstone-staging-area


#### Upload files to AWS S3

In [32]:
# Define S3 resource
s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

In [9]:
# Upload a single files to S3
# Filename - File to upload
# Bucket - Bucket to upload to (the top level directory under AWS S3)
# Key - S3 object name (can contain subdirectories). If not specified then file_name is used
# s3.meta.client.upload_file(Filename='airport_codes.csv', Bucket=BUCKET_NAME, Key='airport_codes.csv')

In [33]:
def uploadDirectory(path,bucketname):
    print("Source file path = "+path)
    print("Destination bucket name = "+bucketname)
    for root,dirs,files in os.walk(path):
        if root[0] == '.' or "/." in root :
            continue
        print("======================")
        print("* Current root: "+root)
        print("---------------------")
        for file in files:
            if file[0] == '_' or file[0] == '.':
                continue
            print("Current file: "+file)
            s3.meta.client.upload_file(os.path.join(root,file),bucketname,root+"/"+file)
        print("======================")

In [59]:
# Upload airport_codes to S3 bucket
uploadDirectory("airport_codes",BUCKET_NAME)

Source file path = airport_codes
Destination bucket name = capstone-staging-area
--------------------
Current root: airport_codes
--------------------
Current file: part-00000-4e6a3157-de28-452e-98c0-4ae050c9c6a2-c000.snappy.parquet
--------------------


In [26]:
# Upload lookups to S3 bucket
uploadDirectory("lookups",BUCKET_NAME)

Source file path = lookups
Destination bucket name = capstone-staging-area
* Current root: lookups
---------------------
Current file: code_country.csv
Current file: states.csv
Current file: port_mode.csv
Current file: visa_category.csv
Current file: ports.csv


In [27]:
# Upload world_temp to S3 bucket
uploadDirectory("world_temp",BUCKET_NAME)

Source file path = world_temp
Destination bucket name = capstone-staging-area
* Current root: world_temp
---------------------
Current file: part-00000-7762b432-aa71-420d-bfe7-1cb947f370d0-c000.snappy.parquet


In [70]:
# Upload us_cities_demographics to S3 bucket
uploadDirectory("us_cities_demographics",BUCKET_NAME)

Source file path = us_cities_demographics
Destination bucket name = capstone-staging-area
* Current root: us_cities_demographics
---------------------
Current file: part-00000-c259ffeb-d6a5-456f-af19-7e0541c363f3-c000.snappy.parquet


In [71]:
# Upload us_cities_demographics to S3 bucket
uploadDirectory("i94_immigration",BUCKET_NAME)

Source file path = i94_immigration
Destination bucket name = capstone-staging-area
* Current root: i94_immigration
---------------------
* Current root: i94_immigration/i94_sep16_sub
---------------------
Current file: part-00000-aae5cfd9-a9bf-4e74-8627-d5087c373435-c000.snappy.parquet
* Current root: i94_immigration/i94_jan16_sub
---------------------
Current file: part-00000-7ab26318-2815-419c-9c79-f0c8a812a860-c000.snappy.parquet
* Current root: i94_immigration/i94_apr16_sub
---------------------
Current file: part-00000-768f7644-5124-4875-9d4c-0dd71daa2a74-c000.snappy.parquet
* Current root: i94_immigration/i94_dec16_sub
---------------------
Current file: part-00000-dc8ce8d3-d9ed-46ed-9449-3b3a50d6ac09-c000.snappy.parquet
* Current root: i94_immigration/i94_nov16_sub
---------------------
Current file: part-00000-53a787ce-e633-4da7-bef2-487c7e64dabf-c000.snappy.parquet
* Current root: i94_immigration/i94_may16_sub
---------------------
Current file: part-00000-9b1ffe6b-ada7-473d-8

#### Checking uploaded files to S3

In [28]:
# Check out filesin s3
sampleDbBucket =  s3.Bucket(BUCKET_NAME)

# Iterate over bucket objects starting with "" and print
for obj in sampleDbBucket.objects.filter(Prefix=""):
    print(obj)

s3.ObjectSummary(bucket_name='capstone-staging-area', key='airport_codes/part-00000-4e6a3157-de28-452e-98c0-4ae050c9c6a2-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='capstone-staging-area', key='i94_immigration/i94_apr16_sub/part-00000-768f7644-5124-4875-9d4c-0dd71daa2a74-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='capstone-staging-area', key='i94_immigration/i94_aug16_sub/part-00000-63f1afa4-c3c2-4990-b731-cbf8d19f728f-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='capstone-staging-area', key='i94_immigration/i94_dec16_sub/part-00000-dc8ce8d3-d9ed-46ed-9449-3b3a50d6ac09-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='capstone-staging-area', key='i94_immigration/i94_feb16_sub/part-00000-b018a727-0d19-4736-ad1d-ea004cb25ae2-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='capstone-staging-area', key='i94_immigration/i94_jan16_sub/part-00000-7ab26318-2815-419c-9c79-f0c8a812a860-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='capstone-staging-area', key='i94

#### Create AWS Redshift Cluster

In [34]:
#Define AWS resources
ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )
iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )
redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

In [35]:
# Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)
try:
    print('- Creating a new IAM Role')
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)

- Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.


In [36]:
# Attach Policy
print('- Attaching Policy')
iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

- Attaching Policy


200

In [37]:
# Get and print the IAM role ARN
print('- Get the IAM role ARN')
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']
print(roleArn)

- Get the IAM role ARN
arn:aws:iam::347115321620:role/dwhRole


In [38]:
# Create a Redshift Cluster
try:
    response = redshift.create_cluster(        
        # Add parameters for hardware
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),
        
        # Add parameters for identifiers & credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        # Add parameter for role (to allow s3 access)
        IamRoles=[roleArn] 
    )
except Exception as e:
    print(e)

In [43]:
import time
# Describe the cluster to see its status - run this block several times until the cluster status becomes Available
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
# Busy wait until the cluster is created
while myClusterProps["ClusterStatus"] == "creating":
    time.sleep(30) # Sleep 30 sec
    myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
print("--> ClusterStatus = "+myClusterProps["ClusterStatus"])
prettyRedshiftProps(myClusterProps)

--> ClusterStatus = available


Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-23c1ba5b
7,NumberOfNodes,4


In [44]:
# Take note of the cluster endpoint and role ARN - DO NOT RUN THIS unless the cluster status becomes "Available"
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::347115321620:role/dwhRole


In [17]:
# set new value
config.set('RedShift', 'DWH_ENDPOINT', DWH_ENDPOINT)
config.set('RedShift', 'DWH_ROLE_ARN', DWH_ROLE_ARN)

# save the file
with open('aws/credentials.cfg', 'w') as configfile:
    config.write(configfile)

In [45]:
# Open an incoming TCP port to access the cluster ednpoint
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-010e12c359cf51059')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


In [46]:
# Make sure you can connect to the cluster
%load_ext sql
from time import time
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
#print(conn_string)
%sql $conn_string

'Connected: dwhuser@dwh'

#### Create tables in Redshift

In [47]:
%%sql 
DROP TABLE IF EXISTS "airport_codes";
CREATE TABLE "airport_codes" (
    "ident" VARCHAR NOT NULL,
    "name" VARCHAR,
    "iso_country" VARCHAR,
    "iso_region" VARCHAR,
    "municipality" VARCHAR,
    "gps_code" VARCHAR,
    "iata_code" VARCHAR,
    "local_code" VARCHAR
);

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [48]:
%%sql 
DROP TABLE IF EXISTS "code_country";
CREATE TABLE "code_country" (
    "code" INTEGER NOT NULL,
    "country" VARCHAR NOT NULL
);

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [49]:
%%sql 
DROP TABLE IF EXISTS "ports";
CREATE TABLE "ports" (
    "code" VARCHAR NOT NULL,
    "port" VARCHAR NOT NULL,
    "state_code" VARCHAR NOT NULL
);

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [50]:
%%sql 
DROP TABLE IF EXISTS "port_mode";
CREATE TABLE "port_mode" (
    "code" INTEGER NOT NULL,
    "mode" VARCHAR NOT NULL
);

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [51]:
%%sql 
DROP TABLE IF EXISTS "states";
CREATE TABLE "states" (
    "state_code" VARCHAR NOT NULL,
    "state_name" VARCHAR NOT NULL
);

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [52]:
%%sql 
DROP TABLE IF EXISTS "i94_dates";
CREATE TABLE "i94_dates" (
    "full_date" DATE NOT NULL,
    "day" INTEGER NOT NULL,
    "month" INTEGER NOT NULL,
    "year" INTEGER NOT NULL,
    "week" INTEGER NOT NULL,
    "weekday" VARCHAR NOT NULL
);

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [53]:
%%sql 
DROP TABLE IF EXISTS "visa_category";
CREATE TABLE "visa_category" (
    "code" INTEGER NOT NULL,
    "visa_category" VARCHAR NOT NULL
);

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [54]:
%%sql 
DROP TABLE IF EXISTS "world_temp";
CREATE TABLE "world_temp" (
    "City" VARCHAR NOT NULL,
    "Country" VARCHAR NOT NULL,
    "AvgTemp" DOUBLE PRECISION,
    "Country_Code" INTEGER
);

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [55]:
%%sql 
DROP TABLE IF EXISTS "us_cities_demographics";
CREATE TABLE "us_cities_demographics" (
    "city" VARCHAR NOT NULL,
    "state" VARCHAR NOT NULL,
    "median_age" DOUBLE PRECISION,
    "male_population" INTEGER,
    "female_population" INTEGER,
    "total_population" INTEGER,
    "foreign_born" INTEGER,
    "state_code" VARCHAR
);

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [56]:
%%sql 
DROP TABLE IF EXISTS "i94_immigration";
CREATE TABLE "i94_immigration" (
    "cicid" BIGINT NOT NULL,
    "i94yr" INTEGER NOT NULL,
    "i94mon" INTEGER NOT NULL,
    "i94cit" INTEGER,
    "i94res" INTEGER,
    "i94port" VARCHAR,
    "arrdate" DATE,
    "i94mode" INTEGER,
    "i94addr" VARCHAR,
    "depdate" DATE,
    "i94bir" INTEGER,
    "i94visa" INTEGER,
    "biryear" INTEGER,
    "dtaddto" VARCHAR,
    "gender" VARCHAR,
    "airline" VARCHAR,
    "admnum" BIGINT,
    "fltno" VARCHAR,
    "visatype" VARCHAR,
    "matflag" BOOLEAN
);

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

#### Load data into tables

In [57]:
%%time
qry = """
    COPY airport_codes
    FROM 's3://capstone-staging-area/airport_codes/'
    credentials 'aws_iam_role={}'
    FORMAT AS PARQUET;
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 4.47 ms, sys: 70 µs, total: 4.54 ms
Wall time: 2.03 s


In [58]:
%%time
qry = """
    COPY code_country
    FROM 's3://capstone-staging-area/lookups/code_country.csv'
    credentials 'aws_iam_role={}'
    delimiter ',' CSV QUOTE '"' IGNOREHEADER 1 compupdate off region 'us-west-2';
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 4.9 ms, sys: 140 µs, total: 5.04 ms
Wall time: 1.77 s


In [59]:
%%time
qry = """
    COPY ports
    FROM 's3://capstone-staging-area/lookups/ports.csv'
    credentials 'aws_iam_role={}'
    delimiter ',' CSV QUOTE '"' IGNOREHEADER 1 compupdate off region 'us-west-2';
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 1.2 ms, sys: 3.54 ms, total: 4.73 ms
Wall time: 1.37 s


In [60]:
%%time
qry = """
    COPY port_mode
    FROM 's3://capstone-staging-area/lookups/port_mode.csv'
    credentials 'aws_iam_role={}'
    delimiter ',' CSV QUOTE '"' IGNOREHEADER 1 compupdate off region 'us-west-2';
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 1.77 ms, sys: 3.58 ms, total: 5.35 ms
Wall time: 812 ms


In [61]:
%%time
qry = """
    COPY states
    FROM 's3://capstone-staging-area/lookups/states.csv'
    credentials 'aws_iam_role={}'
    delimiter ',' CSV QUOTE '"' IGNOREHEADER 1 compupdate off region 'us-west-2';
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 532 µs, sys: 4.14 ms, total: 4.67 ms
Wall time: 1.29 s


In [62]:
%%time
qry = """
    COPY visa_category
    FROM 's3://capstone-staging-area/lookups/visa_category.csv'
    credentials 'aws_iam_role={}'
    delimiter ',' CSV QUOTE '"' IGNOREHEADER 1 compupdate off region 'us-west-2';
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 3.87 ms, sys: 523 µs, total: 4.39 ms
Wall time: 799 ms


In [63]:
%%time
qry = """
    COPY world_temp
    FROM 's3://capstone-staging-area/world_temp/'
    credentials 'aws_iam_role={}'
    FORMAT AS PARQUET;
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 6.5 ms, sys: 0 ns, total: 6.5 ms
Wall time: 14.1 s


In [64]:
%%time
qry = """
    COPY us_cities_demographics
    FROM 's3://capstone-staging-area/us_cities_demographics/'
    credentials 'aws_iam_role={}'
    FORMAT AS PARQUET;
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 4.52 ms, sys: 0 ns, total: 4.52 ms
Wall time: 1.19 s


In [65]:
%%time
qry = """
    COPY i94_immigration
    FROM 's3://capstone-staging-area/i94_immigration/i94'
    credentials 'aws_iam_role={}'
    FORMAT AS PARQUET;
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 6.84 ms, sys: 3.86 ms, total: 10.7 ms
Wall time: 35.3 s


#### Load i94_dates dates from i94_immigration

In [66]:
%%time
qry = """
INSERT INTO i94_dates
SELECT  dt, 
        EXTRACT(DAY FROM dt) as day, 
        EXTRACT(MONTH FROM dt) as month, 
        EXTRACT(YEAR FROM dt) as year, 
        EXTRACT(WEEK FROM  dt) as week, 
        to_char(dt,'Day') as weekday
FROM
(
    SELECT DISTINCT dt
    FROM
    (
            SELECT DISTINCT arrdate as dt
            FROM i94_immigration
            WHERE arrdate IS NOT NULL
        UNION
            SELECT DISTINCT depdate as dt
            FROM i94_immigration
            WHERE depdate IS NOT NULL
    ) 
) as t4
"""

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com:5439/dwh
732 rows affected.
CPU times: user 8.67 ms, sys: 98 µs, total: 8.77 ms
Wall time: 1.36 s


#### Delete created AWS Redshift Cluster

In [120]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'MasterUsername': 'dwhuser',
  'DBName': 'dwh',
  'Endpoint': {'Address': 'dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2020, 12, 26, 22, 21, 6, 736000, tzinfo=tzlocal()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-b9bf0890',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-23c1ba5b',
  'AvailabilityZone': 'us-west-2b',
  'PreferredMaintenanceWindow': 'sun:10:30-sun:11:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'PubliclyAccessible': True,
  'Encrypted': False,
  'Tags': [],
  'EnhancedVpcRouting': False,
  'Ia

In [121]:
# run this block several times until the cluster really deleted
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
k = prettyRedshiftProps(myClusterProps)
print("ClusterStatus = "+myClusterProps["ClusterStatus"])
k

ClusterStatus = deleting


Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,deleting
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.cjhgmfxe879j.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-23c1ba5b
7,NumberOfNodes,4


In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the iam role
#iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
#iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness

In [77]:
# Perform quality checks here
#import psycopg2
#try:
#    con=psycopg2.connect(dbname= DWH_DB, host=DWH_ENDPOINT, port= DWH_PORT, user= DWH_DB_USER, password= DWH_DB_PASSWORD)
#except psycopg2.Error as e: 
#    print("Error: Could not make connection to the Redshift database")
#    print(e)

In [78]:
#try:
#    cur = con.cursor()
#except psycopg2.Error as e: 
#    print("Error: Could not get curser to the Database")
#    print(e)

In [80]:
#con.set_session(autocommit = True)

In [84]:
#cur.execute("SELECT * FROM airport_codes")

In [None]:
#cur.fetchall()

In [4]:
#import numpy as np
#data = np.array(cur.fetchall())
#data

In [None]:
#cur.close() 
#conn.close()

#### Run Quality Checks

In [90]:
from sqlalchemy import create_engine
import pandas as pd
# Test connection to Redshift using Pandas
engine = create_engine('postgresql://'+DWH_DB_USER+':'+DWH_DB_PASSWORD+'@'+DWH_ENDPOINT+':'+DWH_PORT+'/'+DWH_DB)
data_frame = pd.read_sql('SELECT * FROM airport_codes LIMIT 2', engine)
data_frame

Unnamed: 0,ident,name,iso_country,iso_region,municipality,gps_code,iata_code,local_code
0,00AA,Aero B Ranch Airport,US,US-KS,Leoti,00AA,,00AA
1,00CN,Kitchen Creek Helibase Heliport,US,US-CA,Pine Valley,00CN,,00CN


##### Source/Count checks to ensure completeness

In [95]:
def get_count(table_name):
    counter_pdf = pd.read_sql('SELECT count(*) as counter FROM '+table_name, engine)
    return counter_pdf.iloc[0]['counter']

In [112]:
# airport_codes count = 55,075
# code_country count = 289
# ports count = 661
# port_mode count = 4
# states count = 55
# visa_category count = 3
# world_temp count = 3490
# us_cities_demographics count = 596
# i94_immigration count = 40790529
# i94_dates count = 732
assert (get_count('airport_codes') == 55075), "Error: airport_codes is not 55075"
assert (get_count('code_country') == 289), "Error: code_country is not 289"
assert (get_count('ports') == 661), "Error: ports is not 661"
assert (get_count('port_mode') == 4), "Error: port_mode is not 4"
assert (get_count('states') == 55), "Error: states is not 55"
assert (get_count('visa_category') == 3), "Error: visa_category is not 3"
assert (get_count('world_temp') == 3490), "Error: world_temp is not 3490"
assert (get_count('us_cities_demographics') == 596), "Error: us_cities_demographics is not 596"
assert (get_count('i94_immigration') == 40790529), "Error: i94_immigration is not 40790529"
assert (get_count('i94_dates') == 732), "Error: i94_dates is not 732"

##### Integrity constraints on the relational database (e.g., unique key, data type, etc.)

In [118]:
def get_duplicate_pks_count(table_name, pks):
    pks_pdf = pd.read_sql("""
                        SELECT count(*) as counter 
                        FROM (
                            SELECT """+pks+""", count(*)
                            FROM """+table_name+"""
                            GROUP BY """+pks+"""
                            HAVING count(*) > 1
                        )
                        """, engine)
    return pks_pdf.iloc[0]['counter']

In [119]:
assert (get_duplicate_pks_count('airport_codes','ident') == 0), "Error: duplicate PKs in airport_codes"
assert (get_duplicate_pks_count('code_country','code') == 0), "Error: duplicate PKs in code_country"
assert (get_duplicate_pks_count('ports','code') == 0), "Error: duplicate PKs in ports"
assert (get_duplicate_pks_count('port_mode','code') == 0), "Error: duplicate PKs in port_mode"
assert (get_duplicate_pks_count('states','state_code') == 0), "Error: duplicate PKs in states"
assert (get_duplicate_pks_count('visa_category','code') == 0), "Error: duplicate PKs in visa_category"
assert (get_duplicate_pks_count('world_temp','city,country') == 0), "Error: duplicate PKs in world_temp"
assert (get_duplicate_pks_count('us_cities_demographics','city,state') == 0), "Error: duplicate PKs in us_cities_demographics"
assert (get_duplicate_pks_count('i94_immigration','cicid,i94yr,i94mon') == 0), "Error: duplicate PKs in i94_immigration"
assert (get_duplicate_pks_count('i94_dates','full_date') == 0), "Error: duplicate PKs in i94_dates"

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

##### i94_immigration
| Column   | Type    | Description                                                                                                                                                                                                                  |
|----------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| cicid    | long    | Id, part of the composite primary key.                                                                                                                                                                                       |
| i94yr    | integer | 4 digit year of the arrival, part of the composite primary key.                                                                                                                                                              |
| i94mon   | integer | Numeric month of the arrival, part of the composite primary key.                                                                                                                                                             |
| i94cit   | integer | Visitor Country of Citizenship (3 digit code of origin country)                                                                                                                                                              |
| i94res   | integer | Visitor country of residence                                                                                                                                                                                                 |
| i94port  | string  | Port of entry, can be in the USA or outside USA. (City, State if USA/Country otherwise) 3 character code of destination city --> Foreign key City Temperature (City) --> Foreign key U.S. City Demographics (City and State) |
| arrdate  | date    | Arrival Date in the USA, It is a SAS date numeric field                                                                                                                                                                      |
| i94mode  | integer | Port mode (Air, Sea, Land, Not reported + missing values). 1 digit travel code                                                                                                                                               |
| i94addr  | string  | Landing state. --> Foreign Key to the U.S. City Demographics (State)                                                                                                                                                         |
| depdate  | date    | Departure Date from the USA. It is a SAS date numeric field                                                                                                                                                                  |
| i94bir   | integer | Age of Respondent in Years                                                                                                                                                                                                   |
| i94visa  | integer | Visa code (Business, Pleasure, Student). Reason for immigration                                                                                                                                                              |
| biryear  | integer | 4 digit year of birth                                                                                                                                                                                                        |
| dtaddto  | string  | Character Date Field - Date to which admitted to U.S. (allowed to stay until)                                                                                                                                                |
| gender   | string  | Non-immigrant sex                                                                                                                                                                                                            |
| airline  | string  | Code of the Airline used to arrive in U.S.  --> Foreign Key to Airport Codes, IATA, and Local Code --> Should also use municipality column in Airport Codes                                                                  |
| admnum   | long    | Admission Number is the number on a CBP Form I–94 or CBP Form I–94W, Does this mean it's unique? The answer is no                                                                                                            |
| fltno    | string  | Flight number of Airline used to arrive in U.S.                                                                                                                                                                              |
| visatype | string  | (B1, B2 .. etc) Class of admission legally admitting the non-immigrant to temporarily stay in U.S.                                                                                                                           |
| matflag  | boolean | Match flag - Match of arrival and departure records (If value missing -> No depature date)                                                                                                                                   |

##### i94_dates
| Column    | Type    | Description                                     |
|-----------|---------|-------------------------------------------------|
| full_date | date    | Primary key. Full date in the format YYYY-MM-DD |
| day       | integer | Day of the month of full_date                   |
| month     | integer | Month of the full_date                          |
| year      | integer | Year of the full_date                           |
| week      | integer | Week of the full_date                           |
| weekday   | string  | Weekday of the full_date                        |

##### code_country
| Column  | Type    | Description                                                               |
|---------|---------|---------------------------------------------------------------------------|
| code    | integer | Primary key. 3 digit code for I94CIT & I94RES in i94_immigration dataset. |
| country | string  | Country name                                                              |

##### ports
| Column     | Type   | Description                                                         |
|------------|--------|---------------------------------------------------------------------|
| code       | string | Primary key. 3 letters Port code for I94PORT in immigration dataset |
| port       | string | Port/City name                                                      |
| state_code | string | State code if the port in US otherwise Country name                 |

##### port_mode
| Column | Type    | Description                                      |
|--------|---------|--------------------------------------------------|
| code   | integer | Primary Key. For I94MODE in immigration dataset. |
| mode   | string  | Can be 'Air', 'Sea', 'Land', and 'Not Reported'  |

##### states
| Column     | Type   | Description                                                                         |
|------------|--------|-------------------------------------------------------------------------------------|
| state_code | string | Primary key. For I94ADDR in immigration dataset. Repesent the code of the US state. |
| state_name | string | Name of the US state                                                                |

##### visa_category
| Column        | Type   | Description                                           |
|---------------|--------|-------------------------------------------------------|
| code          | string | Primary key. Code for I94VISA in immigration dataset. |
| visa_category | string | Can be 'Business', 'Pleasure', and 'Student'          |

##### world_temp
| Column      | Type    | Description                                      |
|-------------|---------|--------------------------------------------------|
| city        | string  | City name. Part of the composite primary key.    |
| country     | string  | Country name. Part of the composite primary key. |
| avgtemp     | double  | Average tempreture in the city                   |
| countrycode | integer | Country code as in code_country table            |

##### us_cities_demographics
| Column            | Type    | Description                                                   |
|-------------------|---------|---------------------------------------------------------------|
| city              | string  | City within USA. Part of the composite primary key.           |
| state             | string  | The USA State of the city. Part of the composite primary key. |
| median_age        | double  | Median of the Ages of the people living in that city          |
| male_population   | integer | Number of males in the city (Can be null)                     |
| female_population | integer | Number of females in the city (Can be null)                   |
| total_population  | integer | Sum of male and female population                             |
| foreign_born      | integer | USA Veterans count                                            |
| state_code        | string  | Code of the sate                                              |

##### airport_codes
| Column       | Type   | Description                                                    |
|--------------|--------|----------------------------------------------------------------|
| ident        | string | Primary key                                                    |
| name         | string | Airport name                                                   |
| iso_country  | string | Code for country of the Airport - US                           |
| iso_region   | string | Code for the region of the Airport - Contains state code US-CA |
| municipality | string | City of the Airport - City within the state                    |
| gps_code     | string | Airport GPS code (Contains nulls)                              |
| iata_code    | string | IATA airport code (Contains nulls)                             |
| local_code   | string | ICAO airport code (Contains nulls)                             |

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Choice of tools and technologies for the project
- PySpark
  - Spark makes representing the data as a DataFrame and wrangling it an easy task.
  - Spark can write DataFrame to parquet files.
- Parquet files:
  - I choose parquet format because it provide good file compaction, so the file size will be reduced dramatically.
  - Parquet is a columnar representation of the data, which will allow analytical queries to run fast.
  - RedShift can read from parquet.
- AWS S3
  - S3 is considered a staging area to allow RedShift to read the files uploaded there.
- AWS RedShift
  - Amazon's Datawarehouse service enables high performance for queries.

#### How often the data should be updated and why?

- `i94_immigration`, this can be updated monthly since new data is provided on a monthly base. We just need to load the delta (the new records) only.
- The number of records in the Lookup tables is small, this means we can overwrite the whole table in case of new values.
- `airport_codes` can be updated daily. It contains few number of records so it can be fully overwritten to ensure consistency.
- `world_temp` for this table I'm calculating the average tempreture per city per country, the number of resulted records out of this aggregation makes the size of the table small. This means we can overwrite the whole table in case of new updates.
- `us_cities_demographics` this table is also small, and we can overwrite it.


#### What if the data was increased by 100x?

In that case, we can't use PySpark on the Udacity Workspace. Instead, we can use AWS EMR, upload the data to AWS S3, then process it using Spark on our EMR cluster. This will also allow us to write directly to RedShift instead of using S3 as a staging area.


#### What if the data populates a dashboard that must be updated on a daily basis by 7am every day?

Since the most frequently updated data is the `airport_codes` (daily), then this should not be a problem. The dashboard can read directly from RedShift, meanwhile, our pipelines do any required update at 00:00 AM. This will ensure that the dashboard always contains the most recently available data.

#### What if The database needed to be accessed by 100+ people?

AWS Redshift supports 100+ connection with no issue, only one restriction a maximum of 50 concurrent queries at a time.