# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project develops a data model based on immigration data for immigration into the US and develops tables to be used for analysis.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import glob

### Step 1: Scope the Project and Gather Data

#### Scope 
The scope of this project is to provide 2 analytical tables to be used for analysis on US demographics and immigration. One table will focus on individual immigration records, the other will contain US city demographic information. New columns will be created to allow comparison of median age of residents and median age of visitors to a state. Additionally a new column will be created that shows count of visitors to a state, so analysis can be performed on the most popular and least popular states and how this compares to overall population.

#### Describe and Gather Data 
In this project we will use 3 different data sources.  
Our first dataset contains I94 immigration data which has details of any immigration into the US. This includes people permanently moving and just visiting the US.   
The second dataset we will use contains US city demographic data.  
Our final data source is a json file containing the 2 letter state code and the full name of the state it corresponds to.

In [2]:
# Read in the data here
immigration_data_df=pd.read_csv("immigration_data_sample.csv")
us_cities_df=pd.read_csv("us-cities-demographics.csv", delimiter=';')
code_state_df=pd.read_json("code_state.json")

In [3]:
# view code state mapping json
code_state_df.head()

Unnamed: 0,Code,State
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [4]:
# view us cities demographic data
us_cities_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [5]:
# drop unneeded columns from us cities data
us_cities_df.drop(['Race', 'Count', 'Number of Veterans', 'Foreign-born', 'Average Household Size'], axis=1, inplace=True)

In [6]:
# view sample of immigration data
immigration_data_df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [7]:
# set up Spark session
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()
# read in one of the files as a spark dataframe
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [8]:
# create a list of all .sas7bdat files
all_files = glob.glob('../../data/18-83510-I94-Data-2016/*.sas7bdat')
# drop the first file as it has already been used to create df_spark
all_files.remove('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
all_files

['../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat']

In [9]:
# check all files have the same number and names of columns
columns=[]
for fname in all_files:
    df = spark.read.format('com.github.saurfang.sas.spark').load(fname)
    equal_columns = (df.columns == df_spark.columns)
    columns.append([fname,len(df.columns), equal_columns])
columns

[['../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat', 28, True],
 ['../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat', 28, True],
 ['../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat', 28, True],
 ['../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat', 34, False],
 ['../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat', 28, True],
 ['../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat', 28, True],
 ['../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat', 28, True],
 ['../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat', 28, True],
 ['../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat', 28, True],
 ['../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat', 28, True],
 ['../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat', 28, True]]

In [10]:
# june 2016 data has different columns
df_jun16 = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat')
df_jun16.columns

['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'validres',
 'delete_days',
 'delete_mexl',
 'delete_dup',
 'delete_visa',
 'delete_recdup',
 'dtadfile',
 'visapost',
 'occup',
 'entdepa',
 'entdepd',
 'entdepu',
 'matflag',
 'biryear',
 'dtaddto',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype']

In [11]:
# create a list of relevant columns
df_columns=['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'matflag',
 'biryear',
 'gender',
 'visatype']
# filter spark dataframe
df_spark_filtered=df_spark.select(df_columns)

In [12]:
# union all immigration data files, selecting only the relevant columns to reduce memory storage
for fname in all_files:
    df = spark.read.format('com.github.saurfang.sas.spark').load(fname)
    df_2=df.select(df_columns)
    df_filtered=df_spark_filtered.union(df_2)

In [13]:
# count of immigration records
df_filtered.count()
# over 1 million records

6529303

In [None]:
#write to parquet
df_filtered.write.parquet("parquet_data")


In [15]:
# read in parquet data
df_filtered_pq=spark.read.parquet("parquet_data")

### Step 2: Explore and Assess the Data

#### Check for null values and remove columns with over 50% null data

In [16]:
# Check for columns which contain > 50% null values
nulls_list=[]
for col in df_filtered_pq.columns:
    count_nulls=df_filtered_pq.filter(F.col(col).isNull()).count()
    nulls_list.append([col, "No of nulls: " +str(count_nulls)])

In [17]:
# get total number of rows
df_filtered_pq_count=df_filtered_pq.count()
# get value for 50% of rows
df_filtered_pq_count*0.5
# 50 % is equal to 3264651

3264651.5

In [18]:
# show nulls_list and look for any columns where nulls are above 3264651
nulls_list

[['cicid', 'No of nulls: 0'],
 ['i94yr', 'No of nulls: 0'],
 ['i94mon', 'No of nulls: 0'],
 ['i94cit', 'No of nulls: 969'],
 ['i94res', 'No of nulls: 0'],
 ['i94port', 'No of nulls: 0'],
 ['arrdate', 'No of nulls: 0'],
 ['i94addr', 'No of nulls: 319417'],
 ['depdate', 'No of nulls: 357245'],
 ['i94bir', 'No of nulls: 1495'],
 ['i94visa', 'No of nulls: 0'],
 ['count', 'No of nulls: 0'],
 ['matflag', 'No of nulls: 342281'],
 ['biryear', 'No of nulls: 1495'],
 ['gender', 'No of nulls: 481875'],
 ['visatype', 'No of nulls: 0']]

Columns that contain over 50% null values:
- depdate 
- matflag 
- gender  

*depdate* represents departure date. Where it is null may just been the person has moved
to the US permanently or just not yet departed, so we will not drop this column.  
*matflag* shows a match between arrdate and depdate, it does not provide much information so can be dropped.  
*gender* does not contain enough data to be used for useful analysis so will be dropped.

In [19]:
# drop redundant columnns
df_dropped=df_filtered_pq.drop('matflag').drop('gender')
# drop duplicates
df_dropped=df_dropped.dropDuplicates()

#### Ensure data contains a unique ID column

In [20]:
# check cicid column is unique
df_dropped_count=df_dropped.count()
cicid_unique_count=df_dropped.select('cicid').distinct().count()
print("The count of rows in the dataframe is: " + str(df_dropped_count) + ". The count of distinct cicid values is: "
     + str(cicid_unique_count) +".")

The count of rows in the dataframe is: 6529303. The count of distinct cicid values is: 5127515.


Since cicid is not unique we will drop it as it is not able to serve as a unique id value. We will then create a new id column

In [21]:
# creating new id column and dropping cicid column
df_dropped=df_dropped.withColumn('id', F.row_number().over(Window.orderBy(F.monotonically_increasing_id()))
                                ).drop('cicid')

In [22]:
# rename and re-format columns
immigration_data=df_dropped.select(
        F.col('id'),
        F.col('i94yr').alias('record_year'),
        F.col('i94mon').alias('record_month'),
        F.col('i94cit').alias('origin_location'),
        F.col('i94res').alias('resident_location'),
        F.col('i94port').alias('airport_code'),
        F.col('arrdate').alias('arrival_date'),
        F.col('i94addr').alias('destination'),
        F.col('depdate').alias('departure_date'),
        F.col('i94bir').alias('age'),
        F.col('visatype').alias('visa_type')
)

In [23]:
# reference date for SAS (January 1, 1960)
sas_reference_date = "1960-01-01"

# convert departure and arrival date from SAS date types
immigration_data = immigration_data.withColumn(
    "arrival_date", F.expr(f"date_add('{sas_reference_date}', arrival_date)")).withColumn(
    "departure_date", F.expr(f"date_add('{sas_reference_date}', departure_date)"))

#### Perform transformations on the us_cities dataframe

In [24]:
# reformat columns, putting all columns in lowercase and removing any whitespace
us_cities_df.columns=us_cities_df.columns.str.lower().str.replace(' ', '_')
us_cities_df.columns=us_cities_df.columns.str.lower().str.replace('-', '_')

In [25]:
# capitalise and trim city and state names
us_cities_df['city']=us_cities_df['city'].str.upper()
us_cities_df['state']=us_cities_df['state'].str.upper()

In [26]:
# create a unique id column based on city and state
us_cities_df['city_state_id']=us_cities_df.state_code.str.lower()+'_'+us_cities_df.city.str.lower().str.replace(' ', '_')


In [27]:
# rename median_age column
us_cities_df=us_cities_df.rename(columns={'median_age':'median_resident_age'})

In [28]:
# drop duplicate values in US cities data
us_cities=us_cities_df.drop_duplicates()

In [29]:
# change order of columns
ordered_list=[ 'city_state_id','city', 'state', 'state_code', 'male_population', 'female_population',
 'total_population','median_resident_age']
us_cities=us_cities[ordered_list]


#### Reformat column names in state_code dataframe

In [30]:
# put all columns in lowercase and remove any whitespace
code_state_df.columns=code_state_df.columns.str.lower().str.replace(' ', '_')


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
**Steps:**
1. Group *immigration_data* by *destination* and calculate the *median_visitor_age* as a new column.  
2. Left join *us_cities* with *immigration_data*, to get all columns in us_cities plus just the *median_visitor_age* column which is created from data from *immigration_data*. Join using *us_cities.state_code* and *immigration_data.destination* columns.
3. Calculate total sum of visitors by *destination*, creating a new column *count_of_visitors* on the dataframe *immigration_data*.
4. Join *immigration_data* with *code_state* to get the full names of the states.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

1. Calculate median age of visitors using the immigration_data dataframe.

In [31]:
# calculate median_age of visitors
# define a function to calculate the median
def calculate_median(df, group_col, target_col):
    # Collect the median values as a list of tuples
    median_values = df.groupBy(group_col).agg(
        F.expr(f'percentile_approx({target_col}, 0.5)').alias('median_visitor_age')
    ).collect()
    
    # Convert the list of tuples to a dictionary for easy access
    median_dict = {row[group_col]: row['median_visitor_age'] for row in median_values}
    return median_dict

In [32]:
# calculate the median age by destination
median_age_by_destination = calculate_median(immigration_data, 'destination', 'age')

In [33]:
# convert the dictionary to a pandas DataFrame
median_age_df = spark.createDataFrame(
    [(k, v) for k, v in median_age_by_destination.items()],
    [ 'destination', 'median_visitor_age']    
).toPandas()

2. Add *median_visitor_age* to *us_cities* dataframe.

In [34]:
# join median_age_df with us_cities
us_cities=us_cities.merge(median_age_df, left_on='state_code', right_on='destination', how='left').drop(
    'destination', axis=1)

3. Calculate the total sum of visitors by *destination*.

In [35]:
# calculate the count of visitors by destination
count_of_visitors= immigration_data.select(
    F.col('id'), 
    F.col('destination').alias('visitor_destination')
).groupBy('visitor_destination').agg(
         F.count('id').alias('count_of_visitors'))

In [36]:
# join count_of_visitors with immigration_data
immigration_data=immigration_data.join(count_of_visitors, 
                                       immigration_data.destination==count_of_visitors.visitor_destination, 
                                       how='left').drop('visitor_destination')

Get full state names.

In [37]:
# create spark version of code_state_df
code_state_spark=spark.createDataFrame(code_state_df)

In [38]:
# join city_state with immigration_data
immigration_data=immigration_data.join(code_state_spark, immigration_data.destination==code_state_spark.code,
                                       how='left'
                                      ).withColumnRenamed('state', 'destination_name').drop('code')

#### 4.2 Data Quality Checks

In [39]:
# define a test to ensure the table contains data
def check_for_records(df):
    ''' 
    Checks that a pandas dataframe contains data by checking that the number of records is at least 1.
    
    Args:
        df: pandas dataframe
    Returns:
        Error message if df contains no records, else nothing is returned
    '''
    if len(df) < 1:
        return 'Error: the dataframe ' + str(df) +' does not contain any records.'

In [40]:
# run test
check_for_records(us_cities)
# test passed

In [41]:
# unit test to check expected column headers in us_cities (checking they have been reformatted)
def column_check(df, expected_columns):
    '''
    Checks column names of a dataframe against a list of expected column names. 
    Columns must be in the same order for the test to pass.
    
    Args:
        df: pandas dataframe
        expected_columns: list of expected columns in the desired order, column names will be case sensitive. 
    Returns:
        AssertionError if expected columns do not match df columns
    '''
    assert df.columns.tolist()==expected_columns

In [42]:
# run test
expected_us_cities_columns=['city_state_id','city', 'state', 'state_code', 'male_population', 'female_population',
       'total_population', 'median_resident_age','median_visitor_age']
column_check(us_cities, expected_us_cities_columns)
# test passed

#### All tests passed.

#### 4.3 Data dictionarys

| Table | Column name | Description|
| --- | --- | --- |
| us_cities | city_state_id | Unique identifier made up of state code and city name |
| us_cities | city | US city name |
| us_cities | state | US state full name |
| us_cities | state_code | US state 2 letter code |
| us_cities | male_population | Male population volume for the city |
| us_cities | female_population | Female population volume for the city |
| us_cities | total_population | Total population volume for the city |
| us_cities | median_age | Median age of residents in the city |
| us_cities | median_visitor_age | Median age of visitors to the state | 

| Table | Column name | Description|
| --- | --- | --- |
| immigration_data | id | Unique identifier |
| immigration_data | record_year | Year that the data was recorded |
| immigration_data | record_month | Month that the data was recorded represented as a number 1-12 |
| immigration_data | origin_location | Location the person flew in from |
| immigration_data | resident_location | Location the person is legally a resident of |
| immigration_data | airport_code | Code of the aiport they flew to |
| immigration_data | arrival_date | Date of arrival into the US |
| immigration_data | destination | Destination of the person given as US state 2 letter code |
| immigration_data | departure_date | Data of departure from the US |
| immigration_data | age | Age in years |
| immigration_data | visa_type | Type of visa |
| immigration_data | median_visitor_age | Median age of visitors to the destination |
| immigration_data | count_of_visitors | Count of visitors by destination |
| immigration_data | destination_name | Name of destination state |

#### Step 5: Complete Project Write Up

## Project write up
I used Spark for the immigration data as this was a really large dataset with over 6,000,000 rows. One of the main benefits of using Spark for this data is that Sparks execution engine will optimise the execution plan and its in-memory computation will increase processing speed. Due to the distributed computing element of Spark it can process data in parallel across a cluster of machines so it can handle this large dataset efficiently.
For our smaller datasets such as US city demographic data I used Pandas, as I didn't need to utilise the distributed computing or scalability aspects of Spark it was more compute efficient to use Pandas. 

The data should be updated monthly using Batch processing. It does not need to be updated live as we are interested in using the data for retrospective analysis. A monthly frequency will be sufficient for analysis. 

- If the data was increased by 100x I would use partitioning in Spark to partition data so it can be ran in parallel across multiple clusters. I would partition by destination.  
- If the data populated a dashboard that must be updated on a daily basis by 7am every day I would first increase the frequency that data is updated, from monthly to daily, updating at midnight. I would schedule the update of the data using Airflow and a daily schedule. I would configure the DAG using an SLA to complete by 7am every day.  
- If the data needed to be accessed by 100+ people I would host it in Snowflake to allow for a user friendly experience that allows multiple users to access the same dataset at the same time. 