In [None]:
import pandas as pd
import glob
import os
import logging

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, TimestampType

In [None]:
# Setup Logging 
logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [None]:
def create_spark_session():
    spark = SparkSession.builder\
                        .config('spark.jars.repositories', 'https://repos.spark-packages.org/') \
                        .config('spark.jars.packages', 'saurfang:spark-sas7bdat:2.0.0-s_2.11') \
                        .enableHiveSupport()\
                        .getOrCreate()
    return spark

In [None]:
@udf(TimestampType())
def to_timestamp_udf(x):
    try:
        return pd.to_timedelta(x, unit='D') + pd.Timestamp('1960-1-1')
    except:
        return pd.Timestamp('1900-1-1')

In [1]:
def process_immigration_data(spark, input_data, output_data):
    logging.info('Start processing immigration data')
    
    # Read immigration data file to dataframe
    df = spark.read.format('com.github.saurfang.sas.spark').load(input_data)
    
    # Change data type of some columns from double to integer
    toInt = udf(lambda x: int(x) if x!=None else x, IntegerType())

    for colname, coltype in df.dtypes:
        if coltype == 'double':
            df = df.withColumn(colname, toInt(colname))
            
    logging.info('Start processing fact_immigration table')
    
    # Extract columns to create fact_immigration table
    fact_immigration_df = df.select('cicid', 'i94port', 'i94addr', 'i94visa', 'i94yr', \
                                    'i94mon', 'i94mode', 'arrdate', 'depdate').distinct()

    # Rename columns of fact_immigration table
    fact_immigration_df = fact_immigration_df.withColumnRenamed('i94port', 'city_code') \
                                    .withColumnRenamed('i94addr', 'state_code') \
                                    .withColumnRenamed('i94visa', 'visa') \
                                    .withColumnRenamed('i94yr', 'year') \
                                    .withColumnRenamed('i94mon', 'month') \
                                    .withColumnRenamed('i94mode', 'mode') \
                                    .withColumnRenamed('arrdate', 'arrive_date') \
                                    .withColumnRenamed('depdate', 'departure_date') \

    # Drop null records on state_code column
    fact_immigration_df = fact_immigration_df.where(col('state_code').isNotNull())

    # Add country column to fact_immigration table
    fact_immigration_df = fact_immigration_df.withColumn('country', lit('United States'))

    # Change date type from SAS to timestamp
    fact_immigration_df = fact_immigration_df.withColumn('arrive_date', to_date(to_timestamp_udf(col('arrive_date')))) \
                                    .withColumn('departure_date', to_date(to_timestamp_udf(col('departure_date'))))                                    

    # Write fact_immigration table to parquet files and partition by state_code
    fact_immigration_df.write.mode('overwrite') \
            .partitionBy('state_code') \
            .parquet(output_data + 'fact_immigration')

    # Create view for quality check
    fact_immigration_df.createOrReplaceTempView('fact_immigration')
    
    logging.info('Finish processing fact_immigration table')

    logging.info('---------------------------------------')

    logging.info('Start processing dim_immigrate_person table')

    # Extract columns to create dim_immigrate_person
    dim_immigrate_person_df = df.select('cicid', 'i94cit', 'i94res',\
                                    'biryear', 'gender', 'insnum').distinct()
    
    # Rename columns of dim_immigrate_person table
    dim_immigrate_person_df = dim_immigrate_person_df.withColumnRenamed('i94cit', 'citizen_country_code') \
                                    .withColumnRenamed('i94res', 'citizen_state_code') \
                                    .withColumnRenamed('biryear', 'birth_year')

    # Write dim_immigrat_person table to parquet files    
    dim_immigrate_person_df.write.mode('overwrite') \
            .parquet(output_data + 'dim_immigrate_person')

    # Create view for quality check
    dim_immigrate_person_df.createOrReplaceTempView('dim_immigrate_person')

    logging.info('Finish processing dim_immigrate_person table')

    logging.info('---------------------------------------')

In [None]:
def process_temperature_data(spark, input_data, output_data):
    logging.info('Start processing dim_temperature table')

    # Read temperature data file to dataframe
    df = spark.read.csv(input_data, header=True)

    # Filter data in United States
    df = df.where(df['Country'] == 'United States')

    # Extract columns to create dim_temperature table
    dim_temperature_df = df.select('dt', 'AverageTemperature', 'AverageTemperatureUncertainty' \
                            'City', 'Country').distinct()

    # Rename columns of dim_temperature
    dim_temperature_df = df.withColumnRenamed('dt', 'time_stamp') \
                                .withColumnRenamed('AverageTemperature', 'avg_temperture') \
                                .withColumnRenamed('AverageTemperatureUncertainty', 'avg_temp_uncertainty') \
                                .withColumnRenamed('City', 'city') \
                                .withColumnRenamed('Country', 'country') \

    # Extract year, month from dt column
    dim_temperature_df = dim_temperature_df.withColumn('dt', to_date(col('dt')))
    dim_temperature_df = dim_temperature_df.withColumn('year', year(dim_temperature_df['dt']))
    dim_temperature_df = dim_temperature_df.withColumn('month', month(dim_temperature_df['dt']))

    # Write dim_temperature to parquet files
    dim_temperature_df.write.mode('overwrite') \
            .parquet(output_data + 'dim_temperature')

    # Create view for quality check
    dim_temperature_df.createOrReplaceTempView('dim_temperature')

    logging.info('Finish processing dim_temperature table')

    logging.info('---------------------------------------')

In [None]:
def process_demographic_data(spark, input_data, output_data):
    logging.info('Start processing dim_demographic table')

    # Read demographic data to dataframe
    df = spark.read.csv(input_data).option(header=True, delimiter=';')
    
    # Rename columns of dim_demographic table
    dim_demographic_df  = dim_demographic_df.withColumnRenamed('City','city') \
                                    .withColumnRenamed('State','state') \
                                    .withColumnRenamed('Median Age','median_age') \
                                    .withColumnRenamed('Male Population','male_population') \
                                    .withColumnRenamed('Female Population','female_population') \
                                    .withColumnRenamed('Total Population','total_population') \
                                    .withColumnRenamed('Number of Veterans','number_of_veterans') \
                                    .withColumnRenamed('Foreign-born','foreign_born') \
                                    .withColumnRenamed('Average Household Size','avg_household_hold') \
                                    .withColumnRenamed('State Code','state_code')\
                                    .withColumnRenamed('Race','race') \
                                    .withColumnRenamed('Count','count')
    
    # Change type of some columns to integer
    colnames = ['median_age', 'male_population', 'female_population', 'total_population', 
                'number_of_veterans', 'foreign_born', 'avg_household_hold', 'count']

    for colname in colnames:
        dim_demographic_df = dim_demographic_df.withColumn(colname, toInt(colname))

    # Write dim_demographic table to parquet files
    dim_demographic_df.write.mode('overwrite') \
            .parquet(output_data + 'dim_demographic')
    
    # Create view for quality check
    dim_demographic_df.createOrReplaceTempView('dim_demographic')

    logging.info('Finish processing dim_demographic table')

    logging.info('---------------------------------------')


In [None]:
def process_airport_data(spark, input_data, output_data): 
    logging.info('Start processing dim_airport table')

    # Read airport data to dataframe
    df = spark.read.csv(input_data, header=True)
    
    # Convert elevation_ft to type integer
    df = df.withColumn('elevation_ft', toInt('elevation_ft'))
   
    # Add state_code column to join with fact table
    df = df.withColumn('state_code', split(col('iso_region'), '-').getItem(1)) 
    
    # Extract columns to create dim_airport table
    dim_airport_df = df.select('ident', 'type', 'name', 'elevation_ft','iso_country', \
                                'state_code', 'municipality', 'coordinates')

    # Write dim_airport table to parquet files
    dim_airport_df.write.mode('overwrite')\
            .parquet(output_data + 'dim_airport')
    
    # Create view for quality check
    dim_airport_df.createOrReplaceTempView('dim_airport')

    logging.info('Finish processing dim_airport table')

    logging.info('---------------------------------------')

In [None]:
def check_data_quality(spark):
    logging.info('Start checking data quality')

    tables = ['fact_immigration', 'dim_immigrate_person', 'dim_temperature', 'dim_demographic', 'dim_airport']
    for table in tables:
        print(f"Checking data quality on table {table}...")
        expected_result = spark.sql(f"""SELECT COUNT(*) FROM {table} WHERE state_code IS NULL""")
        if expected_result.head()[0] > 0:
            print(f"Data quality check failed! Found NULL values in {table} table!")
        else:
            print(f"Table {table} passed")

    logging.info('Finish checking data quality')

    logging.info('---------------------------------------')      

In [None]:
spark = create_spark_session()
output_data = "./output_data"

immigration_data = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
temperature_data = "../../data2/GlobalLandTemperaturesByCity.csv"
demographic_data = "./us-cities-demographics.csv"
airport_data = "./airport-codes_csv.csv"

In [None]:
process_immigration_data(spark, immigration_data, output_data)
process_temperature_data(spark, temperature_data, output_data)
process_demographic_data(spark, demographic_data, output_data)
process_airport_data(spark, airport_data, output_data)

# Explore United States Immigration Data
    Udacity Data Engineering Nanodegree Capstone Project

## Overview
The Organization for Tourism Development (OTD) want to analyze migration flux in USA, in order to find insights to significantly and sustainably develop the tourism in USA. To support their core idea they have identified a set of analysis/queries they want to run on the raw data available. The project deals with building a ETL data pipeline, to go from raw data to the data insights on the migration flux.

The project includes 5 steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data
#### What data 
This project using 4 datasets includes:
* I94 immigration data for year 2016. Used for the main analysis
* World Temperature Data
* Airport Code Table
* U.S. City Demographic Data

#### What tools
* Apache Hadoop: using to stored data
* Apche Spark: using to transform and analyst data
* Apache Aiflow: using to schedule tasks

### Step 2: Explore and Assess the Data

Read the file `Capstone Project Template.ipynb`

### Step 3: Define the Data Model
Conceptual data model using Star schema

![diagram](concept_model.png)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Fact Immigration:
- Read immigration data file to dataframe
- Change data type of some columns from double to integer
- Extract columns to create fact_immigration table
- Rename columns of fact_immigration table
- Drop null records on state_code column
- Add country column to fact_immigration table
- Change date type from SAS to timestamp
- Write fact_immigration table to parquet files and partition by state_code

Dim Immigrate Person:
- Extract columns to create dim_immigrate_person from immigration data
- Rename columns of dim_immigrate_person table
- Write dim_immigrat_person table to parquet files  

Dim Temperature
- Read temperature data file to dataframe
- Filter data in United States
- Extract columns to create dim_temperature table
- Rename columns of dim_temperature
- Extract year, month from dt column
- Write dim_temperature to parquet files

Dim Demographic
- Read demographic data to dataframe
- Rename columns of dim_demographic table
- Change type of some columns to float
- Write dim_demographic table to parquet files

Dim Airport
- Read airport data to dataframe
- Convert elevation_ft to type integer
- Add state_code column to join with fact table
- Extract columns to create dim_airport table
- Write dim_airport table to parquet files

Read the file `Capstone Project Template.ipynb`

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Read the file `Capstone Project Template.ipynb`

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.
* fact_immigration
    - cicid: integer (nullable = true)
    - city_code: string (nullable = true)
    - state_code: string (nullable = true)
    - visa: integer (nullable = true)
    - year: integer (nullable = true)
    - month: integer (nullable = true)
    - mode: integer (nullable = true)
    - arrive_date: date (nullable = true)
    - departure_date: date (nullable = true)
    - country: string (nullable = false)

* dim_immigrate_person
    - cicid: integer (nullable = true)
    - citizen_country_code: integer (nullable = true)
    - citizen_state_code: integer (nullable = true)
    - birth_year: integer (nullable = true)
    - gender: string (nullable = true)
    - insnum: string (nullable = true)

* dim_temperature
    - dt: string (nullable = true)
    - avg_temperture: string (nullable = true)
    - avg_temp_uncertainty: string (nullable = true)
    - city: string (nullable = true)
    - country: string (nullable = true)

* dim_demographic
    - city: string (nullable = true)
    - state: string (nullable = true)
    - median_age: float (nullable = true)
    - male_population: float (nullable = true)
    - female_population: float (nullable = true)
    - total_population: float (nullable = true)
    - number_of_veterans: float (nullable = true)
    - foreign_born: float (nullable = true)
    - avg_household_size: float (nullable = true)
    - state_code: string (nullable = true)
    - race: string (nullable = true)
    - count: float (nullable = true)


* dim_airport
    - ident: string (nullable = true)
    - type: string (nullable = true)
    - name: string (nullable = true)
    - elevation_ft: integer (nullable = true)
    - iso_country: string (nullable = true)
    - state_code: string (nullable = true)
    - municipality: string (nullable = true)
    - coordinates: string (nullable = true) 

# Explore United States Immigration Data
### Data Engineering Capstone Project

#### Project Summary
The purpose of this data engineering capstone project is to create an ETL pipeline integrating data from different data sources for data analysis of immigration data for the US. We can explore insights from data and then make better decisions on immigration policies for those who came and will be coming in near future to the US.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
This project we will be using the following datasets: I94 Immigration Data, World Temperature Data, U.S. City Demographic Data to create data warehouse
* I94 Immigration Data: Data contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).
* World Temperature Data: This dataset is from Kaggle and contains monthly average temperature data at different country in the world wide.
* U.S. City Demographic Data: This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.

The technology used in this project is:
- Apache Hadoop: using to storage data for reading and processed
- Apche Spark: using to transform and analyst data
- Apache Aiflow: using to schedule tasks

### Step 2: Explore and Assess the Data
Refer to <mark>Capstine Project Template.ipynb</mark>

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

![diagram](image.png)

- The imigration fact table is a center of the model. This table's data comes from the immigration dataset contain state_code link to the dimension tables.
- The temperature dimension table look at the temperature data of New York city
- The demographic dimension table allow us explore the population of states and Which areas have the most traveller to visit,...
- The airport dimension table show us description of areas airport

Immigration fact:
- read dataset from SAS file
- count number of record in df_immigration
- drop ducplicates base on cicid
- convert arrive_date and departure_date to timestamp
- count null with two columns arrive_date and depature_date
- check the difference between gender
- select columns necessary and rename for fact table
- write parquet file to hdfs

Demographic dimension:
- read dataset from csv file
- drop duplicate data
- change type of some columns to interger
- find top 5 city have the most population
- find sum of veterans at Chicago in each state
- rename all columns
- write parquet file to hdfs

Temperature dimension:
- read dataset from csv file or hdfs
- check null with AverageTemperature and AverageTemperatureUncertainty column
- limit data in US country and New York city
- check recent date and previsous date
- rename columns and add state_code column fill with "NY" to join with fact table
- write parquet file to hdfs

Airport dimension:
- read dataset from csv file or hdfs
- check null of some columns
- count number of types airports
- convert elevation_ft to type integer
- the average elevation in each airports
- select columns necessary and add "state_code" column to join with fact table
- write parquet file to hdfs

### Step 4: Run Pipelines to Model the Data 
