-
Notifications
You must be signed in to change notification settings - Fork 0
/
sql_queries.py
220 lines (187 loc) · 9.06 KB
/
sql_queries.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import configparser
# CONFIG
config = configparser.ConfigParser()
config.read('dwh.cfg')
"""
Create all the tables
"""
stagging_immigration_table_create= ("""Create table IF NOT EXISTS immigration_stagging
(
cicid DOUBLE PRECISION ,
i94yr DOUBLE PRECISION ,
i94mon DOUBLE PRECISION ,
i94cit DOUBLE PRECISION ,
i94res DOUBLE PRECISION ,
i94port varchar(254),
arrdate DOUBLE PRECISION ,
i94mode DOUBLE PRECISION ,
i94addr varchar(254),
depdate DOUBLE PRECISION ,
i94bir DOUBLE PRECISION ,
i94visa DOUBLE PRECISION ,
count DOUBLE PRECISION ,
dtadfile varchar(254),
visapost varchar(254),
occup varchar(254),
entdepa varchar(254),
entdepd varchar(254),
entdepu varchar(254),
matflag varchar(254),
biryear DOUBLE PRECISION ,
dtaddto varchar(254),
gender varchar(1),
insnum varchar(254),
airline varchar(254),
admnum DOUBLE PRECISION ,
fltno varchar(254),
visatype varchar(254),
usa_states varchar(254),
country varchar(254),
converted_date timestamp,
string_date varchar(254),
year BIGINT,
month BIGINT,
country_code BIGINT
)
""")
stagging_usa_state_table_create = ("""CREATE TABLE IF NOT EXISTS usa_demographic_stagging(
City varchar(254),
State varchar(254),
Median_Age DOUBLE PRECISION,
Male_Population DOUBLE PRECISION,
Female_Population DOUBLE PRECISION,
Total_Population int,
Number_of_Veterans DOUBLE PRECISION,
Foreign_born DOUBLE PRECISION,
Average_Household_Size DOUBLE PRECISION,
State_Code varchar(254),
Race varchar(254),
Count int
)
""")
dim_visa_create = (""" create table IF NOT EXISTS dim_visa
(
visa_id INT IDENTITY(1,1) sortkey,
visa_type varchar(254),
PRIMARY KEY (visa_id)
)diststyle all
""")
dim_date_create = (""" create table IF NOT EXISTS dim_date
(
date_id INT IDENTITY(1,1) sortkey ,
immigration_date varchar(254),
month DOUBLE PRECISION,
year DOUBLE PRECISION ,
PRIMARY KEY (date_id)
)diststyle all
""")
dim_usa_state_create = (""" create table IF NOT EXISTS dim_usa_state
(
usa_state_id INT IDENTITY(1,1) sortkey,
state_code varchar(254) ,
state varchar(254),
PRIMARY KEY (usa_state_id)
)diststyle all
""")
dim_country_create = (""" create table IF NOT EXISTS dim_country
(
country_id INT sortkey,
country varchar(254),
PRIMARY KEY (country_id)
)diststyle all
""")
fact_immigration_create = (""" create table IF NOT EXISTS fact_immigration
(
cicid DOUBLE PRECISION sortkey,
country_id INT REFERENCES dim_country(country_id),
usa_state_id INT REFERENCES dim_usa_state(usa_state_id),
date_id INT REFERENCES dim_date(date_id),
visa_id INT REFERENCES dim_visa(visa_id) distkey,
PRIMARY KEY (cicid)
)
""")
# STAGING TABLES
"""
Load the data from S3 into staging tables (Redshift)
"""
staging_immigration_copy = (""" copy immigration_stagging from '{}'
credentials 'aws_iam_role={}'
FORMAT AS PARQUET
""").format(config.get('S3','immmigration_data'),
config.get('IAM_ROLE', 'ARN')
)
staging_usa_demographic_copy = (""" copy usa_demographic_stagging from '{}'
credentials 'aws_iam_role={}'
CSV
""").format(config.get('S3','demographic_data'),
config.get('IAM_ROLE', 'ARN')
)
# FINAL TABLES
"""
Insert the data from Staging table into fact and dimnesion table .
"""
dim_visa_table_insert = (
"""
INSERT INTO dim_visa (visa_type)
SELECT DISTINCT visatype
FROM immigration_stagging a
WHERE a.visatype NOT IN (SELECT c.visa_type FROM dim_visa c)
"""
)
dim_date_table_insert = (""" INSERT INTO dim_date (immigration_date,year, month)
SELECT DISTINCT
a.string_date,
a.year,
a.month
FROM immigration_stagging a
WHERE a.string_date NOT IN (SELECT immigration_date FROM dim_date)
""")
dim_country_table_insert = (""" INSERT INTO dim_country (country_id,country)
SELECT DISTINCT
a.country_code,
a.country
FROM immigration_stagging a
WHERE a.country_code NOT IN (SELECT country_id FROM dim_country)
""")
dim_usa_state_table_insert = (""" INSERT INTO dim_usa_state (state_code, state)
SELECT DISTINCT
a.state_code,
a.state
FROM usa_demographic_stagging a
WHERE a.state_code NOT IN (SELECT state_code FROM dim_usa_state)
""")
dim_others_usa_state_table_insert = (""" INSERT INTO dim_usa_state (state_code, state)
values
('Other','Other')
""")
fact_immigration_table_insert = (""" INSERT INTO fact_immigration
(
cicid,
country_id,
usa_state_id,
date_id,
visa_id
)
SELECT DISTINCT
a.cicid,
d.country_id,
e.usa_state_id,
c.date_id,
b.visa_id
FROM immigration_stagging a
,dim_visa b
,dim_date c
,dim_country d
,dim_usa_state e
WHERE a.visatype = b.visa_type
and a.string_date = c.immigration_date
and a.country = d.country
and a.usa_states = e.state_code
and a.cicid NOT IN (SELECT cicid FROM fact_immigration)
""")
validate_immigration_stagging_table = (""" SELECT COUNT(*) FROM immigration_stagging """)
validate_fact_table = (""" SELECT COUNT(*) FROM fact_immigration """)
# QUERY LISTS
create_table_queries = [stagging_immigration_table_create, stagging_usa_state_table_create,dim_visa_create,dim_date_create,dim_usa_state_create,dim_country_create,fact_immigration_create]
copy_table_queries = [staging_immigration_copy, staging_usa_demographic_copy]
insert_table_queries = [ dim_visa_table_insert, dim_date_table_insert, dim_country_table_insert, dim_usa_state_table_insert,dim_others_usa_state_table_insert,fact_immigration_table_insert]