# Data Engineering Capstone Project

Data Engineering Nanodegree conclusion project.

## Immigration in the US



In [1]:
from datetime import datetime
import psycopg2
import numpy as np
import pandas as pd
import logging
import warnings
from pandas.core.common import SettingWithCopyWarning
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)

The following project consists in building a database model for immigration data in the United States of America. Analyses on this data can be useful for both government and business decision making.

#### Datasets

- I94 Immigration Data: This data comes from the US National Tourism and Trade Office, but for this notebook only a small sample will be used. [This is where the data comes from](https://travel.trade.gov/research/reports/i94/historical/2016.html).

- World Temperature Data: This dataset came from Kaggle. [You can read more about it here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

- U.S. City Demographic Data: This data comes from OpenSoft. [You can read more about it here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

- Airport Code Table: This is a simple table of airport codes and corresponding cities. [It comes from here](https://datahub.io/core/airport-codes#data).


## Exploring the Data




#### I94 Immigration

In [2]:
immigration = pd.read_csv('immigration_data_sample.csv')
print(f"number of rows: {immigration.shape[0]} \n")
print(immigration.dtypes)
immigration.head()

number of rows: 1000 

Unnamed: 0      int64
cicid         float64
i94yr         float64
i94mon        float64
i94cit        float64
i94res        float64
i94port        object
arrdate       float64
i94mode       float64
i94addr        object
depdate       float64
i94bir        float64
i94visa       float64
count         float64
dtadfile        int64
visapost       object
occup          object
entdepa        object
entdepd        object
entdepu       float64
matflag        object
biryear       float64
dtaddto        object
gender         object
insnum        float64
airline        object
admnum        float64
fltno          object
visatype       object
dtype: object


Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


#### World temperature by city

In [3]:
wrld_temp = pd.read_csv('GlobalLandTemperaturesByCity.zip')
# selecting only countries from North America
world_temperature = wrld_temp[wrld_temp['Country'].isin(['United States', 'Canada', 'Mexico'])]
print(f"number of rows: {world_temperature.shape[0]} \n")
print(world_temperature.dtypes)
world_temperature.head()

number of rows: 972294 

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
dtype: object


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
32382,1828-01-01,-2.763,2.617,Abbotsford,Canada,49.03N,122.45W
32383,1828-02-01,0.393,1.936,Abbotsford,Canada,49.03N,122.45W
32384,1828-03-01,4.137,1.91,Abbotsford,Canada,49.03N,122.45W
32385,1828-04-01,,,Abbotsford,Canada,49.03N,122.45W
32386,1828-05-01,10.63,2.346,Abbotsford,Canada,49.03N,122.45W


#### us cities

In [4]:
us_cities = pd.read_csv('us-cities-demographics.csv', sep=';')
print(f"number of rows: {us_cities.shape[0]} \n")
print(us_cities.dtypes)
us_cities.head()

number of rows: 2891 

City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


#### airports

In [5]:
airports = pd.read_csv('airport-codes_csv.csv')
print(f"number of rows: {airports.shape[0]} \n")
print(airports.dtypes)
airports.head()

number of rows: 55075 

ident            object
type             object
name             object
elevation_ft    float64
continent        object
iso_country      object
iso_region       object
municipality     object
gps_code         object
iata_code        object
local_code       object
coordinates      object
dtype: object


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


### Cleaning Steps

- immigration: Multiple fields need changing. For example, two columns are filled with codes that correspond to location names - i94cit e i94res -, we will use a JSON file to join this data, which can also be made by creating another table in the schema.
- world_temperature: We will start by separating year and month.The coordinates are in a differente format if comparing with the airport data frame, but those will not relate much, so we can ignore it.
- us_cities: Nothing special to be changed, some columns could be INT instead of DOUBLE or FLOAT, but pandas did not permit this transformation without taking some action on NA values(like removing them or filling with zeros), which is terrible. I'd rather model the database using FLOAT data type than restricting analysts' options on handling missing data.
- airports: Data is not in the first normal form, so lets transform it by writing coordinates in separate columns. 

In [6]:
# Data Cleaning

## immigration
#### getting location codes
locations = pd.read_json('countries.json', typ='series')
locations = pd.DataFrame(locations)
locations['id'] = locations.index
locations = locations.rename(columns = {'id': 'location_id', 0 : 'location'})

#### i94res
immigration['i94res'] = immigration['i94res'].astype(int)
df_immigration = pd.merge(locations, immigration,how='inner',left_on=['location_id'],right_on=['i94res'])
df_immigration = df_immigration.drop(columns=['i94res', 'location_id']).rename(columns = {'location': 'i94res'})

#### i94cit
immigration['i94cit'] = immigration['i94cit'].astype(int)
df_immigration = pd.merge(locations, df_immigration ,how='inner',left_on=['location_id'],right_on=['i94cit'])
df_immigration = df_immigration.drop(columns=['i94cit', 'location_id']).rename(columns = {'location': 'i94cit'})

#### convert float to int
df_immigration['i94yr'] = df_immigration['i94yr'].astype(int)
df_immigration['i94mon'] = df_immigration['i94mon'].astype(int)
df_immigration['i94mode'] = df_immigration['i94mode'].astype(int)
df_immigration['biryear'] = df_immigration['biryear'].astype(int)
df_immigration['cicid'] = df_immigration['cicid'].astype(int)
df_immigration['i94bir'] = df_immigration['i94bir'].astype(int)
df_immigration['i94visa'] = df_immigration['i94visa'].astype(int)
df_immigration['count'] = df_immigration['count'].astype(int)

#### converting dates
df_immigration['arrdate'] = pd.to_timedelta(df_immigration['arrdate'], unit='d') + pd.datetime(1960, 1, 1)
df_immigration['depdate'] = pd.to_timedelta(df_immigration['depdate'], unit='d') + pd.datetime(1960, 1, 1)

pd.set_option('display.max_columns', 500)

print(df_immigration.dtypes)
df_immigration.head()

i94cit                object
i94res                object
Unnamed: 0             int64
cicid                  int64
i94yr                  int64
i94mon                 int64
i94port               object
arrdate       datetime64[ns]
i94mode                int64
i94addr               object
depdate       datetime64[ns]
i94bir                 int64
i94visa                int64
count                  int64
dtadfile               int64
visapost              object
occup                 object
entdepa               object
entdepd               object
entdepu              float64
matflag               object
biryear                int64
dtaddto               object
gender                object
insnum               float64
airline               object
admnum               float64
fltno                 object
visatype              object
dtype: object


Unnamed: 0.1,i94cit,i94res,Unnamed: 0,cicid,i94yr,i94mon,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,"MEXICO Air Sea, and Not Reported (I-94, no lan...","MEXICO Air Sea, and Not Reported (I-94, no lan...",2171295,4422636,2016,4,MCA,2016-04-23,1,TX,2016-04-24,26,2,1,20160423,MTR,,G,R,,M,1990,10222016,M,,*GA,94362000000.0,XBLNG,B2
1,"MEXICO Air Sea, and Not Reported (I-94, no lan...","MEXICO Air Sea, and Not Reported (I-94, no lan...",1387092,2826530,2016,4,SNJ,2016-04-15,1,CA,2016-04-17,42,2,1,20160415,MEX,,G,O,,M,1974,10142016,F,,Y4,93617880000.0,00930,B2
2,"MEXICO Air Sea, and Not Reported (I-94, no lan...","MEXICO Air Sea, and Not Reported (I-94, no lan...",2888997,5835717,2016,4,DET,2016-04-30,1,FL,2016-05-07,35,2,1,20160430,MEX,,G,O,,M,1981,10292016,F,,AA,94957650000.0,01498,B2
3,"MEXICO Air Sea, and Not Reported (I-94, no lan...","MEXICO Air Sea, and Not Reported (I-94, no lan...",2360660,4805034,2016,4,HOU,2016-04-25,1,PA,2016-04-29,40,1,1,20160425,MEX,,G,O,,M,1976,10242016,F,,UA,94493820000.0,01085,B1
4,"MEXICO Air Sea, and Not Reported (I-94, no lan...","MEXICO Air Sea, and Not Reported (I-94, no lan...",1773904,3599863,2016,4,HOU,2016-04-19,1,TX,2016-07-04,42,3,1,20160419,MEX,,G,Q,,M,1974,D/S,F,,WN,93991160000.0,02831,F1


In [7]:
## world_temperature
world_temperature['year'] = world_temperature['dt'].apply(lambda x: datetime.strptime(x, "%Y-%m-%d").year)
world_temperature['month'] = world_temperature['dt'].apply(lambda x: datetime.strptime(x, "%Y-%m-%d").month)
print(world_temperature.dtypes)
world_temperature.head()

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
year                               int64
month                              int64
dtype: object


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,year,month
32382,1828-01-01,-2.763,2.617,Abbotsford,Canada,49.03N,122.45W,1828,1
32383,1828-02-01,0.393,1.936,Abbotsford,Canada,49.03N,122.45W,1828,2
32384,1828-03-01,4.137,1.91,Abbotsford,Canada,49.03N,122.45W,1828,3
32385,1828-04-01,,,Abbotsford,Canada,49.03N,122.45W,1828,4
32386,1828-05-01,10.63,2.346,Abbotsford,Canada,49.03N,122.45W,1828,5


In [8]:
## airports

airports['latitude'] = airports['coordinates'].apply(lambda x: float(x.split(',')[0]))
airports['longitude'] = airports['coordinates'].apply(lambda x: float(x.split(',')[1]))
airports = airports.drop(columns=['coordinates'])
airports.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,latitude,longitude
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,-74.933601,40.070801
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,-101.473911,38.704022
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,-151.695999,59.9492
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,-86.770302,34.864799
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,-91.254898,35.6087


# Data Model

## Schema

Event though they have some limitations, relational databases are consolidated and well suited for Big Data situations. For many of the upcoming analyses in this notebook there is no need to resort to Cloud solutions for Data Warehousing such as AWS Redshift, so I opted to stick to the basics and at the end of this document you can find some scenarios(where reasonable solutions are adressed) that require more speed and scalability. PostgreSQL is the RDBMS of choice as it integrates easily with Python.

The schema is described as follows:

## facts table

- immigrations

  * id, country_cit, country_res, cicid, year,
  * month, age, birth_year, gender, transport_type,
  * state, record_date, count, occup, arrival_flag,
  * departure_flag, update_flag, match_flag ,airport_code,
  * admnum,flight, visacode, visatype, visapost

## dimension tables

- admissions

  * id, ins_num, date

- flights

  * id, airline, arrdate, depdate

- airports
   * idtype, name, elevation_ft, continent, iso_country,
   * iso_region, municipality, gps_code, iata_code, local_code,
   * latitude, longitude

- countries

  * id, name

- states

  * id, name

- cities
  * id, name 

- demographics

    * city, state, median_age, male_population, female_population,
    * total_population, number_of_veterans, foreign_born,
    * average_household_size, state_code, race, count

- temperature

  * date, avg_temperature, avg_temp_uncertainty, city, country, latitude, longitude

- transports

  * id ,mode (1 = 'Air', 2 = 'Sea', 3 = 'Land', 9 = 'Not reported'-NA included)

- visa

  * code, category (1 = Business, 2 = Pleasure, 3 = Student)


### Schema design

![immigrations_schema](immigrations_schema.png)

In [9]:
# CREATE DATABASE
try:
    default_db = 'studentdb'
    conn = psycopg2.connect(f"host=127.0.0.1 dbname={default_db} user=student password=student")
    conn.set_session(autocommit=True)
    cur = conn.cursor()
except psycopg2.Error as e:
    print(f"Connection to {default_db} failed")
    print(e)

try:
    db = 'immigration_us'
    cur.execute(f"DROP DATABASE IF EXISTS {db} ")
    cur.execute(f"CREATE DATABASE {db} WITH ENCODING 'utf8' TEMPLATE template0")
    conn.close()
except psycopg2.Error as e:
    print(f"{db} creation failed")
    print(e)

# CONNECT TO DATABASE
try:
    conn = psycopg2.connect(f"host=127.0.0.1 dbname={db} user=student password=student")
    cur = conn.cursor()
except psycopg2.Error as e:
    print(f"Connection to {db} failed")
    print(e)

#### table statements

In [10]:
# DROP TABLES
immigrations_table_drop = "DROP TABLE IF EXISTS immigrations"
temperature_table_drop  = "DROP TABLE IF EXISTS temperature"
admissions_table_drop   = "DROP TABLE IF EXISTS admissions"
flights_table_drop      = "DROP TABLE IF EXISTS flights"
airports_table_drop     = "DROP TABLE IF EXISTS airports"
transports_table_drop   = "DROP TABLE IF EXISTS transports"
demographics_table_drop = "DROP TABLE IF EXISTS demographics"
countries_table_drop    = "DROP TABLE IF EXISTS countries"
states_table_drop       = "DROP TABLE IF EXISTS states"
cities_table_drop       = "DROP TABLE IF EXISTS cities"
visa_table_drop         = "DROP TABLE IF EXISTS visa"

# CREATE TABLES
admissions_table_create = ("""
    CREATE TABLE IF NOT EXISTS admissions(
    id      FLOAT     PRIMARY KEY,
    ins_num FLOAT,
    date    TEXT
    );
""")

flights_table_create = ("""
    CREATE TABLE IF NOT EXISTS flights(
    id      TEXT    PRIMARY KEY,
    airline TEXT,
    arrdate TEXT, 
    depdate TEXT
    );
""")

airports_table_create = ("""
    CREATE TABLE IF NOT EXISTS airports(
    id            TEXT   PRIMARY KEY,
    type          TEXT,
    name          TEXT,
    elevation_ft  FLOAT,
    continent     TEXT,
    iso_country   TEXT,
    iso_region    TEXT,
    municipality  TEXT,
    gps_code      TEXT,
    iata_code     TEXT,
    local_code    TEXT,
    latitude      TEXT,
    longitude     TEXT
    );
""")


countries_table_create = ("""
    CREATE TABLE IF NOT EXISTS  countries(
    id         TEXT   PRIMARY KEY,
    name       TEXT
    );
""")


states_table_create = ("""
    CREATE TABLE IF NOT EXISTS  states(
    id         TEXT   PRIMARY KEY,
    name       TEXT
    );
""")

temperature_table_create = ("""
    CREATE TABLE IF NOT EXISTS  temperature(
    date                 TEXT,
    avg_temperature      FLOAT,
    avg_temp_uncertainty FLOAT,
    city                 TEXT   REFERENCES cities(name) NOT NULL,
    country              TEXT,
    latitude             TEXT,
    longitude            TEXT,
    PRIMARY KEY(date, latitude, longitude)
    );
""")

cities_table_create = ("""
    CREATE TABLE IF NOT EXISTS cities(
    id   SERIAL,
    name TEXT   PRIMARY KEY
    );
""")
 
    
demographics_table_create = ("""
    CREATE TABLE IF NOT EXISTS demographics(
    city                   TEXT    PRIMARY KEY  REFERENCES cities(name) NOT NULL,
    state                  TEXT,
    median_age             FLOAT,
    male_population        FLOAT,
    female_population      FLOAT,
    total_population       FLOAT,
    number_of_veterans     FLOAT,
    foreign_born           FLOAT,
    average_household_size FLOAT,
    state_code             TEXT    REFERENCES states(id) NOT NULL,
    race                   TEXT,
    count                  INT
    );
""")


transports_table_create = ("""
    CREATE TABLE IF NOT EXISTS transports(
    id    INT  PRIMARY KEY,
    mode  TEXT
    );
""")

visa_table_create = ("""
    CREATE TABLE IF NOT EXISTS visa(
    code            INT PRIMARY KEY,
    category        TEXT
    );
""")

immigrations_table_create = ("""
    CREATE TABLE IF NOT EXISTS immigrations(   
    id             FLOAT      PRIMARY KEY,
    cicid          FLOAT        NOT NULL,
    country_cit    TEXT,
    country_res    TEXT,
    year           FLOAT,
    month          FLOAT,
    age            FLOAT,
    birth_year     FLOAT,
    gender         TEXT,
    transport_type FLOAT,
    state          TEXT,
    record_date    TEXT,
    count          FLOAT,
    occup          TEXT,
    arrival_flag   TEXT,
    departure_flag TEXT,
    update_flag    TEXT,       
    match_flag     TEXT,
    airport_code   TEXT,
    admnum         FLOAT,
    flight         TEXT,
    visacode       INT,
    visatype       TEXT,
    visapost       TEXT
    );
""")


create_table_queries = [cities_table_create, admissions_table_create, flights_table_create, 
                        airports_table_create, countries_table_create, states_table_create, 
                        demographics_table_create, temperature_table_create, transports_table_create, 
                        visa_table_create, immigrations_table_create]

drop_table_queries   = [immigrations_table_drop, demographics_table_drop, temperature_table_drop,
                        cities_table_drop, admissions_table_drop, flights_table_drop, airports_table_drop, 
                        countries_table_drop, states_table_drop, transports_table_drop, visa_table_drop]

### Creating tables

In [11]:
def drop_tables(cur, conn):
    '''
    drop_tables(cur, conn)
    Drops each table using the queries in `drop_table_queries` list.
    '''
    print("Dropping tables")
    for query in drop_table_queries:
        table = query.replace("\n", "").split('(')[0].split(' ')[-1]
        try:
            cur.execute(query)
            print(f"{table} dropped")
        except psycopg2.Error as e:
            print(f"Error: Dropping {table}")
            print(e)
        conn.commit()
    print('\n\n')
        

def create_tables(cur, conn):
    '''
    create_tables(cur, conn)
    Creates each table using the queries in `create_table_queries` list.
    '''
    print("Creating tables")
    for query in create_table_queries:
        table = query.replace("\n", "").split('(')[0].split(' ')[-1]
        try:
            cur.execute(query)
            print(f"{table} created successfully")
        except psycopg2.Error as e:
            print(f"Error: failed creating {table}")
            print(e)
        conn.commit()

drop_tables(cur, conn)
create_tables(cur, conn)

Dropping tables
immigrations dropped
demographics dropped
temperature dropped
cities dropped
admissions dropped
flights dropped
airports dropped
countries dropped
states dropped
transports dropped
visa dropped



Creating tables
cities created successfully
admissions created successfully
flights created successfully
airports created successfully
countries created successfully
states created successfully
demographics created successfully
temperature created successfully
transports created successfully
visa created successfully
immigrations created successfully


## ETL pipeline


Extracting files content, checking for inconsistencies across datasets, and finally inserting data into tables described by the Schema.

#### insert statements

In [12]:
# 24 columns
immigrations_table_insert = ("""
    INSERT INTO immigrations (id, cicid, country_cit, country_res,\
    year, month, age, birth_year, gender, transport_type, state,\
    record_date, count, occup, arrival_flag, departure_flag, update_flag,\
    match_flag, airport_code, admnum, flight, visacode, visatype, visapost)
    VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    ON CONFLICT(id)
    DO NOTHING
""")

admissions_table_insert = ("""
    INSERT INTO admissions (id, ins_num, date)
    VALUES(%s, %s, %s)
    ON CONFLICT (id)
    DO NOTHING
""")

flights_table_insert = ("""
    INSERT INTO flights (id, airline, arrdate, depdate)
    VALUES(%s, %s, %s, %s)
    ON CONFLICT (id)
    DO NOTHING
""")

airports_table_insert = ("""
    INSERT INTO airports (id, type, name, elevation_ft, continent, \
    iso_country, iso_region, municipality, gps_code, iata_code,     \
    local_code, latitude, longitude)
    VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    ON CONFLICT (id)
    DO NOTHING
""")


countries_table_insert = ("""
    INSERT INTO countries (id, name)
    VALUES(%s, %s)
    ON CONFLICT (id)
    DO NOTHING
""")

states_table_insert = ("""
    INSERT INTO states (id, name)
    VALUES(%s, %s)
    ON CONFLICT (id)
    DO NOTHING
""")

cities_table_insert = ("""
    INSERT INTO cities (name)
    VALUES(%s)
    ON CONFLICT (name)
    DO NOTHING
""")

temperature_table_insert = ("""
    INSERT INTO temperature (date, avg_temperature, avg_temp_uncertainty,
    city, country, latitude, longitude)
    VALUES(%s, %s, %s, %s, %s, %s, %s)
    ON CONFLICT (date, latitude, longitude)
    DO NOTHING
""")

demographics_table_insert = ("""
    INSERT INTO demographics (city, state, median_age, male_population, \
    female_population, total_population, number_of_veterans, foreign_born,        \
    average_household_size, state_code, race, count)
    VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    ON CONFLICT (city)
    DO NOTHING
""")

transports_table_insert = ("""
    INSERT INTO transports (id, mode)
    VALUES(%s, %s)
    ON CONFLICT (id)
    DO NOTHING
""")

visa_table_insert = ("""
    INSERT INTO visa (code, category)
    VALUES(%s, %s)
    ON CONFLICT (code)
    DO NOTHING
""")

## Running Pipelines to Model the Data 

#### Insert functions

In [13]:

def i94_process_file(conn, cur):
    '''
    i94_process_file(conn, cur)
    Read file and insert data into flights, admissions and immigrations tables.
    '''
    df = pd.read_csv("immigration_data_sample.csv")
    # flights
    flights = df[['fltno', 'airline', 'arrdate', 'depdate']]
    flights['arrdate'] = pd.to_timedelta(flights['arrdate'], unit='d') + pd.datetime(1960, 1, 1)
    flights['depdate'] = pd.to_timedelta(flights['depdate'], unit='d') + pd.datetime(1960, 1, 1)

    def convert_date(column):
        flights[column] = pd.to_datetime(flights[column]).apply(lambda x: x.date())
        flights[column] = flights[column].astype(str)
    convert_date('arrdate')
    convert_date('depdate')

    for index, row in flights.iterrows():
        cur.execute(flights_table_insert, row)
    
    # admissions
    admissions = df[['admnum', 'insnum', 'dtaddto']]
    for index, row in admissions.iterrows():
        cur.execute(admissions_table_insert, row)
        
    # immigrations
    #### convert float to int
    df['i94res'] = df['i94res'].astype(int).astype(str)
    df['Unnamed: 0'] = df['Unnamed: 0'].astype(float)
    df['i94cit'] = df['i94cit'].astype(int).astype(str)
    # df['i94yr'] = df['i94yr'].astype(int)
    # df['i94mon'] = df['i94mon'].astype(int)
    # df['i94mode'] = df['i94mode'].astype(int)
    # df['biryear'] = df['biryear'].astype(int)
    # df['i94bir'] = df['i94bir'].astype(int)
    # df['i94visa'] = df['i94visa'].astype(int)
    # df['count'] = df['count'].astype(int)
    # change NA to 'not reported' in transports table
    df['i94mode'] = df['i94mode'].astype(str).replace('9',np.nan)
    # rename columns
    df = df.rename(columns = {'Unnamed: 0': 'id', 'i94cit':'country_cit', 'i94res':'country_res', 'i94yr':'year', 
                         'i94mon':'month', 'i94bir':'age', 'biryear':'birth_year', 'i94visa': 'visacode',
                        'fltno': 'flight', 'dtadfile': 'record_date', 'i94port':'airport_code',
                        'i94mode':'transport_type', 'i94addr':'state', 'entdepa':"arrival_flag",
                        'entdepd':'departure_flag', 'entdepu':'update_flag', 'matflag':'match_flag'})
    # select columns
    immigrations = df[['id', 'cicid', 'country_cit', 'country_res', 'year', 'month', 
                   'age', 'birth_year', 'gender', 'transport_type', 'state', 'record_date','count',
                   'occup', 'arrival_flag', 'departure_flag', 'update_flag',  
                   'match_flag', 'airport_code', 'admnum', 'flight', 'visacode', 'visatype', 'visapost']]

    for index, row in immigrations.iterrows():
        cur.execute(immigrations_table_insert, row)


def airports_process_file(conn, cur):
    '''
    airports_process_file(conn, cur)
    Read file and insert data into airports table.
    '''
    df = pd.read_csv('airport-codes_csv.csv')
    df['latitude'] = df['coordinates'].apply(lambda x: float(x.split(',')[0]))
    df['longitude'] = df['coordinates'].apply(lambda x: float(x.split(',')[1]))
    df = df.drop(columns=['coordinates'])
    df = df.rename(columns = {'ident': 'id'})
    
    for index, row in df.iterrows():
        cur.execute(airports_table_insert, row)


def countries_process_file(conn, cur):
    '''
    countries_process_file(conn, cur)
    Read file and insert data into countries table.
    '''
    df = pd.read_json('countries.json', typ='series')
    df = pd.DataFrame(df)
    df['id'] = df.index
    df = df.rename(columns = {'id': 'location_id', 0 : 'name'})
    df = df[['location_id', 'name']]
    for index, row in df.iterrows():
        cur.execute(countries_table_insert, row)

def states_process_file(conn, cur):
    '''
    states_process_file(conn, cur)
    Read file and insert data into states table.
    '''
    states = pd.read_json('states.json', typ='series')
    states = pd.DataFrame(states)
    states['id'] = states.index
    states = states.rename(columns = {'id': 'State Code', 0 : 'State'})
    states = states[['State', 'State Code']]
    us_cities = pd.read_csv('us-cities-demographics.csv', sep=';')
    df = us_cities.drop_duplicates(subset=['State Code'])
    df = df[['State','State Code']]
    df = pd.concat([df, states]).drop_duplicates(subset=['State Code'])
    df = df[['State Code','State']]
    for index, row in df.iterrows():
        cur.execute(states_table_insert, row)


def temperature_cities_process_file(conn, cur):
    '''
    temperature_cities_process_file(conn, cur)
    Read file and insert data into temperature tables.
    '''
    north_america_temp = pd.read_csv('GlobalLandTemperaturesByCity.zip')
    cities =  north_america_temp.drop_duplicates(subset=['City'])
    cities = cities[['City']]
    df = pd.read_csv('us-cities-demographics.csv', sep=';')
    df = df[['City']]
    cities = pd.concat([cities, df]).drop_duplicates()
    
    for index, row in cities.iterrows():
        cur.execute(cities_table_insert, row)
    
    # select only countries from North America
    temperature = north_america_temp[north_america_temp['Country'].isin(['United States', 'Canada', 'Mexico'])]
    temperature['Country'] = temperature['Country'].apply(lambda x: x.upper())
    temperature = temperature.drop_duplicates(subset=['dt', 'Latitude', 'Longitude'])
    for index, row in temperature.iterrows():
        cur.execute(temperature_table_insert, row)


def demographics_process_file(conn, cur):
    '''
    demographics_process_file(conn, cur)
    Read file and insert data into table.
    '''
    df = pd.read_csv('us-cities-demographics.csv', sep=';')
    for index, row in df.iterrows():
        cur.execute(demographics_table_insert, row)


def transports_process_file(conn, cur):
    '''
    transports_process_file(conn, cur)
    Read file and insert data into transports table.
    '''
    data = {'id': [1, 2, 3, 9], 'mode': ['Air', 'Sea', 'Land', 'Not reported']}
    df = pd.DataFrame(data)
    for index, row in df.iterrows():
        cur.execute(transports_table_insert, row)


def visa_process_file(conn, cur):
    '''
    visa_process_file(conn, cur)
    Read file and insert data into visa table.
    '''
    data = {'code': [1, 2, 3], 'category': ['Business', 'Pleasure', 'Student']}
    df = pd.DataFrame(data)
    for index, row in df.iterrows():
        cur.execute(visa_table_insert, row)

### Running ETL process

In [14]:
def process_data(conn, cur):
    '''
    process_data(conn, cur)
    process functions that insert data into tables and commit
    '''
    funcs = [countries_process_file, states_process_file, airports_process_file,
        transports_process_file, visa_process_file, temperature_cities_process_file, demographics_process_file, i94_process_file]
    for func in funcs:
        try:
            func(conn, cur)
        except psycopg2.Error as e:
            print(f"Error running {func.__name__}")
            logging.exception(e)
        conn.commit()
        
process_data(conn, cur)

### Data Quality Checks

The data quality checks include:
 * [Integrity constraints on the relational database (e.g., unique key, data type, etc.)](#table-statements);
 * [Insert statements check for duplicates before inserting data into table](#insert-statements);
 * [If an exception occurs when inserting data, display log info](#Running-ETL-process);
 * Checking if tables records are correctly loaded(see code below):

In [21]:
tables = ['immigrations', 'demographics', 'temperature', 'cities',
          'admissions', 'flights', 'airports', 'countries', 'states',
          'transports', 'visa']
for table in tables:
    try:
        cur.execute(f"SELECT COUNT(*) FROM {table}")
    except psycopg2.Error as e:
        logging.info(e)
    conn.commit()
    if cur.rowcount < 1:
        print(f"{table} is empty")
    else:
        print(f"{table} loaded with {cur.fetchone()[0]} records ")

immigrations loaded with 1000 records 
demographics loaded with 567 records 
temperature loaded with 420039 records 
cities loaded with 3763 records 
admissions loaded with 1000 records 
flights loaded with 503 records 
airports loaded with 55075 records 
countries loaded with 289 records 
states loaded with 55 records 
transports loaded with 4 records 
visa loaded with 3 records 


### Data analysis

Some analysis on immigration data could be:

- Find patterns of gender and/or age(differences in visa type or airline chosen);
- Rank airlines and routes by number of immigrants transported;
- Track flow of passengers flying to great urban centers;
- Find patterns of immigration based on seasons of the year;
- Check if there is a relation between a city thermal amplitude and emigration.

#### Example queries

In [16]:
%load_ext sql
%sql postgresql://student:student@127.0.0.1/immigration_us
%sql SELECT * FROM immigrations LIMIT 5;

 * postgresql://student:***@127.0.0.1/immigration_us
5 rows affected.


id,cicid,country_cit,country_res,year,month,age,birth_year,gender,transport_type,state,record_date,count,occup,arrival_flag,departure_flag,update_flag,match_flag,airport_code,admnum,flight,visacode,visatype,visapost
2027561.0,4084316.0,209,209,2016.0,4.0,61.0,1955.0,F,1.0,HI,20160422,1.0,,G,O,,M,HHW,56582674633.0,00782,2,WT,
2171295.0,4422636.0,582,582,2016.0,4.0,26.0,1990.0,M,1.0,TX,20160423,1.0,,G,R,,M,MCA,94361995930.0,XBLNG,2,B2,MTR
589494.0,1195600.0,148,112,2016.0,4.0,76.0,1940.0,M,1.0,FL,20160407,1.0,,G,O,,M,OGG,55780468433.0,00464,2,WT,
2631158.0,5291768.0,297,297,2016.0,4.0,25.0,1991.0,M,1.0,CA,20160428,1.0,,G,O,,M,LOS,94789696030.0,00739,2,B2,DOH
3032257.0,985523.0,111,111,2016.0,4.0,19.0,1997.0,F,3.0,NY,20160406,1.0,,Z,K,,M,CHM,42322572633.0,LAND,2,WT,


In [22]:
%%sql 
SELECT a.flight, b.name as state 
FROM immigrations a 
INNER JOIN states b 
ON
a.state = b.id

LIMIT 5;

 * postgresql://student:***@127.0.0.1/immigration_us
5 rows affected.


flight,state
00782,Hawaii
XBLNG,Texas
00464,Florida
00739,California
LAND,New York


## Complete Project Write Up

A data model was created to store relational data on immigration in the USA. PostgreSQL was the choice of use for its easy integration with Python. As mentioned earlier, using a relational database may be enough for most applications, but the cost of its limitations, such as having to draw a complex schema beforehand, can slow down the development process. Moving to the cloud is always an option and some situations where this is reasonable are described below.

Possible decisions for alternate scenarios:

- "Data increases by 100x": Instead of using a structured database in disk, a Data Lake could be used, as the project especifies that various types and sources of data(structured or unstructured) can be explored. It is not possible to tell upfront which will be useful. A possible approach is to launch an EMR Cluster and design the schema on read.

- "The data populates a dashboard that must be updated on a daily basis by 7am every day": Schedule tasks using Apache Airflow.

- "The database needed to be accessed by 100+ people": Redshift Clusters can handle the traffic without changing the RDBMS, but it's important to monitor AWS billing to avoid unnecessary costs.

In [10]:
# DROP TABLES 
drop_tables(cur, conn)

Dropping tables
immigrations dropped
demographics dropped
temperature dropped
cities dropped
admissions dropped
flights dropped
airports dropped
countries dropped
states dropped
transports dropped
visa dropped





In [11]:
# CLOSE CONNECTION
conn.close()