# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [29]:
# Do all imports and installs here
import pandas as pd
import glob
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.types import StringType, IntegerType, DoubleType, BooleanType, DateType
from pyspark.sql.functions import concat, col, lit,udf
import re
import datetime as dt

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Create an ETL pipeline for US immigration data, demographic data of US cities and airports to allow queries on the relationship of these data sources and enable deeper analytics on the data.

#### Describe and Gather Data 
i94 Immigration Data : This data comes from the US National Tourism and Trade Office. This table is used for the fact table in this project.

U.S. City Demographic Data us-cities-demographics. This dataset contains population details of all US Cities and census-designated places includes gender & race information. This data came from OpenSoft. The table is grouped by state to get aggregated statistics.¶

Airport Codes is a simple table of airport codes and corresponding cities. The rows where IATA codes are available in the table are selected for this project. 

### Step 2: Explore and Assess the Data

## Immigration Data

In [1]:
print(glob.glob("../../data/*/*"))

['../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat', '../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat', '../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat', '../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat', '../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat', '../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat', '../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat', '../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat', '../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat', '../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat', '../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat', '../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat']


-> 12 files available, one for each month in 2016.

In [45]:
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat').drop('validres','delete_days','delete_mexl','delete_dup','delete_visa','delete_recdup')
df_spark.printSchema()
print(f"Rows: {df_spark.count()}")

for file in glob.glob("../../data/*/*"):
    print('Reading file '+file)
    if  '18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat' not in file:
        df_spark = df_spark.unionByName(spark.read.format('com.github.saurfang.sas.spark').load(file))
        print(f"Rows: {df_spark.count()}")
    
#df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016')
columns_in_file=len(df_spark.columns)
rows_in_file=df_spark.count()
distinct_rows_data=df_spark.distinct().count()
print(f"The total number of columns in the data file are {columns_in_file} and number of rows {rows_in_file} ")
print(f"Count of distinct number of rows {distinct_rows_data} ")

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [46]:
#write to parquet
df_spark.write.parquet("all_sas_data")
df_spark=spark.read.parquet("all_sas_data")

Over 40m entries in immigration data files!
File from June contains additional columns to the other files. The columns were removed since they are not needed and hinder aggregation of the data.

## US Cities demographic data

In [14]:
file_name = "us-cities-demographics.csv"
demographics_df = spark.read.csv(file_name, inferSchema=True, header=True, sep=';')

# display the first five records
demographics_df.limit(10).toPandas()


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712,41971,80683,4815,8355,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762,43270,85032,5783,3269,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751,58077,109828,5204,16315,2.65,NC,Asian,11060


## Airport Codes (incl. IATA codes)

Filter US airports with IATA code

In [4]:
file_name = "airport-codes_csv.csv"
airports_df = spark.read.csv(file_name, inferSchema=True, header=True, sep=',')
airports_df = airports_df.filter("iso_country='US'").filter("iata_code != 'none'")
# display the first five records
airports_df.limit(10).toPandas()


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,07FA,small_airport,Ocean Reef Club Airport,8,,US,US-FL,Key Largo,07FA,OCA,07FA,"-80.274803161621, 25.325399398804"
1,0AK,small_airport,Pilot Station Airport,305,,US,US-AK,Pilot Station,,PQS,0AK,"-162.899994, 61.934601"
2,0CO2,small_airport,Crested Butte Airpark,8980,,US,US-CO,Crested Butte,0CO2,CSE,0CO2,"-106.928341, 38.851918"
3,0TE7,small_airport,LBJ Ranch Airport,1515,,US,US-TX,Johnson City,0TE7,JCY,0TE7,"-98.62249755859999, 30.251800537100003"
4,13MA,small_airport,Metropolitan Airport,418,,US,US-MA,Palmer,13MA,PMX,13MA,"-72.31140136719999, 42.223300933800004"
5,13Z,seaplane_base,Loring Seaplane Base,0,,US,US-AK,Loring,13Z,WLR,13Z,"-131.636993408, 55.6012992859"
6,16A,small_airport,Nunapitchuk Airport,12,,US,US-AK,Nunapitchuk,PPIT,NUP,16A,"-162.440454, 60.905591"
7,16K,seaplane_base,Port Alice Seaplane Base,0,,US,US-AK,Port Alice,16K,PTC,16K,"-133.597, 55.803"
8,19AK,small_airport,Icy Bay Airport,50,,US,US-AK,Icy Bay,19AK,ICY,19AK,"-141.662002563, 59.96900177"
9,19P,seaplane_base,Port Protection Seaplane Base,0,,US,US-AK,Port Protection,19P,PPV,19P,"-133.61000061035, 56.328800201416"


In [5]:
airports_df.select('type').distinct().collect()

[Row(type='large_airport'),
 Row(type='seaplane_base'),
 Row(type='heliport'),
 Row(type='closed'),
 Row(type='medium_airport'),
 Row(type='small_airport')]

In [6]:
airports_df.groupBy("type").count().orderBy("type").show()

+--------------+-----+
|          type|count|
+--------------+-----+
|        closed|   63|
|      heliport|   19|
| large_airport|  167|
|medium_airport|  653|
| seaplane_base|   72|
| small_airport| 1045|
+--------------+-----+



In [7]:
airports_df.count()

2019

In [8]:
airports_df.groupBy("iso_country","iso_region").count().orderBy("iso_region").show(60)

+-----------+----------+-----+
|iso_country|iso_region|count|
+-----------+----------+-----+
|         US|     US-AK|  334|
|         US|     US-AL|   30|
|         US|     US-AR|   29|
|         US|     US-AZ|   46|
|         US|     US-CA|  157|
|         US|     US-CO|   38|
|         US|     US-CT|    9|
|         US|     US-DC|    5|
|         US|     US-DE|    4|
|         US|     US-FL|   78|
|         US|     US-GA|   37|
|         US|     US-HI|   22|
|         US|     US-IA|   41|
|         US|     US-ID|   17|
|         US|     US-IL|   41|
|         US|     US-IN|   37|
|         US|     US-KS|   36|
|         US|     US-KY|   18|
|         US|     US-LA|   22|
|         US|     US-MA|   22|
|         US|     US-MD|   17|
|         US|     US-ME|   19|
|         US|     US-MI|   48|
|         US|     US-MN|   35|
|         US|     US-MO|   29|
|         US|     US-MS|   27|
|         US|     US-MT|   27|
|         US|     US-NC|   34|
|         US|     US-ND|   15|
|       

Add column for state code

In [9]:
def getStateCode(iso_region):
    try:
        result = iso_region.split("-")[1]
    except Exception:
        result = 'XX'
    return result

getStateCodeUDF = udf(lambda iso_region: getStateCode(iso_region), StringType())
airports_df = airports_df.withColumn("state_code", getStateCodeUDF(sf.col("iso_region")))

airports_df.head(5)

[Row(ident='07FA', type='small_airport', name='Ocean Reef Club Airport', elevation_ft=8, continent='NA', iso_country='US', iso_region='US-FL', municipality='Key Largo', gps_code='07FA', iata_code='OCA', local_code='07FA', coordinates='-80.274803161621, 25.325399398804', state_code='FL'),
 Row(ident='0AK', type='small_airport', name='Pilot Station Airport', elevation_ft=305, continent='NA', iso_country='US', iso_region='US-AK', municipality='Pilot Station', gps_code=None, iata_code='PQS', local_code='0AK', coordinates='-162.899994, 61.934601', state_code='AK'),
 Row(ident='0CO2', type='small_airport', name='Crested Butte Airpark', elevation_ft=8980, continent='NA', iso_country='US', iso_region='US-CO', municipality='Crested Butte', gps_code='0CO2', iata_code='CSE', local_code='0CO2', coordinates='-106.928341, 38.851918', state_code='CO'),
 Row(ident='0TE7', type='small_airport', name='LBJ Ranch Airport', elevation_ft=1515, continent='NA', iso_country='US', iso_region='US-TX', municipali

### Country codes extracted from I94_SAS_Labels_Descriptions.SAS

In [33]:
country_codes = 'country_codes.txt'
with open(country_codes) as f:
    lines = f.readlines()
lines = [line.replace('"','').replace('\n','').replace("'",'') for line in lines]

regexp = re.compile(r"^\s*(\d{3})\s*=\s*(.*).*")
matches = [regexp.match(line) for line in lines]
country_dict = {
    'country_code' : list(),
    'country_name' : list()
}

for m in matches:
    if m is None:
        continue
    country_dict['country_code'].append(int(m.group(1)))
    country_dict['country_name'].append(m.group(2))

countries_pd = pd.DataFrame.from_dict(country_dict)
countries_pd.count()
countries_df = spark.createDataFrame(countries_pd)

In [34]:
countries_pd

Unnamed: 0,country_code,country_name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA
5,324,ANGOLA
6,529,ANGUILLA
7,518,ANTIGUA-BARBUDA
8,687,ARGENTINA
9,151,ARMENIA


### City codes extracted from I94_SAS_Labels_Descriptions.SAS

In [17]:
city_codes = 'city_codes.txt'
with open(city_codes) as f:
    lines = f.readlines()
lines = [line.replace("   '",'').replace("'",'').replace("\n","").replace("\t","") for line in lines]

regexp = re.compile(r'^(.{3})=(.*),(\s*\w{2})(\s+.*|$).*')
matches = [regexp.match(line) for line in lines]
city_dict = {
    'code' : list(),
    'name' : list(),
    'state_code' : list()
}
for m in matches:
    if m is None:
        continue
    city_dict['code'].append(m.group(1).strip())
    city_dict['name'].append(m.group(2).strip())
    city_dict['state_code'].append(m.group(3).strip())

us_cities_pd = pd.DataFrame.from_dict(city_dict)
us_cities_pd


Unnamed: 0,code,name,state_code
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK
5,DTH,DUTCH HARBOR,AK
6,EGL,EAGLE,AK
7,FRB,FAIRBANKS,AK
8,HOM,HOMER,AK
9,HYD,HYDER,AK


## Demographics Data

In [32]:
demographics_pd = demographics_df.toPandas()

for column in demographics_pd.columns:
    print(f"Null data in {column}? : {any(demographics_pd[column].isnull())}")

Null data in City? : False
Null data in State? : False
Null data in Median Age? : False
Null data in Male Population? : True
Null data in Female Population? : True
Null data in Total Population? : False
Null data in Number of Veterans? : True
Null data in Foreign-born? : True
Null data in Average Household Size? : True
Null data in State Code? : False
Null data in Race? : False
Null data in Count? : False


Empty cells only in non relevant columns.

In [33]:
demographics_pd.shape[0] - demographics_pd.dropna().shape[0]

16

In [8]:
demographics_pd.shape

(2891, 12)

Only 16 rows from 2891 are missing values!
### remove uninteresting columns 

In [15]:
# let's change column types 
demographics_df = demographics_df.withColumn("Median Age", demographics_df["Median Age"].cast(DoubleType()))\
                                .withColumn("Male Population", demographics_df["Male Population"].cast(IntegerType()))\
                                .withColumn("Female Population", demographics_df["Female Population"].cast(IntegerType()))\
                                .withColumn("Total Population", demographics_df["Total Population"].cast(IntegerType()))\
                                .withColumn("Count", demographics_df["Count"].cast(IntegerType()))
demographics_df = demographics_df.select("City","State","Male Population","Female Population","Total Population","State Code","Race","Count")
demographics_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



Add city code columns

In [18]:
def getCityCode(s, sc):
    try:
        result = us_cities_pd.loc[(us_cities_pd['name'] == s.upper()) & (us_cities_pd['state_code'] == sc)]['code'].iloc[0]
    except Exception:
        result = '000'
    return result

getCityCodeUDF = udf(lambda c,sc: getCityCode(c,sc), StringType())

demographics_df = demographics_df.withColumn("city_code", getCityCodeUDF(sf.col("City"), sf.col("State Code")))
demographics_df.filter("city_code != '000'").count()


571

Found 571 US cities in the given demographic data and in SAS data. Let's focus on these cities!

## US airports

In [13]:
def getCityCode(s, sc):
    try:
        result = us_cities_pd.loc[(us_cities_pd['name'] == s.upper()) & (us_cities_pd['state_code'] == sc)]['code'].iloc[0]
    except Exception:
        result = '000'
    return result

getCityCodeUDF = udf(lambda c,sc: getCityCode(c,sc), StringType())

airports_df = airports_df.withColumn("city_code", getCityCodeUDF(sf.col("municipality"), sf.col("state_code")))
airports_df.filter("city_code != '000'").count()


1990

Found 1990 airports in cities of the cities data set

In [16]:
airports_df.filter("city_code != '000'").head()

Row(ident='00MT', type='closed', name='Sands Ranch Airport', elevation_ft=2600, continent='NA', iso_country='US', iso_region='US-MT', municipality='Havre', gps_code=None, iata_code=None, local_code=None, coordinates='-109.705002, 48.537498', state_code='MT', city_code='HVR')

### US state codes extracted from I94_SAS_Labels_Descriptions.SAS

In [30]:
us_state_codes = 'us_state_codes.txt'
with open(us_state_codes) as f:
    lines = f.readlines()
lines = [line.replace("'",'').replace("\n","").replace("\t","") for line in lines]

regexp = re.compile(r'^(.{2})=(.*)')
matches = [regexp.match(line) for line in lines]
us_state_dict = {
    'code' : list(),
    'name' : list(),
}
for m in matches:
    if m is None:
        continue
    us_state_dict['code'].append(m.group(1))
    us_state_dict['name'].append(m.group(2))

us_states_pd = pd.DataFrame.from_dict(us_state_dict)
us_states_pd

Unnamed: 0,code,name
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA
5,CO,COLORADO
6,CT,CONNECTICUT
7,DE,DELAWARE
8,DC,DIST. OF COLUMBIA
9,FL,FLORIDA


### Immigration Data

In [4]:
immigration_df=spark.read.parquet("all_sas_data")
immigration_df.head()

Row(cicid=5680949.0, i94yr=2016.0, i94mon=7.0, i94cit=117.0, i94res=117.0, i94port='NYC', arrdate=20659.0, i94mode=1.0, i94addr='NY', depdate=None, i94bir=30.0, i94visa=3.0, count=1.0, dtadfile='20160724', visapost='NPL', occup=None, entdepa='G', entdepd=None, entdepu=None, matflag=None, biryear=1986.0, dtaddto='D/S', gender='F', insnum=None, airline='IG', admnum=2947450085.0, fltno='3940', visatype='F1')

Convert i94cit and i94res to integer and replace them with country codes

In [30]:
immitgration_df = immigration_df.withColumn("i94cit",col("i94cit").cast(IntegerType()))\
.withColumn("i94res",col("i94res").cast(IntegerType()))

immitgration_df.head()

Row(cicid=5680949.0, i94yr=2016.0, i94mon=7.0, i94cit=117, i94res=117, i94port='NYC', arrdate=20659.0, i94mode=1.0, i94addr='NY', depdate=None, i94bir=30.0, i94visa=3.0, count=1.0, dtadfile='20160724', visapost='NPL', occup=None, entdepa='G', entdepd=None, entdepu=None, matflag=None, biryear=1986.0, dtaddto='D/S', gender='F', insnum=None, airline='IG', admnum=2947450085.0, fltno='3940', visatype='F1')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Correlation between demographic data and immigration data is evaluated based on the city code. The code has been calculated and added where necessary to all fact datasets.

#### 3.2 Mapping Out Data Pipelines
Build spark views from the explored dataframes to allow easy queuing on the data.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [30]:
immigrations_df=spark.read.parquet("all_sas_data")

In [31]:
get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
    
immigrations_df = immigrations_df.withColumn("arrdate", get_datetime(immigrations_df.arrdate))

immigrations_df.createOrReplaceTempView('immigrations_table')
immigrations = spark.sql("""
    SELECT 
        i94yr as year,
        i94mon as month,
        i94cit as origin_city,
        i94port as destination_city,
        arrdate as arrival_date,
        visatype as visatype,
        (CASE WHEN i94visa = '1.0' 
            THEN 'business'
            ELSE 
                (CASE WHEN i94visa = '3.0' 
                     THEN 'student' 
                     ELSE (CASE WHEN i94visa = '2.0' 
                              THEN 'pleasure' 
                              ELSE 'unknown'
                          END)
                END)
        END) as visa
    FROM immigrations_table
""")
immigrations.limit(20).toPandas()
visatype = spark.sql(""" select visatype, count(*) from immigrations_table group by (visatype)""")
visatype.toPandas()

Unnamed: 0,visatype,count(1)
0,F2,68866
1,GMB,2728
2,B2,15188785
3,F1,1487432
4,CPL,236
5,I1,2825
6,WB,2940456
7,M1,16306
8,B1,2282096
9,WT,16915615


In [12]:
immigrations.groupBy("visa","visatype").count().orderBy("visa","visatype").show(truncate=False)


+--------+--------+--------+
|visa    |visatype|count   |
+--------+--------+--------+
|business|B1      |2282096 |
|business|E1      |48905   |
|business|E2      |259215  |
|business|GMB     |2728    |
|business|I       |39054   |
|business|I1      |2825    |
|business|WB      |2940456 |
|pleasure|B2      |15188785|
|pleasure|CP      |272007  |
|pleasure|CPL     |236     |
|pleasure|GMT     |1265275 |
|pleasure|SBP     |61      |
|pleasure|WT      |16915615|
|student |F1      |1487432 |
|student |F2      |68866   |
|student |M1      |16306   |
|student |M2      |667     |
+--------+--------+--------+



Join immigration data with demographics data and countries table

In [39]:
demographics_df = demographics_df.withColumnRenamed('total population', 'total_population')
demographics_df.createOrReplaceTempView('demographics_table')
countries_df.createOrReplaceTempView('countries')

demographics_immigration = spark.sql("""
    SELECT 
        i94yr as year,
        i94mon as month,
        i94cit as origin_city,
        country_name,
        i94port as destination_city,
        arrdate as arrival_date,
        visatype as visatype,
        (CASE WHEN i94visa = '1.0' 
            THEN 'business'
            ELSE 
                (CASE WHEN i94visa = '3.0' 
                     THEN 'student' 
                     ELSE (CASE WHEN i94visa = '2.0' 
                              THEN 'pleasure' 
                              ELSE 'unknown'
                          END)
                END)
        END) as visa,
        total_population,
        City as city,
        State as state
    FROM immigrations_table
    LEFT JOIN demographics_table ON demographics_table.city_code = immigrations_table.i94port
    LEFT JOIN countries ON countries.country_code = immigrations_table.i94cit
""")
demographics_immigration.limit(20).toPandas()

Unnamed: 0,year,month,origin_city,country_name,destination_city,arrival_date,visatype,visa,total_population,city,state
0,2016.0,7.0,299.0,MONGOLIA,LOS,2016-07-24,B2,pleasure,3971896,Los Angeles,California
1,2016.0,7.0,299.0,MONGOLIA,LOS,2016-07-24,B2,pleasure,3971896,Los Angeles,California
2,2016.0,7.0,299.0,MONGOLIA,LOS,2016-07-24,B2,pleasure,3971896,Los Angeles,California
3,2016.0,7.0,299.0,MONGOLIA,LOS,2016-07-24,B2,pleasure,3971896,Los Angeles,California
4,2016.0,7.0,299.0,MONGOLIA,LOS,2016-07-24,B2,pleasure,3971896,Los Angeles,California
5,2016.0,7.0,299.0,MONGOLIA,LOS,2016-07-24,B2,pleasure,3971896,Los Angeles,California
6,2016.0,7.0,299.0,MONGOLIA,LOS,2016-07-24,B2,pleasure,3971896,Los Angeles,California
7,2016.0,7.0,299.0,MONGOLIA,LOS,2016-07-24,B2,pleasure,3971896,Los Angeles,California
8,2016.0,7.0,299.0,MONGOLIA,LOS,2016-07-24,B2,pleasure,3971896,Los Angeles,California
9,2016.0,7.0,299.0,MONGOLIA,LOS,2016-07-24,B2,pleasure,3971896,Los Angeles,California


#### 4.2 Data Quality Checks

Quality Checks have been executed during loading and exploring the data. 
Each process step has benn monitored by printing out the result and checking its validity.

* <u>Demographic Data</u>\
Check for empty cells. Check if result contain all 40m datasets

* <u>Immigration data</u>\
Aggregate all given data to one dataframe. Remove columns not available in all datasets.

* <u>Airport Codes</u>\
Filter for US airports with valid state code.

#### 4.3 Data dictionary 

##### 4.3.1 Immigration Data 



<table align="left">
<tr>
    <th>Column</th><th>Description</th>
    </tr>
<tr>
    <td>cicid</td><td>Unique record ID</td>
</tr>
<tr><td>i94yr</td><td>4 digit year</td></tr>
<tr><td>ii94mon</td><td>Numeric month</td></tr>
<tr><td>ii94cit</td><td>3 digit code for immigrant country of birth</td></tr>
<tr><td>ii94res</td><td>3 digit code for immigrant country of residence</td></tr>
<tr><td>ii94port</td><td>Port of admission</td></tr>
<tr><td>iarrdate</td><td>Arrival Date in the USA</td></tr>
<tr><td>ii94mode</td><td>Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)\</td></tr>
<tr><td>ii94addr</td><td>USA State of arrival</td></tr>
<tr><td>idepdate</td><td>Departure Date from the USA</td></tr>
<tr><td>ii94bir</td><td>Age of Respondent in Years</td></tr>
<tr><td>ii94visa</td><td>Visa codes collapsed into three categories</td></tr>
<tr><td>icount</td><td>Field used for summary statistics</td></tr>
<tr><td>idtadfile</td><td>Character Date Field - Date added to I-94 Files</td></tr>
<tr><td>ivisapost</td><td>Department of State where where Visa was issued</td></tr>
<tr><td>ioccup</td><td>Occupation that will be performed in U.S</td></tr>
<tr><td>ientdepa</td><td>Arrival Flag - admitted or paroled into the U.S.</td></tr>
<tr><td>ientdepd</td><td>Departure Flag - Departed, lost I-94 or is deceased</td></tr>
<tr><td>ientdepu</td><td>Update Flag - Either apprehended, overstayed, adjusted to perm residence</td></tr>
<tr><td>imatflag</td><td>Match flag - Match of arrival and departure records</td></tr>
<tr><td>ibiryear</td><td>4 digit year of birth</td></tr>
<tr><td>idtaddto</td><td>Character Date Field - Date to which admitted to U.S. (allowed to stay until)</td></tr>
<tr><td>igender</td><td>Non-immigrant sex</td></tr>
<tr><td>iinsnum</td><td>INS number</td></tr>
<tr><td>iairline</td><td>Airline used to arrive in U.S.</td></tr>
<tr><td>iadmnum</td><td>Admission Number</td></tr>
<tr><td>ifltno</td><td>Flight number of Airline used to arrive in U.S.</td></tr>
<tr><td>ivisatype</td><td>Class of admission legally admitting the non-immigrant to temporarily stay in U.S.</td></tr>
</table>


##### 4.3.2 Demographic Data

<table align="left">
      <tr>,
        <th >Column</th>
        <th >Description</th>
      </tr>
     <tr><td>City</td><td>City Name</td>
     <tr><td>State</td><td>US State where the city is located</td>
     <tr><td>Median Age</td><td>Median age of the population</td>
     <tr><td>Male Population</td><td>Count of male population</td>
     <tr><td>Female Population</td><td>Count of female population</td>
     <tr><td>Total Population</td><td>Count of total population</td>
     <tr><td>Number of Veterans</td><td>Count of total veterans</td>
     <tr><td>Foreign born</td><td>Count of residents of the city that were not born in that city</td>
     <tr><td>Average household Size</td><td>Average household size in the city</td>
     <tr><td>State Code</td><td>2 character US State code</td>
     <tr><td>Race</td><td>Respondent race</td></tr>
    <tr><td>Count</td><td>Count of people of race</td></tr>
         
</table>


#### Step 5: Complete Project Write Up
* <u>choice of tools and technologies:</u>

To get an overview of the data and run a few queries it seem sufficient to set up a Juniper note book and use python to explore the datasets. For quick access Spark is used to load the data and store in parquet format which allows quick load and save.

* <u>update of data:</u>

Immigration data should be updated every month because it is collected monthly.
For demographic data it seem sufficient to be updated oncy a year or even less.


#### If the data was increased by 100x:
Spark scales very well horizontically. Set up spark cluster according to requirements regarding processing time.

#### The data populates a dashboard that must be updated on a daily basis by 7am every day.
Use a scheduler like Airflow. Use the code form this note book to create a DAG and schedule it accordingly

#### The database needed to be accessed by 100+ people.

Set up a distributed database like Redshift to store the data permanently in fact and dimension tables.