# Immigrant cities data
### Data Engineering Capstone Project

#### Project Summary

In the Udacity provided project, I'll work with three datasets to complete the project. The main dataset will include data on immigration to the United States, and supplementary datasets will include data on airport codes, U.S. city demographics, and temperature data.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
# Do all imports and installs here
import pandas as pd
import re
pd.set_option('display.max_columns', 30) # to view all columns 
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import count,when,isnan, col, udf, year, month, round, dayofweek, weekofyear, isnull
from pyspark.sql.types import StringType
from pyspark.sql import functions as f

### Step 1: Scope the Project and Gather Data

#### Scope 
The objective of this project is to collect data from three different sources and produce fact and dimension tables in order to conduct immigration analysis using Spark and Pandas in the United States utilizing criteria such as city average temperature, city demographics, population number, and percentage.

The end-user can use this data to get insights into the population size of US cities and their ethnicity for other studies such as clothing sales of what they prefer and not, food favorites, etc.

In this project, the dataset can answer questions like the average temperature of US cities at different times. Number and percentage of US cities people. Visa and the transportation type of immigrants, etc.

#### Describe and Gather Data 
**I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://www.trade.gov/national-travel-and-tourism-office) is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.

**World Temperature Data:** This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

**U.S. City Demographic Data:** This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).



In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_i94 = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_temp = spark.read.format("csv").option("delimiter", ",").option("header", "true").load('../../data2/GlobalLandTemperaturesByCity.csv')
df_demo = spark.read.format("csv").option("delimiter", ";").option("header", "true").load('us-cities-demographics.csv')
df_airport = spark.read.format("csv").option("delimiter", ",").option("header", "true").load('airport-codes_csv.csv')

In [None]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 

#### Check number and duplicates of rows for each dataset

In [None]:
print('Number immigration rows: ',df_i94.count())
print('Number of distinct immigration rows: ',df_i94.distinct().count())
print('Number demographics rows: ',df_demo.count())
print('Number of distinct demographics rows: ',df_demo.distinct().count())
print('Number temperature rows: ',df_temp.count())
print('Number of distinct temperature rows: ',df_temp.distinct().count())

#### Check Schema for each dataset

In [None]:
print('Immigration Schema:')
df_i94.printSchema()
print('Demographics Schema:')
df_demo.printSchema()
print('Temperature Schema:')
df_temp.printSchema()

#### Display five records for each dataset

In [None]:
df_i94.limit(5).toPandas()

In [None]:
df_demo.limit(5).toPandas()

In [None]:
df_temp.limit(5).toPandas()

#### Check the number of nulls for each column in each dataset 

In [None]:
df_i94.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_i94.columns]
   ).toPandas()

occup, entdepu and insnum columns seem to be useless since they are over 3 million records are missing.

In [None]:
df_demo.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_demo.columns]
   ).toPandas()

There are few null values, the dataset will not be affected dropping these rows.

In [None]:
df_temp.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_temp.columns]
   ).toPandas()

There are a lot of null values, these nulls will be dropped. 

There are a lot of null values in iata_code column, these nulls will be dropped. 

In [None]:
# check duplicates in cicid column
if df_i94.count() > df_i94.dropDuplicates(['cicid']).count():
    raise ValueError('Data has duplicates')

#### Cleaning Steps

In [None]:
# Drop columns from Immigration dataset
cols = ('occup', 'entdepu','insnum')
df_i94 = df_i94.drop(*cols)
df_i94 = df_i94.dropna(how="any", subset=['i94port', 'i94addr', 'gender'])
df_i94.printSchema()

In [None]:
df_i94.limit(5).toPandas()

In [None]:
print('Number of demographics rows before dropping null values: ',df_demo.count())
df_demo = df_demo.na.drop("any")
print('Number of demographics rows after dropping null values: ', df_demo.count())

In [None]:
df_demo.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_demo.columns]
   ).toPandas()

In [None]:
print('Number of temperature rows before dropping null values: ',df_temp.count())
df_temp = df_temp.na.drop("any")
print('Number of temperature rows after dropping null values: ', df_temp.count())

In [None]:
df_temp.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_temp.columns]
   ).toPandas()

In [None]:
# Create list of valid ports
i94_sas_label_descriptions = 'I94_SAS_Labels_Descriptions.SAS'
with open(i94_sas_label_descriptions) as f:
    lines = f.readlines()

re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = {}
for line in lines[302:961]:
    results = re_compiled.search(line)
    valid_ports[results.group(1)] = results.group(2)

# Create list of valid states
valid_states = df_demo.toPandas()['State Code'].unique()
print(valid_states)

In [None]:
# Create udf to convert SAS date to PySpark date 
@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

# Create udf to validate state
@udf(StringType())
def validate_state(x):  
    if x in valid_states:
        return x
    return 'null'

In [None]:
# Extract valid states 
df_i94_cleaned = df_i94.withColumn('i94addr', validate_state(df_i94['i94addr']))

# Convert arrdate from SAS to PySpark format
df_i94_cleaned = df_i94.withColumn('arrdate', convert_datetime(df_i94['arrdate']))

# filter out null values
df_i94_cleaned = df_i94_cleaned.filter(df_i94_cleaned.i94addr != 'null')


df_i94_staging = df_i94_cleaned.select(col('cicid').alias('id'), 
                                       col('arrdate').alias('date'),
                                       col('i94port').alias('city_code'),
                                       col('i94addr').alias('state_code'),
                                       col('i94bir').alias('age'),
                                       col('gender').alias('gender'),
                                       col('i94visa').alias('visa_type'),
                                       col('i94mode').alias('transportation_type'),
                                       'count').drop_duplicates()

df_i94_staging.limit(5).toPandas()

In [None]:
# Calculate percentages of numeric columns and create new ones
df_demo_cleaned = df_demo.withColumn("median_age", df_demo['Median Age']) \
    .withColumn("total_pop",df_demo['Total Population'])\
    .withColumn("num_male_pop", df_demo['Male Population']) \
    .withColumn("prct_male_pop", (df_demo['Male Population'] / df_demo['Total Population']) * 100) \
    .withColumn("num_female_pop", df_demo['Female Population']) \
    .withColumn("prct_female_pop", (df_demo['Female Population'] / df_demo['Total Population']) * 100) \
    .withColumn("num_veterans", df_demo['Number of Veterans']) \
    .withColumn("prct_veterans", (df_demo['Number of Veterans'] / df_demo['Total Population']) * 100) \
    .withColumn("num_foreign_born", df_demo['Foreign-born'] ) \
    .withColumn("prct_foreign_born", (df_demo['Foreign-born'] / df_demo['Total Population']) * 100) \
    .withColumn("race", df_demo['Race']) \
    .withColumn("state_code",df_demo['State Code'])\
    .withColumn("city",df_demo['City'])\
    .dropna(how='any', subset=["state_code"])

df_demo_staging = df_demo_cleaned.select("median_age",'total_pop','num_male_pop','prct_male_pop',"num_female_pop",
                                         "prct_female_pop","num_veterans","prct_veterans","num_foreign_born","prct_foreign_born",
                                         "race",'state_code','city' )
                                         

df_demo_staging.limit(5).toPandas()

In [None]:
# filter out the temperature dataset by United States

df_temp_cleaned= df_temp.filter(col('Country') == 'United States') \
    .withColumn('year', year(df_temp['dt'])) \
    .withColumn('month', month(df_temp['dt'])) \
    .withColumn('week#',weekofyear(df_temp['dt']))\
    .withColumn("city", df_temp["City"])\
    .withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
    .dropna(how='any', subset=["city"])

# use temperatures from the year 2006 and above
df_temp_cleaned = df_temp_cleaned.filter(df_temp_cleaned["year"] >= 2006)

df_temp_staging = df_temp_cleaned.select(col('year'), 
                                         col('month'), 
                                         col('week#'),
                                         col('city'),
                                         round(col('AverageTemperature'), 1).alias('avg_temperature'),
                                         col('Latitude'), 
                                         col('Longitude')).drop_duplicates()

df_temp_staging.limit(5).toPandas()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The data model used for this project is the Star Schema model.

This model leading to simpler, faster SQL queries. We use this model over the snowflake schema because we have few tables and to make it easy for business analysts to use SQL. 

#### Staging tables:
| Immigration stage | Demographics stage | Temperature stage |
|:-----|:------:|:-----:|
|id                   |total_pop              |year
|date                 |num_male_pop          | month
|city_code            |prct_male_pop         |week#
|state_code           |num_female_pop        | city
|age                  |prct_female_pop         | avg_temperature
|gender               |num_veterans           | Latitude
|visa_type           |prct_veterans         |Longitude
|transportation_type |num_foreign_born
|                     |prct_foreign_born
|                      |race  
|                     |  state_code 
|                      |city
|                     | median_age



#### Fact table:
 | Immigration fact|
 | ----|
 |id|
 | date|
 |city|
 | city_code|
 |state_code|
 | count|
 

#### Dimension  tables
|immigration dim | demographics dim| temperature dim | time dim
|:----|:---:|:---:|:---|
|id|                 state_code        |city| date
|age|                city              | year| year
|visa_type           | median_age     |  month| month
|transportation_type| num_male_pop      |week#| week#
|   gender           |prct_male_pop     |avg_temperature|day
|                    |num_female_pop|
|                     |prct_female_pop|
|                   | num_veterans| 
|                     |prct_veterans|
|                      |num_foreign_born|
|                    |  prct_foreign_born|
|                  |total_pop| 
|                  |race| 
|                  |   Longitude  |
|                   | Latitude|


          
    

#### 3.2 Mapping Out Data Pipelines
Listing the steps necessary to pipeline the data into the chosen data model

1. Clean the data from null values, duplicates, etc
2. Load staging tables.
3. Create fact and dimension tables

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Building the data pipelines to create the data model.

In [None]:
df_immigration_dim = df_i94_staging.select('id','age','visa_type','transportation_type','gender').drop_duplicates()

In [None]:
df_immigration_dim.limit(5).toPandas()

In [None]:
df_demographics_dim =df_demo_staging.join(df_temp_staging,'city').select('state_code',
                                                                         'city',
                                                                         'median_age',
                                                                         'total_pop',
                                                                         'num_male_pop',
                                                                         'prct_male_pop',
                                                                         'num_female_pop',
                                                                         'prct_female_pop',
                                                                         'num_veterans',
                                                                         'prct_veterans',
                                                                         'num_foreign_born',
                                                                         'prct_foreign_born',
                                                                         'race',
                                                                         'Longitude',
                                                                         'Latitude').drop_duplicates()
                                                                         

In [None]:
df_demographics_dim.limit(5).toPandas()

In [None]:
df_temperature_dim = df_temp_staging.select('city','year','month','week#','avg_temperature').drop_duplicates()

In [None]:
df_temperature_dim.limit(5).toPandas()

In [None]:
df_immigration_fact = df_i94_staging.join(df_demographics_dim,'state_code').join(df_temperature_dim,'city').select('id',
                                                                                                                  'date',
                                                                                                                  'city',
                                                                                                                  'city_code',
                                                                                                                  'state_code',
                                                                                                                  'count').drop_duplicates()

In [None]:
df_immigration_fact.limit(5).toPandas()

In [None]:
df_time_dim = df_i94_staging.select('date')\
                            .withColumn('year', year(df_i94_staging['date']))\
                            .withColumn('month', month(df_i94_staging['date']))\
                            .withColumn('day',dayofweek(df_i94_staging['date']))\
                            .withColumn('week#',weekofyear(df_i94_staging['date'])).drop_duplicates()

In [None]:
df_time_dim.limit(5).toPandas()

#### 4.2 Data Quality Checks


In [None]:
# Function to check tables availability 
def check_tables(df):
    if df is not None:
        print("Data quality check PASSED.\nFact and Dimension tables are available.")
        return True      
    else:
        print("Data quality check failed.\nThere are some tables that are missing!")
        return False
        
check_tables(df_immigration_dim) & \
check_tables(df_demographics_dim) & \
check_tables(df_temperature_dim) &\
check_tables(df_immigration_fact) & \
check_tables(df_time_dim)

In [None]:
# Function to check if there are values that exist in tables
def values_check(df):
    if df.count() !=0:
        print("Data quality check passed.\nValues in Fact and Dimension tables are available.")
        return True
    else:
        print("Data quality check failed.\nThere are some tables that are empty.")
        return False

values_check(df_immigration_dim) & \
values_check(df_demographics_dim) & \
values_check(df_temperature_dim) & \
values_check(df_immigration_fact) & \
values_check(df_time_dim)

#### a query to validate the data model
filter the data by transportation type and show the id and gender column.
note: 2 means Sea, 1 means Air and 3 means Land

In [None]:
# a query to validate the data model
df_immigration_dim.filter('transportation_type = 2').select("id","gender").show()

#### 4.3 Data dictionary 
 
    
 | Immigration fact|description|
 | ----|-
  |id:| id 
 | date:| arrival date 
 |city:| arrival city 
  | city_code:| arrival city code
  |state_code:| arrival state code
 | count:| used to count how many arrival to US

 &nbsp;
 
|immigration dim | description|
|:----|-|
|id:|   immigrant's id            
|age:|   immigrant's age             
|visa_type:   |    immigrant's visa type     
|transportation_type:| immigrant's transportation type
|   gender:  |  immigrant's gender        

&nbsp;

|demographics dim| description|
|:----|-|
|state_code:        |city port code|
|city:              | city name|
| median_age:     | median age of the city|
| num_male_pop:  |  number of the male population 
|prct_male_pop:  |   percentage of the male population
|num_female_pop:| number of the female population
|prct_female_pop:|percentage of the female population
| num_veterans:| number of the veterans population
|prct_veterans:| percentage of the veterans population
|num_foreign_born:| number of the foreign population
|  prct_foreign_born:|percentage of the foreign population
|total_pop:| total number of the city's population
|race:| Respondent race
|Longitude:  | city longitude
| Latitude:| city latitude

&nbsp;

 |temperature dim | description|
|:----|-|
|city:| city name
| year:| year of the record
|  month:| month of the record 
|week#:| week of the record 
|avg_temperature:| average temperature

&nbsp;

| time dim|description|
|:---|--|
| date:|date of the record|
| year:|year of the record|
| month:|month of the record|
| week#:|week of the record |
|day:|day week of the record |



#### Step 5: Complete Project Write Up

Spark is chosen for this project as it isprovides a faster and more general data processing platform. Spark lets you run programs up to 100x faster in memory, or 10x faster on disk, than Hadoop. 

The data is used for reporting purposes. Whenever new data is needed, this code provides the ability to have cleaned data and organized.

How you would approach the problem differently under the following scenarios:

If the data was increased by 100x: In the future, we could explore scaling up the number of EC2 instances hosting Spark, as well as adding more Spark work nodes. Processing time should be able to be sped up as a result of increased capacity, which might come from either vertical or horizontal scaling.

If the data populates a dashboard that must be updated on a daily basis by 7am every day: We might explore utilizing Airflow to plan and automate the data pipeline jobs, which would be quite convenient. We may be able to satisfy customer requirements thanks to the built-in retry and monitoring mechanisms.

If the database needed to be accessed by 100+ people: We may think about putting our solution in a production-scale data warehouse in the cloud, which will have more capacity to service more users and will include workload management to guarantee that resources are distributed equally across users.