# Project Title
### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to create a data warehouse for immigration using I94 Immigration Data, World Temperature Data, U.S. City Demographic Data, and Airport Code Table.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from sqlalchemy import create_engine
from pyspark.sql import SparkSession
from datetime import timedelta, date
from pyspark.sql.functions import year, udf, col
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, DateType

### Step 1: Scope the Project and Gather Data

#### Scope 
I will create 1 fact table(immigration) and 2 dimension table(temperature and cities). I will use spark.

#### Describe and Gather Data
##### I94 Immigration Data 
- cicid
- i94yr
- i94mon
- i94cit
- other immigration columns.

##### World Temperature Data
- dt
- average temperature
- average temperature uncertainty
- city
- country
- latitude
- longitude

##### Airport Code Table
- ident
- type
- name
- elevation ft
- continent
- iso country
- iso region
- municipality
- gps code
- iata code
- local code
- coordinates

##### City Demographic Data
- city
- state
- median age
- male population
- female population
- total ppulation
- number of veterans
- foreign born
- average household size
- state code

In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [3]:
i94_df = spark.read.parquet("sas_data")

In [4]:
temp_df = spark.read.csv('../../data2/GlobalLandTemperaturesByCity.csv', header=True)

In [5]:
airport_df = spark.read.csv('airport-codes_csv.csv', header=True)

In [6]:
cities_df = spark.read.option("delimiter", ";").csv('us-cities-demographics.csv', header = True)

### Step 2: Explore and Assess the Data
#### Explore the Data & Cleaing the data

In [7]:
i94_df.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

i94cit, i94port are just code. The origin data is in I94_SAS_Labels_Descriptions.SAS. <br>
Arrdate is SAS date foramt.<br>

In [8]:
i94_df.filter(i94_df.i94cit.isNull()).count()

0

In [9]:
i94_df.filter(i94_df.arrdate.isNull()).count()

0

In [10]:
temp_df.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [11]:
temp_df.select("Latitude").distinct().show(5)

+--------+
|Latitude|
+--------+
|  36.17N|
|  44.20N|
|  57.05N|
|   4.02S|
|  24.92N|
+--------+
only showing top 5 rows



The data include many countries. I remove other countries without US.

In [12]:
temp_df.select('Country').distinct().show(100)

+--------------------+
|             Country|
+--------------------+
|                Chad|
|              Russia|
|            Paraguay|
|               Yemen|
|             Senegal|
|              Sweden|
|              Guyana|
|               Burma|
|         Philippines|
|             Eritrea|
|            Djibouti|
|            Malaysia|
|           Singapore|
|              Turkey|
|              Malawi|
|                Iraq|
|             Germany|
|         Afghanistan|
|            Cambodia|
|              Jordan|
|              Rwanda|
|               Sudan|
|              France|
|              Greece|
|           Sri Lanka|
|              Taiwan|
|             Algeria|
|   Equatorial Guinea|
|                Togo|
|            Slovakia|
|             Reunion|
|           Argentina|
|             Belgium|
|              Angola|
|             Ecuador|
|               Qatar|
|             Lesotho|
|          Madagascar|
|             Albania|
|             Finland|
|          

In [13]:
temp_df = temp_df.filter(temp_df.Country == 'United States')

In [14]:
airport_df.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

I desinged data warehouse, and fact data is immigration data.<br>
This data and fact data do not have and can not extract same key. So I discard this data. (Fact data just have airline. I can not guess airport.)

In [15]:
cities_df.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
##### Fact Table (immigration)
- id: pk
- arrive_date: us arrive date
- birth_year: birth year
- addr: addres
- gender
- admission number: US Admission Number
- visa_type
- year: immigration year
- month: immigration month
- cit: immigration cit
- port_city: immigration port city
- post_state: immigration port state
- mode: immigration mode (1(air), 2(sea), 3(land), 9(not reported))
- visa: 1(Business), 2(Pleasure), 3(Student)

#### Dimension Table 1 (temperature)
- id: pk
- year
- average_temperature
- average_temperature_uncertainty
- city
- latitude
- longitude

#### Dimension Table 2 (cities)
- id: pk
- city
- state
- median age
- male population
- female population
- total population
- number of veterans
- foreign born
- average household size
- state code
- race
- count

#### 3.2 Mapping Out Data Pipelines
##### immigration
1. To get cit, port and addr, parsing sas code file.
2. Inner join origin immigration data.
3. Select target columns and str columns lower some uppser case.

##### temperature
1. To get year, parsing date.
2. Select target columns and str columns lower some uppser case.

##### cities
1. Select target columns and str columns lower some uppser case.

And this notebook just insert 100 data. 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [17]:
engine = create_engine('postgresql+psycopg2://student:student@localhost:5432/studentdb')

In [18]:
sas_date_to_timestamp = udf(lambda x: timedelta(days=x) + date(1960, 1, 1), DateType())
i94_df = i94_df.withColumn('arrive_date', sas_date_to_timestamp(col('arrdate')))

In [19]:
with open('I94_SAS_Labels_Descriptions.SAS') as f:
    descriptions = f.readlines()
    
def extract_code(codes):
    temp = []
    for code in codes:
        code = code.strip().replace("'", '').split('=')
        code = [c.strip() for c in code]
        temp.append(code)
    return pd.DataFrame(temp)

cits = descriptions[10:298]
cit_pd_df = extract_code(cits)
cit_pd_df.columns = ['i94cit', 'cit']
# print(cit_pd_df['i94cit'])
cit_pd_df['i94cit'] = cit_pd_df['i94cit'].astype('int')
schema = StructType([StructField('i94cit', IntegerType(), True),
                     StructField('cit', StringType(), True)])
cit_df = spark.createDataFrame(cit_pd_df,schema=schema)

ports = descriptions[303:962]
port_pd_df = extract_code(ports)
port_pd_df.columns = ['i94port', 'port']

port_pd_df['port_city'] = port_pd_df.port.str.split(',').str[0]
port_pd_df['port_state'] = port_pd_df.port.str.split(',').str[1]
port_pd_df = port_pd_df.drop(columns=['port'])
schema = StructType([StructField('i94port', StringType(), True),
                     StructField('port_city', StringType(), True),
                     StructField('port_state', StringType(), True)])
port_df = spark.createDataFrame(port_pd_df,schema=schema)

addrs = descriptions[982:1036]
addr_pd_df = extract_code(addrs)
addr_pd_df.columns = ['i94addr', 'addr']

schema = StructType([StructField('i94addr', StringType(), True),
                     StructField('addr', StringType(), True)])
addr_df = spark.createDataFrame(addr_pd_df,schema=schema)

In [20]:
i94_result_df = i94_df.join(cit_df, 'i94cit') \
    .join(port_df, 'i94port') \
    .join(addr_df, 'i94addr') \
    .select(col('arrive_date'),
            col('biryear').alias('birth_year'),
            col('addr'),
            col('gender'),
            col('admnum').alias('admission_number'),
            col('visatype').alias('visa_type'),
            col('i94yr').alias('year'),
            col('i94mon').alias('month'),
            col('cit'),
            col('port_city'),
            col('port_state'),
            col('i94mode').alias('mode'),
            col('i94visa').alias('visa')).toPandas()

In [21]:
i94_result_df['id'] = pd.RangeIndex(i94_result_df.shape[0])
i94_result_df[['cit', 'port_city', 'port_state']] = i94_result_df[['cit', 'port_city', 'port_state']].apply(lambda col: col.str.lower())
i94_result_df.head(100).to_sql('immigration', engine, if_exists='replace',index=False)

In [22]:
temp_result_df = temp_df.withColumn('year', year(col('dt'))).select(
    col('year'),
    col('AverageTemperature').alias('average_temperature'),
    col('AverageTemperatureUncertainty').alias('average_temperature_uncertainty'),
    col('City').alias('city'),
    col('Latitude').alias('latitude'),
    col('Longitude').alias('longitude')
).toPandas()

In [23]:
temp_result_df['id'] = pd.RangeIndex(temp_result_df.shape[0])
temp_result_df[['city']] = temp_result_df[['city']].apply(lambda col: col.str.lower())
temp_result_df.head(100).to_sql('temperature', engine, if_exists='replace',index=False)

In [24]:
cities_result_df = cities_df.select(
    col('City').alias('city'),
    col('State').alias('state'),
    col('Median Age').alias('median_age'),
    col('Male Population').alias('male_population'),
    col('Female Population').alias('female_population'),
    col('Total Population').alias('total_population'),
    col('Number of Veterans').alias('number_of_veterans'),
    col('Foreign-born').alias('foreign_born'),
    col('Average Household Size').alias('average_household_size'),
    col('State Code').alias('state_code'),
    col('Race').alias('race'),
    col('Count').alias('count')
).toPandas()

In [25]:
cities_result_df['id'] = pd.RangeIndex(cities_result_df.shape[0])
cities_result_df[['city', 'state']] = cities_result_df[['city', 'state']].apply(lambda col: col.str.lower())
cities_result_df.head(100).to_sql('cities', engine, if_exists='replace',index=False)

#### 4.2 Data Quality Checks
Check the number of data is 100.

In [26]:
immigration_check_df = pd.read_sql_query('select * from "immigration"',con=engine)
if immigration_check_df.shape[0] != 100:
    print('data size is error')

In [27]:
temperature_check_df = pd.read_sql_query('select * from "temperature"',con=engine)
if temperature_check_df.shape[0] != 100:
    print('data size is error')

In [28]:
cities_check_df = pd.read_sql_query('select * from "cities"',con=engine)
if cities_check_df.shape[0] != 100:
    print('data size is error')

#### 4.3 Data dictionary 
##### Fact Table (immigration)
- id: pk
- arrive_date: us arrive date
- birth_year: birth year
- addr: addres
- gender
- admission number: US Admission Number
- visa_type
- year: immigration year
- month: immigration month
- cit: immigration cit
- port_city: immigration port city
- post_state: immigration port state
- mode: immigration mode (1(air), 2(sea), 3(land), 9(not reported))
- visa: 1(Business), 2(Pleasure), 3(Student)

#### Dimension Table 1 (temperature)
- id: pk
- year
- average_temperature
- average_temperature_uncertainty
- city
- latitude
- longitude

#### Dimension Table 2 (cities)
- id: pk
- city
- state
- median age
- male population
- female population
- total population
- number of veterans
- foreign born
- average household size
- state code
- race
- count


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
I use apache spark and pandas dataframe. 
1. Some data volume is big.<br>
2. If data size is bigger than now, I can easily add other machine or customize spark application.

* Propose how often the data should be updated and why.
Fact table have i94 year and month, So I often update month.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
   - I can use cloud service like aws. Cloud services usually offer scale out.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
   - I can use airflow or other CI tools(jenkins, etc). This tools offer schedule.
 * The database needed to be accessed by 100+ people.
   - I will add slave database. 100+ is not huge. So I think slave machine is enough.
   - However, if the growth trend is steep or concentrated only in some time, I will use the cloud service(like redshift).