# Come to brazil
### Data Engineering Capstone Project

#### Project Summary
Brazil is a continentallly sized country, both in terms of geographic size and population. The mercator projection, presented by Flemish geographer and cartographer Gerardus Mercator in 1569 was created long before modern cartographic techniques were developed. South america is larger in size than western europe, and individual brazilian states are larger in both size and population than the largest western european countries, like france and germany. Even as a poor country, the sheer potential in numbers amongst the upper economic echelons of the brazilian population make it a very interesting commercial study subject. That, coupled with a virtual 100% import tariff more than doubling commodity prices natively, and a habit of smuggling goods by both brazilian travelers and brazilian immigrants in america make creating a service industry specifically geared towards catering to brazilian passerbys especially profitable. This study will use airport data from the I94 Immigration Data to determine which airports would be more interesting to establish services such as apple stores accepting brazilian real (brazilian currency) with portuguese speaking salesmen.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))


import pandas as pd
import boto3
import json


KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

import boto3

ec2 = boto3.resource('ec2',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )
s3d = boto3.client('s3',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )
session = boto3.Session(
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
)
s3u = session.resource('s3')
iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-east-1'
                  )

redshift = boto3.client('redshift',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )



### Step 1: Scope the Project and Gather Data

#### Scope 

The scope of the project is the data presented by Udacity, presented in graphical form in a plotly graph, but could very well be an auto-updating dash website being fed immigration and travel data from brazil to america. As the joke goes, if you don't come to brazil, brazil comes to you

#### Describe and Gather Data 

- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from.  
- I94_SAS_Labels_descriptions.SAS - Partial description of the SAS data provided by udacity
- Airport Code Table(airport-codes_csv.csv): This is a simple table of airport codes and corresponding cities. It comes from here.
- U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it here (us-cities-demographics.csv): list of us cities including their latitude and longitude

In [None]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
#response = s3_client.get_object(Bucket=AWS_S3_BUCKET, Key="files/books.csv")
#fname = 's3://astrogildopereirajunior/i94_apr16_sub.sas7bdat'
#df = pd.read_sas(
#    fname,
#    'sas7bdat',
#    encoding="ISO-8859-1",
#    storage_options={
#        "key": KEY,
#        "secret": SECRET,
#    },
#)
#s3d.download_file('astrogildopereirajunior', 'i94_apr16_sub.sas7bdat', 'i94.sas7bdat')
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")
#df.to_csv("staging_i94.csv",index=False)
#result = s3u.Bucket('astrogildopereirajr').upload_file('staging_i94.csv')
#print(result)
#fname2 = '../../data2/GlobalLandTemperaturesByCity.csv'
#df2 = pd.read_csv(fname2)

In [None]:
df = df.fillna(0)

In [73]:
df.head(2)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,0.0,0,0.0,...,U,0,1979.0,10282016,0,0,0,1897628000.0,0,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,0.0,...,Y,0,1991.0,D/S,M,0,0,3736796000.0,296,F1


In [43]:
df.to_csv('staging_i94.csv')

In [2]:
import boto3

#Creating Session With Boto3.
session = boto3.Session(
aws_access_key_id=KEY,
aws_secret_access_key=SECRET
)

#Creating S3 Resource From the Session.
s3Z = session.resource('s3')

result = s3Z.Bucket('astrogildopereirajunior').upload_file('staging_i94.csv','staging_i94.csv')

print(result)

None


In [78]:
session = boto3.Session(
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
)
s3u = session.resource('s3')
result = s3.meta.client.upload_file('staging_i94.csv', 'astrogildopereirajunior', 'staging_i94.csv')
#result = s3u.Bucket('astrogildopereirajunior').upload_file('staging_i94.csv')
print(result)

None


In [49]:
import logging
import boto3
from botocore.exceptions import ClientError
import os


def upload_file(file_name, bucket, object_name=None):
    """Upload a file to an S3 bucket

    :param file_name: File to upload
    :param bucket: Bucket to upload to
    :param object_name: S3 object name. If not specified then file_name is used
    :return: True if file was uploaded, else False
    """

    # If S3 object_name was not specified, use file_name
    if object_name is None:
        object_name = os.path.basename(file_name)

    # Upload the file
    s3_client = boto3.client('s3',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )
    try:
        response = s3_client.upload_file(file_name, bucket, object_name)
    except ClientError as e:
        logging.error(e)
        return False
    return True

In [83]:
upload_file('staging_i94.csv','astrogildopereirajunior')

True

In [None]:
brazil = df[df['i94res']==689]

In [5]:
brazil

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
628,737.0,2016.0,4.0,103.0,689.0,NEW,20545.0,1.0,NY,20553.0,...,,M,1980.0,06292016,F,,UA,5.540653e+10,00148,WT
629,738.0,2016.0,4.0,103.0,689.0,NEW,20545.0,1.0,NY,20553.0,...,,M,1982.0,06292016,F,,UA,5.540652e+10,00148,WT
9327,11229.0,2016.0,4.0,111.0,689.0,DAL,20545.0,1.0,NV,20547.0,...,,M,1984.0,06292016,M,,AA,5.540677e+10,00962,WT
9328,11230.0,2016.0,4.0,111.0,689.0,LVG,20545.0,1.0,NV,20551.0,...,,M,1982.0,06292016,,,CM,5.544914e+10,00252,WB
9329,11231.0,2016.0,4.0,111.0,689.0,MIA,20545.0,1.0,FL,20549.0,...,,M,1970.0,06292016,,,JJ,5.540666e+10,08090,WT
9330,11232.0,2016.0,4.0,111.0,689.0,MIA,20545.0,1.0,FL,20556.0,...,,M,1938.0,06292016,,,JJ,5.544946e+10,08094,WT
9331,11233.0,2016.0,4.0,111.0,689.0,HOU,20545.0,1.0,TX,20574.0,...,,M,1998.0,D/S,F,,UA,9.243793e+10,00128,F1
13689,16687.0,2016.0,4.0,117.0,689.0,ATL,20545.0,1.0,MA,20554.0,...,,M,1971.0,06292016,M,,DL,5.540701e+10,00060,WT
13690,16691.0,2016.0,4.0,117.0,689.0,DAL,20545.0,1.0,TX,20554.0,...,,M,1952.0,06292016,,,AA,5.540664e+10,00962,WT
13691,16692.0,2016.0,4.0,117.0,689.0,DAL,20545.0,1.0,TX,20554.0,...,,M,1975.0,06292016,,,AA,5.540664e+10,00962,WT


In [None]:
airports = brazil['i94port'].unique()

In [None]:
airports.sort()
airports

In [None]:
brazil.to_csv("brazil.csv",index=False)

In [22]:
df.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

In [46]:
brazil.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
628,737,2016,4,103,689,NEW,20545,1,NY,20553,...,,M,1980,6292016,F,,UA,55406500000.0,148,WT
629,738,2016,4,103,689,NEW,20545,1,NY,20553,...,,M,1982,6292016,F,,UA,55406500000.0,148,WT
9327,11229,2016,4,111,689,DAL,20545,1,NV,20547,...,,M,1984,6292016,M,,AA,55406800000.0,962,WT
9328,11230,2016,4,111,689,LVG,20545,1,NV,20551,...,,M,1982,6292016,,,CM,55449100000.0,252,WB
9329,11231,2016,4,111,689,MIA,20545,1,FL,20549,...,,M,1970,6292016,,,JJ,55406700000.0,8090,WT


In [4]:
df2.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [3]:
airport_codes = pd.read_csv("airport-codes_csv.csv")
brazil = pd.read_csv("brazil.csv")
#df4 = pd.read_csv("immigration_data_sample.csv")
#us_demographics = pd.read_csv("us-cities-demographics.csv",sep = ';')

  interactivity=interactivity, compiler=compiler, result=result)


In [21]:
airport_codes.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [55]:
teste = brazil


In [4]:
brazil['i94port'] = brazil['i94port'].str.replace('NYC','JFK')

In [5]:
brazil['i94port'] = brazil['i94port'].str.replace('LOS','LAX')

In [47]:
teste.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
628,737,2016,4,103,689,NEW,20545,1,NY,20553,...,,M,1980,6292016,F,,UA,55406500000.0,148,WT
629,738,2016,4,103,689,NEW,20545,1,NY,20553,...,,M,1982,6292016,F,,UA,55406500000.0,148,WT
9327,11229,2016,4,111,689,DAL,20545,1,NV,20547,...,,M,1984,6292016,M,,AA,55406800000.0,962,WT
9328,11230,2016,4,111,689,LVG,20545,1,NV,20551,...,,M,1982,6292016,,,CM,55449100000.0,252,WB
9329,11231,2016,4,111,689,MIA,20545,1,FL,20549,...,,M,1970,6292016,,,JJ,55406700000.0,8090,WT


In [71]:
z = brazil[brazil['i94port']=='LAX']
z

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
16802,20461.0,2016.0,4.0,126.0,689.0,LAX,20545.0,1.0,CA,20558.0,...,,M,1948.0,06292016,F,,AA,5.540877e+10,00216,WT
16803,20462.0,2016.0,4.0,126.0,689.0,LAX,20545.0,1.0,CA,20553.0,...,,M,1972.0,06292016,M,,AA,5.540849e+10,00216,WT
16804,20463.0,2016.0,4.0,126.0,689.0,LAX,20545.0,1.0,CA,20556.0,...,,M,1949.0,06292016,M,,AA,5.540848e+10,00216,WT
16805,20464.0,2016.0,4.0,126.0,689.0,LAX,20545.0,1.0,CA,20560.0,...,,M,1946.0,06292016,M,,AA,5.540881e+10,00216,WT
16831,20497.0,2016.0,4.0,126.0,689.0,LAX,20545.0,1.0,CA,20552.0,...,,M,1971.0,09302016,M,,AM,9.247620e+10,00019,B2
43656,51444.0,2016.0,4.0,148.0,689.0,LAX,20545.0,1.0,CA,20554.0,...,,M,1962.0,06292016,,,AA,5.540853e+10,00216,WT
43657,51445.0,2016.0,4.0,148.0,689.0,LAX,20545.0,1.0,OR,20549.0,...,,M,1972.0,06292016,M,,AA,5.540850e+10,00216,WT
63948,77905.0,2016.0,4.0,254.0,689.0,LAX,20545.0,1.0,CA,20548.0,...,,M,1953.0,06292016,,,KE,5.540430e+10,00062,WT
89486,206017.0,2016.0,4.0,689.0,689.0,LAX,20545.0,1.0,CA,20562.0,...,,M,1942.0,09302016,F,,LA,9.251707e+10,02604,B2
89667,206277.0,2016.0,4.0,689.0,689.0,LAX,20545.0,1.0,CA,20553.0,...,,M,1965.0,09302016,,,AA,9.244821e+10,00216,B2


In [20]:
len(df4)

1000

In [26]:
us_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [19]:
dfz = df3[df3['iata_code']=='SKA']
dfz

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
29965,KSKA,large_airport,Fairchild Air Force Base,2461.0,,US,US-WA,Spokane,KSKA,SKA,SKA,"-117.65599823, 47.6151008606"


In [6]:
brazil.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,628,737.0,2016.0,4.0,103.0,689.0,NEW,20545.0,1.0,NY,...,,M,1980.0,6292016,F,,UA,55406530000.0,148,WT
1,629,738.0,2016.0,4.0,103.0,689.0,NEW,20545.0,1.0,NY,...,,M,1982.0,6292016,F,,UA,55406520000.0,148,WT
2,9327,11229.0,2016.0,4.0,111.0,689.0,DAL,20545.0,1.0,NV,...,,M,1984.0,6292016,M,,AA,55406770000.0,962,WT
3,9328,11230.0,2016.0,4.0,111.0,689.0,LVG,20545.0,1.0,NV,...,,M,1982.0,6292016,,,CM,55449140000.0,252,WB
4,9329,11231.0,2016.0,4.0,111.0,689.0,MIA,20545.0,1.0,FL,...,,M,1970.0,6292016,,,JJ,55406660000.0,8090,WT


In [7]:
braziliansInAirports = pd.merge(brazil,airport_codes,left_on=['i94port'], right_on=['iata_code'],how='inner')
braziliansInAirports.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,628,737.0,2016.0,4.0,103.0,689.0,NEW,20545.0,1.0,NY,...,Lakefront Airport,8.0,,US,US-LA,New Orleans,KNEW,NEW,NEW,"-90.028297424316, 30.042400360107"
1,629,738.0,2016.0,4.0,103.0,689.0,NEW,20545.0,1.0,NY,...,Lakefront Airport,8.0,,US,US-LA,New Orleans,KNEW,NEW,NEW,"-90.028297424316, 30.042400360107"
2,13692,16693.0,2016.0,4.0,117.0,689.0,NEW,20545.0,1.0,NY,...,Lakefront Airport,8.0,,US,US-LA,New Orleans,KNEW,NEW,NEW,"-90.028297424316, 30.042400360107"
3,43654,51442.0,2016.0,4.0,148.0,689.0,NEW,20545.0,1.0,NJ,...,Lakefront Airport,8.0,,US,US-LA,New Orleans,KNEW,NEW,NEW,"-90.028297424316, 30.042400360107"
4,89564,206161.0,2016.0,4.0,689.0,689.0,NEW,20545.0,1.0,NJ,...,Lakefront Airport,8.0,,US,US-LA,New Orleans,KNEW,NEW,NEW,"-90.028297424316, 30.042400360107"


In [73]:
braziliansInAirports.groupby(['municipality','coordinates','iata_code']).size().to_frame(name = 'count').reset_index().sort_values(by=['count'],ascending=False)

Unnamed: 0,municipality,coordinates,iata_code,count
58,Miami,"-80.29060363769531, 25.79319953918457",MIA,36774
66,Orlando,"-81.332901, 28.5455",ORL,20030
61,New York,"-73.77890015, 40.63980103",JFK,17916
51,Los Angeles,"-118.4079971, 33.94250107",LAX,7390
41,Houston,"-95.27890015, 29.64539909",HOU,5614
3,Atlanta,"-84.428101, 33.6367",ATL,5021
24,Dallas,"-96.851799, 32.847099",DAL,4231
60,New Orleans,"-90.028297424316, 30.042400360107",NEW,3465
75,Point Hope,"-166.7989959716797, 68.34880065917969",PHO,2514
28,Detroit,"-83.00990295, 42.40919876",DET,1584


In [67]:
braziliansInAirports.to_csv("Brazilians-In-Airports2.csv")

In [8]:

df = braziliansInAirports
new = df["coordinates"].str.split(", ", n = 1, expand = True)
df['lat'] = new[1]
df['lon'] = new[0]
df = df.groupby(['municipality', 'lat','lon']).size().to_frame(name = 'count').reset_index()
df.to_csv("braziliansinairports.csv",index=False)

In [9]:
df.head()

Unnamed: 0,municipality,lat,lon,count
0,Agadir,30.325000762939453,-9.41306972503662,14
1,Aguadilla,18.49489974975586,-67.12940216064453,8
2,Angus Downs Station,-25.0325,132.2748,31
3,Atlanta,33.6367,-84.428101,5021
4,Auburn/Lewiston,44.048500061,-70.2835006714,20


In [None]:
import plotly.graph_objects as go

import pandas as pd

df = braziliansInAirports
new = df["coordinates"].str.split(", ", n = 1, expand = True)
df['lat'] = new[1]
df['lon'] = new[0]
df = df.groupby(['municipality', 'lat','lon']).size().to_frame(name = 'count').reset_index()


df['text'] = df['municipality'] + '<br>Arrivals '+ df['count'].astype(str)
limits = [(0,2),(3,10),(11,20),(21,50),(50,3000)]
colors = ["royalblue","crimson","lightseagreen","orange","lightgrey"]
cities = []
scale = 2

fig = go.Figure()

for i in range(len(limits)):
    lim = limits[i]
    df_sub = df[lim[0]:lim[1]]
    fig.add_trace(go.Scattergeo(
        locationmode = 'USA-states',
        lon = df_sub['lon'],
        lat = df_sub['lat'],
        text = df_sub['text'],
        marker = dict(
            size = df_sub['count']/scale,
            color = colors[i],
            line_color='rgb(40,40,40)',
            line_width=0.5,
            sizemode = 'area'
        ),
        name = '{0} - {1}'.format(lim[0],lim[1])))

fig.update_layout(
        title_text = '2016 US city brazilian arrivals<br>(Click legend to toggle traces)',
        showlegend = True,
        geo = dict(
            scope = 'usa',
            landcolor = 'rgb(217, 217, 217)',
        )
    )

fig.show()

![Map](mapz.png "Map")

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark = df_spark\
        .na.fill({
            'cicid':0.0,'i94yr':0.0,'i94mon':0.0,'i94cit':0.0,\
            'i94res':0.0,'i94port':'NA','arrdate':0.0,\
            'i94mode': 0.0, 'i94addr': 'NA','depdate': 0.0, \
            'i94bir': 0.0, 'i94visa': 0.0, 'count': 0.0, \
            'dtadfile': 'NA', 'visapost': 'NA', 'occup': 'NA', \
            'entdepa': 'NA', 'entdepd': 'NA', 'entdepu': 'NA', \
            'matflag': 'NA', 'biryear': 0.0, 'dtaddto': 'NA', \
            'gender': 'NA', 'insnum': 'NA', 'airline': 'NA', \
            'admnum': 0.0, 'fltno': 'NA', 'visatype': 'NA'
            })

In [10]:
df_spark.write.option("header",True).csv("dfcapstone")

In [11]:
df_spark=spark.read.csv("dfcapstone")

In [27]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

In [12]:
df_spark.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)
 |-- _c25: string (nullable = true)
 |-- _c26: string (nullable = true)
 |-- _c27: string (nullable = tru

In [40]:
df_spark.take(5)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup='NA', entdepa='G', entdepd='O', entdepu='NA', matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum='NA', airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1'),
 Row(cicid=5748518.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='NV', depdate=20591.0, i94bir=32.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup='NA', entdepa='G', entdepd='O', entdepu='NA', matflag='M', biryear=1984.0, dtaddto='10292016', gender='F', insnum='NA', airline='VA', admnum=94955622830.0, fltno='00007', visatype='B1'),
 Row(cicid=5748519.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='WA', depdate=20582.0, i94bir=29.

In [None]:
df_spark.collect()[843]

### Step 2: Explore and Assess the Data
#### Explore the Data 
The presented data was relatively good, well cleaned, but had quite a few entire city names listed as i94 ports, which required some manual cleaning of the data above

#### Cleaning Steps
Altered all city airport codes for specific airport codes before inner joining with airport codes dataset

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The data model is an inner join between a filtered dataset of arrivals, by the code assigned to brazil (689), and the airport codes through the iana airport code, and grouped by and counted by individual rows of arrivals, reduced to the count of arrivals, municipality, latitute and longitude, displayed in a geographic map of the united states through a bubble graph using plotly graph objects.

#### 3.2 Mapping Out Data Pipelines
The data is mapped utilizing spark and could be fed from redshift to a dash webpage live, meaning the data could be increased 100x, the pipelines could be run on a daily basis by 7 am every day (or ran every second), and the database could be accessed in redshift by 100+ people.


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [10]:
import pandas as pd
import boto3
import json


import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })



Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


In [11]:


import boto3

ec2 = boto3.resource('ec2',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-east-1'
                  )

redshift = boto3.client('redshift',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )



In [4]:


sampleDbBucket =  s3.Bucket("awssampledbuswest2")
for obj in sampleDbBucket.objects.filter(Prefix="ssbgz"):
    print(obj)
# for obj in sampleDbBucket.objects.all():
#     print(obj)



s3.ObjectSummary(bucket_name='awssampledbuswest2', key='ssbgz/')
s3.ObjectSummary(bucket_name='awssampledbuswest2', key='ssbgz/customer0002_part_00.gz')
s3.ObjectSummary(bucket_name='awssampledbuswest2', key='ssbgz/dwdate.tbl.gz')
s3.ObjectSummary(bucket_name='awssampledbuswest2', key='ssbgz/lineorder0000_part_00.gz')
s3.ObjectSummary(bucket_name='awssampledbuswest2', key='ssbgz/lineorder0001_part_00.gz')
s3.ObjectSummary(bucket_name='awssampledbuswest2', key='ssbgz/lineorder0002_part_00.gz')
s3.ObjectSummary(bucket_name='awssampledbuswest2', key='ssbgz/lineorder0003_part_00.gz')
s3.ObjectSummary(bucket_name='awssampledbuswest2', key='ssbgz/lineorder0004_part_00.gz')
s3.ObjectSummary(bucket_name='awssampledbuswest2', key='ssbgz/lineorder0005_part_00.gz')
s3.ObjectSummary(bucket_name='awssampledbuswest2', key='ssbgz/lineorder0006_part_00.gz')
s3.ObjectSummary(bucket_name='awssampledbuswest2', key='ssbgz/lineorder0007_part_00.gz')
s3.ObjectSummary(bucket_name='awssampledbuswest2', key='s

In [5]:


from botocore.exceptions import ClientError
#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.1 Creating a new IAM Role
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::006692115187:role/dwhRole


In [None]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))


import pandas as pd
import boto3
import json



KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

import boto3

ec2 = boto3.resource('ec2',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-east-1'
                  )

redshift = boto3.client('redshift',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )


from botocore.exceptions import ClientError

#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)



In [6]:


try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)



In [13]:


def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)



Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-0a40083bdaf72f4d0
7,NumberOfNodes,4


In [14]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::006692115187:role/dwhRole


In [19]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)


ec2.SecurityGroup(id='sg-0240fa3a38c1e4c89')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


In [15]:
%load_ext sql



In [16]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh


'Connected: dwhuser@dwh'

In [4]:


#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!




ClusterNotFoundFault: An error occurred (ClusterNotFound) when calling the DeleteCluster operation: Cluster dwhcluster not found.

In [5]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!

{'ResponseMetadata': {'RequestId': '7404dd0b-e101-4765-ac39-9a669699f9bd',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '7404dd0b-e101-4765-ac39-9a669699f9bd',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Sun, 06 Mar 2022 21:53:08 GMT'},
  'RetryAttempts': 0}}

In [61]:
%sql DROP TABLE IF EXISTS staging_i94

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
Done.


[]

In [71]:
%sql DROP TABLE IF EXISTS staging_airport

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
Done.


[]

In [14]:
%%sql
CREATE TABLE IF NOT EXISTS staging_i94 (
    cicid       VARCHAR                 NULL,
    i94yr       VARCHAR                 NULL,
    i94mon      VARCHAR                 NULL,
    i94cit      VARCHAR                 NULL,
    i94res      VARCHAR                 NULL,
    i94port     VARCHAR                 NULL,
    arrdate     VARCHAR                 NULL,
    i94mode     VARCHAR                 NULL,
    i94addr     VARCHAR                 NULL,
    depdate     VARCHAR                 NULL,
    i94bir      VARCHAR                 NULL,
    i94visa     VARCHAR                 NULL,
    count       VARCHAR                NULL,
    dtadfile    VARCHAR                  NULL, 
    visapost    VARCHAR                 NULL,
    occup       VARCHAR                 NULL,
    entdepa     VARCHAR                 NULL,
    entdepd     VARCHAR                 NULL,
    entdepu     VARCHAR                 NULL,
    matflag     VARCHAR                 NULL,
    biryear     VARCHAR                NULL,
    dtaddto     VARCHAR                  NULL,
    gender      VARCHAR                 NULL,
    isnum       VARCHAR                 NULL,
    airline     VARCHAR                 NULL,
    admnum      VARCHAR                  NULL,
    fltno       VARCHAR                 NULL,
    visatype    VARCHAR                 NULL
);

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
Done.


[]

In [15]:
%%sql
CREATE TABLE IF NOT EXISTS staging_airport(
            ident               VARCHAR         NOT NULL SORTKEY DISTKEY,
            type                VARCHAR         NULL,
            name                VARCHAR         NULL,
            elevation_ft        VARCHAR         NULL,
            continent           VARCHAR         NULL,
            iso_country         VARCHAR         NULL,
            iso_region          VARCHAR         NULL,
            municipality        VARCHAR         NOT NULL,
            gps_code            VARCHAR         NULL,
            iata_code           VARCHAR         NOT NULL,
            local_code          VARCHAR         NULL,
            coordinates         VARCHAR         NOT NULL
);

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
Done.


[]

In [42]:
%%sql
CREATE TABLE IF NOT EXISTS braziliansinairports (
            municipality    VARCHAR             NOT NULL,
            lat             VARCHAR             NOT NULL DISTKEY SORTKEY,
            lon             VARCHAR             NOT NULL,
            count           VARCHAR             NOT NULL
);

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
Done.


[]

In [41]:
%sql DROP TABLE IF EXISTS braziliansinairports

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
Done.


[]

In [41]:
sql = "select * from staging_i94;"
dat = pd.read_sql_query(sql, conn)

In [42]:
dat

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,isnum,airline,admnum,fltno,visatype
0,11233.0,2016.0,4.0,111.0,689.0,HOU,20545.0,1.0,TX,20574.0,...,,M,1998.0,D/S,F,,UA,92437932730.0,00128,F1
1,16697.0,2016.0,4.0,117.0,689.0,FTL,20545.0,1.0,FL,20555.0,...,,M,1951.0,06292016,,,AD,55406329033.0,08704,WT
2,16705.0,2016.0,4.0,117.0,689.0,HOU,20545.0,1.0,NV,20554.0,...,,M,1980.0,06292016,F,,UA,55407248533.0,00128,WT
3,16713.0,2016.0,4.0,117.0,689.0,NYC,20545.0,1.0,NY,20549.0,...,,M,1978.0,06292016,F,,JJ,55407524433.0,08078,WT
4,16721.0,2016.0,4.0,117.0,689.0,NYC,20545.0,1.0,NY,20550.0,...,,M,1984.0,06292016,F,,AA,55406648033.0,00950,WT
5,16729.0,2016.0,4.0,117.0,689.0,ORL,20545.0,1.0,FL,20551.0,...,,M,1989.0,06292016,,,JJ,55450716933.0,08086,WT
6,16738.0,2016.0,4.0,117.0,689.0,MIA,20545.0,1.0,FL,20550.0,...,,M,1975.0,06292016,F,,JJ,55406697633.0,08090,WT
7,16746.0,2016.0,4.0,117.0,689.0,MIA,20545.0,1.0,FL,20558.0,...,,M,1973.0,06292016,M,,JJ,55406688433.0,08090,WT
8,16755.0,2016.0,4.0,117.0,689.0,MIA,20545.0,1.0,NV,20553.0,...,,M,1983.0,06292016,,,AA,55406103633.0,00904,WT
9,20445.0,2016.0,4.0,126.0,689.0,ATL,20545.0,1.0,NC,20554.0,...,,M,1941.0,06292016,M,,DL,55406761833.0,00104,WT


In [52]:
import psycopg2
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [18]:
query = """
COPY staging_airport
FROM 's3://astrogildopereirajunior/airport-codes_csv.csv'
credentials 'aws_iam_role={}'
csv;""".format(config.get('IAM_ROLE', 'ARN'))
print(query)
cur.execute(query)
conn.commit()


COPY staging_airport
FROM 's3://astrogildopereirajunior/airport-codes_csv.csv'
credentials 'aws_iam_role=arn:aws:iam::006692115187:role/dwhRole'
csv;


In [66]:
query = """
    COPY staging_i94
    FROM 's3://astrogildopereirajunior/brazil2.csv'
    credentials 'aws_iam_role={}'
    csv;
""".format(config.get('IAM_ROLE', 'ARN'))
cur.execute(query)
conn.commit()

In [19]:
cur = conn.cursor()
query = """
    COPY staging_i94
    FROM 's3://astrogildopereirajunior/staging_i94.csv'
    credentials 'aws_iam_role={}'
    csv;
""".format(config.get('IAM_ROLE', 'ARN'))
#.format(config.get('IAM_ROLE', 'ARN'))
cur.execute(query)
conn.commit()

In [66]:
dfbrazil = pd.read_csv('brazil.csv')

  interactivity=interactivity, compiler=compiler, result=result)


In [71]:
dfbrazil.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,628,737.0,2016.0,4.0,103.0,689.0,NEW,20545.0,1.0,NY,...,,M,1980.0,6292016,F,,UA,55406530000.0,148,WT
1,629,738.0,2016.0,4.0,103.0,689.0,NEW,20545.0,1.0,NY,...,,M,1982.0,6292016,F,,UA,55406520000.0,148,WT
2,9327,11229.0,2016.0,4.0,111.0,689.0,DAL,20545.0,1.0,NV,...,,M,1984.0,6292016,M,,AA,55406770000.0,962,WT
3,9328,11230.0,2016.0,4.0,111.0,689.0,LVG,20545.0,1.0,NV,...,,M,1982.0,6292016,,,CM,55449140000.0,252,WB
4,9329,11231.0,2016.0,4.0,111.0,689.0,MIA,20545.0,1.0,FL,...,,M,1970.0,6292016,,,JJ,55406660000.0,8090,WT


In [76]:
df2 = pd.read_csv("brazil2.csv")

  interactivity=interactivity, compiler=compiler, result=result)


In [87]:
len(df2['cicid'].unique())

134907

In [75]:
df2.to_csv('brazil2.csv',index=False)

In [None]:
sql = "select * from staging_airport;"
airport_codes = pd.read_sql_query(sql, conn)
sql = "select * from staging_i94;"
df_i94 = pd.read_sql_query(sql, conn)
print("queried both tables")
brazil = df_i94[df_i94['i94res']==689]
brazil['i94port'] = brazil['i94port'].str.replace('NYC','JFK')
brazil['i94port'] = brazil['i94port'].str.replace('LOS','LAX')
braziliansInAirports = pd.merge(brazil,airport_codes,left_on=['i94port'], right_on=['iata_code'],how='inner')
braziliansInAirports = braziliansInAirports.groupby(['municipality','coordinates','iata_code']).size().to_frame(name = 'count').reset_index().sort_values(by=['count'],ascending=False)
print("processed part 1")
df = braziliansInAirports
new = df["coordinates"].str.split(", ", n = 1, expand = True)
df['lat'] = new[1]
df['lon'] = new[0]
df = df.groupby(['municipality', 'lat','lon']).size().to_frame(name = 'count').reset_index()

In [31]:
df

Unnamed: 0,municipality,lat,lon,count
0,Agadir,30.325000762939453,-9.413069725036621,14
1,Aguadilla,18.49489974975586,-67.12940216064453,8
2,Angus Downs Station,-25.0325,132.2748,31
3,Atlanta,33.6367,-84.428101,5021
4,Auburn/Lewiston,44.048500061,-70.2835006714,20
5,Austin,30.194499969482422,-97.6698989868164,2
6,Barcelona,10.111111,-64.692222,186
7,Batman,37.929000854499996,41.1166000366,34
8,Bedford,42.47000122,-71.28900146,13
9,Boma,-5.854000091552734,13.064000129699707,32


In [44]:
df['count'] = df['count'].astype(str)

In [45]:
df.dtypes

municipality    object
lat             object
lon             object
count           object
dtype: object

In [48]:
query = """
INSERT into braziliansinairports(municipality, lat, lon, count) values('%s','%s','%s','%s');
""" % (df['municipality'], df['lat'], df['lon'],df['count'])
cur.execute(query)
conn.commit()

DataError: value too long for type character varying(256)


In [50]:
upload_file('braziliansinairports.csv','astrogildopereirajunior')

True

In [53]:
cur = conn.cursor()
query = """
    COPY braziliansinairports
    FROM 's3://astrogildopereirajunior/braziliansinairports.csv'
    credentials 'aws_iam_role={}'
    csv;
""".format(config.get('IAM_ROLE', 'ARN'))
cur.execute(query)
conn.commit()

In [None]:
df.to_csv("inairports.csv",index=False)

In [80]:
%sql select * from stl_load_errors;

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
6 rows affected.


userid,slice,tbl,starttime,session,query,filename,line_number,colname,type,col_length,position,raw_line,raw_field_value,err_code,err_reason,is_partial,start_offset
100,4,101843,2022-03-05 23:14:48.778717,1073884423,2434,s3://astrogildopereirajunior/brazil.csv,1,cicid,numeric,"18, 0",0,",cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype",,1213,Missing data for not-null field,0,0
100,3,101834,2022-03-05 22:51:19.810158,1073802524,2121,s3://astrogildopereirajunior/brazil.csv,1,i94yr,numeric,"18, 0",1,",cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype",cicid,1207,"Invalid digit, Value 'c', Pos 0, Type: Decimal",0,0
100,3,101854,2022-03-05 23:21:17.725942,1073778433,2545,s3://astrogildopereirajunior/brazil.csv,1,i94yr,numeric,"18, 0",1,",cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype",cicid,1207,"Invalid digit, Value 'c', Pos 0, Type: Decimal",0,0
100,0,101856,2022-03-05 23:33:04.292588,1073926142,2672,s3://astrogildopereirajunior/brazil2.csv,1,cicid,numeric,"18, 0",0,"cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype",cicid,1207,"Invalid digit, Value 'c', Pos 0, Type: Decimal",0,0
100,7,101841,2022-03-05 23:07:48.710011,1073778215,2331,s3://astrogildopereirajunior/brazil.csv,1,i94yr,numeric,"18, 0",1,",cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype",cicid,1207,"Invalid digit, Value 'c', Pos 0, Type: Decimal",0,0
100,7,101850,2022-03-05 23:18:25.210247,1073909489,2503,s3://astrogildopereirajunior/brazil.csv,1,i94yr,numeric,"18, 0",1,",cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype",cicid,1207,"Invalid digit, Value 'c', Pos 0, Type: Decimal",0,0


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [17]:
%sql select * from staging_airport LIMIT 10;

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
10 rows affected.


ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
00CN,heliport,Kitchen Creek Helibase Heliport,3350,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"
00CO,closed,Cass Field,4830,,US,US-CO,Briggsdale,,,,"-104.344002, 40.622202"
00FA,small_airport,Grass Patch Airport,53,,US,US-FL,Bushnell,00FA,,00FA,"-82.21900177001953, 28.64550018310547"
00ID,small_airport,Delta Shores Airport,2064,,US,US-ID,Clark Fork,00ID,,00ID,"-116.21399688720703, 48.145301818847656"
00PN,small_airport,Ferrell Field,1301,,US,US-PA,Mercer,00PN,,00PN,"-80.211111, 41.2995"
00WN,small_airport,Hawks Run Airport,2900,,US,US-WA,Asotin,00WN,,00WN,"-117.2490005493164, 46.25"
01CA,heliport,Lugo Substation Heliport,3733,,US,US-CA,Hesperia,01CA,,01CA,"-117.370058745, 34.368240591699994"
01GA,heliport,Medical Center Heliport,319,,US,US-GA,Columbus,01GA,,01GA,"-84.9791030883789, 32.47930145263672"
01ID,small_airport,Lava Hot Springs Airport,5268,,US,US-ID,Lava Hot Springs,01ID,,01ID,"-112.031998, 42.6082"


In [27]:
%sql select COUNT(*) from staging_i94 WHERE i94port IS NULL;

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
0


In [22]:
%sql select * from staging_i94 LIMIT 10;

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
10 rows affected.


cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,isnum,airline,admnum,fltno,visatype
17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401,,,O,O,,M,2012.0,9302016,,,AA,92468463130.0,199,B2
27.0,2016.0,4.0,101.0,101.0,BOS,20545.0,1.0,MA,20549.0,58.0,1.0,1.0,20160401,TIA,,G,O,,M,1958.0,4062016,M,,LH,92478763830.0,422,B1
36.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20561.0,37.0,2.0,1.0,20160401,TIA,,G,O,,M,1979.0,9302016,M,,TK,92506258230.0,1,B2
48.0,2016.0,4.0,101.0,117.0,NYC,20545.0,1.0,NY,20572.0,68.0,2.0,1.0,20160401,FLR,,G,O,,M,1948.0,9302016,M,,AA,92473604830.0,199,B2
56.0,2016.0,4.0,102.0,102.0,NYC,20545.0,1.0,NY,20549.0,49.0,2.0,1.0,20160401,,,G,O,,M,1967.0,6292016,F,,KL,55446650533.0,641,WT
72.0,2016.0,4.0,103.0,103.0,ATL,20545.0,1.0,GA,20554.0,55.0,2.0,1.0,20160401,,,G,O,,M,1961.0,6292016,M,,LH,55428013833.0,444,WT
81.0,2016.0,4.0,103.0,103.0,BOS,20545.0,1.0,MA,20550.0,52.0,2.0,1.0,20160401,,,O,O,,M,1964.0,6292016,,,LH,55431242233.0,422,WT
89.0,2016.0,4.0,103.0,103.0,CLT,20545.0,1.0,FL,20553.0,50.0,2.0,1.0,20160401,,,G,O,,M,1966.0,6292016,F,,AA,55429087133.0,731,WT
97.0,2016.0,4.0,103.0,103.0,DAL,20545.0,1.0,TX,20560.0,23.0,2.0,1.0,20160401,,,G,O,,M,1993.0,6292016,M,,BA,55450569233.0,193,WT
106.0,2016.0,4.0,103.0,103.0,DET,20545.0,1.0,MI,20556.0,61.0,2.0,1.0,20160401,,,G,O,,M,1955.0,6292016,F,,LH,55426242833.0,442,WT


In [54]:
%%time
%%sql
SELECT *
FROM braziliansinairports
LIMIT 10

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
10 rows affected.
CPU times: user 801 µs, sys: 4.28 ms, total: 5.08 ms
Wall time: 5.96 s


municipality,lat,lon,count
Halali,-19.0285,16.4585,1
San Miguel de TucumÃ¡n,-26.8409,-65.104897,7
Savannah,32.12760162,-81.20210266,12
Dallas,32.847099,-96.851799,4231
Santa Ana,33.67570114,-117.8679962,7
Palm Springs,33.8297004699707,-116.50700378417969,1565
Ruse,43.6948013306,26.056699752800004,3
municipality,lat,lon,count
Port BergÃ©,-15.584286,47.623587,249
Dourados,-22.2019,-54.926601,2


In [56]:
%sql SELECT COUNT(*) FROM braziliansinairports WHERE municipality IS NULL;

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
0


In [57]:
%sql SELECT COUNT(*) FROM braziliansinairports WHERE lat IS NULL OR lon IS NULL;

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
0


In [59]:
%sql SELECT COUNT(*) AS rows FROM braziliansinairports;

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
1 rows affected.


rows
111


In [60]:
%sql SELECT COUNT(*) AS rows FROM staging_i94;

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
1 rows affected.


rows
3096314


In [62]:
%sql SELECT COUNT(*) AS rows FROM staging_airport;

 * postgresql://dwhuser:***@dwhcluster.cl2qwsv8rq2g.us-east-1.redshift.amazonaws.com:5439/dwh
1 rows affected.


rows
55076


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

The final list of columns on the processed dataframe is:
* municipality: the municipality in which arrivals from brazil took place
* lat: the latitude of the municipality
* lon: the longitude of the municipality
* count: the count of arrivals from brazil in an airport in said municipality

a few other columns were also extremely important for joining tables and reaching that conclusion, such as:
* i94port: the i94 port of entry for the unit of immigrant from the brazil in the provided i94 US National Tourism and Trade Office [dataset](https://travel.trade.gov/research/reports/i94/historical/2016.html).
* iata_code: the provided airport codes from this [github](https://datahub.io/core/airport-codes#data)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.