# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [13]:
# Do all imports and installs here
import pandas as pd
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, from_unixtime, unix_timestamp, to_date, lit
from pyspark.sql.types import IntegerType
from pyspark.sql.types import TimestampType
from pyspark.sql.types import DateType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import from_unixtime            
import pyspark.sql.functions as F

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

Udacity provided four datasets to be explored through this project. The main dataset will include data on immigration to the United States, and supplementary datasets will include data on airport codes, U.S. city demographics, and temperature data.


### 1. Immigration Dataset
This data comes from the US National Tourism and Trade Office. This dataset is large, it contains approximately 3M entries.

In [15]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [17]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [19]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3096313 entries, 0 to 3096312
Data columns (total 28 columns):
cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port     object
arrdate     float64
i94mode     float64
i94addr     object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile    object
visapost    object
occup       object
entdepa     object
entdepd     object
entdepu     object
matflag     object
biryear     float64
dtaddto     object
gender      object
insnum      object
airline     object
admnum      float64
fltno       object
visatype    object
dtypes: float64(13), object(15)
memory usage: 661.4+ MB


In [21]:
df.describe()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,arrdate,i94mode,depdate,i94bir,i94visa,count,biryear,admnum
count,3096313.0,3096313.0,3096313.0,3096313.0,3096313.0,3096313.0,3096074.0,2953856.0,3095511.0,3096313.0,3096313.0,3095511.0,3096313.0
mean,3078652.0,2016.0,4.0,304.9069,303.2838,20559.85,1.07369,20573.95,41.76761,1.845393,1.0,1974.232,70828850000.0
std,1763278.0,0.0,0.0,210.0269,208.5832,8.777339,0.5158963,29.35697,17.42026,0.398391,0.0,17.42026,22154420000.0
min,6.0,2016.0,4.0,101.0,101.0,20545.0,1.0,15176.0,-3.0,1.0,1.0,1902.0,0.0
25%,1577790.0,2016.0,4.0,135.0,131.0,20552.0,1.0,20561.0,30.0,2.0,1.0,1962.0,56035230000.0
50%,3103507.0,2016.0,4.0,213.0,213.0,20560.0,1.0,20570.0,41.0,2.0,1.0,1975.0,59360940000.0
75%,4654341.0,2016.0,4.0,512.0,504.0,20567.0,1.0,20579.0,54.0,2.0,1.0,1986.0,93509870000.0
max,6102785.0,2016.0,4.0,999.0,760.0,20574.0,9.0,45427.0,114.0,3.0,1.0,2019.0,99915570000.0


In [23]:
#check for duplicates
sum(df.duplicated())

0

In [25]:
# check unique entries 
df.nunique()

cicid       3096313
i94yr             1
i94mon            1
i94cit          243
i94res          229
i94port         299
arrdate          30
i94mode           4
i94addr         458
depdate         235
i94bir          112
i94visa           3
count             1
dtadfile        117
visapost        530
occup           111
entdepa          13
entdepd          12
entdepu           2
matflag           1
biryear         112
dtaddto         777
gender            4
insnum         1913
airline         534
admnum      3075579
fltno          7152
visatype         17
dtype: int64

In [27]:
df.shape

(3096313, 28)

#### Dataset Dictionary
This dataset contains these fields:

- **CICID** unique numer of the file

- **I94YR** 4 digit year of the application

- **I94MON** Numeric month of the application

- **I94CIT** city where the applicant is living

- **I94RES** state where the applicant is living

- **I94PORT** location (port) where the application is issued

- **ARRDATE** arrival date in USA in SAS date format

- **I94MODE** how did the applicant arrived in the USA

- **I94ADDR** US state where the port is

- **DEPDATE** is the Departure Date from the USA
 
- **I94BIR** age of applicant in years

- **I94VISA** what kind of VISA

- **COUNT** used for summary statistics, always 1

- **DTADFILE** date added to I-94 Files

- **VISAPOST** department of State where where Visa was issued

- **OCCUP** occupation that will be performed in U.S.

- **ENTDEPA** arrival Flag

- **ENTDEPD** departure Flag

- **ENTDEPU** update Flag

- **MATFLAG** match flag

- **BIRYEAR** 4 digit year of birth

- **DTADDTO** date to which admitted to U.S. (allowed to stay until)

- **GENDER** non-immigrant gender

- **INSNUM** INS number

- **AIRLINE** airline used to arrive in USA

- **ADMNUM** admission Number

- **FLTNO** flight number of Airline used to arrive in USA

- **VISATYPE** class of admission legally admitting the non-immigrant to temporarily stay in USA


In [32]:
immig_dict = {'CICID': 'unique numer of the file', 'I94YR': '4 digit year of the application', 'I94MON': 'Numeric month of the application', 
              'I94CIT': 'city where the applicant is living', 'I94RES' : 'state where the applicant is living',
              'I94PORT': 'location (port) where the application is issued', 'ARRDATE': 'arrival date in USA in SAS date format',
              'I94MODE': 'how did the applicant arrived in the USA', 'I94ADDR': 'US state where the port is', 'DEPDATE': 'Departure Date from the USA',
              'I94BIR': 'age of applicant in years', 'I94VISA': ' VISA type',
              'COUNT': 'used for summary statistics always 1', 'DTADFILE': 'date added to I-94 Files',
              'VISAPOST':'department of State where where Visa was issued', 
              'OCCUP': 'occupation that will be performed in US', 'ENTDEPA': 'arrival Flag', 'ENTDEPD':'departure Flag',
              'ENTDEPU': 'update Flag', 'MATFLAG':'match flag','BIRYEAR':'4 digit year of birth', 
              'DTADDTO': 'date to which admitted to U.S. (allowed to stay until)', 'GENDER': 'non-immigrant gender',
              'INSNUM': 'INS number', 'AIRLINE': 'airline used to arrive in USA', 'ADMNUM':'admission Number', 
              'FLTNO': 'flight number of Airline used to arrive in USA', 
              'VISATYPE':' class of admission legally admitting the non-immigrant to temporarily stay in USA'}

In [34]:
immig_dict

{'CICID': 'unique numer of the file',
 'I94YR': '4 digit year of the application',
 'I94MON': 'Numeric month of the application',
 'I94CIT': 'city where the applicant is living',
 'I94RES': 'state where the applicant is living',
 'I94PORT': 'location (port) where the application is issued',
 'ARRDATE': 'arrival date in USA in SAS date format',
 'I94MODE': 'how did the applicant arrived in the USA',
 'I94ADDR': 'US state where the port is',
 'DEPDATE': 'Departure Date from the USA',
 'I94BIR': 'age of applicant in years',
 'I94VISA': ' VISA type',
 'COUNT': 'used for summary statistics always 1',
 'DTADFILE': 'date added to I-94 Files',
 'VISAPOST': 'department of State where where Visa was issued',
 'OCCUP': 'occupation that will be performed in US',
 'ENTDEPA': 'arrival Flag',
 'ENTDEPD': 'departure Flag',
 'ENTDEPU': 'update Flag',
 'MATFLAG': 'match flag',
 'BIRYEAR': '4 digit year of birth',
 'DTADDTO': 'date to which admitted to U.S. (allowed to stay until)',
 'GENDER': 'non-immig

### World Temperature Data
This dataset came from Kaggle. Data was repackaged from a newer compilation put together by the Berkeley Earth, which is affiliated with Lawrence Berkeley National Laboratory. The Berkeley Earth Surface Temperature Study combines 1.6 billion temperature reports from 16 pre-existing archives. It is nicely packaged and allows for slicing into interesting subsets (for example by country). They publish the source data and the code for the transformations they applied. They also use methods that allow weather observations from shorter time series to be included, meaning fewer observations need to be thrown away.

In [35]:
temperature_fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_df = pd.read_csv(temperature_fname)

In [36]:
temp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [37]:
# Check temperature data
temp_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


In [38]:
temp_df.shape

(8599212, 7)

#### Temperature Dataset Dictionary

|Column Name       |	Description                                     |
| :- -: | :- |
|dt                |	Date in format YYYY-MM-DD                       |
|AverageTemperature|	Average temperature of the city in a given date |
|City              |	City Name                                       |
|Country           |	Country Name                                    |
|Latitude          |	Latitude                                        |
|Longitude         |	Longitude                                       |

### Airports Data
This is a simple table of airport codes and corresponding cities. The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code
Dataset can be found through this link (https://datahub.io/core/airport-codes#data)

In [39]:
airport_df = pd.read_csv("airport-codes_csv.csv")

In [40]:
airport_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [41]:
airport_df.shape

(55075, 12)

#### Airport Dataset Dictionary

|Column Name       |	Description                                     |
| :- -: | :- |
|ident|	Unique identifier|
|type|	Type of the airport|
|name|	Airport Name|
|elevation_ft|	Altitude of the airport|
|continent|	Continent|
|iso_country|	ISO code of the country of the airport|
|iso_region|	ISO code for the region of the airport|
|municipality|	City where the airport is located|
|gps_code|	GPS code of the airport|
|iata_code|	IATA code of the airport|
|local_code|	Local code of the airport|
|coordinates|	GPS coordinates of the airport|

### US Cities: Demographics
This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. 

This data comes from the US Census Bureau's 2015 American Community Survey. Data is availbe through this link (https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

In [42]:
demographics_df = pd.read_csv("us-cities-demographics.csv", sep=";")

In [43]:
demographics_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [44]:
demographics_df.shape

(2891, 12)

### US Cities: Demographics Dataset Dictionary 
|Column Name       |	Description  |
| :- -: | :- |
|City|	Name of the city|
|State|	US state of the city|
|Median Age|	The median of the age of the population|
|Male Population|	Number of the male population|
|Female Population|	Number of the female population|
|Total Population|	Number of the total population|
|Number of Veterans|	Number of veterans living in the city|
|Foreign-born|	Number of residents of the city that were not born in the city|
|Average Household Size|	Average size of the houses in the city|
|State Code|	Code of the state of the city|
|Race|	Race class|
|Count|	Number of individual of each race|

In [45]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [46]:
df_spark.count()

3096313

In [47]:
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

Data looks exactly like the sample, therefore we are going to proceed. 

In [49]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

##### Immigration dataset
A data dictionary is provided by Udacity that contains details of the missing data from Immigration dataset. Therefore, I will check the number to entries trying to find a primary key. 

In [50]:
df_spark.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [51]:
df_spark.createOrReplaceTempView("immigrants_table")

In [52]:
df_spark.count()

3096313

In [53]:
# check if cicid can be used as a primary key (if it has the same count)
spark.sql("""
SELECT COUNT (DISTINCT cicid)
FROM immigrants_table
""").show()

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              3096313|
+---------------------+



A date in SAS format is simply the number of days between the chosen date and the reference date (01-01-1960)

In [54]:
df_spark = spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM immigrants_table")
df_spark.createOrReplaceTempView("immigrants_table")

In [55]:
#Let’s check the gender distribution
df_spark.select("gender").groupBy("gender").count().show()

+------+-------+
|gender|  count|
+------+-------+
|     F|1302743|
|  null| 414269|
|     M|1377224|
|     U|    467|
|     X|   1610|
+------+-------+



In [56]:
# remove records with missing or incorrect values
spark.sql("""SELECT * FROM immigrants_table WHERE gender IN ('F', 'M')""").createOrReplaceTempView("immigrants_table")

Dividing data in 'I94VISA' column into three categories as follow:

- 1 = Business
- 2 = Pleasure
- 3 = Student

In [57]:
spark.sql("""SELECT *, CASE 
                        WHEN i94visa = 1.0 THEN 'Business' 
                        WHEN i94visa = 2.0 THEN 'Pleasure'
                        WHEN i94visa = 3.0 THEN 'Student'
                        ELSE 'N/A' END AS visa_type 
                        
                FROM immigrants_table""").createOrReplaceTempView("immigrants_table")

In [58]:
spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS departure_date 
                        
                FROM immigrants_table""").createOrReplaceTempView("immigrants_table")

In [59]:
#checking results from  previous query and confirming that there are no N/A values
spark.sql("SELECT count(*) FROM immigrants_table WHERE departure_date = 'N/A'").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [60]:
#Check that departure_date comes after arrival_date
spark.sql("""
SELECT COUNT(*)
FROM immigrants_table
WHERE departure_date <= arrival_date
""").show()

+--------+
|count(1)|
+--------+
|     372|
+--------+



In [61]:
spark.sql("""
SELECT arrival_date, departure_date
FROM immigrants_table
WHERE departure_date <= arrival_date
""").show(5)

+------------+--------------+
|arrival_date|departure_date|
+------------+--------------+
|  2016-04-30|    2016-04-29|
|  2016-04-30|    2016-04-28|
|  2016-04-30|    2016-04-29|
|  2016-04-05|    2012-04-14|
|  2016-04-05|    2016-03-14|
+------------+--------------+
only showing top 5 rows



Fixing this error is impractical, therefore I will drop these records. In addition, the defected records are relatively few. 

In [62]:
spark.sql("""
SELECT *
FROM immigrants_table
WHERE departure_date >= arrival_date
""").createOrReplaceTempView("immigrants_table")

The arrival modes are defined in the dictonary as follows:

- 1 = 'Air'
- 2 = 'Sea'
- 3 = 'Land'
- 9 = 'Not reported'

Only Air arrival will be kept as since airport dataset will be joined.  

In [63]:
spark.sql("""
SELECT i94mode, count(*)
FROM immigrants_table
GROUP BY i94mode
""").show()

+-------+--------+
|i94mode|count(1)|
+-------+--------+
|    1.0| 2465354|
|    3.0|   59244|
|    2.0|   17895|
|    9.0|    2458|
+-------+--------+



Next, I will check the age records, for the following: 
- Check for any missing values in age column.
- Check the birthyear column for any missing values.
- Check if the ages are resonable.
- Check each age group

In [64]:
#Check for any missing values in age column.
spark.sql("""
SELECT COUNT(*)
FROM immigrants_table
WHERE i94bir IS NULL
""").show()

+--------+
|count(1)|
+--------+
|      43|
+--------+



In [65]:
#Check the birthyear column for any missing values.
spark.sql("""
SELECT COUNT(biryear) 
FROM immigrants_table 
WHERE biryear IS NULL""").show()

#Check the birthyear column for Max and Min birthyear.
spark.sql("""
SELECT MAX(biryear), MIN(biryear) 
FROM immigrants_table 
WHERE biryear IS NOT NULL""").show()

+--------------+
|count(biryear)|
+--------------+
|             0|
+--------------+

+------------+------------+
|max(biryear)|min(biryear)|
+------------+------------+
|      2016.0|      1916.0|
+------------+------------+



In [66]:
#count of records older than 80, was born before 1936 since dataset is from 2016 
spark.sql("""
SELECT COUNT(*)
FROM immigrants_table 
WHERE biryear IS NOT NULL
AND biryear <= 1936
""").show()

# Check each age group
spark.sql("""
SELECT biryear, COUNT(*)
FROM immigrants_table 
WHERE biryear IS NOT NULL
AND biryear <= 1936
GROUP BY biryear
ORDER BY biryear ASC
""").show()

spark.sql("""
SELECT (2016-biryear)-i94bir AS difference, count(*) 
FROM immigrants_table 
WHERE i94bir IS NOT NULL GROUP BY difference
""").show()

+--------+
|count(1)|
+--------+
|   21693|
+--------+

+-------+--------+
|biryear|count(1)|
+-------+--------+
| 1916.0|       8|
| 1917.0|      15|
| 1918.0|      20|
| 1919.0|      35|
| 1920.0|      33|
| 1921.0|      68|
| 1922.0|      83|
| 1923.0|     144|
| 1924.0|     189|
| 1925.0|     254|
| 1926.0|     371|
| 1927.0|     520|
| 1928.0|     710|
| 1929.0|     972|
| 1930.0|    1261|
| 1931.0|    1588|
| 1932.0|    1938|
| 1933.0|    2319|
| 1934.0|    2940|
| 1935.0|    3593|
+-------+--------+
only showing top 20 rows

+----------+--------+
|difference|count(1)|
+----------+--------+
|       0.0| 2544908|
+----------+--------+



Min birth year is 1916, only 8 records are 100 years old which is (2.67e-4 %). Records over 90 years are 911 which is (0.03 %). and that is an acceptable percentage. 

In [67]:
#As per data dictionary, 'i94port' codes are 3 character long 
#checking length of 'i94port'
spark.sql("""
SELECT LENGTH (i94port) AS len
FROM immigrants_table
GROUP BY len
""").show()

+---+
|len|
+---+
|  3|
+---+



In [68]:
#check cities of living
spark.sql("""
SELECT count(*) 
FROM immigrants_table
WHERE i94cit IS NULL
""").show()

#check residence states
spark.sql("""
SELECT count(*) 
FROM immigrants_table
WHERE i94res IS NULL
""").show()

#check ports where the application is issued
spark.sql("""
SELECT count(*) 
FROM immigrants_table
WHERE i94port IS NULL
""").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+

+--------+
|count(1)|
+--------+
|       0|
+--------+

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [69]:
#Checking for missing values in 'visatype' column 
spark.sql("""
SELECT COUNT(*)
FROM immigrants_table
WHERE visatype IS NULL
""").show()

#check 'visatype' categories
spark.sql("""
SELECT visa_type, visatype, count(*)
FROM immigrants_table
GROUP BY visa_type, visatype
ORDER BY visa_type, visatype
""").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+

+---------+--------+--------+
|visa_type|visatype|count(1)|
+---------+--------+--------+
| Business|      B1|  186610|
| Business|      E1|    3182|
| Business|      E2|   16227|
| Business|     GMB|     132|
| Business|       I|    2962|
| Business|      I1|     214|
| Business|      WB|  185857|
| Pleasure|      B2|  967988|
| Pleasure|      CP|   11785|
| Pleasure|     CPL|       8|
| Pleasure|     GMT|   79454|
| Pleasure|     SBP|       2|
| Pleasure|      WT| 1060229|
|  Student|      F1|   27789|
|  Student|      F2|    1774|
|  Student|      M1|     708|
|  Student|      M2|      30|
+---------+--------+--------+



In [70]:
df_spark = spark.sql("""SELECT * FROM immigrants_table""") 

##### World Temperature Dataset

In [71]:
#checking dataset 
temp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [72]:
temp_df.nunique()

dt                                 3239
AverageTemperature               111994
AverageTemperatureUncertainty     10902
City                               3448
Country                             159
Latitude                             73
Longitude                          1227
dtype: int64

I noticed to issues to be fixed. 
- Date type needs to be fixed. Also, date will be truncated as the dataset goes back to Nov. 1743
- I will use USA only in countries as we are intersted in immigrants to USA.

In [73]:
# convert date type
#temp_df['Date'] = pd.to_datetime(temp_df.dt)

In [74]:
# convert date type
def convert_date(col_name):
    var = pd.to_datetime(col_name)
    return(var)

In [75]:
temp_df['Data'] = convert_date(temp_df.dt)

In [78]:
temp_df.rename(columns = {'dt':'Date'}, inplace = True)

In [79]:
# truncate date to 1960
temp_df=temp_df[temp_df['Date']>"1960-01-01"].copy()

In [80]:
# remove contries except for USA
temp_df = temp_df[temp_df['Country']=='United States']

In [81]:
# check for null values.
temp_df.isnull().sum()

Date                             0
AverageTemperature               1
AverageTemperatureUncertainty    1
City                             0
Country                          0
Latitude                         0
Longitude                        0
Data                             0
dtype: int64

In [82]:
#check the single null record
temp_df[temp_df.AverageTemperature.isnull()]

Unnamed: 0,Date,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,Data
287781,2013-09-01,,,Anchorage,United States,61.88N,151.13W,2013-09-01


This value goes back to 2013 and the main dataset is on 2016, therefore i will leave it as it is. or I can drop the row. 

In [83]:
#check most recent date in dataset
temp_df['Date'].max()

'2013-09-01'

It seems that there is no data after 2013 and the main dataset is for the year 2016, therefore I may neglect this dataset, or assume that the aim of this project is to perform joins between datasets. 

In [84]:
temp_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 165508 entries, 49236 to 8439246
Data columns (total 8 columns):
Date                             165508 non-null object
AverageTemperature               165507 non-null float64
AverageTemperatureUncertainty    165507 non-null float64
City                             165508 non-null object
Country                          165508 non-null object
Latitude                         165508 non-null object
Longitude                        165508 non-null object
Data                             165508 non-null datetime64[ns]
dtypes: datetime64[ns](1), float64(2), object(5)
memory usage: 11.4+ MB


##### Airport Dataset

In [85]:
airport_df.shape

(55075, 12)

In [86]:
airport_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [87]:
airport_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [88]:
# check airport location
airport_df.groupby('iso_country')['iso_country'].count()

iso_country
AD        2
AE       57
AF       64
AG        3
AI        1
AL       13
AM       13
AO      104
AQ       27
AR      848
AS        4
AT      145
AU     1963
AW        1
AZ       35
BA       15
BB        6
BD       16
BE      146
BF       51
BG      134
BH        4
BI        7
BJ       10
BL        1
BM        3
BN        2
BO      197
BQ        3
BR     4334
      ...  
TM       21
TN       15
TO        6
TR      124
TT        3
TV        3
TW       65
TZ      207
UA      191
UG       38
UM        6
US    22757
UY       54
UZ      176
VA        1
VC        6
VE      592
VG        3
VI        9
VN       50
VU       32
WF        2
WS        4
XK        6
YE       25
YT        1
ZA      489
ZM      103
ZW      138
ZZ        7
Name: iso_country, Length: 243, dtype: int64

In [89]:
#check null values for ISO code of the country of the airport
airport_df[airport_df['iso_country'].isna()].shape

(247, 12)

In [90]:
#check the location of null values to by continent 
airport_df[airport_df['iso_country'].isna()].groupby('continent')['continent'].count()

continent
AF    247
Name: continent, dtype: int64

It seems that all the null values are in Africa. Since we are interested only in USA these values will be droped. 

In [91]:
#drop the null values for ISO code of the country of the airport
airport_df = airport_df[airport_df['iso_country'].fillna('').str.upper().str.contains('US')].copy()

In [92]:
#check the Types of the airport
airport_df.groupby('type')['type'].count()

type
balloonport          18
closed             1326
heliport           6265
large_airport       170
medium_airport      692
seaplane_base       566
small_airport     13720
Name: type, dtype: int64

We are not interested in some of these airport, and it will be trancated. As they are not considred immigration airports. Such as 'closed', 'heliport', 'seaplane_base', and 'balloonport' airports. 

In [93]:
#drop irrelevant airports
values_to_drop = ['closed', 'heliport', 'seaplane_base', 'balloonport']
airport_df = airport_df[~airport_df['type'].str.strip().isin(values_to_drop)].copy()

In [94]:
#check for all null values:
airport_df.isnull().sum()

ident               0
type                0
name                0
elevation_ft       63
continent       14582
iso_country         0
iso_region          0
municipality       50
gps_code          399
iata_code       12717
local_code        199
coordinates         0
dtype: int64

Since 'municipality' indicates the City where the airport is located, and 'I94PORT'  indicates location (port) where the application is issued, so we can use these two columns to join both datasets. 

Now, these 50 missing values can not be fixed so, I will remove them. 

In [95]:
#drop the nan values 
airport_df = airport_df[~airport_df['municipality'].isna()].copy()

In [96]:
# convert 'municipality' to uppercase to match immigration dataset
airport_df.municipality = airport_df.municipality.str.upper()

In [97]:
#check values in 'iso_region' column
airport_df.groupby('iso_region')['iso_region'].count()

iso_region
US-AK      586
US-AL      197
US-AR      291
US-AZ      214
US-CA      551
US-CO      288
US-CT       56
US-DC        2
US-DE       36
US-FL      522
US-GA      365
US-HI       35
US-IA      232
US-ID      238
US-IL      579
US-IN      486
US-KS      372
US-KY      164
US-LA      281
US-MA       79
US-MD      157
US-ME      122
US-MI      379
US-MN      361
US-MO      411
US-MS      211
US-MT      255
US-NC      349
US-ND      297
US-NE      259
US-NH       54
US-NJ      116
US-NM      149
US-NV      113
US-NY      402
US-OH      492
US-OK      372
US-OR      357
US-PA      486
US-RI       10
US-SC      173
US-SD      162
US-TN      228
US-TX     1546
US-U-A       3
US-UT      103
US-VA      311
US-VT       66
US-WA      379
US-WI      457
US-WV       83
US-WY       95
Name: iso_region, dtype: int64

In [98]:
# check length iso_region records, it should be less than 5
airport_df['len'] = airport_df["iso_region"].apply(len)
# remove incorrect records.
airport_df = airport_df[airport_df['len']==5].copy()
# extract the state code
airport_df['state'] = airport_df['iso_region'].str.strip().str.split("-", n = 1, expand = True)[1]

##### US Cities: Demographics

In [99]:
demographics_df.shape

(2891, 12)

In [100]:
demographics_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


In [101]:
demographics_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [102]:
#check null values
demographics_df.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [103]:
# remove spaces and convert city to upper case
demographics_df.City = demographics_df.City.str.strip().str.upper()

In [104]:
#check for duplications to find a pk 
demographics_df[demographics_df[['City', 'State','Race']].duplicated()]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model



This data model aims to check the immigrants flow to the United States. i94 data will be our fact table. Data model will have fact table called (fact_immigrant) and four dimension tables time_dim, temperature_dim, airport_dim, and demographics_dim. 

**fact_immigrant**
- cicid
- citizenship_country
- residence_country
- city pk
- state pk 
- arrival_date pk 
- departure_date pk
- age
- visa_type 
- detailed_visa_type 

**time_dim** 
- date pk
- year
- month
- day
- week
- weekday
- dayofyear

**temperature_dim**
- date
- City pk 
- average temperature
- average temperature uncertainty

**airport_dim**
- ident
- type
- name
- elevation_ft
- state pk 
- municipality
- iata_code

**demographics_dim**
- City
- state
- state_code pk
- median_age
- male_population
- female_population
- total population
- Foreign_born
- Average_Household_Size
- Race
- Count


![diagrame](capstone.png)

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

The pipeline steps are described below:
1. Extracting data as described below: 
 - Clean I94 data as described in step 2 to create Spark dataframe `df_spark` for each month.
 - Clean temperature data as described in step 2 to create Spark dataframe `temp_df`.
 - Clean airport dataset in step 2 and `airport_df` is created.
 - clean US Cities: Demographics dataset in step 2 and `demographics_df` is created.
2. Data Transformation and Loading:
 - **fact_immigrants**
   - Drop non air arrival records
   - Drop incorrect gender records
   - Convert dates
   - Replace country codes with the character string equivalents
   - Replace visa_type with character string
   - Replace port of entry with city and state
   - Drop records with non US entry port
   - Calculate age in a new colum using birth year.
   - Write to parquet
   
 - **time_dim**
   - Get all the arrival dates from dataset
   - Extract year, month, day, week from the date and insert all the values in the table
   - Write to parquet

 - **temperature_dim**
   - Drop all data for cities outside the united states;
   - Drop all data for dates before 1960 since airtravel wasn't popular before that.
   - Convert city to upper case
   - Compute the average temperature and uncertainty over date+city partitions
   - Insert into the temperature table since our dataset may include new cities in the future
   - Write to parquet
   
 - **airport_dim**
   - Drop non us airports
   - Drop port of entries like 'closed', 'heliport', 'seaplane_base', 'balloonport'
   - Drop records where municipalities are null.
   - Convert municipality to upper case
   - Insert to our table
   - Write to parquet
   
 - **demographics_dim**
   - Convert to city names to upper case
   - Insert to our table
   - Write to parquet

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [105]:
# Write code here

In [106]:
# load dictionary data
df_countryCodes = pd.read_csv('countries.csv')
df_i94portCodes = pd.read_csv('i94portCodes.csv')

# load the various csv files into pandas dataframes
demographics_df = pd.read_csv('us-cities-demographics.csv', sep=';')
temp_df = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')

# load the SAS data
df_spark=spark.read.parquet("sas_data")

In [107]:
# convert dictionaries data to views in spark context & perform SQL operations 
spark_df_countryCodes = spark.createDataFrame(df_countryCodes)
spark_df_countryCodes .createOrReplaceTempView("countryCodes")

In [108]:
#remove all  null records
df_i94portCodes = df_i94portCodes[~df_i94portCodes.state.isna()].copy()

In [109]:
# Drop values for non US. 
nonUSairports = ['CANADA', 'Canada', 'NETHERLANDS', 'NETH ANTILLES', 'THAILAND', 'ETHIOPIA', 'PRC', 'BERMUDA', 'COLOMBIA', 'ARGENTINA', 'MEXICO', 
               'BRAZIL', 'URUGUAY', 'IRELAND', 'GABON', 'BAHAMAS', 'MX', 'CAYMAN ISLAND', 'SEOUL KOREA', 'JAPAN', 'ROMANIA', 'INDONESIA',
               'SOUTH AFRICA', 'ENGLAND', 'KENYA', 'TURK & CAIMAN', 'PANAMA', 'NEW GUINEA', 'ECUADOR', 'ITALY', 'EL SALVADOR']
df_i94portCodes = df_i94portCodes[~df_i94portCodes.state.isin(nonUSairports)].copy()

In [110]:
spark_df_i94portCodes = spark.createDataFrame(df_i94portCodes)
spark_df_i94portCodes .createOrReplaceTempView("i94portCodes")

In [111]:
df_spark.createOrReplaceTempView("immigrants_table")

In [112]:
# drop records with non airport entry 
spark.sql("""
SELECT *
FROM immigrants_table
WHERE i94mode = 1
""").createOrReplaceTempView("immigrants_table")

In [113]:
# convert departure date to valid value
spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS departure_date 
                        
                FROM immigrants_table""").createOrReplaceTempView("immigrants_table")

In [114]:
# we use an inner join to drop invalid codes
#country of citizenship
spark.sql("""
SELECT im.*, cc.country AS citizenship_country
FROM immigrants_table im
INNER JOIN countryCodes cc
ON im.i94cit = cc.code
""").createOrReplaceTempView("immigrants_table")

In [115]:
#country of residence
spark.sql("""
SELECT im.*, cc.country AS residence_country
FROM immigrants_table im
INNER JOIN countryCodes cc
ON im.i94res = cc.code
""").createOrReplaceTempView("immigrants_table")

In [116]:
# convert arrival date to valid value
spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM immigrants_table").createOrReplaceTempView("immigrants_table")

In [117]:
# Add entry_port name and state
spark.sql("""
SELECT im.*, pc.location AS entry_port, pc.state AS entry_port_state
FROM immigrants_table im 
INNER JOIN i94portCodes pc
ON im.i94port = pc.code
""").createOrReplaceTempView("immigrants_table")

In [118]:
# calculate age of each record
spark.sql("""
SELECT *, (2016-biryear) AS age 
FROM immigrants_table
""").createOrReplaceTempView("immigrants_table")

In [119]:
# Add visa character string 
spark.sql("""SELECT *, CASE 
                        WHEN i94visa = 1.0 THEN 'Business' 
                        WHEN i94visa = 2.0 THEN 'Pleasure'
                        WHEN i94visa = 3.0 THEN 'Student'
                        ELSE 'N/A' END AS visa_type 
                        
                FROM immigrants_table""").createOrReplaceTempView("immigrants_table")

In [120]:
# drop records where  gender is undefined
spark.sql("""
SELECT *
FROM immigrants_table 
WHERE gender IN ('F', 'M')
""").createOrReplaceTempView("immigrants_table")

In [121]:
# Create and Insert the fact_immigrants table data to a spark dataframe
fact_immigrants = spark.sql("""
                        SELECT 
                            cicid, 
                            citizenship_country,
                            residence_country,
                            TRIM(UPPER (entry_port)) AS city,
                            TRIM(UPPER (entry_port_state)) AS state,
                            arrival_date,
                            departure_date,
                            age,
                            visa_type,
                            visatype AS detailed_visa_type

                        FROM immigrants_table
""")

In [122]:
# extract data to create dimension table
time_dim = spark.sql("""
SELECT DISTINCT arrival_date AS date
FROM immigrants_table
""")
time_dim.createOrReplaceTempView("time_dim_table")

In [123]:
# extract year, month, day, weekofyear, dayofweek and weekofyear from the date and insert all the values in the time_dim table;
time_dim = spark.sql("""
SELECT date, 
YEAR(date) AS year, 
MONTH(date) AS month, 
DAY(date) AS day, 
WEEKOFYEAR(date) AS week, 
DAYOFWEEK(date) as weekday,
DAYOFYEAR(date) year_day
FROM time_dim_table
ORDER BY date ASC
""")

In [124]:
# Drop non US records
temp_df = temp_df[temp_df['Country']=='United States'].copy()

# Convert the date to valid datetime 
temp_df['date'] = pd.to_datetime(temp_df.dt)

# Remove all data before 1960
temp_df=temp_df[temp_df['date']>"1960-01-01"].copy()

# convert cities to upper case
temp_df.City = temp_df.City.str.strip().str.upper() 

In [125]:
# convert the dataframes from pandas to spark
spark_temp_df = spark.createDataFrame(temp_df)
spark_temp_df.createOrReplaceTempView("temperature")

In [126]:
temperature_dim = spark.sql("""
SELECT
    DISTINCT date, 
    city,
    AVG(AverageTemperature) OVER (PARTITION BY date, City) AS average_temperature, 
    AVG(AverageTemperatureUncertainty)  OVER (PARTITION BY date, City) AS average_termperature_uncertainty
    
FROM temperature
""")

In [127]:
# load the csv directly into a spark dataframe 
spark_airport_df = spark.read.format("csv").option("header", "true").load('airport-codes_csv.csv')
spark_airport_df.createOrReplaceTempView("airport")

In [128]:
#convert ISO country to upper case
spark.sql("""
SELECT *
FROM airport
WHERE iso_country IS NOT NULL
AND UPPER(TRIM(iso_country)) LIKE 'US'
""").createOrReplaceTempView("airport")

In [129]:
#Drop port of entries like 'closed', 'heliport', 'seaplane_base', 'balloonport'
#Drop records where municipalities are null.
#Convert LENGTH(iso_region) to be 5
spark.sql("""
SELECT *
FROM airport
WHERE LOWER(TRIM(type)) NOT IN ('closed', 'heliport', 'seaplane_base', 'balloonport')
AND municipality IS NOT NULL
AND LENGTH(iso_region) = 5
""").createOrReplaceTempView("airport")

In [130]:
#create airport_dim
airport_dim = spark.sql("""
SELECT TRIM(ident) AS ident,
type,
name,
elevation_ft,
SUBSTR(iso_region, 4) AS state,
TRIM(UPPER(municipality)) AS municipality,
iata_code
FROM airport
""")

In [131]:
demographics_df.City = demographics_df.City.str.strip().str.upper()
demographics_df['State Code'] = demographics_df['State Code'].str.strip().str.upper()
demographics_df.Race = demographics_df.Race.str.strip().str.upper()

# convert the dataframes from pandas to spark
spark_demographics_df = spark.createDataFrame(demographics_df)
spark_demographics_df.createOrReplaceTempView("demographics")

In [132]:
# create demographics dim table
demographics_dim = spark.sql("""
                                SELECT  City, 
                                        State, 
                                        'Median Age' AS median_age, 
                                        'Male Population' AS male_population, 
                                        'Female Population' AS female_population, 
                                        'Total Population' AS total_population, 
                                        'Foreign-born' AS foreign_born, 
                                        'Average Household Size' AS average_household_size, 
                                        'State Code' AS state_code, 
                                        Race, 
                                        Count
                                FROM demographics
""")

In [134]:
# Saving the data in parquet format
fact_immigrants.write.parquet("fact_immigrants")
time_dim.write.parquet("time_dim")
temperature_dim.write.parquet("temperature_dim")
airport_dim.write.parquet("airport_dim")
demographics_dim.write.parquet("demographics_dim")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [135]:
# Perform quality checks here
fact_immigrants.createOrReplaceTempView("fact_immigrants")
time_dim.createOrReplaceTempView("time_dim")
temperature_dim.createOrReplaceTempView("temperature_dim")
airport_dim.createOrReplaceTempView("airport_dim")
demographics_dim.createOrReplaceTempView("demographics_dim")

In [136]:
#check the number of rows in  time table 
spark.sql("""
SELECT COUNT(*) 
FROM time_dim
""").show()

# make sure each row has a distinct date key 
spark.sql("""
SELECT COUNT(DISTINCT date) 
FROM time_dim
""").show()

+--------+
|count(1)|
+--------+
|      30|
+--------+

+--------------------+
|count(DISTINCT date)|
+--------------------+
|                  30|
+--------------------+



In [137]:
#check all dates from the fact table are included in the time dimension (NULL is the expected result)
spark.sql("""
SELECT DISTINCT date
FROM time_dim

MINUS

(SELECT DISTINCT arrival_date AS date
FROM immigrants_table

WHERE departure_date IS NOT NULL)

""").show()

+----+
|date|
+----+
+----+



In [138]:
#immigration table checking

# The number of primary key from the staging table 
spark.sql("""
SELECT count(distinct cicid) 
FROM immigrants_table
""").show()

# match primary key count from the fact table 
spark.sql("""
SELECT count(distinct cicid)
FROM fact_immigrants
""").show()

#It should match the count from the fact table 
spark.sql("""
SELECT count(*)
FROM fact_immigrants
""").show()

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              2165257|
+---------------------+

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              2165257|
+---------------------+

+--------+
|count(1)|
+--------+
| 2165257|
+--------+



As seen above The count of primary key from the staging table matches the count from the fact table and they are unique and different

In [139]:
#checking temperature dim. table  (city + date) are the primary keys 

spark.sql("""
SELECT count(*) 
FROM temperature_dim
""").show()

#make sure that all primary keys are unique 
spark.sql("""
SELECT COUNT(DISTINCT date, city) 
FROM temperature_dim
""").show()

+--------+
|count(1)|
+--------+
|  159712|
+--------+

+--------------------------+
|count(DISTINCT date, city)|
+--------------------------+
|                    159712|
+--------------------------+



After checking temperature dim. table, we found that (city + date) unique so this combination should be used to join the tables.

In [140]:
 #checking the demographics dim. table counts
spark.sql("""
SELECT count(*) 
FROM demographics_dim
""").show()
#make sure that all primary keys are unique 
spark.sql("""
SELECT COUNT(DISTINCT city, state, race) 
FROM demographics_dim
""").show()

+--------+
|count(1)|
+--------+
|    2891|
+--------+

+---------------------------------+
|count(DISTINCT city, state, race)|
+---------------------------------+
|                             2891|
+---------------------------------+



After checking demographics dim.  table, we found that (city + state+ race) unique so this combination should be used to join the tables.

In [142]:
#checking datasets to select the best way to join demographic dim table with immigrants fact table
fact_immigrants.show(5)
demographics_dim.show(5)

+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|    cicid|citizenship_country|residence_country|  city|state|arrival_date|departure_date| age|visa_type|detailed_visa_type|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|4041803.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|49.0| Business|                B1|
|4041804.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|          null|38.0| Business|                B1|
|4041805.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|45.0| Business|                B1|
|4041806.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|25.0| Business|                B1|
| 452706.0|             NORWAY|           NORWAY|BANGOR|   ME|  2016-04-03|    2016-04-05|38.0| Business|                B1|


In [143]:
#check the distinct combinations of city and state in fact table
spark.sql("""
SELECT COUNT(DISTINCT city, state)
FROM fact_immigrants
""").show()

# checking the combinations of city and state in demographic dim and compare it to fact immigrants
spark.sql("""
SELECT COUNT(*)
FROM
(
SELECT DISTINCT city, state
FROM fact_immigrants
) fi
INNER JOIN 
(
SELECT DISTINCT City, state_code
FROM demographics_dim 
) da
ON fi.city = da.City
AND fi.state = da.state_code
""").show(5)

+---------------------------+
|count(DISTINCT city, state)|
+---------------------------+
|                        151|
+---------------------------+

+--------+
|count(1)|
+--------+
|       0|
+--------+



Almost half or less of the data in demographic dimension tables (demographics_dim) are common with data in fact table (fact_immigrants). Taking into consideration that immigration table contains data for one month only. So, this is an acceptable ratio. Therefore, they could be joined together.

I will try filtering out city/state combinations that are not common, then check ehat would happen. 

In [151]:
# checking count of common combinations of city and state 
spark.sql("""
SELECT COUNT(*)
FROM fact_immigrants
WHERE CONCAT(city, state) IN (
    SELECT CONCAT(fi.city, fi.state)
    FROM
    (
        SELECT DISTINCT city, state
        FROM fact_immigrants
    ) fi
    INNER JOIN 
    (
        SELECT DISTINCT municipality, state
        FROM airport_dim 
    ) da
    ON fi.city = da.municipality
    AND fi.state = da.state
)
""").show(2)



+--------+
|count(1)|
+--------+
| 1983869|
+--------+



Almost 91% of the data have common combinations of city and state which is a great percentage. However, I have a concern demographic dataset includes only cities with populations over 65,000 inhabitants.

In [152]:
# checking the airport dim. table (ident) should be the primary key
spark.sql("""
SELECT count(*) 
FROM airport_dim
""").show()

#make sure that all primary keys are unique 
spark.sql("""
SELECT COUNT(DISTINCT ident) 
FROM airport_dim
""").show()

+--------+
|count(1)|
+--------+
|   14529|
+--------+

+---------------------+
|count(DISTINCT ident)|
+---------------------+
|                14529|
+---------------------+



In [153]:
# checking for the best way to join airport dim table with immigrants fact table
fact_immigrants.show(5)
airport_dim.show(5)

+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|    cicid|citizenship_country|residence_country|  city|state|arrival_date|departure_date| age|visa_type|detailed_visa_type|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|4041803.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|49.0| Business|                B1|
|4041804.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|          null|38.0| Business|                B1|
|4041805.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|45.0| Business|                B1|
|4041806.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|25.0| Business|                B1|
| 452706.0|             NORWAY|           NORWAY|BANGOR|   ME|  2016-04-03|    2016-04-05|38.0| Business|                B1|


Some cities have more than one airport. Immigration dataset does not have any data about airports. Therefore, I will check (city + state) combination.

In [154]:
#select distinct combinations of city and state from fact table
spark.sql("""
SELECT COUNT(DISTINCT city, state)
FROM fact_immigrants
""").show()

# select common combinations of city and state 
spark.sql("""
SELECT COUNT(*)
FROM
(
SELECT DISTINCT city, state
FROM fact_immigrants
) fi
INNER JOIN 
(
SELECT DISTINCT municipality, state
FROM airport_dim
) da
ON fi.city = da.municipality
AND fi.state = da.state
""").show(2)

+---------------------------+
|count(DISTINCT city, state)|
+---------------------------+
|                        151|
+---------------------------+

+--------+
|count(1)|
+--------+
|     102|
+--------+



Almost two thirds of data in airports dimension tables (airpost_dim) are common with data in fact table (fact_immigrants). Taking into consideration that immigration table contains data for one month only. So, this is an acceptable ratio. Therefore, they could be joined together using a left join. 

In [155]:
# This function checks the null values
def nullValueCheck(spark_ctxt, tables_check):
    """
    This function checks null values on specific columns of given tables received as parameters and 
    raises a ValueError exception when null values are encountered.
   parameters:
    spark_ctxt: spark context where the data quality check is performed
    tables_check: A dictionary containing (table, columns) pairs  for each table.   
    """  
    for table in tables_check:
        print(f"Performing data quality check on table {table}...")
        for column in tables_check[table]:
            returnedValue = spark_ctxt.sql(f"""SELECT COUNT(*) as nbr FROM {table} WHERE {column} IS NULL""")
            if returnedValue.head()[0] > 0:
                raise ValueError(f"Data quality check failed! Found NULL values in {column} column!")
        print(f"Table {table} passed.")

In [158]:
#checking the dictionary of tables as well as columns 
tables_check = { 'fact_immigrants' : ['cicid'], 'time_dim':['date'], 'temperature_dim':['date','City'], 'airport_dim': ['ident'], 'demographics_dim':['City','state_code']}

#calling function on the spark context
nullValueCheck(spark, tables_check)


Performing data quality check on table fact_immigrants...
Table fact_immigrants passed.
Performing data quality check on table time_dim...
Table time_dim passed.
Performing data quality check on table temperature_dim...
Table temperature_dim passed.
Performing data quality check on table airport_dim...
Table airport_dim passed.
Performing data quality check on table demographics_dim...
Table demographics_dim passed.


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

What's the goal?
The goal is to check the flow of immigrants to the US using airport data, temperature data, demographic data, and a huge dataset from the US National Tourism and Trade Office. 

How often the data should be updated and why?
The data should be updated monthly in conjunction with the current raw file format.

Spark was chosen since it can easily handle multiple file formats (including SAS) containing large amounts of data. Spark SQL was chosen to process the large input files into dataframes and manipulated via standard SQL join operations to form additional tables.

If the data needs to populate a dashboard daily to meet an SLA then we could use a scheduling tool such as Airflow to run the ETL pipeline overnight.

If I had 100x times the size of the processed files I would load the data into AWS S3, then use spark to do EDA, load it back to S3 and finally ETL into Redshift.

Redshift is a good fit if 100 persons would need to access the data, it should be able to handle this with no problem. We could increase the specs of our cluster if it was not fast enough to serve everyone.