# Immigration analysis
### Data Engineering Capstone Project

#### Project Summary
It will be analysed immigration data alongside temperature, demographic and airport code data. Generating a better quantitative understanding of immigration across the years in the USA.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
from pyspark.sql import SparkSession

### Step 1: Scope the Project and Gather Data

#### Scope 
The project will use Airflow, AWS EMR and Delta to generate bronze, silver and gold layers of data to facilitate the analyses of immigration data in the USA.

#### Describe and Gather Data 
Four datasets are going to be used in this project:
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. It has information about immigration along the years;
- World Temperature Data: This dataset came from Kaggle. It has information about temperature in cities around the world;
- U.S. City Demographic Data: This data comes from OpenSoft. It has information about USA demography;
- Airport Code Table: This is a simple table of airport codes and corresponding cities.

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [2]:
access_key = ""
secret_key = ""
with open('credentials-airflow-user.txt') as f:
    content = f.read()
    access_key = content.split(",")[0]
    secret_key = content.split(",")[1]

In [3]:
# !pyspark --packages io.delta:delta-core_2.12:2.2.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
# !pip install delta

In [4]:
# !pip install delta-spark

In [6]:
from delta import *
import pyspark

builder = pyspark.sql.SparkSession.builder.appName("MyApp"). \
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.2.2").\
    config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").\
    config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").\
    config("spark.hadoop.fs.s3a.access.key", access_key).\
    config("spark.hadoop.fs.s3a.secret.key", secret_key).\
    config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider').\
    enableHiveSupport()

spark = builder.getOrCreate()

23/04/04 14:43:45 WARN Utils: Your hostname, BRSAOLN042734 resolves to a loopback address: 127.0.1.1; using 192.168.0.45 instead (on interface wlp0s20f3)
23/04/04 14:43:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


https://repos.spark-packages.org/ added as a remote repository with the name: repo-1
Ivy Default Cache set to: /home/renan_nunes/.ivy2/cache
The jars for the packages stored in: /home/renan_nunes/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b729e409-01ee-40ec-9fec-29dc7d8ef3d5;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.2.0 in central
	found io.delta#delta-storage;2.2.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 118ms :: artifacts dl 5ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	io.delta#delta-core_2.12;2.2.0 from central in [default]
	io.delta#delta-storage;2.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [defa

:: loading settings :: url = jar:file:/home/renan_nunes/.virtualenvs/spark-udacity/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
23/04/04 14:43:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [7]:
airport_codes_df = spark.read.option("header", True).csv('s3a://landing-layer-udacity-nd//airport-codes_csv.csv')
airport_codes_df.printSchema()

23/04/04 14:43:58 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/04/04 14:43:58 WARN BasicProfileConfigLoader: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code.


[Stage 0:>                                                          (0 + 1) / 1]

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



                                                                                

In [8]:
sas_data_df = spark.read.parquet("data/sas_data")
sas_data_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [9]:
print(f"Rows: {sas_data_df.count()}; Columns: {len(sas_data_df.columns)}")

Rows: 3096313; Columns: 28


In [10]:
pd.options.display.max_columns = None
sas_data_df.limit(10).toPandas()

23/04/04 14:44:06 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,57.0,2.0,1.0,20160430,ACK,,G,O,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2
6,5748523.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,66.0,2.0,1.0,20160430,ACK,,G,O,,M,1950.0,10292016,F,,NZ,94979690000.0,10,B2
7,5748524.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,41.0,2.0,1.0,20160430,ACK,,G,O,,M,1975.0,10292016,F,,NZ,94979750000.0,10,B2
8,5748525.0,2016.0,4.0,245.0,464.0,HOU,20574.0,1.0,FL,20581.0,27.0,2.0,1.0,20160430,ACK,,G,O,,M,1989.0,10292016,M,,NZ,94973250000.0,28,B2
9,5748526.0,2016.0,4.0,245.0,464.0,LOS,20574.0,1.0,CA,20581.0,26.0,2.0,1.0,20160430,ACK,,G,O,,M,1990.0,10292016,F,,NZ,95013550000.0,2,B2


In [20]:
from pyspark.sql.functions import min, max
sas_data_df.select(min('i94yr'), max('i94yr')).collect()

[Row(min(i94yr)=2016.0, max(i94yr)=2016.0)]

This way we can see how this dataset looks like. It is also possible to notice that `i94port` seems to be a code, so next dataset to explore will be the "Airport Code Table"

In [21]:
airport_codes_df = spark.read.option("header", True).option("inferSchema", True).csv('data/airport-codes_csv.csv')
airport_codes_df.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [22]:
airport_codes_df.limit(10).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


In [23]:
airport_codes_df.filter("iata_code='LOS'").show()

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
| DNMM|large_airport|Murtala Muhammed ...|         135|       AF|         NG|     NG-LA|       Lagos|    DNMM|      LOS|      null|3.321160078048706...|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+



The "LOS" airport exists but it is from Nigeria while the immigration data has same data from USA cities on `i94addr` which means that these columns probably doesn't represent the same thing so, on the data modeling, they won't have any relations between the two resulting tables

Taking a look at "U.S. City Demographic Data" now:

In [24]:
us_cities_df = spark.read.option("header", True).\
                     option("delimiter", ";").\
                     option("inferSchema", True).\
                     csv("data/us-cities-demographics.csv")
us_cities_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [25]:
from pyspark.sql import functions as F

us_cities_df = us_cities_df.select([F.col(x).alias(x.lower().replace(' ', '_').replace('-', '_')) \
        for x in us_cities_df.columns])

In [26]:
us_cities_df.limit(10).toPandas()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,race,count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712,41971,80683,4815,8355,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762,43270,85032,5783,3269,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751,58077,109828,5204,16315,2.65,NC,Asian,11060


Finally, Kaggle has some graphs for the temperatura data, so it won't be necessary to redo here.

#### Cleaning Steps
The main data preparation step needed is to extract the meaning of some immigration data using the `I94_SAS_Labels_Descriptions.SAS` file, besides that, the temperature could use some filtering (for USA cities and for not null values) but, since it is simpler, it will be done directly on the Airflow DAG.

In [79]:
from pyspark.sql.types import StructType, StructField, StringType

def generate_spark_dataframe_from_file(file_path, header_identifier):
    with open(file_path) as f:
        file_content = f.read()
    start_of_values = file_content.index(header_identifier)
    end_of_values = file_content.index(";", start_of_values)

    searched_values = file_content[start_of_values:end_of_values]

    schema = StructType([
      StructField('code', StringType(), True),
      StructField('value', StringType(), True)
      ])
    values = []

    for line in searched_values.split("\n"):
        if line.find("=") != -1:
            breaked_line = line.split("=")
            values.append([breaked_line[0].strip(" '\t"), breaked_line[1].strip(" '\t")])

        values_dataframe = spark.createDataFrame(values, schema)

    return values_dataframe

In [81]:
I94cit_res_df = generate_spark_dataframe_from_file("data/I94_SAS_Labels_Descriptions.SAS", "I94CIT")
print(I94cit_res_df.printSchema())

I94port_df = generate_spark_dataframe_from_file("data/I94_SAS_Labels_Descriptions.SAS", "I94PORT")
print(I94port_df.printSchema())

I94mode_df = generate_spark_dataframe_from_file("data/I94_SAS_Labels_Descriptions.SAS", "I94MODE")
print(I94mode_df.printSchema())

I94addr_df = generate_spark_dataframe_from_file("data/I94_SAS_Labels_Descriptions.SAS", "I94ADDR")
print(I94addr_df.printSchema())

I94visa_df = generate_spark_dataframe_from_file("data/I94_SAS_Labels_Descriptions.SAS", "I94VISA")
print(I94visa_df.printSchema())

root
 |-- code: string (nullable = true)
 |-- value: string (nullable = true)

None
root
 |-- code: string (nullable = true)
 |-- value: string (nullable = true)

None
root
 |-- code: string (nullable = true)
 |-- value: string (nullable = true)

None
root
 |-- code: string (nullable = true)
 |-- value: string (nullable = true)

None
root
 |-- code: string (nullable = true)
 |-- value: string (nullable = true)

None


In [86]:
sas_data_df.join(I94cit_res_df, sas_data_df.i94cit == I94cit_res_df.code, "inner")\
    .drop("code").withColumnRenamed("value", "i94cit_value")\
    .join(I94cit_res_df, sas_data_df.i94res == I94cit_res_df.code, "inner")\
    .drop("code").withColumnRenamed("value", "i94res_value")\
    .join(I94port_df, sas_data_df.i94port == I94port_df.code, "inner")\
    .drop("code").withColumnRenamed("value", "i94port_value")\
    .join(I94mode_df, sas_data_df.i94mode == I94mode_df.code, "inner")\
    .drop("code").withColumnRenamed("value", "i94mode_value")\
    .join(I94addr_df, sas_data_df.i94addr == I94addr_df.code, "inner")\
    .drop("code").withColumnRenamed("value", "i94addr_value")\
    .join(I94visa_df, sas_data_df.i94visa == I94visa_df.code, "inner")\
    .drop("code").withColumnRenamed("value", "i94visa_value")\
    .limit(20).toPandas()

                                                                                

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,i94cit_value,i94res_value,i94port_value,i94mode_value,i94addr_value,i94visa_value
0,5761453.0,2016.0,4.0,299.0,299.0,WAS,20574.0,1.0,DC,20580.0,50.0,1.0,1.0,20160430,ULN,,G,O,,M,1966.0,10292016,M,,KE,94964160000.0,00093,B1,MONGOLIA,MONGOLIA,WASHINGTON DC,Air,DIST. OF COLUMBIA,Business
1,5761458.0,2016.0,4.0,299.0,299.0,WAS,20574.0,1.0,WA,20588.0,40.0,1.0,1.0,20160430,ULN,,G,O,,M,1976.0,10292016,M,,KE,94964490000.0,00093,B1,MONGOLIA,MONGOLIA,WASHINGTON DC,Air,WASHINGTON,Business
2,5761459.0,2016.0,4.0,299.0,299.0,LOS,20574.0,1.0,CA,,27.0,1.0,1.0,20160430,ULN,,G,,,,1989.0,04292018,F,,KE,94975730000.0,00017,E2,MONGOLIA,MONGOLIA,"LOS ANGELES, CA",Air,CALIFORNIA,Business
3,5761467.0,2016.0,4.0,299.0,299.0,SFR,20574.0,1.0,CA,20581.0,31.0,1.0,1.0,20160430,ULN,,G,O,,M,1985.0,10292016,F,,UA,94981180000.0,00892,B1,MONGOLIA,MONGOLIA,"SAN FRANCISCO, CA",Air,CALIFORNIA,Business
4,4178372.0,2016.0,4.0,582.0,582.0,PHO,20566.0,1.0,NY,20567.0,30.0,1.0,1.0,20160422,GDL,,G,R,,M,1986.0,10212016,M,,AM,94205520000.0,0400C,B1,"MEXICO Air Sea, and Not Reported (I-94, no lan...","MEXICO Air Sea, and Not Reported (I-94, no lan...","PHOENIX, AZ",Air,NEW YORK,Business
5,5761472.0,2016.0,4.0,299.0,299.0,SFR,20574.0,1.0,VA,20581.0,32.0,1.0,1.0,20160430,ULN,,G,O,,M,1984.0,10292016,F,,UA,94980810000.0,00892,B1,MONGOLIA,MONGOLIA,"SAN FRANCISCO, CA",Air,VIRGINIA,Business
6,5761474.0,2016.0,4.0,299.0,299.0,SFR,20574.0,1.0,VA,20595.0,29.0,1.0,1.0,20160430,ULN,,G,O,,M,1987.0,10292016,F,,UA,94980750000.0,00892,B1,MONGOLIA,MONGOLIA,"SAN FRANCISCO, CA",Air,VIRGINIA,Business
7,5894568.0,2016.0,4.0,692.0,112.0,MIA,20574.0,1.0,MI,20597.0,22.0,1.0,1.0,20160430,RME,,G,O,,M,1994.0,10292016,F,,AB,94982940000.0,07210,B1,ECUADOR,GERMANY,"MIAMI, FL",Air,MICHIGAN,Business
8,5894697.0,2016.0,4.0,692.0,692.0,HOU,20574.0,1.0,TX,20584.0,26.0,1.0,1.0,20160430,GYQ,,G,O,,M,1990.0,10262016,F,,UA,95024100000.0,01033,B1,ECUADOR,ECUADOR,"HOUSTON, TX",Air,TEXAS,Business
9,5894698.0,2016.0,4.0,692.0,692.0,HOU,20574.0,1.0,TX,20584.0,25.0,1.0,1.0,20160430,GYQ,,G,O,,M,1991.0,10292016,M,,UA,95024020000.0,01033,B1,ECUADOR,ECUADOR,"HOUSTON, TX",Air,TEXAS,Business


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
To have a more flexibility, reduce costs and still keep ACID transactions, it was decided that a medallion architecture (with bronze, silver and gold layers) with Delta Lake will be appropriate. It will be presented the data models for each layer:

**- Bronze:**

It will be the data as is, so without filters, merges or other operations. The schemas are:

| Table               | Columns                                                                                                                                                                                                                                       | Id    | FK                                                 | Description                                                     |
|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------|----------------------------------------------------|-----------------------------------------------------------------|
| i94_immigration     | cicid, i94yr, i94mon, i94cit, i94res, i94port, arrdate, i94mode, i94addr, depdate, i94bir, i94visa, count, dtadfile, visapost, occup, entdepa, entdepd, entdepu, matflag, biryear, dtaddto, gender, insnum, airline, admnum, fltno, visatype | cicid | i94cit, i94res, i94port, i94mode, i94addr, i94visa | Table with immigration data                                     |
| i94cit_res          | code, value                                                                                                                                                                                                                                   | code  | -                                                  | Table with codes and corresponding values for i94cit and i94res |
| i94port             | code, value                                                                                                                                                                                                                                   | code  | -                                                  | Table with codes and corresponding values for i94port           |
| i94mode             | code, value                                                                                                                                                                                                                                   | code  | -                                                  | Table with codes and corresponding values for i94mode           |
| i94addr             | code, value                                                                                                                                                                                                                                   | code  | -                                                  | Table with codes and corresponding values for i94addr           |
| i94visa             | code, value                                                                                                                                                                                                                                   | code  | -                                                  | Table with codes and corresponding values for i94visa           |
| world_temperature   | dt, averagetemperature, averagetemperatureuncertainty, city, country, latitude, longitude                                                                                                                                                  | -     | -                                                  | Table with temperature data                                     |
| us_city_demographic | city, state, median_age, male_population, female_population, total_population, number_of_veterans, foreign_born, average_household_size, state_code, race, count                                                                              | -     | -                                                  | Table with US city demographic data                             |
| airport_codes  | ident, type, name, elevation_ft, continent, iso_country, iso_region, municipality, gps_code, iata_code, local_code, coordinates                                                                                                               | -     | -                                                  | Table with airport code data                                    |

**- Silver:**

To reduce the amount of tables, some merges and filters will be applied, resulting in better tables for the business to use. The resulting schema is:

| Table               | Columns                                                                                                                                                                                                                                       | Id    | Description                                                     |
|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------|-----------------------------------------------------------------|
| i94_immigration             | cicid, i94yr, i94mon, i94cit_value, i94res_value, i94port_value, arrdate, i94mode_value, i94addr_value, depdate, i94bir, i94visa_value, count, dtadfile, visapost, occup, entdepa, entdepd, entdepu, matflag, biryear, dtaddto, gender, insnum, airline, admnum, fltno, visatype | cicid | Table with immigration data         |
| world_temperature   | dt, averagetemperature, averagetemperatureuncertainty, city, country, latitude, longitude                                                                                                                                                  | -     | Table with temperature data                                     |
| us_city_demographic | city, state, median_age, male_population, female_population, total_population, number_of_veterans, foreign_born, average_household_size, state_code, race, count                                                                              | -     | Table with US city demographic data                             |
| airport_codes  | ident, type, name, elevation_ft, continent, iso_country, iso_region, municipality, gps_code, iata_code, local_code, coordinates                                                                                                               | -     | Table with airport code data                                    |

**- Gold:**

The gold layer is a project/object specific database, therefore, in this case, will be a single table aiming to show grouped data for US cities by date alongside the temperature for the period to analyse if it appears to have an impact on it. The schema will be:

| Table                    | Columns                                                                                 | Description                                                        |
|--------------------------|-----------------------------------------------------------------------------------------|--------------------------------------------------------------------|
| grouped_immigration_data | city, state, date, immigration_count, i94mode_value, i94visa_value, average_temperature | Grouped data to analyse the immigration grouped by some criterions |

#### 3.2 Mapping Out Data Pipelines
To generate these tables, some steps are needed:
- Create the tables on Delta format
- Move the data as is from s3 to Delta tables to generate the bronze layer
- Merge and filter the bronze layer to generate the silver layer
- Perform more operations to generate the gold layer
- Verify data quality


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [52]:
from delta.tables import *

# bronze layer
DeltaTable.createIfNotExists(spark) \
    .addColumn("cicid", "DOUBLE").addColumn("i94yr", "DOUBLE") \
    .addColumn("i94mon", "DOUBLE").addColumn("i94cit", "DOUBLE") \
    .addColumn("i94res", "DOUBLE").addColumn("i94port", "STRING") \
    .addColumn("arrdate", "DOUBLE").addColumn("i94mode", "DOUBLE") \
    .addColumn("i94addr", "STRING").addColumn("depdate", "DOUBLE") \
    .addColumn("i94bir", "DOUBLE").addColumn("i94visa", "DOUBLE") \
    .addColumn("count", "DOUBLE").addColumn("dtadfile", "STRING") \
    .addColumn("visapost", "STRING").addColumn("occup", "STRING") \
    .addColumn("entdepa", "STRING").addColumn("entdepd", "STRING") \
    .addColumn("entdepu", "STRING").addColumn("matflag", "STRING") \
    .addColumn("biryear", "DOUBLE").addColumn("dtaddto", "STRING") \
    .addColumn("gender", "STRING").addColumn("insnum", "STRING") \
    .addColumn("airline", "STRING").addColumn("admnum", "DOUBLE") \
    .addColumn("fltno", "STRING").addColumn("visatype", "STRING") \
    .location("/tmp/i94_immigration") \
    .execute()
i94_immigration = spark.read.format("delta").load("/tmp/i94_immigration")
i94_immigration.write.format("delta").mode("overwrite").save("s3a://bronze-layer-udacity-nd/i94_immigration")


DeltaTable.createIfNotExists(spark) \
    .addColumn("code", "STRING").addColumn("value", "STRING") \
    .location("/tmp/i94cit_res") \
    .execute()
i94cit_res = spark.read.format("delta").load("/tmp/i94cit_res")
i94cit_res.write.format("delta").mode("overwrite").save("s3a://bronze-layer-udacity-nd/i94cit_res")


DeltaTable.createIfNotExists(spark) \
    .addColumn("code", "STRING").addColumn("value", "STRING") \
    .location("/tmp/i94port") \
    .execute()
i94port = spark.read.format("delta").load("/tmp/i94port")
i94port.write.format("delta").mode("overwrite").save("s3a://bronze-layer-udacity-nd/i94port")


DeltaTable.createIfNotExists(spark) \
    .addColumn("code", "STRING").addColumn("value", "STRING") \
    .location("/tmp/i94mode1") \
    .execute()
i94mode = spark.read.format("delta").load("/tmp/i94mode1")
i94mode.write.format("delta").mode("overwrite").save("s3a://bronze-layer-udacity-nd/i94mode")


DeltaTable.createIfNotExists(spark) \
    .addColumn("code", "STRING").addColumn("value", "STRING") \
    .location("/tmp/i94addr") \
    .execute()
i94addr = spark.read.format("delta").load("/tmp/i94addr")
i94addr.write.format("delta").mode("overwrite").save("s3a://bronze-layer-udacity-nd/i94addr")


DeltaTable.createIfNotExists(spark) \
    .addColumn("code", "STRING").addColumn("value", "STRING") \
    .location("/tmp/i94visa") \
    .execute()
i94visa = spark.read.format("delta").load("/tmp/i94visa")
i94visa.write.format("delta").mode("overwrite").save("s3a://bronze-layer-udacity-nd/i94visa")


DeltaTable.createIfNotExists(spark) \
    .addColumn("dt", "TIMESTAMP") \
    .addColumn("averagetemperature", "DOUBLE").addColumn("averagetemperatureuncertainty", "DOUBLE") \
    .addColumn("city", "STRING").addColumn("country", "STRING") \
    .addColumn("latitude", "STRING").addColumn("longitude", "STRING") \
    .location("/tmp/world_temperature") \
    .execute()
world_temperature = spark.read.format("delta").load("/tmp/world_temperature")
world_temperature.write.format("delta").mode("overwrite").save("s3a://bronze-layer-udacity-nd/world_temperature")


DeltaTable.createIfNotExists(spark) \
    .addColumn("city", "STRING").addColumn("state", "STRING") \
    .addColumn("median_age", "DOUBLE").addColumn("male_population", "INT") \
    .addColumn("female_population", "INT").addColumn("total_population", "INT") \
    .addColumn("number_of_veterans", "INT").addColumn("foreign_born", "INT") \
    .addColumn("average_household_size", "DOUBLE").addColumn("state_code", "STRING") \
    .addColumn("race", "STRING").addColumn("count", "INT") \
    .location("/tmp/us_city_demographic") \
    .execute()
us_city_demographic = spark.read.format("delta").load("/tmp/us_city_demographic")
us_city_demographic.write.format("delta").mode("overwrite").save("s3a://bronze-layer-udacity-nd/us_city_demographic")


DeltaTable.createIfNotExists(spark) \
    .addColumn("ident", "STRING").addColumn("type", "STRING") \
    .addColumn("name", "STRING").addColumn("elevation_ft", "INT") \
    .addColumn("continent", "STRING").addColumn("iso_country", "STRING") \
    .addColumn("iso_region", "STRING").addColumn("municipality", "STRING") \
    .addColumn("gps_code", "STRING").addColumn("iata_code", "STRING") \
    .addColumn("local_code", "STRING").addColumn("coordinates", "STRING") \
    .location("/tmp/airport_codes") \
    .execute()
airport_code_table = spark.read.format("delta").load("/tmp/airport_codes")
airport_code_table.write.format("delta").mode("overwrite").save("s3a://bronze-layer-udacity-nd/airport_codes")

# the other tables will be generated by querying these
# código para subir o airflow e falar que precisar rodar a dag x

                                                                                

In [54]:
# import shutil

# shutil.rmtree('/tmp/i94cit_res')

Próximos passos:

~~- Arrumar melhor a exploração dos dados~~

~~- Fazer a preparação dos dados~~

~~- Escrever sobre a modelagem~~

~~- Fazer o código para criar as tabelas delta~~

~~- Validar se tem requisito de redshift no capstone (não tem)~~

~~- Baixar os datasets numa pasta datasets (e organizar a pasta local para subir no git) e rodar as células anteriores lendo do arquivo local~~

~~- Subir estado atual num novo repositório no git~~

~~- Colocar os dados no s3 num bucket landing~~

~~- Subir o airflow localmente e colocar credenciais da AWS~~

~~- Testar EMR operator~~

~~- Fazer uma task para pegar os dados do s3 e jogar para o redshift/delta lake~~

- Fazer uma task para criar a silver layer

- Fazer uma task para criar a gold layer

- Fazer uma task para a validação dos dados

- Rodar algo para formatar na pep8

- Complementar o readme e colocar uma foto com a arquitetura

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Explicar como serão os testes na parte de cima
# Mostrar um exemplo de teste aqui, mas especificar que a DAG prefica rodar antes

#### 4.3 Data dictionary 

In [None]:
golden_layer_dictionary = {
    "city": "US city",
    "state": "US state",
    "date": "Date related to the immigration",
    "immigration_count": "Number of immigrations on the date per mode and visa",
    "i94mode_value": "Mode of the immigration (for example: 'air', 'sea', 'land')",
    "i94visa_value": "Visa code of the immigration (for example: 'business', 'pleasure', 'student')",
    "average_temperature": "Average temperature for the date"
}

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.