# Project Title
### Data Engineering Capstone Project

#### Project Summary

The project uses spark to load and do etl with U.S. immigration data. 

The project saves the processed result locally with parquet format.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, round
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format,dayofweek
from pyspark.sql.functions import expr
from pyspark.sql.functions import unix_timestamp,from_unixtime
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,DoubleType,DecimalType,TimestampType,ArrayType

In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 

* goals: processed the date to fact tables and dimesion tables 

* tools: Spark

#### Describe and Gather Data 

| data | from | local dir | link |   
|:---|:---|:---|:---|
| I94 Immigration Data | US National Tourism and Trade Office |/data/18-83510-I94-Data-2016/ | https://travel.trade.gov/research/reports/i94/historical/2016.html |
| World Temperature Data | Kaggle | /data2/GlobalLandTemperaturesByCity.csv | https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data |
| U.S. City Demographic Data | OpenSoft| /home/workspace/us-cities-demographics.csv  | https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/ |
| Airport Code Table | datahub.io | /home/workspace/airport-codes_csv.csv  | https://datahub.io/core/airport-codes#data |


In [3]:
airport_file = "../data/airport-codes_csv.csv"
city_fiele = "../data/us-cities-demographics.csv"
temperature_file = "/data2/GlobalLandTemperaturesByCity.csv"
immigration_dir = "/data/18-83510-I94-Data-2016/"
immigration_dimesions= "../data/I94_SAS_Labels_Descriptions.SAS"
immigration_sample ="../data/immigration_data_sample.csv"

i94addrl_file="../output/i94addrl.csv"
i94cntyl_file="../output/i94cntyl.csv"
i94model_file="../output/i94model.csv"
i94prtl_file="../output/i94prtl.csv"
i94visa_file="../output/i94visa.csv"

In [13]:
pd_immigration=pd.read_csv(immigration_sample)
pd_immigration.head()


Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [14]:
# Read in the data here
pd_airport = pd.read_csv(airport_file,header=0)
pd_airport.head()


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [39]:
coordinates = StructType([
    StructField('x', StringType(), False),
    StructField('y', StringType(), True),
])
schema = StructType([
    StructField('id', StringType(), False),
    StructField('type', StringType(), True),
    StructField('name', StringType(), True),
    StructField('elevation_ft', IntegerType(), True),
    StructField('continent', StringType(), True),
    StructField('iso_country', StringType(), True),
    StructField('iso_region', StringType(), True),
    StructField('municipality', StringType(), True),
    StructField('gps_code',StringType(), True),
    StructField('iata_code', StringType(), True),
    StructField('local_code', StringType(), True),
    StructField('coordinates', StringType() , True),
])
df_airport_raw=spark.read.csv(airport_file, header=True, schema=schema)
df_airport_raw.show(5)
df_airport_raw.count()

+----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  id|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
| 00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
|00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
|00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
|00AL|small_airport|        Epps Airpark|         820|       NA|         US|     US-AL| 

55075

In [56]:
schema = StructType([
    StructField('city', StringType(), False),
    StructField('state', StringType(), True),
    StructField('median_age', DecimalType(4,1), True),
    StructField('male', IntegerType(), True),
    StructField('female', IntegerType(), True),
    StructField('total', IntegerType(), True),
    StructField('veterans', IntegerType(), True),
    StructField('foreign_born', IntegerType(), True),
    StructField('average_household',DecimalType(4,2), True),
    StructField('state_code', StringType(), True),
    StructField('race', StringType(), True),
    StructField('count', IntegerType(), True),
])
#df_city=spark.read.option("header", True).option("delimiter", ";").csv(city_fiele)
df_city_raw=spark.read.csv(city_fiele,header=True,schema=schema,sep=";")
df_city_raw.show(5)
df_city_raw.count()

+----------------+-------------+----------+------+------+------+--------+------------+-----------------+----------+--------------------+-----+
|            city|        state|median_age|  male|female| total|veterans|foreign_born|average_household|state_code|                race|count|
+----------------+-------------+----------+------+------+------+--------+------------+-----------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8| 40601| 41862| 82463|    1562|       30908|             2.60|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0| 44129| 49500| 93629|    4147|       32935|             2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5| 38040| 46799| 84839|    4819|        8229|             2.58|        AL|               Asian| 4759|
|Rancho Cucamonga|   California|      34.5| 88127| 87105|175232|    5821|       33878|             3.18|        CA|Black or African-...|24437|

2891

In [13]:
schema = StructType([
    StructField('date', TimestampType(), True),
    StructField('temperature', DoubleType(), True),
    StructField('uncertainty', DoubleType(), True),
    StructField('city', StringType(), True),
    StructField('country', StringType(), True),
    StructField('latitude', StringType(), True),
    StructField('longitude', StringType(), True),
    
])
df_temperature_raw=spark.read.option("header", True).csv(temperature_file,schema=schema)
df_temperature_raw.show(5)
df_temperature_raw.count()


+-------------------+-----------+------------------+-----+-------+--------+---------+
|               date|temperature|       uncertainty| city|country|latitude|longitude|
+-------------------+-----------+------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|      6.068|1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|       null|              null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|       null|              null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|       null|              null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|       null|              null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+-----------+------------------+-----+-------+--------+---------+
only showing top 5 rows



8599212

In [19]:
#write to parquet
#df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#df_spark.write.parquet("output/sas_data")

In [20]:
schema = StructType([
    StructField('cicid', DoubleType(), False),
    StructField('i94yr', DoubleType(), True),
    StructField('i94mon', DoubleType(), True),
    StructField('i94cit', DoubleType(), True),
    StructField('i94res', DoubleType(), True),
    StructField('i94port', StringType(), True),
    StructField('arrdate', DoubleType(), True),
    StructField('i94mode', DoubleType(), True),
    StructField('i94addr', StringType(), True),
    StructField('depdate', DoubleType(), True),
    StructField('i94bir', DoubleType(), True),
    StructField('i94visa', DoubleType(), True),
    StructField('count', DoubleType(), True),
    StructField('dtadfile', StringType(), True),
    StructField('visapost', StringType(), True),
    StructField('occup', StringType(), True),
    StructField('entdepa', StringType(), True),
    StructField('entdepd', StringType(), True),
    StructField('entdepu', StringType(), True),
    StructField('matflag', StringType(), True),
    StructField('biryear', DoubleType(), True),
    StructField('dtaddto', StringType(), True),
    StructField('gender', StringType(), True),
    StructField('insnum', StringType(), True),
    StructField('airline', StringType(), True),
    StructField('admnum', DoubleType(), True),
    StructField('fltno', StringType(), True),
    StructField('visatype', StringType(), True),
])
df_spark=spark.read.schema(schema).parquet("../output/sas_data")
#df_spark=spark.read.parquet("../output/sas_data")
df_spark.printSchema()
df_spark.show(5)
df_spark.count()


root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

3096313

In [21]:
schema = StructType([
    StructField('id', StringType(), True),
    StructField('state', StringType(), True),
])
df_i94addr=spark.read.csv(i94addrl_file,schema=schema,sep=";")
df_i94addr.show(5)
df_i94addr.count()

+---+----------+
| id|     state|
+---+----------+
| AL|   ALABAMA|
| AK|    ALASKA|
| AZ|   ARIZONA|
| AR|  ARKANSAS|
| CA|CALIFORNIA|
+---+----------+
only showing top 5 rows



55

In [22]:
schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('city', StringType(), True),
])
df_i94city=spark.read.csv(i94cntyl_file,schema=schema,sep=";")
df_i94city.show(5)
df_i94city.count()

+---+--------------------+
| id|                city|
+---+--------------------+
|582|MEXICO Air Sea, a...|
|236|         AFGHANISTAN|
|101|             ALBANIA|
|316|             ALGERIA|
|102|             ANDORRA|
+---+--------------------+
only showing top 5 rows



289

In [23]:
schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('model', StringType(), True),
])
df_i94model=spark.read.csv(i94model_file,schema=schema,sep=";")
df_i94model.show(5)
df_i94model.count()

+---+------------+
| id|       model|
+---+------------+
|  1|         Air|
|  2|         Sea|
|  3|        Land|
|  9|Not reported|
+---+------------+



4

In [24]:
schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('visa', StringType(), True),
])
df_i94visa=spark.read.csv(i94visa_file,schema=schema)
df_i94visa.show(5)
df_i94visa.count()

+---+--------+
| id|    visa|
+---+--------+
|  1|Business|
|  2|Pleasure|
|  3| Student|
+---+--------+



3

In [41]:
schema = StructType([
    StructField('id', StringType(), True),
    StructField('port', StringType(), True),
])
df_i94port_raw=spark.read.csv(i94prtl_file,schema=schema,sep=";")
df_i94port_raw.show(5)
df_i94port_raw.count()

+---+--------------------+
| id|                port|
+---+--------------------+
|ALC|ALCAN, AK        ...|
|ANC|ANCHORAGE, AK    ...|
|BAR|BAKER AAF - BAKER...|
|DAC|DALTONS CACHE, AK...|
|PIZ|DEW STATION PT LA...|
+---+--------------------+
only showing top 5 rows



659

In [44]:
def get_state_code(x):
    i=x.rfind(",")
    return (x[:i])
def get_name(x):
    i=x.rfind(",")
    return (x[i+2:])

get_state_code_udf = udf(lambda x: get_state_code(x), StringType())
get_name_udf = udf(lambda x: get_name(x), StringType())
df = df_i94port_raw
df = df.withColumn('state_code', get_state_code_udf('port'))
df = df.withColumn('name', get_name_udf('port'))
df_i94port = df.select(["id","state_code","name"])

df_i94port.show(5)
df_i94port.printSchema()
df_i94port.toPandas()

+---+--------------------+---------------+
| id|          state_code|           name|
+---+--------------------+---------------+
|ALC|               ALCAN|AK             |
|ANC|           ANCHORAGE|    AK         |
|BAR|BAKER AAF - BAKER...|             AK|
|DAC|       DALTONS CACHE|        AK     |
|PIZ|DEW STATION PT LA...|             AK|
+---+--------------------+---------------+
only showing top 5 rows

root
 |-- id: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- name: string (nullable = true)



Unnamed: 0,id,state_code,name
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK
5,DTH,DUTCH HARBOR,AK
6,EGL,EAGLE,AK
7,FRB,FAIRBANKS,AK
8,HOM,HOMER,AK
9,HYD,HYDER,AK


### Step 2: Explore and Assess the Data
#### Explore the Data 

[explore_data.ipynb](explore_data.ipynb)

* many field in immigration data shoule be int instead of double


* many redundancy data

  * keep only US related data
  
  * keep only data in 2016


* multiple data in one colomn

  * latitude & longitude in temperature data
  
  * state & city in i94port table
  
  
#### Cleaning Steps



In [47]:
# clean airport

def get_longitude(x):
    i=x.find(",")
    return float(x[:i])
def get_latitude(x):
    i=x.find(",")
    return float(x[i+2:])

get_longitude_udf = udf(lambda x: get_longitude(x), DoubleType())
get_latitude_udf = udf(lambda x: get_latitude(x), DoubleType())
get_state_udf = udf(lambda x: x[3:], StringType())
df = df_airport_raw.filter(col("iso_country")=="US")
df = df.withColumn('longitude', get_longitude_udf('coordinates'))
df = df.withColumn('latitude', get_latitude_udf('coordinates'))
df = df.withColumn('state', get_state_udf('iso_region'))
df_airport = df.select(["id","type","name","elevation_ft","state","municipality","gps_code","iata_code","local_code","longitude","latitude"])

df_airport.show(5)
df_airport.printSchema()

+----+-------------+--------------------+------------+-----+------------+--------+---------+----------+------------------+-----------------+
|  id|         type|                name|elevation_ft|state|municipality|gps_code|iata_code|local_code|         longitude|         latitude|
+----+-------------+--------------------+------------+-----+------------+--------+---------+----------+------------------+-----------------+
| 00A|     heliport|   Total Rf Heliport|          11|   PA|    Bensalem|     00A|     null|       00A|-74.93360137939453|   40.07080078125|
|00AA|small_airport|Aero B Ranch Airport|        3435|   KS|       Leoti|    00AA|     null|      00AA|       -101.473911|        38.704022|
|00AK|small_airport|        Lowell Field|         450|   AK|Anchor Point|    00AK|     null|      00AK|    -151.695999146|      59.94919968|
|00AL|small_airport|        Epps Airpark|         820|   AL|     Harvest|    00AL|     null|      00AL|-86.77030181884766|34.86479949951172|
|00AR|       

In [63]:
# df = df_city_raw.filter(col("city")=="Silver Spring")
df = df_city_raw.dropDuplicates(["city","state","state_code"])
df = df.select(["city","state_code","state","median_age","male","female","total","veterans","foreign_born","average_household"])
df_city = df
df_city.show(5)
df_city.printSchema()

+------------+----------+-----------+----------+------+------+------+--------+------------+-----------------+
|        city|state_code|      state|median_age|  male|female| total|veterans|foreign_born|average_household|
+------------+----------+-----------+----------+------+------+------+--------+------------+-----------------+
|   Rockville|        MD|   Maryland|      38.1| 31205| 35793| 66998|    1990|       25047|             2.60|
|Delray Beach|        FL|    Florida|      47.9| 32219| 34042| 66261|    4232|       16639|             2.35|
| Jersey City|        NJ| New Jersey|      34.3|131765|132512|264277|    4374|      109186|             2.57|
|    Gulfport|        MS|Mississippi|      35.1| 33108| 38764| 71872|    6646|        3072|             2.54|
|  Cincinnati|        OH|       Ohio|      32.7|143654|154883|298537|   13699|       16896|             2.08|
+------------+----------+-----------+----------+------+------+------+--------+------------+-----------------+
only showi

In [39]:
def get_longitude(x):
    if x is None:
        return None
    l=float(x[0:-1])
    if x[-1]=="W":
        l=-l
    return l
def get_latitude(x):
    return float(x[0:-1])

get_longitude_udf = udf(lambda x: get_longitude(x),DoubleType())
get_latitude_udf = udf(lambda x: get_latitude(x),DoubleType())
df=df_temperature_raw
df=df.filter(
    (col("date")>=datetime.datetime(2016, 1, 1, 0, 0)) &
    (col("date")<datetime.datetime(2017, 1, 1, 0, 0)) &
    (col("country")=="United States")
)
df=df.withColumn('longitude', get_longitude_udf('longitude').cast(DecimalType(5,2)))
df=df.withColumn('latitude', get_latitude_udf('latitude').cast(DecimalType(5,2)))
#df=df.withColumn("year",year("date"))
#df=df.groupBy("year").count().orderBy("year",ascending=False)
df.show(20)
df.printSchema()

+----+-----+
|year|count|
+----+-----+
|2013|31590|
|2012|42120|
|2011|42120|
|2010|42120|
|2009|42120|
|2008|42120|
|2007|42120|
|2006|42120|
|2005|42120|
|2004|42120|
|2003|42120|
|2002|42120|
|2001|42120|
|2000|42120|
|1999|42120|
|1998|42120|
|1997|42120|
|1996|42120|
|1995|42120|
|1994|42120|
+----+-----+
only showing top 20 rows

root
 |-- year: integer (nullable = true)
 |-- count: long (nullable = false)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 

[data dicationary](data_dicationary.ipynb)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.