# Project Title
### Data Engineering Capstone Project

#### Project Summary
* This project aims to create a schema that can derive various correlations using three datasets, demographics of US states and cities dataset, global temperature data of countries dataset and i94 immigration dataset. For example, is there any correlation beetween the average temperature of residence country and going to united state.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [34]:
# Do all imports and installs here
import pandas as pd
import os
from pyspark.sql import SparkSession
import os
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql import functions as F
import datetime as dt
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear, avg, monotonically_increasing_id
from pyspark.sql.types import *
import requests
from pyspark.sql.functions import year, month, dayofmonth, weekofyear, date_format
from pyspark.sql import SparkSession, SQLContext, GroupedData, HiveContext
from pyspark.sql.functions import *
from pyspark.sql.functions import date_add as d_add
from pyspark.sql.types import DoubleType, StringType, IntegerType, FloatType, DateType
from pyspark.sql.functions import lit
from pyspark.sql import Row
import re
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import initcap, udf, lit, explode, split, regexp_extract, col, isnan, isnull, desc, when, sum, to_date, desc, regexp_replace, count, to_timestamp
from pyspark.sql.types import IntegerType,BooleanType,DateType
import re

### Step 1: Scope the Project and Gather Data
* The steps to create the analytics model database: 
- Use pandas and Spark python dataframe to load the data
- Explore I94 immigration dataset to identify missing values and perform data cleaning
- Explore demographics dataset to identify missing values and perform data cleaning
- Explore temperatures dataset by country to identify missing values and perform data cleaning
- Extract Port, Country , State, Visa and Mode of travel from sas_label file and perform join with I94 dataset
- Create star schema
    - Dimension tables :
        1. Extract and create ArrivalDate dimension from i94 dataset, this dimension can join the fact table through the arrdate feature. 
        2. Extract and create I94Port dimension from i94 sas_label file, this dimension can join the fact table through the i94port feature.
        3. Create demographics_df dimension from the us_cities_demographics dataset. This dimension can join the fact table through the stateCode feature.
        4. Create GlobalLandTemperaturesByCity dimension from the GlobalLandTemperaturesByCity dataset. This dimension can join the fact table through the Country feature.
        
    - Create the fact table from the cleaned I94 dataset.

-  Apache Spark is the technology that used in this project

### Datasets 
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://www.trade.gov/national-travel-and-tourism-office) is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
- World Temperature Data: This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data).
- U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
- Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

In [35]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df = pd.read_csv(fname)


In [36]:
df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


- dt : Date time
- AverageTemperature : Average temperature in celsius
- AverageTemperatureUncertainty : 95% confidence interval around average temperature
- City : Name of city
- Country : Name of country
- Latitude : Latitude of city
- Longitude : Longitude of city

In [37]:
df.shape

(8599212, 7)

In [38]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [39]:
df_spark_pandas=df_spark.limit(1)
df_spark_pandas.toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2


- cicid : Unique ID
- type : Airport type
- i94mon : month
- i94cit : 3 digit code for immigrant country of birth
- i94res : 3 digit code for immigrant country of residence
- i94port : Port of admission
- arrdate : Arrival Date in the USA
- i94mode : Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
- i94addr : USA State of arrival
- depdate : Departure Date from the USA
- i94bir : Age of Respondent in Years
- i94visa : Visa codes collapsed into three categories
- count : Field used for summary statistics
- dtadfile : Character Date Field - Date added to I-94 Files
- visapost : Department of State where Visa was issued
- occup : Occupation that will be performed in U.S
- entdepa : Arrival Flag - admitted or paroled into the U.S.
- entdepd : Departure Flag - Departed, lost I-94 or is deceased
- entdepu : Update Flag - Either apprehended, overstayed, adjusted to perm residence
- matflag : Match flag - Match of arrival and departure records
- biryear : 4 digit year of birth
- dtaddto : Character Date Field - Date to which admitted to U.S. (allowed to stay until)
- gender : Non-immigrant sex
- insnum : INS number
- airline : Airline used to arrive in U.S
- admnum : Admission Number
- fltno : Flight number of Airline used to arrive in U.S.
- visatype : Class of admission legally admitting the non-immigrant to temporarily stay in U.S

In [40]:
df_spark.count()

3096313

In [41]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

In [42]:
airport_codes = 'airport-codes_csv.csv'
airport_df = pd.read_csv(airport_codes)

In [43]:
airport_df.shape

(55075, 12)

In [44]:
airport_df.head(1)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"


- ident : Unique identifier
- type : Airport type
- name : Airport name
- elevation_ft : Airport altitude
- continent : Continent name
- iso_country : The International Organization for Standardization code for the airport's country
- iso_region : The International Organization for Standardization code for the airport's region
- municipality : municipality where the airport is located
- gps_code : Airport GPS Code
- iata_code : Airport International Air Transport Association's Code
- local_code : Airport local code
- coordinates : Airport coordinates

In [45]:
us_cities_demographics = 'us-cities-demographics.csv'
demographics_df = pd.read_csv(us_cities_demographics, header=0, sep=';')

In [46]:
demographics_df.shape

(2891, 12)

In [47]:
demographics_df.head(1)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924


- City : City Name
- State : US State of the City
- Median Age : The median population age
- Male Population : Male population total
- Female Population : Female population total
- Total Population : Total population
- Number of Veterans : Number of veterans living in the city
- Foreign-born : Number of residents who were not born in the city
- Average Household Size : Average size of houses in the city
- State Code : Code of the state
- Race : Race class
- Count : Number of individuals in each race

In [48]:
from pyspark.sql import SparkSession
import pandas as pd

#spark = SparkSession.builder.\
#config("spark.jars.repositories", "https://repos.spark-packages.org/").\
#config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
#enableHiveSupport().getOrCreate()

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [51]:
df_spark=spark.read.parquet("sas_data")

In [52]:
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [53]:
df_spark.count()

3096313

In [54]:
df_spark.show(2)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [55]:
from pyspark.sql.functions import sum,avg,max,count
df_spark.groupBy("i94mon").agg(count("*").alias("count")).show(truncate=False)


+------+-------+
|i94mon|count  |
+------+-------+
|4.0   |3096313|
+------+-------+



In [56]:
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [57]:
missing_percentage_df = df_spark.select([(count(when(isnan(c) | col(c).isNull(), c))*100/count(lit(1))).alias(c) for c in df_spark.columns])
missing_percentage_df = missing_percentage_df.collect()[0].asDict()
missing_percentage_df

{'cicid': 0.0,
 'i94yr': 0.0,
 'i94mon': 0.0,
 'i94cit': 0.0,
 'i94res': 0.0,
 'i94port': 0.0,
 'arrdate': 0.0,
 'i94mode': 0.007718857880324115,
 'i94addr': 4.928183940060324,
 'depdate': 4.6008591508675,
 'i94bir': 0.025901774142342845,
 'i94visa': 0.0,
 'count': 0.0,
 'dtadfile': 3.229647648671178e-05,
 'visapost': 60.75774639062653,
 'occup': 99.73755883206898,
 'entdepa': 0.0076865614038374025,
 'entdepd': 4.470768943579024,
 'entdepu': 99.98733978121722,
 'matflag': 4.470768943579024,
 'biryear': 0.025901774142342845,
 'dtaddto': 0.015405419284161517,
 'gender': 13.379429017673601,
 'insnum': 96.32763225164898,
 'airline': 2.700857439154246,
 'admnum': 0.0,
 'fltno': 0.6313638188387285,
 'visatype': 0.0}

In [58]:
drop_columns = [k for k, v in missing_percentage_df.items() if v > 80]
drop_columns

['occup', 'entdepu', 'insnum']

In [59]:
i94_df = df_spark.drop(*drop_columns)

In [60]:
i94_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [61]:
i94_df.groupBy("cicid").count().where("count > 1").count()

0

In [62]:
def convert_to_dt(i):
    try:
        s = datetime(1960,1,1)
        d = timedelta(days=int(i))
        return s + d
    except:
        ex = datetime(1960,1,1)
        return ex

In [63]:
depdate_datetime_udf = udf(lambda i: convert_to_dt(i), DateType())
i94_df = i94_df.withColumn('depdate', to_date(depdate_datetime_udf(col('depdate'))))
i94_df = i94_df.withColumn('depdate', to_date(depdate_datetime_udf(col('depdate'))))

In [64]:
i94_df.show(2)

+---------+------+------+------+------+-------+-------+-------+-------+----------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|count|dtadfile|visapost|entdepa|entdepd|matflag|biryear| dtaddto|gender|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+----------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|1960-01-01|  40.0|    1.0|  1.0|20160430|     SYD|      G|      O|      M| 1976.0|10292016|     F|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|1960-01-01|  32.0|    1.0|  1.0|20160430|     SYD|      G|      O|      M| 1984.0|10292016|     F

In [65]:
TextFile = 'I94_SAS_Labels_Descriptions.SAS'

df_label = spark.read.text(TextFile, wholetext=True)

In [66]:
@udf
def matching(TextFile, TextPattern): 
    return re.search(TextPattern, TextFile).group(0)

In [67]:

df_I94CitRes_1 = df_label.withColumn('I94CitRes_1', matching("value", lit("(\/\* I94CIT & I94RES[^;]+)"))).select("I94CitRes_1")
df_I94CitRes_2 = df_I94CitRes_1.withColumn('I94CitRes_2', explode(split("I94CitRes_1", "[\r\n]+"))).select('I94CitRes_2')
df_I94CitRes_3 = df_I94CitRes_2.filter(df_I94CitRes_2['I94CitRes_2'].rlike("([0-9]+ *\= *[0-9A-Za-z \:\',\-()\/\.#&]+)"))\
.withColumn('country_code', regexp_extract(df_I94CitRes_2['I94CitRes_2'], "[0-9]+", 0))\
.withColumn('country_code',col('country_code').cast(IntegerType()))\
.withColumn('country_name', regexp_extract(df_I94CitRes_2['I94CitRes_2'], "(?<=')([0-9A-Za-z \:\',\-()\/\.#&]+)(?=')", 0))\
.withColumn('country_name',initcap(col('country_name')))\
.withColumn('country_name',when(col('country_name') == 'Mexico Air Sea, And Not Reported (i-94, No Land Arrivals)','Mexico')\
            .otherwise(col('country_name')))\
.drop("I94CitRes_2")

In [68]:
df_I94CitRes_3.select(col("country_name")).collect()[0][0]

'Mexico'

In [69]:
df_I94CitRes_3.show(1,truncate=False)

+------------+------------+
|country_code|country_name|
+------------+------------+
|582         |Mexico      |
+------------+------------+
only showing top 1 row



In [70]:
from pyspark.sql.functions import col,isnan, when, count

df_I94CitRes_3.groupBy("country_code").count().where("count > 1").show()

+------------+-----+
|country_code|count|
+------------+-----+
+------------+-----+



In [71]:
df_I94CitRes_3.select([count(when(isnan(a) | col(a).isNull(), a)).alias(a) for a in df_I94CitRes_3.columns]
   ).show()

+------------+------------+
|country_code|country_name|
+------------+------------+
|           0|           0|
+------------+------------+



In [72]:

df_I94Port1 = df_label.withColumn('I94Port1', matching("value", lit("(\/\* I94PORT[^;]+)"))).select("I94Port1")

df_I94Port2 = df_I94Port1.withColumn('I94Port_2', explode(split("I94Port1", "[\r\n]+"))).select('I94Port_2')

df_I94Port3 = df_I94Port2.filter(df_I94Port2['I94Port_2'].rlike("([0-9A-Z.' ]+\t*\=\t*[0-9A-Za-z \',\-()\/\.#&]*)"))\
.withColumn('portCode', regexp_extract(df_I94Port2['I94Port_2'], "(?<=')[0-9A-Z. ]+(?=')", 0))\
.withColumn('cityName_stateCode', regexp_extract(col('I94Port_2'), "(?<=\t')([0-9A-Za-z ,\-()\/\.#&]+)(?=')", 0))\
.withColumn('cityName', split(col('cityName_stateCode'), ',').getItem(0))\
.withColumn('stateCode', split(col('cityName_stateCode'), ',').getItem(1))\
.withColumn('stateCode', regexp_replace(col('stateCode'), '\s*', ''))\
.withColumn('cityName',initcap(col('cityName')))\
.withColumn('stateCode', when(col('stateCode') == 'CA(BPS)','CA')\
            .when(col('stateCode') == 'AR(BPS)','AR')\
            .when(col('stateCode') == 'HWY.STATION','CA')\
            .when(col('stateCode') == 'CO#ARPT','CO')\
            .when(col('stateCode') == 'FL#ARPT','FL')\
            .when(col("stateCode").isNull(), "AZ")\
            .otherwise(col('stateCode')))\
.drop('I94Port_2')
df_I94Port3=df_I94Port3.withColumn('stateCode',when((df_I94Port3.portCode == "WAS")&(df_I94Port3.stateCode == 'AZ') , "DC")\
                                   .otherwise(col('stateCode')))
df_I94Port3=df_I94Port3.withColumn('cityName', when(df_I94Port3.portCode == 'MAP','Mariposa')\
                                   .otherwise(col('cityName')))

In [73]:
df_I94Port3.show(10,truncate=False)

+--------+----------------------------+------------------------+---------+
|portCode|cityName_stateCode          |cityName                |stateCode|
+--------+----------------------------+------------------------+---------+
|ALC     |ALCAN, AK                   |Alcan                   |AK       |
|ANC     |ANCHORAGE, AK               |Anchorage               |AK       |
|BAR     |BAKER AAF - BAKER ISLAND, AK|Baker Aaf - Baker Island|AK       |
|DAC     |DALTONS CACHE, AK           |Daltons Cache           |AK       |
|PIZ     |DEW STATION PT LAY DEW, AK  |Dew Station Pt Lay Dew  |AK       |
|DTH     |DUTCH HARBOR, AK            |Dutch Harbor            |AK       |
|EGL     |EAGLE, AK                   |Eagle                   |AK       |
|FRB     |FAIRBANKS, AK               |Fairbanks               |AK       |
|HOM     |HOMER, AK                   |Homer                   |AK       |
|HYD     |HYDER, AK                   |Hyder                   |AK       |
+--------+---------------

In [74]:
df_I94Port3.printSchema()

root
 |-- portCode: string (nullable = true)
 |-- cityName_stateCode: string (nullable = true)
 |-- cityName: string (nullable = true)
 |-- stateCode: string (nullable = true)



In [75]:
df_I94Port3.filter(df_I94Port3.cityName == "Anchorage").show(truncate=False)

+--------+----------------------+---------+---------+
|portCode|cityName_stateCode    |cityName |stateCode|
+--------+----------------------+---------+---------+
|ANC     |ANCHORAGE, AK         |Anchorage|AK       |
+--------+----------------------+---------+---------+



In [76]:
df_I94Port3.groupBy("portCode").count().where("count > 1").show()

+--------+-----+
|portCode|count|
+--------+-----+
+--------+-----+



In [77]:
from pyspark.sql.functions import col,isnan, when, count
df_I94Port3.select([count(when(isnan(a) | col(a).isNull(), a)).alias(a) for a in df_I94Port3.columns]).show()

+--------+------------------+--------+---------+
|portCode|cityName_stateCode|cityName|stateCode|
+--------+------------------+--------+---------+
|       0|                 0|       0|        0|
+--------+------------------+--------+---------+



In [78]:
df_I94Addr_1 = df_label.withColumn('I94Addr_1', matching("value", lit("(\/\* I94ADDR[^;]+)"))).select("I94Addr_1")

df_I94Addr_2 = df_I94Addr_1.withColumn('I94Addr_2', explode(split("I94Addr_1", "[\r\n]+"))).select('I94Addr_2')

df_I94Addr_3 = df_I94Addr_2.filter(df_I94Addr_2['I94Addr_2'].rlike("(\t*[0-9A-Z.' ]+\=\t*[0-9A-Za-z \',\-()\/\.#&]*)"))\
.withColumn('stateCode', regexp_extract(df_I94Addr_2['I94Addr_2'], "(?<=\t')[0-9A-Z. ]+(?=')", 0))\
.withColumn('stateName', regexp_extract(df_I94Addr_2['I94Addr_2'], "(?<=\=')([0-9A-Za-z ,\-()\/\.#&]+)(?=')", 0))\
.withColumn('stateName',initcap(col('stateName')))\
.drop("I94Addr_2")

In [79]:
df_I94Addr_3.filter(df_I94Addr_3.stateName == "Alabama").show(truncate=False)

+---------+---------+
|stateCode|stateName|
+---------+---------+
|AL       |Alabama  |
+---------+---------+



In [80]:

df_I94Mode_1 = df_label.withColumn('I94Mode_1', matching("value", lit("(\/\* I94MODE[^;]+)"))).select("I94Mode_1")

df_I94Mode_2 = df_I94Mode_1.withColumn('I94Mode_2', explode(split("I94Mode_1", "[\r\n]+"))).select('I94Mode_2')

df_I94Mode_3 = df_I94Mode_2.filter(df_I94Mode_2['I94Mode_2']\
                                                                    .rlike("(\t*[0-9]+ *\= *[0-9A-Za-z \',\-()\/\.#&]*)"))\
.withColumn('travelModeCode', regexp_extract(df_I94Mode_2['I94Mode_2'], "(?<=\t)[0-9]+(?= )", 0))\
.withColumn('travelModeName', regexp_extract(df_I94Mode_2['I94Mode_2'], "(?<=\= ')([A-Za-z ,\-()\/\.#&]+)(?=')", 0))\
.drop("I94Mode_2")

In [81]:
df_I94Mode_3.show()

+--------------+--------------+
|travelModeCode|travelModeName|
+--------------+--------------+
|             1|           Air|
|             2|           Sea|
|             3|          Land|
|             9|  Not reported|
+--------------+--------------+



In [82]:

df_I94Visa_1 = df_label.withColumn('I94Visa_1', matching("value", lit("(\/\* I94VISA[^;]+)"))).select("I94Visa_1")

df_I94Visa_2 = df_I94Visa_1.withColumn('I94Visa_2', explode(split("I94Visa_1", "[\r\n]+"))).select('I94Visa_2')

df_I94Visa_3 = df_I94Visa_2.filter(df_I94Visa_2['I94Visa_2'].rlike("(\s*[0-9]+ *\= *[A-Za-z]*)"))\
.withColumn('VisaCategoryCode', regexp_extract(df_I94Visa_2['I94Visa_2'], "[0-9]+", 0))\
.withColumn('VisaCategoryName', regexp_extract(df_I94Visa_2['I94Visa_2'], "[A-Za-z]+", 0))\
.drop("I94Visa_2")

In [83]:
df_I94Visa_3.show()

+----------------+----------------+
|VisaCategoryCode|VisaCategoryName|
+----------------+----------------+
|               1|        Business|
|               2|        Pleasure|
|               3|         Student|
+----------------+----------------+



In [84]:
airport_codes = 'airport-codes_csv.csv'
airport_df = pd.read_csv(airport_codes)

In [85]:
airport_df.shape

(55075, 12)

In [86]:
airport_df.head(5)


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [87]:
duplicate = airport_df[airport_df.duplicated(["iso_region","municipality"])]

In [88]:
duplicate

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
49,00TE,heliport,Tcjc-Northeast Campus Heliport,600.0,,US,US-TX,Fort Worth,00TE,,00TE,"-97.18949890136719, 32.847599029541016"
116,01TS,closed,St Joseph Hospital Heliport,675.0,,US,US-TX,Fort Worth,,,,"-97.324501, 32.7285"
179,02TE,heliport,Baylor Medical Center Heliport,560.0,,US,US-TX,Waxahachie,02TE,,02TE,"-96.86419677734375, 32.39540100097656"
181,02TS,closed,FWOMC Heliport,684.0,,US,US-TX,Fort Worth,,,,"-97.370003, 32.747601"
221,03MT,small_airport,Cascade Field,3580.0,,US,US-MT,Cascade,3MT7,,3MT7,"-111.71748, 47.267327"
231,03OH,small_airport,Gibbs Field,580.0,,US,US-OH,Fremont,03OH,,03OH,"-83.01740264892578, 41.418399810791016"
238,03PS,closed,Ziggy's Field,1050.0,,US,US-PA,Bellefonte,,,,"-77.905602, 40.849998"
312,05AK,small_airport,Wasilla Creek Airpark,620.0,,US,US-AK,Palmer,05AK,,05AK,"-149.1880035, 61.66830063"
327,05II,small_airport,Reichhart Airport,795.0,,US,US-IN,New Haven,05II,,05II,"-84.99720001220703, 41.02870178222656"
353,05PA,heliport,PECO Mob. Heliport,110.0,,US,US-PA,Philadelphia,05PA,,05PA,"-75.178201, 39.9548"


In [89]:
airport_df.isna().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

In [90]:
airport_df[airport_df['municipality']=='Eagle']

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
23699,ID31,heliport,Young Heliport,2625.0,,US,US-ID,Eagle,ID31,,ID31,"-116.38899993896484, 43.71659851074219"
27016,KEGE,medium_airport,Eagle County Regional Airport,6548.0,,US,US-CO,Eagle,KEGE,EGE,EGE,"-106.9179993, 39.64260101"
38243,PAEG,small_airport,Eagle Airport,908.0,,US,US-AK,Eagle,PAEG,EAA,EAA,"-141.151001, 64.77639771"


In [91]:
mask=airport_df['municipality'].str.contains('DOUGLAS',case=False, na=False)

In [92]:
airport_df[mask].sample(3)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
20457,FK-0024,small_airport,Douglas Station Airport,,SA,FK,FK-U-A,Douglas,,,,"-58.612131, -51.459942"
6747,60KS,small_airport,Alley Field,1260.0,,US,US-KS,Douglass,60KS,,60KS,"-97.00029754638672, 37.51390075683594"
4273,3GE6,heliport,Wellstar Douglas Hospital Heliport,1120.0,,US,US-GA,Douglasville,3GE6,,3GE6,"-84.73280334472656, 33.73889923095703"


In [93]:
airport_df.type.value_counts()

small_airport     33965
heliport          11287
medium_airport     4550
closed             3606
seaplane_base      1016
large_airport       627
balloonport          24
Name: type, dtype: int64

In [94]:
len(airport_df[airport_df['iso_country']=='US'])

22757

In [95]:
null_per = airport_df.isnull().sum()/airport_df.shape[0]*100
null_per

ident            0.000000
type             0.000000
name             0.000000
elevation_ft    12.720835
continent       50.329551
iso_country      0.448479
iso_region       0.000000
municipality    10.305946
gps_code        25.501589
iata_code       83.315479
local_code      47.914662
coordinates      0.000000
dtype: float64

In [96]:
drop_columns = null_per[null_per>80].keys()
drop_columns

Index(['iata_code'], dtype='object')

In [97]:
output_df = airport_df.drop(drop_columns, axis=1)

In [98]:
output_df['State_code'] = output_df.iso_region.str.split('-', expand = True)[1]

In [99]:
output_df.loc[output_df["iso_country"].isna(), "iso_country"] = output_df.iso_region.str.split('-', expand = True)[0]

In [100]:
mapping0 = output_df.dropna().drop_duplicates().set_index("iso_country")["continent"].to_dict()
output_df.loc[output_df["continent"].isna(), "continent"] = output_df.loc[output_df["continent"].isna(), "iso_country"].map(mapping0)

In [101]:
output_df.loc[output_df["gps_code"].isna(), "gps_code"] = output_df["ident"]
output_df.loc[output_df["local_code"].isna(), "local_code"] = output_df["gps_code"]

In [102]:
output_df.isnull().sum()/output_df.shape[0]*100

ident            0.000000
type             0.000000
name             0.000000
elevation_ft    12.720835
continent        9.009532
iso_country      0.000000
iso_region       0.000000
municipality    10.305946
gps_code         0.000000
local_code       0.000000
coordinates      0.000000
State_code       0.000000
dtype: float64

In [103]:
output_df[output_df['iso_country']=='US'].isnull().sum()/output_df.shape[0]*100

ident           0.000000
type            0.000000
name            0.000000
elevation_ft    0.433954
continent       0.000000
iso_country     0.000000
iso_region      0.000000
municipality    0.185202
gps_code        0.000000
local_code      0.000000
coordinates     0.000000
State_code      0.000000
dtype: float64

In [104]:
output_df.dropna(axis=0,subset=["municipality"],inplace=True)

In [105]:
len(output_df)

49399

In [106]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df = pd.read_csv(fname)

In [107]:
df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [108]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


In [109]:
duplicate = df[df.duplicated(['dt'])]

In [110]:
duplicate

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
3239,1743-11-01,10.013,2.291,Çorlu,Turkey,40.99N,27.69E
3240,1743-12-01,,,Çorlu,Turkey,40.99N,27.69E
3241,1744-01-01,,,Çorlu,Turkey,40.99N,27.69E
3242,1744-02-01,,,Çorlu,Turkey,40.99N,27.69E
3243,1744-03-01,,,Çorlu,Turkey,40.99N,27.69E
3244,1744-04-01,13.685,2.162,Çorlu,Turkey,40.99N,27.69E
3245,1744-05-01,15.021,1.824,Çorlu,Turkey,40.99N,27.69E
3246,1744-06-01,19.663,1.701,Çorlu,Turkey,40.99N,27.69E
3247,1744-07-01,22.314,1.648,Çorlu,Turkey,40.99N,27.69E
3248,1744-08-01,,,Çorlu,Turkey,40.99N,27.69E


In [111]:
df.Country.value_counts()[:5]

India            1014906
China             827802
United States     687289
Brazil            475580
Russia            461234
Name: Country, dtype: int64

In [112]:
df.City.value_counts()[:5]

Springfield    9545
Worcester      8359
León           7469
Rongcheng      6526
Manchester     6478
Name: City, dtype: int64

In [113]:
df[df['Country']=='Afghanistan'].head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
539825,1833-01-01,-2.204,2.693,Baglan,Afghanistan,36.17N,69.61E
539826,1833-02-01,-1.171,2.533,Baglan,Afghanistan,36.17N,69.61E
539827,1833-03-01,3.459,2.799,Baglan,Afghanistan,36.17N,69.61E
539828,1833-04-01,9.917,2.215,Baglan,Afghanistan,36.17N,69.61E
539829,1833-05-01,15.652,2.305,Baglan,Afghanistan,36.17N,69.61E


In [114]:
len(df[df['Country']=='United States'])

687289

In [115]:
df[df['Country']=='United States'].City.value_counts()[:5]

Springfield    9545
Columbus       6478
Aurora         6101
Arlington      5564
Peoria         5384
Name: City, dtype: int64

In [116]:
null_per = df.isnull().sum()/df.shape[0]*100
null_per

dt                               0.000000
AverageTemperature               4.234458
AverageTemperatureUncertainty    4.234458
City                             0.000000
Country                          0.000000
Latitude                         0.000000
Longitude                        0.000000
dtype: float64

In [117]:
df['dt'] =pd.to_datetime(df['dt'], format="%Y-%d-%m")
df['year'] = df['dt'].dt.year
df['month'] = df['dt'].dt.month
df['week'] = df['dt'].dt.week
df['day'] = df['dt'].dt.day
df['dt'] = df['dt'].dt.strftime('%Y-%m-%d')
df['dt'] =pd.to_datetime(df['dt'], format="%Y-%m-%d")

In [118]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 11 columns):
dt                               datetime64[ns]
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
year                             int64
month                            int64
week                             int64
day                              int64
dtypes: datetime64[ns](1), float64(2), int64(4), object(4)
memory usage: 721.7+ MB


In [119]:
df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,year,month,week,day
0,1743-01-11,6.068,1.737,Århus,Denmark,57.05N,10.33E,1743,1,2,11
1,1743-01-12,,,Århus,Denmark,57.05N,10.33E,1743,1,2,12
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E,1744,1,1,1
3,1744-01-02,,,Århus,Denmark,57.05N,10.33E,1744,1,1,2
4,1744-01-03,,,Århus,Denmark,57.05N,10.33E,1744,1,1,3


In [120]:
mapping1 = df.dropna().drop_duplicates().groupby(["Country","City","month","week"])['AverageTemperature'].mean().to_dict()
df.loc[df["AverageTemperature"].isna(), "AverageTemperature"] = df.loc[df["AverageTemperature"].isna(), ["Country","City","month","week"]].apply(lambda x : mapping1[(x[0],x[1],x[2],x[3])],axis=1)


In [121]:
mapping2 = df.dropna().drop_duplicates().groupby(["Country","City","month","week"])['AverageTemperatureUncertainty'].mean().to_dict()
df.loc[df["AverageTemperatureUncertainty"].isna(), "AverageTemperatureUncertainty"] = df.loc[df["AverageTemperatureUncertainty"].isna(), ["Country","City","month","week"]].apply(lambda x : mapping2[(x[0],x[1],x[2],x[3])],axis=1)


In [122]:
df=df[df.year>2000]

In [123]:
len(df)

537030

In [124]:
us_cities_demographics = 'us-cities-demographics.csv'
demographics_df = pd.read_csv(us_cities_demographics, header=0, sep=';')

In [125]:
demographics_df.shape

(2891, 12)

In [126]:
demographics_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


In [127]:
demographics_df.State.value_counts()[:3]

California    676
Texas         273
Florida       222
Name: State, dtype: int64

In [128]:
demographics_df[(demographics_df["Race"]=="Hispanic or Latino") & (demographics_df["State Code"]=="PR")]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
111,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,Hispanic or Latino,335559
155,Caguas,Puerto Rico,40.4,34743.0,42265.0,77008,,,,PR,Hispanic or Latino,76349
637,Carolina,Puerto Rico,42.0,64758.0,77308.0,142066,,,,PR,Hispanic or Latino,139967
1995,Ponce,Puerto Rico,40.5,56968.0,64615.0,121583,,,,PR,Hispanic or Latino,120705
2004,Bayamón,Puerto Rico,39.4,80128.0,90131.0,170259,,,,PR,Hispanic or Latino,169155
2589,Guaynabo,Puerto Rico,42.2,33066.0,37426.0,70492,,,,PR,Hispanic or Latino,69936
2746,Mayagüez,Puerto Rico,38.1,30799.0,35782.0,66581,,,,PR,Hispanic or Latino,65521


In [129]:
len(demographics_df[(demographics_df["Race"]=="Hispanic or Latino") & (demographics_df["State Code"]=="PR")])

7

In [130]:
null_per = demographics_df.isnull().sum()/demographics_df.shape[0]*100
null_per

City                      0.000000
State                     0.000000
Median Age                0.000000
Male Population           0.103770
Female Population         0.103770
Total Population          0.000000
Number of Veterans        0.449671
Foreign-born              0.449671
Average Household Size    0.553442
State Code                0.000000
Race                      0.000000
Count                     0.000000
dtype: float64

In [131]:
mapping1 = demographics_df.dropna().drop_duplicates().groupby(["State Code","Race"])['Male Population'].mean().to_dict()
demographics_df.loc[demographics_df["Male Population"].isna(), "Male Population"] = demographics_df.loc[demographics_df["Male Population"].isna(), ["State Code","Race"]].apply(lambda x : mapping1[(x[0],x[1])],axis=1)


In [132]:
demographics_df.loc[demographics_df["Female Population"].isna(), "Female Population"] = demographics_df["Total Population"] - demographics_df["Male Population"]

In [133]:
mapping3 = demographics_df.dropna().drop_duplicates().groupby("Race")['Foreign-born'].mean().to_dict()
demographics_df.loc[demographics_df["Foreign-born"].isna(), "Foreign-born"] = demographics_df.loc[demographics_df["Foreign-born"].isna(), ["Race"]].apply(lambda x : mapping3[(x[0])],axis=1)


In [134]:
mapping4 = demographics_df.dropna().drop_duplicates().groupby("Race")['Average Household Size'].mean().to_dict()
demographics_df.loc[demographics_df["Average Household Size"].isna(), "Average Household Size"] = demographics_df.loc[demographics_df["Average Household Size"].isna(), ["Race"]].apply(lambda x : mapping4[(x[0])],axis=1)


In [135]:
mapping5 = demographics_df.dropna().drop_duplicates().groupby("Race")['Number of Veterans'].mean().to_dict()
demographics_df.loc[demographics_df["Number of Veterans"].isna(), "Number of Veterans"] = demographics_df.loc[demographics_df["Number of Veterans"].isna(), ["Race"]].apply(lambda x : mapping5[(x[0])],axis=1)


In [136]:
null_per = demographics_df.isnull().sum()/demographics_df.shape[0]*100
null_per

City                      0.0
State                     0.0
Median Age                0.0
Male Population           0.0
Female Population         0.0
Total Population          0.0
Number of Veterans        0.0
Foreign-born              0.0
Average Household Size    0.0
State Code                0.0
Race                      0.0
Count                     0.0
dtype: float64

In [137]:
duplicate = df[df.duplicated()]
duplicate

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,year,month,week,day


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

- The travellers flow is my main interest, thus fact_i94 table is my fact table.

- demographics_df dimension table : this dimension table is the demographical details of travellers flow based on States of US and created from the us_cities_demographics dataset. This dimension can join the fact table through the stateCode feature.

- GlobalLandTemperaturesByCity dimension table: this dimension table is the temperatures  of travellers flow based on thier origin and destination countries and created from the GlobalLandTemperaturesByCity dataset. This dimension can join the fact table through the Country feature.

- ArrivalDate dimension table: this dimension table is the datetime details of travellers flow based on thier Arrival and extracted and created from the i94 dataset. This dimension can join the fact table through the i94port feature.

- I94Port dimension table: this dimension table is airport details of travellers flow based on State and city and this table extracted from the i94 sas_label file. This dimension can join the fact table through the i94port feature.


![ModelPic](ModelPic.png){:height="100px" width="100px"}

#### 3.2 Mapping Out Data Pipelines
1. Clean data by dropping columns with 80% percentage of null values or more 
2. Fill the null values based on another columns 
3. Load the data into staging tables
4. Create Dimension tables
5. Create Fact table
6. Write data into parquet files
7. Perform data quality checks

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [138]:
def create_spark_session():
    """
    the entry point of a spark application
    """
    spark = SparkSession.builder.\
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    enableHiveSupport().getOrCreate()
    return spark

In [139]:
spark = create_spark_session()
output_data = "abdullahoutput/"

### GlobalLandTemperaturesByCity dimension table
0. After exploring the dataset, there's no need to drop any column ,but there many steps to fill some null values and I included them within the function 
1. Read the dataset with pandas
2. Convert "dt" column to date time format
3. Extract year, month, week and day from "dt" column so that i can use them to fill null values
4. Fill the null values for 'AverageTemperature' column based on "Country" , "City" , "month" and "week" columns
5. Fill the null values for 'AverageTemperatureUncertainty' column based on "Country" , "City" , "month" and "week" columns
6. Filter by year larger than 2000 since the dataset size is too large
7. Create table using apache spark and fill it with the cleaned dataset
8. Get the average of AverageTemperature and AverageTemperatureUncertainty based on the country
9. Write the table with parquet format

In [140]:
def GlobalLandTemperaturesByCity_fun(spark, output_data):
    """
    spark: the spark session that has been created.
    output_data: the path where the parquet files will be written.
    """
    
    # Read the dataset with pandas
    fname = '../../data2/GlobalLandTemperaturesByCity.csv'
    df = pd.read_csv(fname)
    
    # Convert "dt" column to date time format
    df['dt'] =pd.to_datetime(df['dt'], format="%Y-%d-%m")
    
    # Extract year, month, week and day from "dt" column so that i can use them to fill null values
    df['year'] = df['dt'].dt.year
    df['month'] = df['dt'].dt.month
    df['week'] = df['dt'].dt.week
    df['day'] = df['dt'].dt.day

    df['dt'] = df['dt'].dt.strftime('%Y-%m-%d')
    df['dt'] =pd.to_datetime(df['dt'], format="%Y-%m-%d")
    
    # Fill the null values for 'AverageTemperature' column based on "Country" , "City" , "month" and "week" columns
    mapping1 = df.dropna().drop_duplicates().groupby(["Country","City","month","week"])['AverageTemperature'].mean().to_dict()
    df.loc[df["AverageTemperature"].isna(), "AverageTemperature"] = df.loc[df["AverageTemperature"].isna(), ["Country","City","month","week"]].apply(lambda x : mapping1[(x[0],x[1],x[2],x[3])],axis=1)
    
    # Fill the null values for 'AverageTemperatureUncertainty' column based on "Country" , "City" , "month" and "week" columns
    mapping2 = df.dropna().drop_duplicates().groupby(["Country","City","month","week"])['AverageTemperatureUncertainty'].mean().to_dict()
    df.loc[df["AverageTemperatureUncertainty"].isna(), "AverageTemperatureUncertainty"] = df.loc[df["AverageTemperatureUncertainty"].isna(), ["Country","City","month","week"]].apply(lambda x : mapping2[(x[0],x[1],x[2],x[3])],axis=1)
    
    # Filter by year larger than 2000 since the dataset size is too large
    df=df[df.year>2000]
    
    #Create table using apache spark and fill it with the cleaned dataset
    schema_GlobalLandTemperaturesByCity = StructType([StructField("dt", DateType() , True)\
                                                      ,StructField("AverageTemperature", FloatType(), True)\
                                                      ,StructField("AverageTemperatureUncertainty", FloatType(), True)\
                                                      ,StructField("City", StringType(), True)\
                                                      ,StructField("Country", StringType(), True)\
                                                      ,StructField("Latitude", StringType(), True)\
                                                      ,StructField("Longitude", StringType(), True)\
                                                      ,StructField("year", IntegerType(), True)\
                                                      ,StructField("month", IntegerType(), True)\
                                                      ,StructField("week", IntegerType(), True)\
                                                      ,StructField("day", IntegerType(), True)])

    spark_GlobalLandTemperaturesByCity = spark.createDataFrame(df, schema=schema_GlobalLandTemperaturesByCity)
    
    # Get the average of AverageTemperature and AverageTemperatureUncertainty based on the country
    df_GlobalLandTemperaturesByCity = spark_GlobalLandTemperaturesByCity.groupBy(col("Country").alias("Country"))\
    .agg(mean('AverageTemperature').alias("AverageTemperature"),\
         mean("AverageTemperatureUncertainty").alias("AverageTemperatureUncertainty"))\
    .select(["Country", "AverageTemperature", "AverageTemperatureUncertainty"])
    
    # Write the table with parquet format
    df_GlobalLandTemperaturesByCity.write.mode('overwrite').parquet(output_data+'GlobalLandTemperaturesByCity/')

In [141]:
GlobalLandTemperaturesByCity_fun(spark, output_data)

### demographics_df dimension table
0. After exploring the dataset, there's no need to drop any column ,but there many steps to fill some null values and I included them within the function 
1. Read the us-cities-demographics dataset with pandas
2. Fill the null values for 'Male Population' column based on "State Code" and  "Race" columns
3. Fill the null values for 'Female Population' column by subtracting 'Male Population' column from 'Total Population' column
4. Fill the null values for 'Foreign-born' column based on "Race" column
5. Fill the null values for 'Average Household Size' column based on "Race" column
6. Fill the null values for 'Number of Veterans' column based on "Race" column
7. Create schema_demographics_df table using apache spark and fill it with the cleaned demographics_df dataset
8. Select the columns to make changes on columns names with appropriate datatype
9. Get the mean of Median_Age and Average_Household_Size and sum of Male_Population and Female_Population and Total_Population and Number_of_Veterans and Foreign_born based on the States
10. Write the table with parquet format

In [142]:
def demographics_df_fun(spark,output_data):
    # Read the us-cities-demographics dataset with pandas
    us_cities_demographics = 'us-cities-demographics.csv'
    demographics_df = pd.read_csv(us_cities_demographics, header=0, sep=';')

    # Fill the null values for 'Male Population' column based on "State Code" and "Race" columns

    mapping1 = demographics_df.dropna().drop_duplicates().groupby(["State Code","Race"])['Male Population'].mean().to_dict()
    demographics_df.loc[demographics_df["Male Population"].isna(), "Male Population"] = demographics_df.loc[demographics_df["Male Population"].isna(), ["State Code","Race"]].apply(lambda x : mapping1[(x[0],x[1])],axis=1)
    
    # Fill the null values for 'Female Population' column by subtracting 'Male Population' column from 'Total Population' column
    demographics_df.loc[demographics_df["Female Population"].isna(), "Female Population"] = demographics_df["Total Population"] - demographics_df["Male Population"]
    # Fill the null values for 'Foreign-born' column based on "Race" column
    mapping3 = demographics_df.dropna().drop_duplicates().groupby("Race")['Foreign-born'].mean().to_dict()
    demographics_df.loc[demographics_df["Foreign-born"].isna(), "Foreign-born"] = demographics_df.loc[demographics_df["Foreign-born"].isna(), ["Race"]].apply(lambda x : mapping3[(x[0])],axis=1)
    # Fill the null values for 'Average Household Size' column based on "Race" column
    mapping4 = demographics_df.dropna().drop_duplicates().groupby("Race")['Average Household Size'].mean().to_dict()
    demographics_df.loc[demographics_df["Average Household Size"].isna(), "Average Household Size"] = demographics_df.loc[demographics_df["Average Household Size"].isna(), ["Race"]].apply(lambda x : mapping4[(x[0])],axis=1)
    
    # Fill the null values for 'Number of Veterans' column based on "Race" column
    mapping5 = demographics_df.dropna().drop_duplicates().groupby("Race")['Number of Veterans'].mean().to_dict()
    demographics_df.loc[demographics_df["Number of Veterans"].isna(), "Number of Veterans"] = demographics_df.loc[demographics_df["Number of Veterans"].isna(), ["Race"]].apply(lambda x : mapping5[(x[0])],axis=1)
    # Create schema_demographics_df table using apache spark and fill it with the cleaned demographics_df dataset
    schema_demographics_df = StructType([StructField("City",StringType() , True)\
                                         ,StructField("State", StringType(), True)\
                                         ,StructField("Median Age", FloatType(), True)\
                                         ,StructField("Male Population", FloatType(), True)\
                                         ,StructField("Female Population", FloatType(), True)\
                                         ,StructField("Total Population", IntegerType(), True)\
                                         ,StructField("Number of Veterans", FloatType(), True)\
                                         ,StructField("Foreign-born", FloatType(), True)\
                                         ,StructField("Average Household Size", FloatType(), True)\
                                         ,StructField("State Code", StringType(), True)\
                                         ,StructField("Race", StringType(), True)\
                                         ,StructField("Count", IntegerType(), True)])
    # Select the columns to make changes on columns names with appropriate datatype
    spark_demographics_df = spark.createDataFrame(demographics_df, schema=schema_demographics_df)
    spark_demographics_df = spark_demographics_df.select("City","State",col("Median Age").alias("Median_Age"),col("Male Population").cast('int').alias("Male_Population"),
                                                    col("Female Population").cast('int').alias("Female_Population"),col("Total Population").alias("Total_Population"),
                                                    col("Number of Veterans").cast('int').alias("Number_of_Veterans"),col("Foreign-born").cast('int').alias("Foreign_born")
                                                    ,col("Average Household Size").alias("Average_Household_Size"),col("State Code").alias("State_Code"),"Race","Count")
    # Get the mean of Median_Age and Average_Household_Size and sum of Male_Population and Female_Population and Total_Population and Number_of_Veterans and Foreign_born based on the States
    spark_demographics_df = spark_demographics_df.groupBy(col("State_Code").alias("State_Code"),col("State").alias("State")).agg(
        round(mean('Median_Age'),2).alias("Median_Age"),
        sum("Male_Population").alias("Male_Population"),
        sum("Female_Population").alias("Female_Population"),
        sum("Total_Population").alias("Total_Population"),
        sum("Number_of_Veterans").alias("Number_of_Veterans"),
        sum("Foreign_born").alias("Foreign_born"),
        round(mean("Average_Household_Size"),2).alias("Average_Household_Size")).select(["State", "state_code", "Median_Age", "Male_Population","Female_Population",\
         "Total_Population","Number_of_Veterans","Foreign_born","Average_Household_Size"])
    # Write the table with parquet format
    spark_demographics_df.write.mode('overwrite').parquet(output_data+'demographics_df/')

In [143]:
demographics_df_fun(spark, output_data)

### I94Port dimension table
1. Read I94_SAS_Labels_Descriptions file in text format as a wholetext
2. Search for a specific section using udf in I94_SAS_Labels_Descriptions file
3. Flatten the content in this section based on Newline and Carriage return
4. Derive a new column from the flatten column and filter data by matching it with regular expressions 
5. Derive a new columns from the previous derived column to specify what the data we want to extract accurately
6. Perform some cleaning steps such as updating some values and change the datatype for columns
7. Write the extracted tables in parquet format.

**The previous steps will be applied to extract five tables(I94CitRes, I94Port, I94Addr, I94Mode and I94Visa) and the I94Port table is the only table will be used as a dimension and the rest of tables will be used within the fact table**

In [144]:
def i94_SAS_label_df_fun(spark, output_data):
    
    TextFile = 'I94_SAS_Labels_Descriptions.SAS'
    df_label = spark.read.text(TextFile, wholetext=True)
    
    @udf
    def matching(TextFile, TextPattern): 
        return re.search(TextPattern, TextFile).group(0)
    # I94CIT & I94RES
    df_I94CitRes_1 = df_label.withColumn('I94CitRes_1', matching("value", lit("(\/\* I94CIT & I94RES[^;]+)"))).select("I94CitRes_1")
    df_I94CitRes_2 = df_I94CitRes_1.withColumn('I94CitRes_2', explode(split("I94CitRes_1", "[\r\n]+"))).select('I94CitRes_2')
    df_I94CitRes_3 = df_I94CitRes_2.filter(df_I94CitRes_2['I94CitRes_2'].rlike("([0-9]+ *\= *[0-9A-Za-z \:\',\-()\/\.#&]+)"))\
    .withColumn('country_code', regexp_extract(df_I94CitRes_2['I94CitRes_2'], "[0-9]+", 0))\
    .withColumn('country_code',col('country_code').cast(IntegerType()))\
    .withColumn('country_name', regexp_extract(df_I94CitRes_2['I94CitRes_2'], "(?<=')([0-9A-Za-z \:\',\-()\/\.#&]+)(?=')", 0))\
    .withColumn('country_name',initcap(col('country_name')))\
    .withColumn('country_name',when(col('country_name') == 'Mexico Air Sea, And Not Reported (i-94, No Land Arrivals)','Mexico')\
                .otherwise(col('country_name'))).drop("I94CitRes_2")
    df_I94CitRes_3.write.mode('overwrite').parquet(output_data+'i94_SAS_label/I94CitRes/')
    
    # I94PORT
    df_I94Port1 = df_label.withColumn('I94Port1', matching("value", lit("(\/\* I94PORT[^;]+)"))).select("I94Port1")
    df_I94Port2 = df_I94Port1.withColumn('I94Port_2', explode(split("I94Port1", "[\r\n]+"))).select('I94Port_2')
    df_I94Port3 = df_I94Port2.filter(df_I94Port2['I94Port_2'].rlike("([0-9A-Z.' ]+\t*\=\t*[0-9A-Za-z \',\-()\/\.#&]*)"))\
    .withColumn('portCode', regexp_extract(df_I94Port2['I94Port_2'], "(?<=')[0-9A-Z. ]+(?=')", 0))\
    .withColumn('cityName_stateCode', regexp_extract(col('I94Port_2'), "(?<=\t')([0-9A-Za-z ,\-()\/\.#&]+)(?=')", 0))\
    .withColumn('cityName', split(col('cityName_stateCode'), ',').getItem(0))\
    .withColumn('stateCode', split(col('cityName_stateCode'), ',').getItem(1))\
    .withColumn('stateCode', regexp_replace(col('stateCode'), '\s*', ''))\
    .withColumn('cityName',initcap(col('cityName')))\
    .withColumn('stateCode', when(col('stateCode') == 'CA(BPS)','CA')\
                .when(col('stateCode') == 'AR(BPS)','AR')\
                .when(col('stateCode') == 'HWY.STATION','CA')\
                .when(col('stateCode') == 'CO#ARPT','CO')\
                .when(col('stateCode') == 'FL#ARPT','FL')\
                .when(col("stateCode").isNull(), "AZ")\
                .otherwise(col('stateCode'))).drop('I94Port_2')
    df_I94Port3=df_I94Port3.withColumn('stateCode',when((df_I94Port3.portCode == "WAS")&(df_I94Port3.stateCode == 'AZ') , "DC")\
                                       .otherwise(col('stateCode')))
    df_I94Port3=df_I94Port3.withColumn('cityName', when(df_I94Port3.portCode == 'MAP','Mariposa').otherwise(col('cityName')))
    df_I94Port3.write.mode('overwrite').parquet(output_data+'i94_SAS_label/I94Port/')
    
    # I94ADDR
    df_I94Addr_1 = df_label.withColumn('I94Addr_1', matching("value", lit("(\/\* I94ADDR[^;]+)"))).select("I94Addr_1")
    df_I94Addr_2 = df_I94Addr_1.withColumn('I94Addr_2', explode(split("I94Addr_1", "[\r\n]+"))).select('I94Addr_2')
    df_I94Addr_3 = df_I94Addr_2.filter(df_I94Addr_2['I94Addr_2'].rlike("(\t*[0-9A-Z.' ]+\=\t*[0-9A-Za-z \',\-()\/\.#&]*)"))\
    .withColumn('stateCode', regexp_extract(df_I94Addr_2['I94Addr_2'], "(?<=\t')[0-9A-Z. ]+(?=')", 0))\
    .withColumn('stateName', regexp_extract(df_I94Addr_2['I94Addr_2'], "(?<=\=')([0-9A-Za-z ,\-()\/\.#&]+)(?=')", 0))\
    .withColumn('stateName',initcap(col('stateName'))).drop("I94Addr_2")
    df_I94Addr_3.write.mode('overwrite').parquet(output_data+'i94_SAS_label/I94Addr/')
    
    # I94MODE
    df_I94Mode_1 = df_label.withColumn('I94Mode_1', matching("value", lit("(\/\* I94MODE[^;]+)"))).select("I94Mode_1")
    df_I94Mode_2 = df_I94Mode_1.withColumn('I94Mode_2', explode(split("I94Mode_1", "[\r\n]+"))).select('I94Mode_2')
    df_I94Mode_3 = df_I94Mode_2.filter(df_I94Mode_2['I94Mode_2'].rlike("(\t*[0-9]+ *\= *[0-9A-Za-z \',\-()\/\.#&]*)"))\
    .withColumn('travelModeCode', regexp_extract(df_I94Mode_2['I94Mode_2'], "(?<=\t)[0-9]+(?= )", 0))\
    .withColumn('travelModeCode',col('travelModeCode').cast(IntegerType()))\
    .withColumn('travelModeName', regexp_extract(df_I94Mode_2['I94Mode_2'], "(?<=\= ')([A-Za-z ,\-()\/\.#&]+)(?=')", 0))\
    .drop("I94Mode_2")
    df_I94Mode_3.write.mode('overwrite').parquet(output_data+'i94_SAS_label/I94Mode/')
    
    # I94Visa
    df_I94Visa_1 = df_label.withColumn('I94Visa_1', matching("value", lit("(\/\* I94VISA[^;]+)"))).select("I94Visa_1")
    df_I94Visa_2 = df_I94Visa_1.withColumn('I94Visa_2', explode(split("I94Visa_1", "[\r\n]+"))).select('I94Visa_2')
    df_I94Visa_3 = df_I94Visa_2.filter(df_I94Visa_2['I94Visa_2'].rlike("(\s*[0-9]+ *\= *[A-Za-z]*)"))\
    .withColumn('VisaCategoryCode', regexp_extract(df_I94Visa_2['I94Visa_2'], "[0-9]+", 0))\
    .withColumn('VisaCategoryCode',col('VisaCategoryCode').cast(IntegerType()))\
    .withColumn('VisaCategoryName', regexp_extract(df_I94Visa_2['I94Visa_2'], "[A-Za-z]+", 0)).drop("I94Visa_2")
    df_I94Visa_3.write.mode('overwrite').parquet(output_data+'i94_SAS_label/I94Visa/')

In [145]:
i94_SAS_label_df_fun(spark, output_data)

### ArrivalDate dimension table
1. Read sas_data file
2. Create function to get the date by adding the number of days between the chosen date and the reference date (01-01-1960) and use UDF to apply it in pyspark
3. From this Date we extract columns such as  "Year", "Month", "Week", "WeekDay" and "Day"
4. Write in parquet format

In [146]:
def ArrivalDate_fun(spark, output_data):
    #  Read sas_data file
    #df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
    df_spark=spark.read.parquet("sas_data")
    # Create function to get the date by adding the number of days between the chosen date and the reference date (01-01-1960) and use UDF to apply it in pyspark

    def convert_to_dt(i):
        try:
            s = datetime(1960,1,1)
            d = timedelta(days=int(i))
            return s + d
        except:
            ex = datetime(1960,1,1)
            return ex
        
    arrdate_datetime_udf = udf(lambda i: convert_to_dt(i), DateType())
    
    # From this Date we extract columns such as  "Year", "Month", "Week", "WeekDay" and "Day"
    arrdate_df = df_spark.select(["arrdate"])\
    .withColumn("ArrivalDate", arrdate_datetime_udf("arrdate"))\
    .withColumn('Year', year('ArrivalDate'))\
    .withColumn('Month', month('ArrivalDate'))\
    .withColumn('Week', weekofyear('ArrivalDate'))\
    .withColumn('WeekDay', dayofweek('ArrivalDate'))\
    .withColumn('Day', dayofmonth('ArrivalDate'))\
    .select(["arrdate", "ArrivalDate", "Year", "Month", "Week", "WeekDay", "Day"])\
    .dropDuplicates(["arrdate"])
    
    # Write in parquet format   
    arrdate_df.write.mode('overwrite').parquet(output_data+'ArrivalDate/')

In [147]:
ArrivalDate_fun(spark, output_data)

### fact_i94 fact table
1. Read sas_data file
2. Get the percentage of null values for each column 
3. Drop column with 80% of null values or more
4. Convert datatype of dtadfile and dtaddto columns to datetime format
5. Create function to get the date by adding the number of days between the chosen date and the reference date (01-01-1960) and use UDF to apply it in pyspark and converted to datetime format
6. Read the extracted tables I94Visa, I94Mode, I94Addr and I94CitRes from sas_labels and perform joins with i94 dataset
7. Select the columns i choose to be within the fact table which includes the foreign keys for all dimensions tables and another columns with adjusted datatype
8. Write fact_i94 tablein parquet format

In [148]:
def i94_df_fun(spark,output_data):
    # Read sas_data file
    df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
    df_spark=spark.read.parquet("sas_data")
    
    #Get the percentage of null values for each column
    missing_percentage_df = df_spark.select([(count(when(isnan(c) | col(c).isNull(), c))*100/count(lit(1))).alias(c) for c in df_spark.columns])
    missing_percentage_df = missing_percentage_df.collect()[0].asDict()
    
    # Drop column with 80% of null values or more
    drop_columns = [k for k, v in missing_percentage_df.items() if v > 80]
    i94_df = df_spark.drop(*drop_columns)
    
    # Convert datatype of dtadfile and dtaddto columns to datetime format
    i94_df = i94_df.withColumn('dtadfile', to_date(col('dtadfile'), format='yyyyMMdd'))\
    .withColumn('dtaddto', to_date(col('dtaddto'), format='MMddyyyy'))
    
    # Create function to get the date by adding the number of days between the chosen date and the reference date (01-01-1960) and use UDF to apply it in pyspark and converted to datetime format

    def convert_to_dt(i):
        try:
            s = datetime(1960,1,1)
            d = timedelta(days=int(i))
            return s + d
        except:
            ex = datetime(1960,1,1)
            return ex
    
    depdate_datetime_udf = udf(lambda i: convert_to_dt(i), DateType())
    i94_df = i94_df.withColumn('depdate', to_date(depdate_datetime_udf(col('depdate'))))
    
    # Read the extracted tables I94Visa, I94Mode, I94Addr and I94CitRes from sas_labels and perform joins with i94 dataset
    df_I94Visa=spark.read.parquet("abdullahoutput/i94_SAS_label/I94Visa")
    df_I94Mode=spark.read.parquet("abdullahoutput/i94_SAS_label/I94Mode")
    df_I94Addr=spark.read.parquet("abdullahoutput/i94_SAS_label/I94Addr")
    df_I94CitRes=spark.read.parquet("abdullahoutput/i94_SAS_label/I94CitRes")
    
    i94_full_df = i94_df.join(df_I94CitRes,[i94_df.i94res==df_I94CitRes.country_code])\
    .withColumnRenamed("country_code","country_code_res").withColumnRenamed("country_name","country_name_res")
    
    i94_full_df1 = i94_full_df.join(df_I94CitRes,[i94_full_df.i94cit==df_I94CitRes.country_code])\
    .withColumnRenamed("country_code","country_code_cit").withColumnRenamed("country_name","country_name_cit")

    i94_full_df2 = i94_full_df1.join(df_I94Mode,[i94_full_df1.i94mode==df_I94Mode.travelModeCode])
    i94_full_df3 = i94_full_df2.join(df_I94Addr,[i94_full_df2.i94addr==df_I94Addr.stateCode])
    i94_full_df4 = i94_full_df3.join(df_I94Visa,[i94_full_df3.i94visa==df_I94Visa.VisaCategoryCode])
    
    # Select the columns i choose to be within the fact table which includes the foreign keys for all dimensions tables and another columns with adjusted datatype
    i94_full_df5 = i94_full_df4.select(col("cicid").cast('int'),col("i94yr").cast('int'),col("i94mon").cast('int'),
                                       "country_code_cit","country_name_cit",
                                       "country_code_res","country_name_res","travelModeCode","travelModeName","stateCode",
                                       "stateName","fltno",col("admnum").cast('int'),"airline","gender",
                                       "dtadfile","dtaddto",col("biryear").cast('int'),"entdepa","entdepd","matflag",
                                       col("i94bir").cast('int'),"arrdate","depdate","i94port","VisaCategoryCode","VisaCategoryName",
                                       "visatype","visapost")
    # Write fact_i94 tablein parquet format
    i94_full_df5.write.mode('overwrite').parquet(output_data+'fact_i94/')
    
    
    

In [149]:
i94_df_fun(spark,output_data)

#### 4.2 Data Quality Checks
* Data quality checks in two steps for each table:
    1. data_quality_checks function is about comparing between the number of rows after the pipeline process and the real value or expected value for table. if they are identical ,then the check will pass.
    2. data_quality_checks function is about primary key duplication of the table and checking if its value is zero or not. if its value is zero, then the check will pass.
* Finally, if any table doesn't pass the quality check, then the whole data_quality_checks will fail.

In [150]:
def data_quality_checks():
    df_fact_i94=spark.read.parquet("abdullahoutput/fact_i94")
    df_ArrivalDate=spark.read.parquet("abdullahoutput/ArrivalDate")
    df_demographics_df=spark.read.parquet("abdullahoutput/demographics_df")
    df_GlobalLandTemperaturesByCity=spark.read.parquet("abdullahoutput/GlobalLandTemperaturesByCity")
    df_I94Port=spark.read.parquet("abdullahoutput/i94_SAS_label/I94Port")
    
    data_quality_checks=[{'check_count_of_Dataset': df_fact_i94.count(), 'real_count_of_Dataset': 2559296,
                          'check_count_duplicate_of_unique_key': df_fact_i94.groupBy("cicid").count().where("count > 1").count(), 'real_count_duplicate_of_unique_key': 0,"table":"df_fact_i94"},
                         {'check_count_of_Dataset': df_ArrivalDate.count(), 'real_count_of_Dataset': 30,
                          'check_count_duplicate_of_unique_key': df_ArrivalDate.groupBy("arrdate").count().where("count > 1").count(), 'real_count_duplicate_of_unique_key': 0,"table":"df_ArrivalDate"},
                         {'check_count_of_Dataset': df_demographics_df.count(), 'real_count_of_Dataset': 49,
                          'check_count_duplicate_of_unique_key': df_demographics_df.groupBy("State").count().where("count > 1").count(), 'real_count_duplicate_of_unique_key': 0,"table":"df_demographics_df"},
                         {'check_count_of_Dataset': df_GlobalLandTemperaturesByCity.count(), 'real_count_of_Dataset': 159,
                          'check_count_duplicate_of_unique_key': df_GlobalLandTemperaturesByCity.groupBy("Country").count().where("count > 1").count(), 'real_count_duplicate_of_unique_key': 0,"table":"df_GlobalLandTemperaturesByCity"},
                         {'check_count_of_Dataset': df_I94Port.count(), 'real_count_of_Dataset': 660,
                          'check_count_duplicate_of_unique_key': df_I94Port.groupBy("portCode").count().where("count > 1").count(), 'real_count_duplicate_of_unique_key': 0,"table":"df_I94Port"}]
    
    
    count_error = 0
    for check in data_quality_checks:
        check_dataset_count = check.get('check_count_of_Dataset')
        real__dataset_count = check.get('real_count_of_Dataset')
        check_uniquekey_count = check.get('check_count_duplicate_of_unique_key')
        real_uniquekey_count = check.get('real_count_duplicate_of_unique_key')
        table_name = check.get('table')
         
        if ((check_dataset_count== real__dataset_count) and (check_uniquekey_count== real_uniquekey_count)):
            print(f"Data quality check pass at {table_name} table")
            
        else:
            count_error += 1
            print(f"Data quality check fails at {table_name} table")
    if count_error > 0:
        print('Data quality checks failed')
    else:
        print('Data quality checks passed')

In [151]:
# Perform quality checks here
data_quality_checks()

Data quality check pass at df_fact_i94 table
Data quality check pass at df_ArrivalDate table
Data quality check pass at df_demographics_df table
Data quality check pass at df_GlobalLandTemperaturesByCity table
Data quality check pass at df_I94Port table
Data quality checks passed


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

### Fact Table: this is the fact table from the cleaned I94 dataset

| Feature      | Description |
| ----------- | ----------- |
| cicid      | Unique ID       |
|i94yr| year        |
|i94mon| month|
|country_code_cit| 3 digit code for immigrant country of birth|
|country_name_cit| Name for immigrant country of birth|
|country_code_res| 3 digit code for immigrant country of residence|
|country_name_res| Name for immigrant country of residence|
|travelModeCode| Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)|
|travelModeName| Mode name of transportation|
|stateCode| USA State code of arrival|
|stateName| USA State name of arrival|
|fltno| Flight number of Airline used to arrive in U.S.|
|admnum| Admission Number|
|airline| Airline used to arrive in U.S|
|gender| Non-immigrant sex|
|dtadfile|Character Date Field - Date added to I-94 Files|
|dtaddto|Character Date Field - Date to which admitted to U.S. (allowed to stay until)|
|biryear|4 digit year of birth|
|entdepa| Arrival Flag - admitted or paroled into the U.S.|
|entdepd|Departure Flag - Departed, lost I-94 or is deceased|
|matflag|Match flag - Match of arrival and departure records|
|i94bir|Age of Respondent in Years|
|arrdate|Arrival Date in the USA|
|depdate|Departure Date from the USA|
|i94port|Port of admission|
|VisaCategoryCode| Visa codes collapsed into three categories|
|VisaCategoryName| Visa name collapsed into three categories|
|visatype|Class of admission legally admitting the non-immigrant to temporarily stay in U.S|
|visapost|Department of State where Visa was issued|







### demographics_df dimension table : this dimension table is created from the us_cities_demographics dataset. This dimension can join the fact table through the stateCode feature.

| Feature      | Description |
| ----------- | ----------- |
|State | US State of the City|
|Median Age | The median population age|
|Male Population | Male population total|
|Female Population | Female population total|
|Total Population | Total population|
|Number of Veterans | Number of veterans living in the city|
|Foreign-born | Number of residents who were not born in the city|
|Average Household Size | Average size of houses in the city|
|State Code | Code of the state|

### GlobalLandTemperaturesByCity dimension table: this dimension table is created from the GlobalLandTemperaturesByCity dataset. This dimension can join the fact table through the Country feature.

| Feature      | Description |
| ----------- | ----------- |
|Country | Name of country|
|AverageTemperature | Average temperature in celsius|
|AverageTemperatureUncertainty | 95% confidence interval around average temperature|

### I94Port dimension table: this dimension table is extracted from the i94 sas_label file. This dimension can join the fact table through the i94port feature.

| Feature      | Description |
| ----------- | ----------- |
|portCode | Port of admission|
|cityName_stateCode | The name of city and code of the state|
|cityName |  The name of city|
|stateCode|The code of the state|

### ArrivalDate dimension table: this dimension table is extracted and created from the i94 dataset. This dimension can join the fact table through the i94port feature.

| Feature      | Description |
| ----------- | ----------- |
|arrdate | Arrival Date in the USA|
|ArrivalDate | Arrival Date in the USA in date time format|
|Year |  Year|
|Month|Month|
|Week|Week of year|
|WeekDay|Day of week|
|Day|Day of month|


## The process result

In [152]:
# The fact and dimensions tables: 
df_fact_i94=spark.read.parquet("abdullahoutput/fact_i94")
df_ArrivalDate=spark.read.parquet("abdullahoutput/ArrivalDate")
df_demographics_df=spark.read.parquet("abdullahoutput/demographics_df")
df_GlobalLandTemperaturesByCity=spark.read.parquet("abdullahoutput/GlobalLandTemperaturesByCity")
df_I94Port=spark.read.parquet("abdullahoutput/i94_SAS_label/I94Port")

In [153]:
df_fact_i94.limit(2).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,country_code_cit,country_name_cit,country_code_res,country_name_res,travelModeCode,travelModeName,stateCode,...,entdepd,matflag,i94bir,arrdate,depdate,i94port,VisaCategoryCode,VisaCategoryName,visatype,visapost
0,5748517,2016,4,245,"China, Prc",438,Australia,1,Air,CA,...,O,M,40,20574.0,1960-01-01,LOS,1,Business,B1,SYD
1,5748518,2016,4,245,"China, Prc",438,Australia,1,Air,NV,...,O,M,32,20574.0,1960-01-01,LOS,1,Business,B1,SYD


In [154]:
df_fact_i94.count()

2559296

In [155]:
df_ArrivalDate.limit(2).toPandas()

Unnamed: 0,arrdate,ArrivalDate,Year,Month,Week,WeekDay,Day
0,20558.0,1960-01-01,1960,1,53,6,1
1,20571.0,1960-01-01,1960,1,53,6,1


In [156]:
df_ArrivalDate.count()

30

In [157]:
df_demographics_df.limit(2).toPandas()

Unnamed: 0,State,state_code,Median_Age,Male_Population,Female_Population,Total_Population,Number_of_Veterans,Foreign_born,Average_Household_Size
0,District of Columbia,DC,33.8,1598525,1762615,3361140,129815,475585,2.24
1,Arkansas,AR,32.74,1400724,1482165,2882889,154390,307753,2.53


In [158]:
df_demographics_df.count()

49

In [159]:
df_GlobalLandTemperaturesByCity.limit(2).toPandas()

Unnamed: 0,Country,AverageTemperature,AverageTemperatureUncertainty
0,Bosnia And Herzegovina,11.593267,0.428826
1,United Kingdom,10.110256,0.262104


In [160]:
df_GlobalLandTemperaturesByCity.count()

159

In [161]:
df_I94Port.limit(2).toPandas()

Unnamed: 0,portCode,cityName_stateCode,cityName,stateCode
0,ALC,"ALCAN, AK",Alcan,AK
1,ANC,"ANCHORAGE, AK",Anchorage,AK


In [162]:
df_I94Port.count()

660

### How to use the tables ?

- Which State of US the people visit for pleasure and what is the mean age of the state people.


- This model could be used to explore the temperature impact on the travelers and what kind of visa they get.


- This model could be used to explore the correlation between the number of airports for each city and state and the size of travelers flow.

In [163]:
# Which State of US the male and female visit for pleasure and what is the mean age of the state people
df_fact_i94=spark.read.parquet("abdullahoutput/fact_i94")
df_demographics_df=spark.read.parquet("abdullahoutput/demographics_df")

In [164]:
dff=df_fact_i94.join(df_demographics_df,[df_fact_i94.stateCode==df_demographics_df.state_code])

In [165]:
dff=dff.filter(dff.VisaCategoryName == "Pleasure")

In [166]:
dfff=dff.select(["State","Median_Age","Male_Population","Female_Population"]).dropDuplicates()

In [167]:
dfff.show()

+-------------+----------+---------------+-----------------+
|        State|Median_Age|Male_Population|Female_Population|
+-------------+----------+---------------+-----------------+
| South Dakota|     37.05|         613590|           611900|
|       Alaska|      32.2|         764725|           728750|
|         Iowa|     32.54|        1772066|          1831937|
|     Virginia|     34.43|        5802370|          6015740|
|      Alabama|     36.16|        2448200|          2715106|
|    Tennessee|     34.31|        5124189|          5565976|
|     Colorado|     35.82|        7273095|          7405250|
|     New York|     35.57|       23422799|         25579256|
|       Oregon|     36.13|        3537215|          3645330|
|     Kentucky|     35.95|        2262415|          2386970|
|Massachusetts|     35.55|        4841101|          5155944|
|   Washington|     35.29|        6228025|          6272510|
|     Arkansas|     32.74|        1400724|          1482165|
| Pennsylvania|     33.9

In [168]:
dfff.count()

49

#### Step 5: Complete Project Write Up
* The rationale for the choice of tools and technologies for the project.
    - Apache Spark is the technology that used in this project since it can handle large amounts of data in parallel distributed fashion with high velocity
* How often the data should be updated and why.
    - The dataset should be updated monthly since the I94 dataset that i worked on is updated monthly 
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     - I would have scaled up the cluster that i work through by increasing the number of nodes in my cluster so that the performance becomes fast
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     - I use Apache Airflow to schedule and run data pipelines for daily execution at 7am.
 * The database needed to be accessed by 100+ people.
     - We will use Amazon Redshift to perform the database analytics