# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project was made using spark in order to be the foundation of building a datawarhouse having the start schema architecture using Python and Spark. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import re
import pandas as pd
from cleaner import Cleaner
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

### Step 1: Scope the Project and Gather Data

#### Scope 
The main scope of this project is to enrich the 194 immigration dataset with supplemntry dataset in order to broad our analysis and deepen our understanding from the data

#### Expolring the data with Pandas as a start

##### 194 SAS Immigration Dataset
This data comes from the US National tourism and Trade office.

In [2]:
# Read in the data here
df_194_sample = pd.read_csv('immigration_data_sample.csv')

In [3]:
df_194_sample['occup'].unique()
#df_194_sample.head()

array([nan, 'STU', 'PHA', 'OTH'], dtype=object)

In [4]:
df_194_sample['i94port'].unique()

array(['HHW', 'MCA', 'OGG', 'LOS', 'CHM', 'ATL', 'SFR', 'NYC', 'CHI',
       'PHI', 'FTL', 'BOS', 'SAI', 'NAS', 'SEA', 'ORL', 'PSP', 'HOU',
       'NEW', 'BAL', 'SNJ', 'DET', 'AGA', 'LVG', 'MIA', 'SDP', 'VCV',
       'DUB', 'PEM', 'TAM', 'BLA', 'WAS', 'KOA', 'DAL', 'SHA', 'SPM',
       'NIA', 'PHR', 'MIL', 'SLC', 'CLT', 'EPI', 'SNA', 'MON', 'DLR',
       'SFB', 'OPF', 'X96', 'CLM', 'LIH', 'DEN', 'PHO', 'POO', 'NOL',
       'WPB', 'PBB', 'TOR', 'MAA', 'RNO', 'FMY', 'HIG', 'OAK', 'OTM',
       'ONT', 'SRQ', 'LLB', 'NCA', 'SUM', 'STR', 'HAM'], dtype=object)

In [5]:
df_194_full = pd.read_sas('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',encoding="ISO-8859-1")

In [6]:
df_194_full.head(5)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [7]:
# looks like some datatypes needs to be casted properly 
df_194_full.dtypes

cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port      object
arrdate     float64
i94mode     float64
i94addr      object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile     object
visapost     object
occup        object
entdepa      object
entdepd      object
entdepu      object
matflag      object
biryear     float64
dtaddto      object
gender       object
insnum       object
airline      object
admnum      float64
fltno        object
visatype     object
dtype: object

In [8]:
df_194_full['i94port'].isna().value_counts()

False    3096313
Name: i94port, dtype: int64

In [9]:
df_194_full['i94port'].unique()

array(['XXX', 'ATL', 'WAS', 'NYC', 'TOR', 'BOS', 'HOU', 'MIA', 'CHI',
       'LOS', 'CLT', 'DEN', 'DAL', 'DET', 'NEW', 'FTL', 'LVG', 'ORL',
       'NOL', 'PIT', 'SFR', 'SPM', 'POO', 'PHI', 'SEA', 'SLC', 'TAM',
       'HAM', 'NAS', 'VCV', 'MAA', 'AUS', 'HHW', 'OGG', 'PHO', 'SDP',
       'SFB', 'EDA', 'MON', 'CLG', 'DUB', 'FMY', 'YGF', 'SAJ', 'CIN',
       'BAL', 'RDU', 'WPB', 'STT', 'OAK', 'NSV', 'SNA', 'OTT', 'X96',
       '5KE', 'CLE', 'HAR', 'PSP', 'CHR', 'HAL', 'SAA', 'KOA', 'SHA',
       'WIN', 'BGM', 'NCA', 'OPF', 'SAI', 'JFA', 'AGA', 'ONT', 'CLM',
       'STL', 'W55', 'CHS', 'SNJ', 'SRQ', 'ANC', 'LNB', 'LIH', 'MIL',
       'INP', 'KAN', 'ROC', 'SAC', 'BRO', 'LAR', 'RNO', 'SGR', 'ELP',
       'MCA', 'MDT', 'SPE', 'FPR', 'SYR', 'ICT', 'MLB', 'ADS', 'TUC',
       'DLR', 'CAE', 'CHA', 'HSV', 'WIL', 'HPN', 'HEF', 'BRG', 'BED',
       'DAB', 'JAC', 'FRB', 'SWF', 'KEY', 'PTK', 'MWH', 'X44', 'MYR',
       'APF', 'ATW', 'PVD', 'BUF', 'PIE', 'MHT', 'BDL', 'NYL', 'VNY',
       '5T6', 'LEX',

In [10]:
# check how many duplicate values do we have
# this will give us a sense later on the size of the issue we have in our main dataset
df_194_full.duplicated().value_counts()

False    3096313
dtype: int64

In [11]:
#it seems that we have a large number of duplicate values in comparison to the size of the dataset
#df_194_full.isna().sum()
df_194_full['cicid'].nunique()

3096313

#### World Temperature Data:
This dataset came from Kaggle.

In [12]:
df_temp = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')
df_temp.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [13]:
#Lets see how many NA values do we have in our world temperature dataset
df_temp.isna().sum()

dt                                    0
AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
dtype: int64

In [14]:
#Duplicate value counts
df_temp.duplicated().value_counts()

False    8599212
dtype: int64

In [15]:
df_temp['Country'].unique()

array(['Denmark', 'Turkey', 'Kazakhstan', 'China', 'Spain', 'Germany',
       'Nigeria', 'Iran', 'Russia', 'Canada', "Côte D'Ivoire",
       'United Kingdom', 'Saudi Arabia', 'Japan', 'United States', 'India',
       'Benin', 'United Arab Emirates', 'Mexico', 'Venezuela', 'Ghana',
       'Ethiopia', 'Australia', 'Yemen', 'Indonesia', 'Morocco',
       'Pakistan', 'France', 'Libya', 'Burma', 'Brazil', 'South Africa',
       'Syria', 'Egypt', 'Algeria', 'Netherlands', 'Malaysia', 'Portugal',
       'Ecuador', 'Italy', 'Uzbekistan', 'Philippines', 'Madagascar',
       'Chile', 'Belgium', 'El Salvador', 'Romania', 'Peru', 'Colombia',
       'Tanzania', 'Tunisia', 'Turkmenistan', 'Israel', 'Eritrea',
       'Paraguay', 'Greece', 'New Zealand', 'Vietnam', 'Cameroon', 'Iraq',
       'Afghanistan', 'Argentina', 'Azerbaijan', 'Moldova', 'Mali',
       'Congo (Democratic Republic Of The)', 'Thailand',
       'Central African Republic', 'Bosnia And Herzegovina', 'Bangladesh',
       'Switzerland'

In [16]:
df_temp[df_temp['Country'] == 'United States'].head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


#### US Cities Demographics Data
This data comes from OpenSoft

In [17]:
# delimitier here is a ";"
df_demog = pd.read_csv('us-cities-demographics.csv',delimiter = ';')

In [18]:
df_demog.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [19]:
# Data Types looks like they are correct
df_demog.dtypes

City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object

In [20]:
# lets seet how many null values do we have in each column
df_demog.isna().sum().sort_values(ascending = False)

Average Household Size    16
Foreign-born              13
Number of Veterans        13
Female Population          3
Male Population            3
Count                      0
Race                       0
State Code                 0
Total Population           0
Median Age                 0
State                      0
City                       0
dtype: int64

In [21]:
# it seems that the if we execlude the NA average household size, other NAs will be exluded too.
df_demog[df_demog['Average Household Size'].isna() == False].isna().sum().sort_values(ascending = False)

Count                     0
Race                      0
State Code                0
Average Household Size    0
Foreign-born              0
Number of Veterans        0
Total Population          0
Female Population         0
Male Population           0
Median Age                0
State                     0
City                      0
dtype: int64

In [22]:
df_demog['Count'].sum()

141554272

In [23]:
df_demog.duplicated().value_counts()

False    2891
dtype: int64

In [24]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_194 = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [25]:
#shape of the dataset
print(df_194.count(),len(df_194.columns))

3096313 28


In [26]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
The data cleaning tasks for the three datasets are consolidated in the cleaner.py file

In [27]:
# Create dictionary of valid i94port codes
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('i94port_valid.txt') as f:
     for line in f:
         match = re_obj.search(line)
         i94port_valid[match[1]]=[match[2]]

In [28]:
@udf(returnType=StringType())
def get_i94port(city):
    '''
    Input: City name
    
    Output: Corresponding i94port
    
    '''
    
    for key in i94port_valid:
        if city.lower() in i94port_valid[key][0].lower():
            return key

In [29]:
#reading the demograohic and tempreture datasets with spark
df_temp = spark.read.format('csv').option('header','true').load('../../data2/GlobalLandTemperaturesByCity.csv')
df_demog = spark.read.format('csv').option('header','true').option('delimiter',';').load('us-cities-demographics.csv')

In [30]:
# Performing cleaning tasks here
df_194_cleaned = Cleaner.clean_194_dataset(df_194,i94port_valid)
df_temp_cleaned = Cleaner.clean_temp_dataset(df_temp)
df_demog_cleaned = Cleaner.clean_demographic_dataset(df_demog)

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nulla

In [31]:
#print(port_city_dict)

#df_temp_cleaned.select('City').distinct().show()
df_temp_cleaned = df_temp_cleaned.withColumn('port_code',get_i94port(df_temp_cleaned.City))
df_demog_cleaned = df_demog_cleaned.withColumn('port_code',get_i94port(df_demog_cleaned.City))

In [32]:
df_demog_cleaned.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- male_population: string (nullable = true)
 |-- female_population: string (nullable = true)
 |-- total_population: string (nullable = true)
 |-- number_of_veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- average_household_size: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)
 |-- port_code: string (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
##### Fact Table
    The Fact table is going to be consisted of columns from the immigration dataset joind with the tempreture data.
    
    *cicid will act as our primary key
    *i94yr (renamed as  year during the cleaning process)
    *i94mon (renamed as month during the cleaning process)
    *i94port (renamed as port_code during the cleaning process)
    *arrdate (renamed as arrival_date during the cleaning process)
    *depdate (renamed as departure_date during the cleaning process)
    *AverageTemperature
    *average_household_size
    *number_of_veterans        
    *total_population          
    *female_population         
    *male_population           
    *median_Age 

##### Dimension Table 1 (dim_supp_info) 
    This table contains supplementrey info from the main immigration dataset.
      *cicid will act as our primary key 
      *biryear (Renamed as birthday_year during the cleaning process)
      *gender
      *airline
      *fltno   (Renamed as flight_Number during the cleaning process)
      *visatype

##### Dimension Table 2 (dim_cities_info)
       *port_code (created based on the get_port_code UDF function)
       *City
       *Country
       *Latitude
       *Longitude

    
#### 3.2 Mapping Out Data Pipelines
     * Load the 194 immigration datset
     * Load the world temprature dataset
     * Load the Demographics dataset
     * Clean the 194 immigration datset
     * Clean the world temprature dataset
     * Clean the Demographics dataset
     * Create the fact table
     * Create the Supp_Info dimension table
     * Create the temperature dimension table
     * Create the demographics dimension table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.
##### Create the fact table

In [33]:
#register the three dataframes as SQL views
df_194_cleaned.createOrReplaceTempView('immigration_data')
df_temp_cleaned.createOrReplaceTempView('temperature_data')
df_demog_cleaned.createOrReplaceTempView('demographics_dataset')

In [39]:
df_immig_fact = spark.sql("""

SELECT cicid 
year ,
month,
imm.port_code ,
arrival_date ,
departure_date,
temp.average_temperature,
demog.average_household_size,
demog.number_of_veterans, 
demog.total_population ,   
demog.female_population,    
demog.male_population,     
demog.median_Age 

FROM immigration_data imm
JOIN temperature_data temp ON imm.port_code = temp.port_code
JOIN demographics_dataset demog ON imm.state_code = demog.state_code AND imm.port_code = demog.port_code
""")

##### Create the second dimension table

In [35]:
df_supp_info = spark.sql("""
SELECT
  cicid, 
  birthday_year,
  gender,
  airline,
  flight_number,
  visatype,
  state_code
  
FROM immigration_data
""")

##### Create the third dimension table

In [36]:
df_cities_info = spark.sql(""" 
SELECT 
port_code,
City,
Country,
Latitude,
Longitude
from temperature_data
""")

#### 4.2 Data Quality Checks & Model Validation
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [37]:
def exists_data_quality_check(df_table,table_name):
    
    if df_table.count() > 0:
        print("Data quality check has passed for table {}".format(table_name))
    else:
        print("Data quality check has passed for table {}".format(table_name))
    

In [7]:
def data_integrity_checks(df_table,table_name):
    
    if table_name == 'Dim_Supp_Info':
        # validate state codes
       integrity_supp_info = df_immig_fact.select('state_code').distinct().join(df_supp_info,df_immig_fact['state_code'] ==
                                                         df_supp_info['state_code'],'left_anti' ).count() ==0
    
       if integrity_supp_info == False:
            print('Data integrity check failed on column state_code in table {}'.format(table_name))
        
       else:
            
            print('Data integrity check passed on column state_code in table {}'.format(table_name))


    elif table_name == 'Dim_Cities_Info':
        integrity_cities_ingo = df_immig_fact.select('port_code').distinct().join(df_cities_info,df_immig_fact['port_code'] ==
                                                         df_supp_info['port_code'],'left_anti' ).count() == 0
      
        if integrity_cities_ingo == False:
            print('Data integrity check failed on column port_code in table {}'.format(table_name))
         
        else:
            
            print('Data integrity check passed on column port_code in table {}'.format(table_name))
            

In [51]:
df_p.shape

(3490, 5)

In [None]:
exists_data_quality_check(df_immig_fact,'Fact_Immigration')
exists_data_quality_check(df_supp_info,'Dim_Supp_Info')
exists_data_quality_check(df_cities_info,'Dim_Cities_Info')


data_integrity_checks(df_supp_info,'Dim_Supp_Info')
data_integrity_checks(df_cities_info,'Dim_Cities_Info')

In [None]:
# Validate the model
df_sample_query  = df_immig_fact.join(df_cities_info,df_immig_fact['port_code'] ==
                                                         df_supp_info['port_code'],'left' ).join()(df_supp_info,df_immig_fact['state_code'] ==
                                                         df_supp_info['state_code'],'left')

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

##### Fact Table
    The Fact table is going to be consisted of columns from the immigration dataset joind with the tempreture data.
    
    *cicid : primary key
    *year : numeric year
    *month : numeric month
    *port_code :3 character code based on the city name 
    *arrival_date : arrival date
    *departure_date : departure data
    *average_temperature : average temperature of the destination city 
    *average_household_size : average household size in the destination city.
    *number_of_veterans : number of veterans in the destination city        
    *total_population : total population in the destination city 
    *female_population : total female population in the destination city.         
    *male_population : total male population in the destination city.          
    *median_Age : Median age in the destination city. 

##### Dimension Table 1 (dim_supp_info) 
    This table contains supplementrey info from the main immigration dataset.
      *cicid : Primary Key
      *birthday_year : birthday year
      *gender :gender
      *airline : airline used
      *flight_number : flight_number
      *visatype : reason for immigration

##### Dimension Table 2 (dim_cities_info)
       *port_code :3 character code based on the city name 
       *City : city name
       *Country " country name
       *Latitude : latitude of the city
       *Longitude : longitude of the city

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

* The main technology used in this project is Apache Spark or more precisely the python API for Apache Spark (PySpark). Spark is a great choice big data analysis and its highly sclabale. For the data model , i went with a star schema architecture where there is one gact table and 2 dimension tables as its straighforward and statisfies the needs for this project.

* The update frequencey mainly depend on the velocity of data and for this dataset, ideally it needs to be update once daily

* If the data increased by 100x , spark can do it comfortably and what i would do is offload this data into a data lake(S3) in redshift or Azure data lake , transform the data using an EMR cluster in aws or a Synapse Spark cluster in azrue ,the only load the warm and hot data into a reshift datawarehouse or synapse dedicated pool.


* For the daily update on 7am , i would use Apache airflow to schedual the pipelins for ease.

* If we started using AWS redshift or Azure Synapse, the data can be easily accessed by 100s of peeople and BI applications.