# US Immigration data analysis
### Data Engineering Capstone Project

#### Project Summary
In this project, we will analyze the immigration data in the different cities of the United States of America over the years. More precisely, we will specifically analyze the immigration volumes by air transportation for analytical purposes for an airline company or the government. For example, if the government knows that a given city has growing visitors coming to this city, it can improve the infrastructure of the airports in that city, etc...

The project will be structured by the following steps:
* Step 1: Scope of the Project and Data description
* Step 2: Exploring and Assessing the Data
* Step 3: Defining the Data Model
* Step 4: Runing ETL to Model the Data
* Step 5: Project Write Up

### Step 1: Scope of the Project and Data description

#### Scope of the project

In this project, I plan to study the immigration volumes in the US cities by month for analytical purposes, to answer queries like : what US city has been the most the visited for a given year and month ? How many people did come to that city ? What are the features of this city (number of population, average age of the population, etc...).

#### Description of the data :

To that end, we will use 3 data sources :
- the **I94 immigration data** : this data comes from the US National Tourism and Trade Office (see [here](https://travel.trade.gov/research/reports/i94/historical/2016.html)). This dataset contains information about the entries into the US territory : the date of entry, the city of arrival, the gender of the immigrant, his age, etc... This data is not an open data, and actually quite expensive, that's why we will only use the data of the year 2016 (which is provided by Udacity) ;
- the **U.S. City Demographic Data** : as its name indicates, this data is about the population data in the US cities, and comes from the US Census Bureau's 2015 American Community Survey ( for more info, see [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)) ;
- the **World Temperature Data** : this dataset contains information about the temperatures in the world cities since 1743, and comes from Kaggle (for more info, see [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)) ;


 


In [155]:
# Do all imports and installs here
import pandas as pd
import numpy as np
import seaborn as sns
import s3fs
import json

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as _sum
from pyspark.sql.functions import avg

In [166]:
pd.set_option('display.max_rows', None)

In [156]:
#! pip install s3fs

In [157]:
# aws credentials
AWS_ACCESS_KEY = ''
AWS_SECRET_KEY = ''

# s3 output path for saving files
S3_PATH = 's3://udacity/capstone/project/'

In [158]:
# s3 filesystem to save file in s3
fs = s3fs.S3FileSystem(key=AWS_ACCESS_KEY, secret=AWS_SECRET_KEY)

### Step 2: Exploring and Assessing the Data

#### Exploring the Temperature data

In [3]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperatures_df = pd.read_csv(fname)
temperatures_df.shape

(8599212, 7)

In [4]:
temperatures_df['dt'] = pd.to_datetime(temperatures_df['dt'], format='%Y-%m-%d')
temperatures_df['year'] = temperatures_df['dt'].dt.year
temperatures_df['month'] = temperatures_df['dt'].dt.month

temperatures_df.sort_values(['year', 'month'], ascending=[True, True], inplace=True)

In [5]:
temperatures_df.head(2)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,year,month
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E,1743,11
3239,1743-11-01,10.013,2.291,Çorlu,Turkey,40.99N,27.69E,1743,11


In [6]:
temperatures_df.tail(2)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,year,month
8595972,2013-09-01,,,Zuwarah,Libya,32.95N,12.45E,2013,9
8599211,2013-09-01,,,Zwolle,Netherlands,52.24N,5.26E,2013,9


#### Dataset description : 
This dataset contains information of world temperature data from November 1743 to September 2013. However our US immigration data contains data of 2016, so we'll approximate the temperatures in the US cities by the last information available for them in this dataset for every month. Moreover, we can see that there are some null values in the **AverageTemperature** column, so we will first remove these null values before filtering the last temperature information for US cities. These cleaning steps will be done in the ETL pipeline.

#### Exploring the US cities demographics data :

In [11]:
us_cities_demographics = pd.read_csv('us-cities-demographics.csv', sep=';')
us_cities_demographics.shape

(2891, 12)

In [12]:
us_cities_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


#### Dataset description : 
By watching this dataset, we can see that each row of the dataset corresponds to the population data for a city with a zoom on the population of a particular race (White, Black or African-American, Hispanic or Latino, Asian, etc..). But we are actually not interested in the distinction between races, so we will deduplicate the lines by the couple (City, State) because we can have the same city name in different states. These steps will be down in the ETL pipeline.

In [13]:
def clean_us_cities_demographics_data(us_cities_demographics_df):
    ### deduplicate the lines
    us_cities_demographics_deduplicated = us_cities_demographics_df.drop_duplicates(subset=['City', 'State'])
    
    ### keep only the useful columns
    us_cities_demographics_deduplicated = us_cities_demographics_deduplicated[['City', 'State', 'Median Age', 'Male Population', 'Female Population', 
                                                                               'Total Population', 'Foreign-born', 'Average Household Size', 'State Code']]
    
    return us_cities_demographics_deduplicated
    

In [14]:
us_cities_demographics_deduplicated = clean_us_cities_demographics_data(us_cities_demographics)

#### Exploring the i94 immigration data :

In [17]:
sample_data = pd.read_csv('immigration_data_sample.csv', index_col=0)

In [21]:
sample_data.sample(5).T

Unnamed: 0,867733,237247,3049432,1908985,2714770
cicid,1.80340e+06,480428,5.70311e+06,3.861e+06,5.4711e+06
i94yr,2016,2016,2016,2016,2016
i94mon,4,4,4,4,4
i94cit,135,148,135,135,207
i94res,135,112,135,135,207
i94port,ATL,MIA,PBB,LOS,SEA
arrdate,20554,20547,20574,20565,20573
i94mode,1,1,3,1,1
i94addr,GA,FL,FL,NY,WA
depdate,20566,20559,20589,20571,20580


#### I94 immigration dataset description : 
This dataset actually contains a lot of information with each record corresponding to one person's travel to the united states. We have a lot of data that is not useful for us like for example the arrival date, the insurance number, etc... so we'll only use a handful of features for our analysis. Also, we don't have to drop duplicate lines here, but we need to aggregate the data at the level of month and city. The city is already available, and we can retrieve the city and the state thanks to the data dicitonnary available (*I94_SAS_Labels_Descriptions.SAS*). These steps will be down in the ETL pipeline.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Considering the type of analysis we want to do, the best data model will be a data model in a star schema, with one main fact table (the immigration_table) and two dimension tables (the city_table and the temperature_table). This architecture is a good choice here because the main analysis we want to do is computing how much people travelled to a given city in a given period of time (month(s)/year), and it is possible to do it with the fact table only, but if we want more information about the city like the number of population or the average temperature, we can do joins with these dimension tables.

##### Dimension table : temperature_table
* temperature_id : primary key
* City
* month
* AverageTemperature

##### Dimension table : city_table
* city_id : id which identifies uniquely a city (with the correspoding state)
* City
* State
* State Code
* Median Age
* Male Population
* Female Population
* Total Population
* Foreign-born
* Average Household Size

##### Fact table : immigration_table
* immigration_id : primary_key
* city_id : id which identifies uniquely a city (with the correspoding state)
* City : city, useful to join the temperature table 
* Month
* year
* Nb_passenger : number of passengers
* Average_age : average age of the passengers

#### 3.2 Chosen data model

The chosen data model here is that of a **datalake on s3**, and the reasons for this choice are the following :
* No need to access the data quickly, as it will be used for static analysis only
* The data are not big (even if the immigration data may seem to be very big, after the aggregations by month and city, it will have below 5000 lines), so charging the files from s3 eill be very fast
* It is less expensive than other solutions like databases such as Amazon Redshift or Postgres

#### 3.3 Mapping Out Data Pipelines

The steps necessary to pipeline the data into the chosen data model are the following:
* Read all the immigration data with Spark, as the data is big (approximately 3 million lines for 1 month of data), add the columns of city and state for the merge with the US Cities Demographic dataset, and aggregate the data according to the needs;
* Clean and process the Us demographics data to create the city table;
* Clean and process the world temperature data to create the temperature table;


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

For information, the entire ETL pipeline has been refactored in the *etl.py* script, so we can directly run it in a terminal to generate and save the fact and dimensions table. But for explanation purposes and better clarity, we will run the ETL pipeline below also.

#### Cleaning and processing the temperature data

In [174]:
def clean_and_process_temperature_data(temperatures_data_path, save_local=True, fs=None, save_to_s3=False, output_path=None):
    '''
    Cleans and processes the temperature data.
    If the s3 arguments are provided, also saves the final temperature table to s3.
    '''

    temperatures_df = pd.read_csv(temperatures_data_path)

    # format the date and extract the year and month
    temperatures_df['dt'] = pd.to_datetime(temperatures_df['dt'], format='%Y-%m-%d')
    temperatures_df['year'] = temperatures_df['dt'].dt.year
    temperatures_df['month'] = temperatures_df['dt'].dt.month

    temperatures_df.sort_values(['year', 'month'], ascending=[True, True], inplace=True)

    # Remove null values in the AverageTemperature column
    temperatures_df = temperatures_df[pd.notnull(temperatures_df['AverageTemperature'])]

    # Filter on the US
    temperatures_US = temperatures_df[temperatures_df['Country'] == 'United States']

    # Keep the last information
    temperatures_US.drop_duplicates(subset=['City', 'month'], keep='last', inplace=True)

    # Create the temperature id
    temperatures_US.index = np.arange(1, len(temperatures_US) + 1)
    temperatures_US = temperatures_US.reset_index()
    temperatures_US = temperatures_US.rename(columns={'index': 'temperature_id'})

    # Keeping the useful information
    temperatures_US = temperatures_US[['temperature_id', 'City', 'month', 'AverageTemperature']]

    if save_local:
        temperatures_US.to_csv('temperature_table.csv', sep=';', index=False)

    if save_to_s3:
        with fs.open(output_path + 'temperature_table.csv', 'wb') as output_file:
            temperatures_US.to_csv(output_file, sep=';', index=False)

    return temperatures_US

In [None]:
temperature_data_path = '../../data2/GlobalLandTemperaturesByCity.csv'

temperature_table = clean_and_process_temperature_data(temperature_data_path)

#### Cleaning and processing the us demographics data

In [176]:
def clean_and_process_us_cities_demographics_data(us_cities_demographics_path, save_local=True, fs=None, save_to_s3=False, output_path=None):
    '''

    Cleans and processes the us demographics data.
    If the s3 arguments are provided, also saves the final demographics table to s3 and/or locally.
    '''

    us_cities_demographics_df = pd.read_csv(us_cities_demographics_path, sep=';')

    # deduplicate the lines
    us_cities_demographics_deduplicated = us_cities_demographics_df.drop_duplicates(subset=['City', 'State'])

    # create city_id for the join with the immigration table
    us_cities_demographics_deduplicated['City'] = us_cities_demographics_deduplicated['City'].str.lower()
    us_cities_demographics_deduplicated['city_id'] = us_cities_demographics_deduplicated[['City', 'State Code']].apply(
        lambda x: hash(tuple(x)), axis=1)

    # keep only the useful columns
    us_cities_demographics_deduplicated_final = us_cities_demographics_deduplicated[
        ['city_id', 'City', 'State', 'State Code', 'Median Age', 'Male Population',
         'Female Population', 'Total Population', 'Foreign-born', 'Average Household Size']]

    if save_local:
        us_cities_demographics_deduplicated_final.to_csv('demographics_table.csv', sep=';', index=False)

    if save_to_s3:
        with fs.open(output_path + 'demographics_table.csv', 'wb') as output_file:
            us_cities_demographics_deduplicated_final.to_csv(output_file, sep=';', index=False)

    return us_cities_demographics_deduplicated

In [None]:
us_cities_demographics_data_path = 'us-cities-demographics.csv'

cities_table = clean_and_process_us_cities_demographics_data(us_cities_demographics_data_path)

#### Cleaning and processing the us immigrations data

In [178]:
def clean_and_process_immigration_data(immigration_data_path, save_local=True, fs=None, save_to_s3=False, output_path=None):
    '''

    Cleans and processes the us immigration data.
    If the s3 arguments are provided, also saves the final immigration table to s3 and/or locally.
    '''

    spark = SparkSession.builder. \
        config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
        .enableHiveSupport().getOrCreate()

    final_df_spark = spark.read.format('com.github.saurfang.sas.spark').load(
        immigration_data_path+'i94_jan16_sub.sas7bdat')
    final_df_spark_columns = list(final_df_spark.columns)

    months = ['feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']

    for month in months:
        file_path = immigration_data_path + 'i94_' + month + '16_sub.sas7bdat'
        df_spark = spark.read.format('com.github.saurfang.sas.spark').load(file_path)
        df_spark = df_spark.select(final_df_spark_columns)
        final_df_spark = final_df_spark.union(df_spark)

    # filter only air transportation
    final_df_spark = final_df_spark[final_df_spark['i94mode'] == 1]

    # aggregate data
    immigration_data_aggregated = final_df_spark.groupBy("i94yr", "i94mon", "i94port").agg(_sum("count"), avg('i94bir'))

    # convert to pandas dataframe
    pd_immigration_data_aggregated_df = immigration_data_aggregated.toPandas()

    # Add the information about city and State to create city_id
    with open('mapping_port_city_state.json', 'r') as fp:
        map_port_city = json.load(fp)

    mapping_port_city_df = pd.DataFrame(
        {'i94port': list(map_port_city.keys()), 'City, State Code': list(map_port_city.values())})

    # Create City ans State Code from the data dictionary of the I94 data
    mapping_port_city_df['City'] = mapping_port_city_df['City, State Code'].map(lambda x: str(x).split(',')[0])

    mapping_port_city_df['State Code'] = mapping_port_city_df['City, State Code'].map(
        lambda x: str(x).split(',')[1] if len(str(x).split(',')) > 1 else pd.np.NaN)
    mapping_port_city_df['State Code'] = mapping_port_city_df['State Code'].str.strip()
    mapping_port_city_df['State Code'] = mapping_port_city_df['State Code'].map(lambda x: str(x).split(' ')[0])

    pd_immigration_data_aggregated_df_final = pd.merge(pd_immigration_data_aggregated_df,
                                                       mapping_port_city_df[['i94port', 'City', 'State Code']],
                                                       on='i94port',
                                                       how='left')

    pd_immigration_data_aggregated_df_final['City'] = pd_immigration_data_aggregated_df_final['City'].str.lower()

    pd_immigration_data_aggregated_df_final['city_id'] = pd_immigration_data_aggregated_df_final[
        ['City', 'State Code']].apply(lambda x: hash(tuple(x)), axis=1)

    # Create primary key
    pd_immigration_data_aggregated_df_final.index = np.arange(1, len(pd_immigration_data_aggregated_df_final) + 1)
    pd_immigration_data_aggregated_df_final = pd_immigration_data_aggregated_df_final.reset_index()

    # rename the columns
    pd_immigration_data_aggregated_df_final = pd_immigration_data_aggregated_df_final.rename(columns={'i94yr': 'Year',
                                                                                                      'i94mon': 'Month',
                                                                                                      'sum(count)': 'Nb_passengers',
                                                                                                      'avg(i94bir)': 'Average_age',
                                                                                                      'index': 'immigration_id'})

    pd_immigration_data_aggregated_df_final = pd_immigration_data_aggregated_df_final[
        ['immigration_id', 'city_id', 'City',
         'Year', 'Month',
         'Nb_passengers', 'Average_age']]

    if save_local:
        pd_immigration_data_aggregated_df_final.to_csv('immigration_table.csv', sep=';', index=False)

    if save_to_s3:
        with fs.open(output_path + 'immigration_table.csv', 'wb') as output_file:
            pd_immigration_data_aggregated_df_final.to_csv(output_file, sep=';', index=False)

    return pd_immigration_data_aggregated_df_final

In [None]:
immigration_data_path = '../../data/18-83510-I94-Data-2016/'

immigration_table = clean_and_process_immigration_data(immigration_data_path)

#### 4.2 Data Quality Checks

We will do 2 data quality checks to ensure that we don't have errors on the data, and the pipeline ran without errors.


 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
* => Checking that the temperatures are always positive
* => Checking that we have information on all the cities

#### 1 - Check that each key in the city table (city_id) is unique

By construction, it is the case for the temperature_id and immigration_id (as they are created from the indexes), but as the city_id is created by hashing the city and the state, we have to check that.

In [183]:
city_table = pd.read_csv('demographics_table.csv', sep=';')

In [189]:
if city_table.city_id.nunique()== len(city_table):
    print('Good News, each city_id is unique !')
else:
    print('There is an error in the city table, the primary keys are not unique !')

Good News, each city_id is unique !


Fortunately each key is unique.

#### 2 - Check that we have information about all the cities of the immigration table in the city table

This is not something which is obvious, because the 2 tables come from different sources.

In [190]:
immigration_table =pd.read_csv('immigration_table.csv', sep=';')

In [193]:
city_table = pd.read_csv('demographics_table.csv', sep=';')

In [196]:
final_table = pd.merge(immigration_table, city_table[['city_id', 'State']],on = 'city_id', how='left')

In [199]:
if final_table.State.isnull().sum()== 0:
    print('Good News, we have information for all the cities in the immigration table !')
else:
    missing_percentage = final_table.State.isnull().sum()/len(final_table)
    print(f'There is {100*round(missing_percentage, 2)}% missing data on cities in the city table !')

There is 62.0% missing data on cities in the city table !


Well, it seems we are missing information on 62% of the citis in the immigration table.

#### 4.3 Data dictionary 
You can find here a brief description of what the data is and where it came from, and also a small explanation for each field.

##### Dimension table : temperature_table (comes from the World Temperature Data from Kaggle)
* temperature_id : primary key
* City 
* month
* AverageTemperature

##### Dimension table : city_table
* city_id : id which identifies uniquely a city (with the corresponding state), created by creating a hash of the city + State Code
* City
* State
* State Code
* Median Age
* Male Population
* Female Population
* Total Population
* Foreign-born
* Average Household Size

##### Fact table : immigration_table
* immigration_id : primary_key
* city_id : id which identifies uniquely a city (with the correspoding state)
* City : city, useful to join with the temperature table 
* Month
* year
* Nb_passenger : number of passengers
* Average_age : average age of the passengers

### Step 5: Project Write Up
### Rationale for the choice of tools and technologies for the project.

* For this project, we used Spark to analyze and aggregate the immigration data, because one month of is approximately 3 millions rows, so 12 months would have been 36 million rows. To work with this large amount of data, Spark is a good choice as it will work more efficiently than tools like pandas, which may have been ok for this analysis too, but may it should have needed more RAM and take more time to do the same job. I have also chosen a data model **datalake on s3**, for the following reasons  : no need to access the data quickly, as it will be used for static analysis only; the data are not big , so charging the files from s3 will be very fast and it is less expensive than other solutions such as Amazon Redshift or Postgres.

* How often the data should be updated : for this project, the data should be updated once a month, because we are doing analysis of immigration data by city and by month, so it will be logical to update it monthly.

* How I would approach the problem differently under the following scenarios:
 * The data was increased by 100x : in this case, we will be dealing with big data, so the solution would a NoSQL database, and consequently we will have to model the database according to the queries we want to run. 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day : in this case, I will use a tool like Airflow which is great for automation and monitoring.
 * The database needed to be accessed by 100+ people : in this case, we may have consistency issues, so we'll have to use a relational databases like Postgres or Amazon Redshift to solve this issue.