# Drafting ETL code

## set up connection and import libraries

In [1]:
import configparser
import psycopg2
import boto3
import pandas as pd
import numpy
import fastparquet
#import pyarrow
from sql_queries import  drop_table_queries, create_table_queries, insert_querires,drop_raw_queries

In [2]:
##use pip install numpy==1.16.1 to update numpy version, otherwise writing to parquet will throw error
numpy.version.version

'1.16.1'

In [3]:
config = configparser.ConfigParser()
config.read('dwh.cfg')

['dwh.cfg']

In [4]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [5]:
%load_ext sql
conn_string="postgresql://{}:{}@{}:{}/{}".format(config.get("CLUSTER","DB_USER"),
                                               config.get("CLUSTER",'DB_PASSWORD'),
                                               config.get("CLUSTER",'HOST'),
                                               config.get("CLUSTER",'DB_PORT'),
                                               config.get("CLUSTER",'DB_NAME'))
print(conn_string)
%sql $conn_string

postgresql://awsuser:Passw0rd@dwhcluster.cbu6otbv3egu.us-west-2.redshift.amazonaws.com:5439/dev


'Connected: awsuser@dev'

In [6]:
s3path = config.get('S3','LOCATION')
s3path

's3://immigrationdatamodel'

## set up s3 access

In [7]:
s3 = boto3.resource(
    service_name='s3',
    region_name='us-west-2',
    aws_access_key_id=config.get("AWS","KEY"),
    aws_secret_access_key=config.get("AWS","SECRET")
)

In [8]:
for bucket in s3.buckets.all():
    print(bucket.name)

aws-emr-resources-133824271603-us-east-1
aws-logs-133824271603-us-east-1
immigrationdatamodel


## clean up database

In [9]:
def drop_tables(cur, conn):
    '''
    this function drops tables if exists
    '''
    
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()

In [10]:
drop_tables(cur, conn)

## create dummy tables

In [11]:
def create_tables(cur, conn):
    '''
    this function drops tables if exists
    '''
    
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()

In [12]:
create_tables(cur, conn)

## load fact table i94

In [13]:
#this is very large, use data sample for test purpose
#raw_i94 = pd.read_sas('../../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat','sas7bdat',encoding="ISO-8859-1")

In [19]:
raw_i94 = pd.read_csv('../rawdata/immigration_data_sample.csv')
raw_i94 = raw_i94.drop(raw_i94.columns[0], axis=1)

In [20]:
i94=raw_i94[['cicid'
                    ,'i94yr'
                    ,'i94mon'
                    ,'i94cit'
                    ,'i94res'
                    ,'i94port'
                    ,'arrdate'
                    ,'i94mode'
                    ,'i94addr'
                    ,'depdate'
                    ,'i94visa'
                    ,'dtadfile'
                    ,'visapost'
                    ,'dtaddto'
                    ,'airline'
                    ,'admnum'
                    ,'fltno'
                    ,'visatype']]

In [21]:
i94['arrdate'] = pd.to_timedelta(i94['arrdate'],unit='D') + pd.Timestamp('1960-1-1')
i94['depdate'] = pd.to_timedelta(i94['depdate'],unit='D') + pd.Timestamp('1960-1-1')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  


In [22]:
i94.shape

(1000, 18)

In [23]:
i94.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94visa,dtadfile,visapost,dtaddto,airline,admnum,fltno,visatype
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,2016-04-22,1.0,HI,2016-04-29,2.0,20160422,,7202016,JL,56582670000.0,00782,WT
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,2016-04-23,1.0,TX,2016-04-24,2.0,20160423,MTR,10222016,*GA,94362000000.0,XBLNG,B2
2,1195600.0,2016.0,4.0,148.0,112.0,OGG,2016-04-07,1.0,FL,2016-04-27,2.0,20160407,,7052016,LH,55780470000.0,00464,WT
3,5291768.0,2016.0,4.0,297.0,297.0,LOS,2016-04-28,1.0,CA,2016-05-07,2.0,20160428,DOH,10272016,QR,94789700000.0,00739,B2
4,985523.0,2016.0,4.0,111.0,111.0,CHM,2016-04-06,3.0,NY,2016-04-09,2.0,20160406,,7042016,,42322570000.0,LAND,WT


In [19]:
#directly writing to redshift is very slow, so write to s3 then copy to redshift
#i94.to_sql('i94', conn_string, index=False, if_exists='replace')

In [24]:
i94.to_csv('../dataoutput/i94.txt',index = False)
s3.Bucket('immigrationdatamodel').upload_file(Filename='../dataoutput/i94.txt', Key='i94.txt')

In [25]:
raw_i94.to_csv('../dataoutput/raw_i94.txt',index = False)
s3.Bucket('immigrationdatamodel').upload_file(Filename='../dataoutput/raw_i94.txt', Key='raw_i94.txt')

## load dim table citydemo

In [26]:
raw_citydemo = pd.read_csv('../rawdata/us-cities-demographics.csv',sep = ';')
citydemo_unchanged = raw_citydemo.iloc[:,0:10].drop_duplicates()
citydemo_stacked = raw_citydemo.iloc[:,[0,1,10,11]]

In [27]:
citydemo_stacked['Race'] = citydemo_stacked['Race'].replace(['American Indian and Alaska Native',
                                               'Black or African-American',
                                               'Hispanic or Latino'],
                                              ['Native','Black','Hispa'])

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  after removing the cwd from sys.path.


In [28]:
citydemo_unstacked = citydemo_stacked.pivot_table(index=['City','State'], 
                                                          columns="Race", 
                                                          values="Count", 
                                                          aggfunc='first').reset_index()

In [29]:
citydemo_unstacked.head()

Race,City,State,Asian,Black,Hispa,Native,White
0,Abilene,Texas,2929.0,14449.0,33222.0,1813.0,95487.0
1,Akron,Ohio,9033.0,66551.0,3684.0,1845.0,129192.0
2,Alafaya,Florida,10336.0,6577.0,34897.0,,63666.0
3,Alameda,California,27984.0,7364.0,8265.0,1329.0,44232.0
4,Albany,Georgia,650.0,53440.0,1783.0,445.0,17160.0


In [30]:
citydemo = citydemo_unchanged.merge(citydemo_unstacked,on = ['City','State'])
citydemo.columns = citydemo.columns.str.replace(' ', '')
citydemo.columns = citydemo.columns.str.replace('-', '')

In [42]:
citydemo['citystate']=citydemo['City'].str.lower().str.replace(" ","")+citydemo['StateCode'].str.lower().str.replace(" ","")

In [43]:
citydemo.head()

Unnamed: 0,City,State,MedianAge,MalePopulation,FemalePopulation,TotalPopulation,NumberofVeterans,Foreignborn,AverageHouseholdSize,StateCode,Asian,Black,Hispa,Native,White,citystate
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,8841.0,21330.0,25924.0,1084.0,37756.0,silverspringmd
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,30473.0,3917.0,2566.0,351.0,58723.0,quincyma
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,4759.0,18191.0,3430.0,,61869.0,hooveral
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,24519.0,24437.0,65823.0,2789.0,111832.0,ranchocucamongaca
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,7349.0,144961.0,100432.0,2268.0,76402.0,newarknj


In [44]:
citydemo.to_csv('../dataoutput/citydemo.txt',index = False)
s3.Bucket('immigrationdatamodel').upload_file(Filename='../dataoutput/citydemo.txt', Key='citydemo.txt')

## load dim table airport

In [45]:
raw_airportcode = pd.read_csv('../rawdata/airport-codes_csv.csv')
airportcode = raw_airportcode.assign(iso_region2 = lambda x: x['iso_region'].str.split('-',1).str[-1])
airportcode[['lat','lon']] = airportcode['coordinates'].str.split(',',1,expand = True)

In [49]:
airportcode['citystate']=airportcode['municipality'].str.lower().str.replace(" ","")+airportcode['iso_region2'].str.lower().str.replace(" ","")

In [50]:
airportcode.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,iso_region2,lat,lon,citystate
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125",PA,-74.93360137939453,40.07080078125,bensalempa
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022",KS,-101.473911,38.704022,leotiks
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968",AK,-151.695999146,59.94919968,anchorpointak
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172",AL,-86.77030181884766,34.86479949951172,harvestal
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087",AR,-91.254898,35.6087,newportar


In [52]:
airportcode.to_csv('../dataoutput/airport.txt',index = False)
s3.Bucket('immigrationdatamodel').upload_file(Filename='../dataoutput/airport.txt', Key='airport.txt')

## load dim table port

In [53]:
raw_port = pd.read_csv('../rawdata/USport.txt',sep = '|')

In [55]:
raw_port['citystate']=raw_port['portname'].str.lower().str.replace(" ","")+raw_port['state'].str.lower().str.replace(" ","")

In [56]:
raw_port.head()

Unnamed: 0,port,portname,state,citystate
0,ALC,ALCAN,AK,alcanak
1,ANC,ANCHORAGE,AK,anchorageak
2,BAR,BAKER AAF - BAKER ISLAND,AK,bakeraaf-bakerislandak
3,DAC,DALTONS CACHE,AK,daltonscacheak
4,PIZ,DEW STATION PT LAY DEW,AK,dewstationptlaydewak


In [31]:
raw_port.to_csv('../dataoutput/usport.txt',index = False)
s3.Bucket('immigrationdatamodel').upload_file(Filename='../dataoutput/usport.txt', Key='usport.txt')

## load dim table countries

In [32]:
raw_countries = pd.read_csv('../rawdata/countries.txt',sep = '|')

In [33]:
raw_countries.to_csv('../dataoutput/countries.txt',index = False)
s3.Bucket('immigrationdatamodel').upload_file(Filename='../dataoutput/countries.txt', Key='countries.txt')

## insert records into redshift from s3

In [34]:
for query in insert_querires:
    cur.execute(query)
    conn.commit()

## drop raw table

In [35]:
for query in drop_raw_queries:
    cur.execute(query)
    conn.commit()

## debug

In [35]:
%%sql
select * from stl_load_errors
order by starttime desc

 * postgresql://awsuser:***@dwhcluster.cbu6otbv3egu.us-west-2.redshift.amazonaws.com:5439/dev
17 rows affected.


userid,slice,tbl,starttime,session,query,filename,line_number,colname,type,col_length,position,raw_line,raw_field_value,err_code,err_reason
100,3,101795,2021-05-15 03:05:06.940823,21016,933,s3://immigrationdatamodel/usport.txt,2,,,,4,"ALC,ALCAN,AK",,1202,Extra column(s) found
100,4,101768,2021-05-15 02:58:56.206408,20394,830,s3://immigrationdatamodel/usport.txt,2,port,float8,0.0,0,"ALC,ALCAN,AK",ALC,1207,"Invalid digit, Value 'A', Pos 0, Type: Double"
100,0,101742,2021-05-15 02:55:55.491188,19589,772,s3://immigrationdatamodel/countries.txt,2,i94yr,float8,0.0,0,"582,""MEXICO Air Sea, and Not Reported (I-94, no land arrivals)""","MEXICO Air Sea, and Not Reported (I-94, no land arrivals)",1214,Delimiter not found
100,0,101717,2021-05-15 02:50:01.343425,19165,688,s3://immigrationdatamodel/raw_i94.txt,2,,,,135,"2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,07202016,F,,JL,56582674633.0,00782,WT",,1202,Extra column(s) found
100,4,101701,2021-05-15 02:47:55.029562,19008,654,s3://immigrationdatamodel/raw_i94.txt,2,arrdate,float8,0.0,41,"2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,07202016,F,,JL,56582674633.0,00782,WT",HHW,1207,"Invalid digit, Value 'H', Pos 0, Type: Double"
100,0,101685,2021-05-15 02:45:39.264709,18344,628,s3://immigrationdatamodel/raw_i94.txt,2,,,,92,"2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,07202016,F,,JL,56582674633.0,00782,WT",,1202,Extra column(s) found
100,3,101667,2021-05-15 02:38:33.661189,17461,537,s3://immigrationdatamodel/i94.txt,2,,,,108,0|4084316.0|2016.0|4.0|209.0|209.0|HHW|2016-04-22|1.0|HI|2016-04-29|2.0|20160422||07202016|JL|56582674633.0|00782|WT,,1202,Extra column(s) found
100,7,101665,2021-05-15 02:36:00.966917,17461,507,s3://immigrationdatamodel/i94.txt,2,,,,108,"0,4084316.0,2016.0,4.0,209.0,209.0,HHW,2016-04-22,1.0,HI,2016-04-29,2.0,20160422,,07202016,JL,56582674633.0,00782,WT",,1202,Extra column(s) found
100,7,101649,2021-05-15 02:34:26.652415,17350,475,s3://immigrationdatamodel/i94.txt,2,i94mode,float8,0.0,39,"0,4084316.0,2016.0,4.0,209.0,209.0,HHW,2016-04-22,1.0,HI,2016-04-29,2.0,20160422,,07202016,JL,56582674633.0,00782,WT",2016-04-22,1207,"Invalid digit, Value '-', Pos 4, Type: Double"
100,7,101633,2021-05-15 02:32:23.785991,17055,447,s3://immigrationdatamodel/i94.txt,2,arrdate,date,0.0,35,"0,4084316.0,2016.0,4.0,209.0,209.0,HHW,2016-04-22,1.0,HI,2016-04-29,2.0,20160422,,07202016,JL,56582674633.0,00782,WT",HHW,1205,Invalid Date Format - length must be 10 or more
