# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project was made to get the immigration data and make all changes needed to clean the data and get the main information where we'd make some analysis and undestand some pattern.

The main objective is get only the information that can bring us some important idea about:
- How they enter the country (the vehicle used, the visa type)
- Where they come from and where they go
- When they come and how the spend in the country

All this information can be used to make more accurate decisions about how to invest the states money. For example, if a state receive many immigrants from airplanes, they can invest in better airports. Other example is one state which is receiving many people to perform a specific job, could be an option invest in a better education in that segment.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import pandas as pd
import datetime

from pyspark.sql.functions import udf, expr, pandas_udf, year, month, dayofmonth
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

For this project, we will use the US immigration data from the year 2016. These data are provided by the US National Tourism and Trade Office through its website and they offer details of these immigrants such as the place of immigration, the date of arrival, the type of immigration visa, among other information. 

These datasets will be restructured to contain less information because we want to understand only the types of immigrants that are arriving in the country using their type of visa and the place they are immigrating to. 

These information are sufficient to have a good idea about the immigration profile in each state and can be useful to government officials to improve tourism or even education policies based on these data.

In [2]:
# Read in the data here
df = pd.read_csv('immigration_data_sample.csv')
df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [6]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [6]:
	
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [3]:
df_spark.printSchema

<bound method DataFrame.printSchema of DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string]>

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps

In the analyzes we want to perform, it is more important to work with the known data than try to treat all the data, so we will do the following steps:

- Remove the lines that do not have a visa type value;
- Remove the lines that do not have the information about the state that received the immigrant;
- Remove the lines that have distinct values for the i94cit and i94res columns.

In [24]:
df_spark = df_spark.filter("i94cit == i94res and i94addr != '' and visatype != '' and arrdate is not null and depdate is not null")

In [25]:
df_spark.dropDuplicates()

DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string]

In [26]:
df = df_spark.withColumn('immigration_id', expr('cast(cicid as int)')).withColumn('origin_id', expr('cast(i94cit as int)')).withColumn('visa_id', expr('cast(i94visa as int)')).withColumn('mode_id', expr('cast(i94mode as int)'))

In [27]:
get_timestamp_from_sas = udf(lambda sas: datetime.datetime(1960, 1, 1) + datetime.timedelta(days=int(sas)), TimestampType())

In [28]:
df = df.withColumn('arrival_date', get_timestamp_from_sas(df.arrdate)).withColumn('departure_date', get_timestamp_from_sas(df.depdate))

In [49]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string, immigration_id: int, origin_id: int, visa_id: int, mode_id: int, arrival_date: timestamp, departure_date: timestamp]>

In [29]:
df = df.selectExpr('immigration_id', 'origin_id', 'i94addr as destination', 
                                                 'arrival_date', 'departure_date', 'dayofmonth(arrival_date) as day', 'month(arrival_date) as month', 'year(arrival_date) as year',
                                                 'mode_id', 'visa_id', 'visatype as visa_type_id')

In [30]:
df.show()

+--------------+---------+-----------+-------------------+-------------------+---+-----+----+-------+-------+------------+
|immigration_id|origin_id|destination|       arrival_date|     departure_date|day|month|year|mode_id|visa_id|visa_type_id|
+--------------+---------+-----------+-------------------+-------------------+---+-----+----+-------+-------+------------+
|            15|      101|         MI|2016-04-01 00:00:00|2016-08-25 00:00:00|  1|    4|2016|      1|      2|          B2|
|            16|      101|         MA|2016-04-01 00:00:00|2016-04-23 00:00:00|  1|    4|2016|      1|      2|          B2|
|            17|      101|         MA|2016-04-01 00:00:00|2016-04-23 00:00:00|  1|    4|2016|      1|      2|          B2|
|            18|      101|         MI|2016-04-01 00:00:00|2016-04-11 00:00:00|  1|    4|2016|      1|      1|          B1|
|            19|      101|         NJ|2016-04-01 00:00:00|2016-04-14 00:00:00|  1|    4|2016|      1|      2|          B2|
|            20|

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

A star schema that would be used for this data set was idealized, allowing the useful data for the analyzes and for the clusters to be gathered in the main table, while the other tables bring more descriptive information that will be important later to better detail the results.

To format this data model would not be necessary too many steps, because the immigration data set already contains the necessary information for the fact table, so it is only needed to remove a few columns and modify the columns containing dates to format them correctly.

As for dimension tables, they will be created using information that describes the values of the dataset columns and that are found either on government websites or in the dataset description document.

The image below represents the schema choosen to our database:
![Star Schema](images/StarSchema.png)


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [3]:
def read_data(filename):
    file = '../../data/18-83510-I94-Data-2016/{}'.format(filename)
    return spark.read.format('com.github.saurfang.sas.spark').load(file)

In [1]:
def clean(df):
    return df.filter("i94cit == i94res and i94addr != '' and visatype != '' and arrdate is not null and depdate is not null").dropDuplicates()

In [4]:
get_timestamp_from_sas = udf(lambda sas: datetime.datetime(1960, 1, 1) + datetime.timedelta(days=int(sas)), TimestampType())

def get_column_dates_from_sas_to_timestamp(df):
    return df.withColumn('arrival_date', get_timestamp_from_sas(df.arrdate)).withColumn('departure_date', get_timestamp_from_sas(df.depdate))

In [5]:
def get_ids_columns(df):
    return df.withColumn('immigration_id', expr('cast(cicid as int)')).withColumn('origin_id', expr('cast(i94cit as int)')).withColumn('visa_id', expr('cast(i94visa as int)')).withColumn('mode_id', expr('cast(i94mode as int)'))

In [6]:
def select_columns(df):
    return df.selectExpr('immigration_id', 'origin_id', 'i94addr as destination', 'arrival_date', 'departure_date', 'mode_id', 'visa_id', 'visatype as visa_type_id')

In [7]:
def etl(files):
    for num, name in enumerate(files, start=1):
        df_immigrations = read_data(name)

        df_immigrations = clean(df_immigrations)
        df_immigrations = get_ids_columns(df_immigrations)
        df_immigrations = get_column_dates_from_sas_to_timestamp(df_immigrations)
        df_immigrations = select_columns(df_immigrations)

        df_immigrations.write.partitionBy('visa_type_id').parquet('parquet_data/{}/'.format(num), mode='overwrite')

        print("File {}: {}.".format(num, name))

In [20]:
files = [
    'i94_jan16_sub.sas7bdat',
    'i94_feb16_sub.sas7bdat',
    'i94_mar16_sub.sas7bdat',
    'i94_apr16_sub.sas7bdat',
    'i94_may16_sub.sas7bdat',
    'i94_jun16_sub.sas7bdat',
    'i94_jul16_sub.sas7bdat',
    'i94_aug16_sub.sas7bdat',
    'i94_sep16_sub.sas7bdat',
    'i94_oct16_sub.sas7bdat',
    'i94_nov16_sub.sas7bdat',
    'i94_dec16_sub.sas7bdat'
]

etl(files)

File 1: i94_jan16_sub.sas7bdat.
File 2: i94_feb16_sub.sas7bdat.
File 3: i94_mar16_sub.sas7bdat.
File 4: i94_apr16_sub.sas7bdat.
File 5: i94_may16_sub.sas7bdat.
File 6: i94_jun16_sub.sas7bdat.
File 7: i94_jul16_sub.sas7bdat.
File 8: i94_aug16_sub.sas7bdat.
File 9: i94_sep16_sub.sas7bdat.
File 10: i94_oct16_sub.sas7bdat.
File 11: i94_nov16_sub.sas7bdat.
File 12: i94_dec16_sub.sas7bdat.


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

The data quality used in this project is only check if the ids aren't null or empty. Here whe are get only the january value to make fastier the process, mas what is made here can be used in a Airflow step using a database as Redshift.

The main objective is the queries return a empty list.

In [35]:
january_df = spark.read.parquet('parquet_data/1/')
january_df.show()

+--------------+---------+-----------+-------------------+-------------------+---+-----+----+-------+-------+------------+
|immigration_id|origin_id|destination|       arrival_date|     departure_date|day|month|year|mode_id|visa_id|visa_type_id|
+--------------+---------+-----------+-------------------+-------------------+---+-----+----+-------+-------+------------+
|          1153|      107|         IL|2016-01-16 00:00:00|2016-01-30 00:00:00| 16|    1|2016|      1|      2|          B2|
|         16124|      131|         NY|2016-01-08 00:00:00|2016-01-12 00:00:00|  8|    1|2016|      1|      2|          B2|
|         26940|      165|         CA|2016-01-01 00:00:00|2016-03-21 00:00:00|  1|    1|2016|      1|      2|          B2|
|         28627|      209|         GU|2016-01-06 00:00:00|2016-01-09 00:00:00|  6|    1|2016|      1|      2|          B2|
|         46913|      245|         IL|2016-01-08 00:00:00|2016-01-31 00:00:00|  8|    1|2016|      1|      2|          B2|
|         47174|

In [36]:
january_df.createOrReplaceTempView('immigrations_january')

In [39]:
spark.sql('SELECT * FROM immigrations_january WHERE (immigration_id IS NULL OR immigration_id = "")').show()

+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+
|immigration_id|origin_id|destination|arrival_date|departure_date|day|month|year|mode_id|visa_id|visa_type_id|
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+



In [40]:
spark.sql('SELECT * FROM immigrations_january WHERE (origin_id IS NULL OR origin_id = "")').show()

+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+
|immigration_id|origin_id|destination|arrival_date|departure_date|day|month|year|mode_id|visa_id|visa_type_id|
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+



In [41]:
spark.sql('SELECT * FROM immigrations_january WHERE (mode_id IS NULL OR mode_id = "")').show()

+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+
|immigration_id|origin_id|destination|arrival_date|departure_date|day|month|year|mode_id|visa_id|visa_type_id|
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+



In [42]:
spark.sql('SELECT * FROM immigrations_january WHERE (visa_id IS NULL OR visa_id = "")').show()

+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+
|immigration_id|origin_id|destination|arrival_date|departure_date|day|month|year|mode_id|visa_id|visa_type_id|
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+



In [43]:
spark.sql('SELECT * FROM immigrations_january WHERE (visa_type_id IS NULL OR visa_type_id = "")').show()

+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+
|immigration_id|origin_id|destination|arrival_date|departure_date|day|month|year|mode_id|visa_id|visa_type_id|
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

This dictionary is in `data_model.SAS` file

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

The purpose of this structure is to perform better grouping, therefore with the data in this format I can make the groups according to the type of visa, the mode of arrival or even by the state of arrival. 
 
These possibilities allow the profile of immigrants arriving in each region to be well explored, in this way the objective is to better understand these profiles to improve public policies according to the needs of each state. For example, when a state brings few tourists, it may be interesting to see if there is room for more tourism in that region. Another option is to check if a region receives many immigrants who enter the country to work, this may indicate that there are areas in the country that can be better supported.
 
The Spark is already used in the project, but the Airflow can be an important addition to allow regulation of tasks according to a scheduler and make the process more agile and automatic without losing control of the steps.

- If the data was increased by 100x.

A good alternative, in case the data increased a lot, would be to process the data in smaller chunks that can run in parallel to make the process faster and more efficient.

- If the pipelines were run on a daily basis by 7am.

The Airflow easily allows you to add this option to define a scheduler that can allow the data to search every day if there is new data to process.

- If the database needed to be accessed by 100+ people.

The idea is for these data to be available in a database, so one of the steps defined in the Airflow would be to insert the data into the database to become available. 

