# Project Title
### Data Engineering Capstone Project

#### Project Summary

There are lot of people travelling to US every year for businesss, pleasure and studies.
The project is to find relationship between US immigration, the weather in the city and the demography of the city.

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up
* Step 6 Analytics performed on the fact table

In [21]:
# Import statements
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
import datetime
from pyspark.sql.types import TimestampType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import split
from pyspark.sql.functions import col
from pyspark.sql.functions import desc
from pyspark.sql.functions import datediff, to_date
from pyspark.sql.functions import year, month
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

### Step 1: Scope the Project and Gather Data

#### Scope 

The project scope is to identify any correlation between number of people travelling to a city in US for business,pleasure,study and  weather in the city. We also try to identify if the number of visitors have any relation to the demography of immigrant population in that city.
For this project, data being used are 
1. Immigration data
2. Temperature Data 
3. US Cities: Demographics
4. Airport Codes


At the end the goal is to get the number of immigrant travelling to US per city per month, get the weather for the city for that month , also get the count of immigrant population in that city.We do so by joining the immigration dataset with airport dataset on the port of entry column,then join the temperature and demographics dataset using the city column.

The tools used in the project are:
* AWS S3
* Spark
* EMR
#### Describe and Gather Data 

I94 Immigration Data: https://travel.trade.gov/research/reports/i94/historical/2016.html
This data comes from the US National Tourism and Trade Office.This dataset contains information about port of entry,type of visa, mode of transportation, age groups, states intended to visit and country of origin.

World Temperature Data: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data
This dataset came from Kaggle. We use dataset has land temperatures by city for each month.

U.S. City Demographic https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
This data comes from OpenSoft.This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey.

Airport Code Table: https://datahub.io/core/airport-codes#data
This is a simple table of airport codes and corresponding cities. The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code.

#### Assumption
The dataset are collected at different year. Assumption is made that the data is collected at the same year.

In [22]:
# Read immigration SAS data
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_pd_immi = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
df_pd_immi.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [23]:
#Read temperature source file
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_pd_temp = pd.read_csv(fname)

In [7]:
df_pd_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
8599207,2013-05-01,11.464,0.236,Zwolle,Netherlands,52.24N,5.26E
8599208,2013-06-01,15.043,0.261,Zwolle,Netherlands,52.24N,5.26E
8599209,2013-07-01,18.775,0.193,Zwolle,Netherlands,52.24N,5.26E
8599210,2013-08-01,18.025,0.298,Zwolle,Netherlands,52.24N,5.26E
8599211,2013-09-01,,,Zwolle,Netherlands,52.24N,5.26E


In [14]:
#Read demographics source file
fname = 'us-cities-demographics.csv'
df_pd_city = pd.read_csv(fname,sep=';')

In [5]:
df_pd_city.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [17]:
#Read airport source file
fname = 'airport-codes_csv.csv'
df_pd_airport = pd.read_csv(fname)


In [11]:
df_pd_airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


### Step 2: Explore and Assess the Data
#### Explore the Data 
To explore the data functions of pandas dataframe are used.
1. df.count() -- To get count of non null values for each column
2. df.isnull().sum()  -- To get a count of how many null values there are in each column.

#### Cleaning Steps
The following steps would be done after reading the files to a spark dataframe
1. Select only the required columns from the dataset
2. Remove Null columns
3. Remove Duplicates

In [11]:
# Performing cleaning tasks here
# Get count of non null values for each column
df_pd_immi.count()




cicid       3096313
i94yr       3096313
i94mon      3096313
i94cit      3096313
i94res      3096313
i94port     3096313
arrdate     3096313
i94mode     3096074
i94addr     2943941
depdate     2953856
i94bir      3095511
i94visa     3096313
count       3096313
dtadfile    3096312
visapost    1215063
occup          8126
entdepa     3096075
entdepd     2957884
entdepu         392
matflag     2957884
biryear     3095511
dtaddto     3095836
gender      2682044
insnum       113708
airline     3012686
admnum      3096313
fltno       3076764
visatype    3096313
dtype: int64

In [12]:
#get count of null values for each column
df_pd_immi.isnull().sum()

cicid             0
i94yr             0
i94mon            0
i94cit            0
i94res            0
i94port           0
arrdate           0
i94mode         239
i94addr      152372
depdate      142457
i94bir          802
i94visa           0
count             0
dtadfile          1
visapost    1881250
occup       3088187
entdepa         238
entdepd      138429
entdepu     3095921
matflag      138429
biryear         802
dtaddto         477
gender       414269
insnum      2982605
airline       83627
admnum            0
fltno         19549
visatype          0
dtype: int64

In [10]:
# Get count of non null values for each column 
df_pd_temp.count()

dt                               8599212
AverageTemperature               8235082
AverageTemperatureUncertainty    8235082
City                             8599212
Country                          8599212
Latitude                         8599212
Longitude                        8599212
dtype: int64

In [13]:
#get count of null values for columns
df_pd_temp.isnull().sum()


dt                                    0
AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
dtype: int64

In [15]:
# Get count of non null values for each column
df_pd_city.count()

City                      2891
State                     2891
Median Age                2891
Male Population           2888
Female Population         2888
Total Population          2891
Number of Veterans        2878
Foreign-born              2878
Average Household Size    2875
State Code                2891
Race                      2891
Count                     2891
dtype: int64

In [16]:
#get count of null values for columns
df_pd_city.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [18]:
# Get count of non null values for each column
df_pd_airport.count()

ident           55075
type            55075
name            55075
elevation_ft    48069
continent       27356
iso_country     54828
iso_region      55075
municipality    49399
gps_code        41030
iata_code        9189
local_code      28686
coordinates     55075
dtype: int64

In [19]:
#get count of null values for columns
df_pd_airport.isnull().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Please refer to Conceptual_Data_model.pdf 

We use a star schema.
We create the following dimension tables from the source:
*  temperature_table
*  immigration_table
*  demgraphics_table
*  time_table
*  airport_table

We then join the the dimension tables and create the below fact table.
* us_city_immigration

The reason for choosing star schema is that the source data was at grain needed for the project. Dim tables has Pk to identify each row and could be brought to fact table as Fk.

#### 3.2 Mapping Out Data Pipelines
The source files are stored in a S3 bucket.
Using spark on Amazon EMR, the source files are extracted, transformed and loaded to Parquet.
Parquet files are written to s3.




### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [3]:
# Create spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.1.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0")\
.enableHiveSupport().getOrCreate()
#df_spark.show()

In [3]:
#Read source to spark dataframe
df_temp=spark.read.csv('../../data2/GlobalLandTemperaturesByCity.csv',header='true')

In [4]:
#Cleansing and transforming temperature data
df_temp = df_temp.drop('Latitude','Longitude').filter("Country == 'United States' AND dt > '2011-12-01' AND dt < '2013-01-01'").withColumn('month',month(to_date(col('dt'))))\
        .select(col('dt'),col('AverageTemperature').alias('average_temperature'),col('AverageTemperatureUncertainty').alias('average_temperature_uncertainty')\
        ,col('City').alias('city'),col('Country').alias('country'),col('month'))
df_temp.show()

+----------+--------------------+-------------------------------+-------+-------------+-----+
|        dt| average_temperature|average_temperature_uncertainty|   city|      country|month|
+----------+--------------------+-------------------------------+-------+-------------+-----+
|2012-01-01|               7.996|                          0.204|Abilene|United States|    1|
|2012-02-01|               8.434|                          0.252|Abilene|United States|    2|
|2012-03-01|              15.628|            0.17300000000000001|Abilene|United States|    3|
|2012-04-01|  21.069000000000003|            0.38799999999999996|Abilene|United States|    4|
|2012-05-01|              24.698|            0.32299999999999995|Abilene|United States|    5|
|2012-06-01|              28.217|                          0.126|Abilene|United States|    6|
|2012-07-01|              29.581|            0.28800000000000003|Abilene|United States|    7|
|2012-08-01|  29.104000000000006|                          0

In [10]:
df_temp.printSchema()

root
 |-- dt: string (nullable = true)
 |-- average_temperature: string (nullable = true)
 |-- average_temperature_uncertainty: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- month: integer (nullable = true)



In [9]:
#Check again for any nulls in the data after data transformation
df_pd = df_temp.toPandas()
df_pd.isnull().sum()

dt                                 0
average_temperature                0
average_temperature_uncertainty    0
city                               0
country                            0
month                              0
dtype: int64

In [5]:
#Read airport data to spark dataframe
df_airport=spark.read.csv('./airport-codes_csv.csv',header='true')


In [6]:
#Cleaning and transformingt he airport data
df_airport = df_airport.withColumn('country',split(col("iso_region"),"-").getItem(0)).withColumn("state",split(col("iso_region"),"-").getItem(1))

In [7]:
df_airport=df_airport.drop('coordinates','local_code','gps_code','elevation_ft').dropna(how = "any", subset = ["municipality","iata_code"]).filter("iso_country == 'US'")
df_airport=df_airport.withColumn("new_municipality",split(col("municipality"),"/").getItem(0)).distinct()
df_airport.show()


+-----+--------------+--------------------+---------+-----------+----------+------------+---------+-------+-----+----------------+
|ident|          type|                name|continent|iso_country|iso_region|municipality|iata_code|country|state|new_municipality|
+-----+--------------+--------------------+---------+-----------+----------+------------+---------+-------+-----+----------------+
| KABE|medium_airport|Lehigh Valley Int...|       NA|         US|     US-PA|   Allentown|      ABE|     US|   PA|       Allentown|
| KOKK| small_airport|Kokomo Municipal ...|       NA|         US|     US-IN|      Kokomo|      OKK|     US|   IN|          Kokomo|
| PHKO|medium_airport|Ellison Onizuka K...|       NA|         US|     US-HI| Kailua/Kona|      KOA|     US|   HI|          Kailua|
| KEUF| small_airport|        Weedon Field|       NA|         US|     US-AL|     Eufaula|      EUF|     US|   AL|         Eufaula|
| KLNS|medium_airport|   Lancaster Airport|       NA|         US|     US-PA|   Lanc

In [10]:
#Check for nulls again after data transformation
df_pd = df_airport.toPandas()
df_pd.isnull().sum()

ident               0
type                0
name                0
continent           0
iso_country         0
iso_region          0
municipality        0
iata_code           0
country             0
state               0
new_municipality    0
dtype: int64

In [8]:
#Read demographics data to spark dataframe
df_us_cities=spark.read.option("sep",";").csv('./us-cities-demographics.csv',header='true')


In [9]:
#Cleanse and transform demographics data
df_us_cities=df_us_cities.select(col('City').alias('city'),col('State').alias('state'),col('Total Population').alias('total_population'),col('Foreign-born').alias('foreign_born'),col('State Code').alias('state_code')).dropna().distinct()
df_us_cities.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- total_population: string (nullable = true)
 |-- foreign_born: string (nullable = true)
 |-- state_code: string (nullable = true)



In [13]:
#Check for nulls again after data transformation
df_pd = df_us_cities.toPandas()
df_pd.isnull().sum()

City                0
State               0
total_population    0
foreign_born        0
state_code          0
dtype: int64

In [10]:
# Read immigration data to spark dataframe
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark.count()

#Function to convert SAS date to date format
get_date = udf(lambda x:datetime.datetime(1960,1,1) + datetime.timedelta(days=int(x)),TimestampType())

#Cleanse and transform immigration data
df_spark = df_spark.withColumn('arrival_date', get_date(df_spark.arrdate))\
.dropna(how = "any", subset = ["i94mode","i94addr","i94bir"])\
.select('cicid','i94yr','i94mon','i94cit','i94res','i94port','arrdate','i94mode','i94addr','i94bir','i94visa','biryear','visatype','arrival_date')
#df_spark.show()
df_spark.printSchema()


root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- biryear: double (nullable = true)
 |-- visatype: string (nullable = true)
 |-- arrival_date: timestamp (nullable = true)



In [18]:
df_spark.show()

+-----+------+------+------+------+-------+-------+-------+-------+------+-------+-------+--------+-------------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|i94bir|i94visa|biryear|visatype|       arrival_date|
+-----+------+------+------+------+-------+-------+-------+-------+------+-------+-------+--------+-------------------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|  25.0|    3.0| 1991.0|      F1|2016-04-07 00:00:00|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|  55.0|    2.0| 1961.0|      B2|2016-04-01 00:00:00|
| 16.0|2016.0|   4.0| 101.0| 101.0|    NYC|20545.0|    1.0|     MA|  28.0|    2.0| 1988.0|      B2|2016-04-01 00:00:00|
| 17.0|2016.0|   4.0| 101.0| 101.0|    NYC|20545.0|    1.0|     MA|   4.0|    2.0| 2012.0|      B2|2016-04-01 00:00:00|
| 18.0|2016.0|   4.0| 101.0| 101.0|    NYC|20545.0|    1.0|     MI|  57.0|    1.0| 1959.0|      B1|2016-04-01 00:00:00|
| 19.0|2016.0|   4.0| 101.0| 101.0|    N

In [15]:
#Check again for nulls 
df_pd = df_spark.toPandas()
df_pd.isnull().sum()

cicid           0
i94yr           0
i94mon          0
i94cit          0
i94res          0
i94port         0
arrdate         0
i94mode         0
i94addr         0
i94bir          0
i94visa         0
biryear         0
visatype        0
arrival_date    0
dtype: int64

In [11]:
#Create a dataframe for time 
df_time = df_spark.select(col('arrival_date').alias('date'),\
              hour(col('arrival_date')).alias('hour'),\
              dayofmonth(col('arrival_date')).alias('day'),\
              weekofyear(col('arrival_date')).alias('week'),\
              month(col('arrival_date')).alias('month'),\
              year(col('arrival_date')).alias('year'),\
              date_format(col('arrival_date'),"EEEE").alias('weekday'),\
             ).dropDuplicates()
df_time.show()

+-------------------+----+---+----+-----+----+---------+
|               date|hour|day|week|month|year|  weekday|
+-------------------+----+---+----+-----+----+---------+
|2016-04-02 00:00:00|   0|  2|  13|    4|2016| Saturday|
|2016-04-15 00:00:00|   0| 15|  15|    4|2016|   Friday|
|2016-04-18 00:00:00|   0| 18|  16|    4|2016|   Monday|
|2016-04-16 00:00:00|   0| 16|  15|    4|2016| Saturday|
|2016-04-11 00:00:00|   0| 11|  15|    4|2016|   Monday|
|2016-04-14 00:00:00|   0| 14|  15|    4|2016| Thursday|
|2016-04-17 00:00:00|   0| 17|  15|    4|2016|   Sunday|
|2016-04-23 00:00:00|   0| 23|  16|    4|2016| Saturday|
|2016-04-26 00:00:00|   0| 26|  17|    4|2016|  Tuesday|
|2016-04-10 00:00:00|   0| 10|  14|    4|2016|   Sunday|
|2016-04-24 00:00:00|   0| 24|  16|    4|2016|   Sunday|
|2016-04-01 00:00:00|   0|  1|  13|    4|2016|   Friday|
|2016-04-21 00:00:00|   0| 21|  16|    4|2016| Thursday|
|2016-04-05 00:00:00|   0|  5|  14|    4|2016|  Tuesday|
|2016-04-08 00:00:00|   0|  8| 

In [18]:
#Write immigration data to parquet file
df_spark.write.partitionBy("i94yr","i94mon","i94port").parquet("sas_data",'overwrite')


In [19]:
#Write temperature data to parquet file
df_temp.write.partitionBy("city").parquet("temperature_data",'overwrite')


In [20]:
#Write airport data to parquet file
df_airport.write.partitionBy("iata_code").parquet("airport_data",'overwrite')


In [21]:
#Write city data to parquet file
df_us_cities.write.partitionBy("city").parquet("cities_data",'overwrite')


In [22]:
#Write time data to parquet file
df_time.write.partitionBy("year","month").parquet("time_data",'overwrite')


In [9]:
#Read data from parquet file
df_spark=spark.read.parquet("sas_data")
df_temp=spark.read.parquet("temperature_data")
df_airport=spark.read.parquet("airport_data")
df_us_cities=spark.read.parquet("cities_data")
df_time=spark.read.parquet("time_data")

In [51]:
# Quality checks to see if the dimension tables are empty
if df_temp.count() == 0:
    print("Temperature data is empty")
    raise ValueError
if df_airport.count() == 0:
    print("Airport data is empty")
    raise ValueError
if df_us_cities.count() == 0:
    print("Demography data is empty")
    raise ValueError

if df_time.count() == 0:
    print("Time data is empty")
    raise ValueError


if df_spark.count() == 0:
    print("Immigration data is empty")
    raise ValueError


In [24]:
#Create fact tables by joining the dimension tables
df_fact=df_spark.join(df_airport,[df_spark.i94port == df_airport.iata_code],how='inner').select(df_spark.arrival_date,df_spark.cicid,df_spark.i94mon,df_airport.new_municipality,df_airport.state,df_airport.ident)\
            .join(df_temp,[df_airport.new_municipality == df_temp.city,df_spark.i94mon == df_temp.month.cast("double")], how='inner')\
            .join(df_us_cities,[df_temp.city == df_us_cities.city,df_airport.state == df_us_cities.state_code],how ='inner')\
            .join(df_time,[df_spark.arrival_date == df_time.date],how='inner')\
            .withColumn("fact_id", monotonically_increasing_id())\
            .select(col('fact_id'),df_time.date,df_spark.cicid,df_airport.ident,df_temp.month,df_temp.city,df_us_cities.state,df_temp.average_temperature,df_us_cities.foreign_born)
    
df_fact.show()
#1218050

+-------+-------------------+---------+-----+-----+-----+-------+-------------------+------------+
|fact_id|               date|    cicid|ident|month| city|  state|average_temperature|foreign_born|
+-------+-------------------+---------+-----+-----+-----+-------+-------------------+------------+
|      0|2016-04-15 00:00:00|2688864.0| KMIA|    4|Miami|Florida|             23.115|      260789|
|      1|2016-04-15 00:00:00|2688885.0| KMIA|    4|Miami|Florida|             23.115|      260789|
|      2|2016-04-15 00:00:00|2688912.0| KMIA|    4|Miami|Florida|             23.115|      260789|
|      3|2016-04-15 00:00:00|2688920.0| KMIA|    4|Miami|Florida|             23.115|      260789|
|      4|2016-04-15 00:00:00|2688921.0| KMIA|    4|Miami|Florida|             23.115|      260789|
|      5|2016-04-15 00:00:00|2688922.0| KMIA|    4|Miami|Florida|             23.115|      260789|
|      6|2016-04-15 00:00:00|2688964.0| KMIA|    4|Miami|Florida|             23.115|      260789|
|      7|2

In [26]:
df_fact.write.partitionBy("month","city").parquet("immigration_weather",'overwrite')

In [4]:
df_fact=spark.read.parquet("immigration_weather")

In [48]:
#type(df_fact)
df_fact.printSchema()

root
 |-- fact_id: long (nullable = false)
 |-- date: timestamp (nullable = true)
 |-- cicid: double (nullable = true)
 |-- ident: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- average_temperature: string (nullable = true)
 |-- foreign_born: string (nullable = true)



#### 4.2 Data Quality Checks
As part of the data quality check the following  are done:

 * Check if the final fact table is empty.
 * Check if any columns are null in the fact table.
 * Check if dimension table after its read from parquet file is empty.
 
Run Quality Checks

In [36]:
# check if fact table is empty
if  df_fact.count() == 0:
    print("No data in the fact table")
    raise ValueError


In [50]:
# Check for null values in the fact table
if df_fact.filter("date is null or cicid is null or ident is null or month is null or city is null or state is null").count() != 0:
    print("Pk column cannot be null")
    raise ValueError
if df_fact.filter("average_temperature is null or foreign_born is null or city is null").count() != 0:
    print("Column cannot be null")
    raise ValueError


#### 4.3 Data dictionary 
Refer to the file Data_Dictionary.pdf

#### Step 5: Complete Project Write Up
 
 
 * Tools and technologies for the project
 
 For the project the technologies used are Spark, S3 and EMR.The reason for using them

 EMR - EMR comes preinstalled with bigdata eco system that we specify.With EMR we can choose the node types ie Compute Optimized Nodes or Storage OPtimized Nodes.We can scale up or scale down the number of nodes for the job.

 Spark - Spark works well with large dataset. It works faster with its in-memory processing and distributed architecure making use of parallel processing.
 
  S3  - It provides easy, low cost ,scalable storage option.The final fact and dimension table are stored as parquet file for easier access and S3 provides a good home to store parquet file.
  
* Propose how often the data should be updated and why

The data could be updated monthly once so that we can retrieve the last month's temperature data set for each city and then join it with the i94 port to know if last months weather had impacted the visitor flow to that city.
  
* Approaches for the different scenario 
 * The data was increased by 100x.
 
 When the data scales up we can improve the performance  by increasing the number of nodes for processing.
 In the case where S3 and EMR are used we can make sure that S3 and EC2 are in the same region.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
 We can use a scheduler like Airflow to run the job daily so that dashboard is refershed with the new data everyday.
 * The database needed to be accessed by 100+ people.
   
  Here the fact tables and fact tables are stored as parquet files in S3. So we need to optimize S3 reads.S3 supports 3,500 PUT/COPY/POST/DELETE and 5,500 GET/HEAD requests per second per prefix in a bucket. There are no limits to the number of prefixes in a bucket. We canincrease our read or write performance by parallelizing reads. We can add more prefixes in an Amazon S3 bucket to parallelize reads,we could scale ourread performance .
 We can even use Amazon CloudFront, AmazonElastiCache, or AWS Elemental MediaStore to store common set of objects that gets repeated GET requests.
 

#### Step 6 Analytics performed on the fact table
  In the jupyter notebook I have considered only April immigration data. Hence all the below analytics is for April 

In [5]:

df_fact.createOrReplaceTempView("fact_table")


In [33]:
# city with maximum visitors 
spark.sql("SELECT month,city,state, average_temperature,foreign_born,count(*) as cnt FROM fact_table group by 1,2,3,4,5 order by cnt desc").show()

+-----+--------------+--------------+-------------------+------------+------+
|month|          city|         state|average_temperature|foreign_born|   cnt|
+-----+--------------+--------------+-------------------+------------+------+
|    4|         Miami|       Florida|             23.115|      260789|330265|
|    4|       Orlando|       Florida| 22.976999999999997|       50558|154848|
|    4|   Los Angeles|    California|             14.751|     1485425|148635|
|    4|   New Orleans|     Louisiana| 22.101999999999997|       21679|133374|
|    4|       Houston|         Texas| 22.468000000000004|      696210| 95774|
|    4|       Atlanta|       Georgia|             16.287|       32016| 89237|
|    4|        Dallas|         Texas|             20.552|      326825| 68277|
|    4|        Boston| Massachusetts|              8.753|      190123| 55795|
|    4|       Seattle|    Washington|              7.063|      119840| 46445|
|    4|       Detroit|      Michigan|              8.351|       

In [15]:
#City with least visitors
spark.sql("SELECT month,city,state, average_temperature,foreign_born,count(*) as cnt FROM fact_table group by 1,2,3,4,5 order by cnt asc").show()

+-----+--------------+--------------+-------------------+------------+---+
|month|          city|         state|average_temperature|foreign_born|cnt|
+-----+--------------+--------------+-------------------+------------+---+
|    4|  Newport News|      Virginia|             14.108|       12589|  1|
|    4|    Montgomery|       Alabama|             18.724|        9337|  1|
|    4|Corpus Christi|         Texas| 24.026999999999997|       30834|  2|
|    4|   Albuquerque|    New Mexico|             13.133|       58200|  3|
|    4|        Mobile|       Alabama|             19.913|        7234|  3|
|    4|       Chicago|      Illinois|  8.383000000000001|      573463|  3|
|    4|    Huntsville|       Alabama|             16.933|       12691|  4|
|    4|       Wichita|        Kansas| 16.480999999999998|       40270|  5|
|    4|       Lansing|      Michigan|              8.625|        8371|  6|
|    4|    Manchester| New Hampshire|              8.753|       14506|  8|
|    4|     Rochester|   

In [17]:
#count of immigrants  city in each month(The data has only April data and hence shows up only april data)
spark.sql("select * from(select *,  rank() OVER (PARTITION BY month ORDER BY cnt DESC) rank from (SELECT month,city,state, average_temperature,foreign_born,count(*) as cnt FROM fact_table group by 1,2,3,4,5) as tbl )as tbl1 where rank <=3 order by month,rank").show()

+-----+-----------+----------+-------------------+------------+------+----+
|month|       city|     state|average_temperature|foreign_born|   cnt|rank|
+-----+-----------+----------+-------------------+------------+------+----+
|    4|      Miami|   Florida|             23.115|      260789|330265|   1|
|    4|    Orlando|   Florida| 22.976999999999997|       50558|154848|   2|
|    4|Los Angeles|California|             14.751|     1485425|148635|   3|
+-----+-----------+----------+-------------------+------------+------+----+



In [13]:
df_spark.createOrReplaceTempView("immigration_table")

In [20]:
#Rank the cities according to their visitor count per the i94mode and get the first 5 ranks
# i94mode = 1 -> Business
# i94mode = 2 -> Pleasure
# i94mode = 3 -> Student
stmnt = "select * from\
(Select * , rank() over (PARTITION BY month,i94mode ORDER BY cnt DESC) rank from \
(SELECT month,i94mode,city,state, average_temperature,foreign_born,count(*) as cnt \
FROM fact_table join immigration_table on fact_table.cicid = \
immigration_table.cicid group by 1,2,3,4,5,6) as temp) \
as temp1 where rank<=5 order by month,i94mode,rank"
spark.sql(stmnt).show()

+-----+-------+-----------+-------------+-------------------+------------+------+----+
|month|i94mode|       city|        state|average_temperature|foreign_born|   cnt|rank|
+-----+-------+-----------+-------------+-------------------+------------+------+----+
|    4|    1.0|      Miami|      Florida|             23.115|      260789|328422|   1|
|    4|    1.0|    Orlando|      Florida| 22.976999999999997|       50558|153862|   2|
|    4|    1.0|Los Angeles|   California|             14.751|     1485425|148195|   3|
|    4|    1.0|New Orleans|    Louisiana| 22.101999999999997|       21679|133054|   4|
|    4|    1.0|    Houston|        Texas| 22.468000000000004|      696210| 95194|   5|
|    4|    2.0|      Miami|      Florida|             23.115|      260789|  1059|   1|
|    4|    2.0|    Houston|        Texas| 22.468000000000004|      696210|   336|   2|
|    4|    2.0|    Seattle|   Washington|              7.063|      119840|   170|   3|
|    4|    2.0|    Orlando|      Florida| 2