-
Notifications
You must be signed in to change notification settings - Fork 0
/
etl.py
295 lines (228 loc) · 9.25 KB
/
etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import os
import logging as log
import configparser
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
# my models
import wrangle
# setup log
logger = log.getLogger()
logger.setLevel(log.INFO)
# AWS configuration
config = configparser.ConfigParser()
config.read("configuration.cfg", encoding="utf-8-sig")
os.environ["AWS_ACCESS_KEY_ID"] = config["AWS"]["AWS_ACCESS_KEY_ID"]
os.environ["AWS_SECRET_ACCESS_KEY"] = config["AWS"]["AWS_SECRET_ACCESS_KEY"]
SOURCE = config["S3"]["SOURCE_S3_BUCKET"]
DESTINATION = config["S3"]["DEST_S3_BUCKET"]
# data processing functions
def create_spark_session():
# spark = (
# SparkSession.builder.config(
# "spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11"
# )
# .enableHiveSupport()
# .getOrCreate()
# )
spark = (
SparkSession.builder.config(
"spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0"
)
.enableHiveSupport()
.getOrCreate()
)
return spark
def process_immigration_data(spark, input_data, output_data):
"""Process immigration data to create fact_immigration, dim_immigration_airline tables and dim_immigration_personal
Arguments:
spark {object}: The SparkSession object.
input_data {object}: The Source of S3 endpoint.
output_data {object}: The Target of S3 endpoint.
Returns:
None
"""
log.info("Start processing immigration dataset")
# load immigration data file
immigration_data_path = os.path.join(
input_data + "/18-83510-I94-Data-2016/*.sas7bdat"
)
try:
df = spark.read.format("com.github.saurfang.sas.spark").load(
immigration_data_path
)
except:
df = spark.read.parquet("sas_data")
df = wrangle.drop_immigration_empty_columns(df)
log.info("Start processing fact_immigration table")
# extract columns to create fact_immigration table
fact_immigration = (
df.select(
"cicid",
"i94yr",
"i94mon",
"i94addr",
"i94port",
"depdate",
"arrdate",
"i94visa",
"i94mode",
)
.distinct()
.withColumn("immigration_id", monotonically_increasing_id())
)
# wrangling data to match data model
fact_immigration = wrangle.fact_immigration(fact_immigration)
# writing fact_immigration table to parquet files partitioned by state and city
fact_immigration.write.mode("overwrite").partitionBy("state_code").parquet(
path=output_data + "fact_immigration"
)
log.info("Start processing dim_immigration_personal")
# extract columns to create dim_immigration_personal table
dim_immigration_personal = (
df.select("cicid", "i94cit", "i94res", "biryear", "gender")
.distinct()
.withColumn("immi_personal_id", monotonically_increasing_id())
)
# data wrangling to match data model
dim_immigration_personal = wrangle.dim_immigration_personal(
dim_immigration_personal
)
# writing dim_immigration_personal table to parquet files
dim_immigration_personal.write.mode("overwrite").parquet(
path=output_data + "dim_immigration_personal"
)
log.info("Start processing dim_immigration_airline")
# extract columns to create dim_immigration_airline table
dim_immigration_airline = (
df.select("cicid", "airline", "admnum", "fltno", "visatype")
.distinct()
.withColumn("immi_airline_id", monotonically_increasing_id())
)
# data wrangling to match data model
dim_immigration_airline = wrangle.dim_immigration_airline(dim_immigration_airline)
# writing dim_immigration_airline table to parquet files
dim_immigration_airline.write.mode("overwrite").parquet(
path=output_data + "dim_immigration_airline"
)
def process_label_descriptions(spark, input_data, output_data):
"""Parsing label description file to get country, city and state codes.
Arguments:
spark {object}: The SparkSession object.
input_data {object}: The Source of S3 endpoint.
output_data {object}: The Target of S3 endpoint.
Returns:
None
"""
log.info("Start processing label descriptions")
label_file_path = os.path.join(input_data + "/I94_SAS_Labels_Descriptions.SAS")
with open(label_file_path) as file:
contents = file.readlines()
country_code = wrangle.country_code(contents)
spark.createDataFrame(country_code.items(), ["code", "country"]).write.mode(
"overwrite"
).parquet(path=output_data + "country_code")
city_code = wrangle.city_code(contents)
spark.createDataFrame(city_code.items(), ["code", "city"]).write.mode(
"overwrite"
).parquet(path=output_data + "city_code")
state_code = wrangle.state_code(contents)
spark.createDataFrame(state_code.items(), ["code", "state"]).write.mode(
"overwrite"
).parquet(path=output_data + "state_code")
def process_temperature_data(spark, input_data, output_data):
"""Process temperature data to create dim_temperature table.
Arguments:
spark {object}: The SparkSession object.
input_data {object}: The Source of S3 endpoint.
output_data {object}: The Target of S3 endpoint.
Returns:
None
"""
log.info("Start processing dim_temperature table")
# read temperature data file
tempe_data_path = os.path.join(input_data + "/GlobalLandTemperaturesByCity.csv")
df = spark.read.csv(tempe_data_path, header=True)
df = df.where(df["Country"] == "United States")
df = wrangle.drop_temperature_nulls_and_duplicates_rows(df)
dim_temperature = df.select(
["dt", "AverageTemperature", "AverageTemperatureUncertainty", "City", "Country"]
).distinct()
dim_temperature = wrangle.dim_temperature(dim_temperature)
# writing dim_temperature table to parquet files.
dim_temperature.write.mode("overwrite").parquet(
path=output_data + "dim_temperature"
)
def process_demography_data(spark, input_data, output_data):
"""Process demograpy data to create dim_demog_population and dim_demog_statistics tables.
Arguments:
spark {object}: The SparkSession object
input_data {object}: The Source of S3 endpoint
output_data {object}: The Target of S3 endpoint
Returns:
None
"""
log.info("Start processing dim_demog_populaiton table.")
# read demography data file
demog_data_path = os.path.join(input_data + "/us-cities-demographics.csv")
df = (
spark.read.format("csv")
.options(header=True, delimiter=";")
.load(demog_data_path)
)
df = wrangle.drop_demographics_nulls_and_duplicates_rows(df)
dim_demog_population = (
df.select(
[
"City",
"State",
"Male Population",
"Female Population",
"Number of Veterans",
"Foreign-born",
"Race",
]
)
.distinct()
.withColumn("demog_pop_id", monotonically_increasing_id())
)
dim_demog_population = wrangle.dim_demog_population(dim_demog_population)
# writing dim_demog_population table to parquet files.
dim_demog_population.write.mode("overwrite").parquet(
path=output_data + "dim_demog_population"
)
log.info("Start processing dim_demog_statistics")
dim_demog_statistics = (
df.select(["City", "State", "Median Age", "Average Household Size"])
.distinct()
.withColumn("demog_stat_id", monotonically_increasing_id())
)
dim_demog_statistics = wrangle.dim_demog_statistics(dim_demog_statistics)
# writing dim_demog_statistics table to parquet files.
dim_demog_statistics.write.mode("overwrite").parquet(
path=output_data + "dim_demog_statistics"
)
def process_datasets(spark, input_data, output_data):
process_immigration_data(spark, input_data, output_data)
process_label_descriptions(spark, input_data, output_data)
process_temperature_data(spark, input_data, output_data)
process_demography_data(spark, input_data, output_data)
def process_datasets_local(spark):
log.info("Processing datasets from local worksapce...")
output_data = "output_data/"
input_data = "../../data"
process_immigration_data(spark, input_data, output_data)
input_data = "../../data"
process_label_descriptions(spark, input_data, output_data)
input_data = "../../data2"
process_temperature_data(spark, input_data, output_data)
input_data = "sample_data"
process_demography_data(spark, input_data, output_data)
def main():
spark = create_spark_session()
input_data = SOURCE
output_data = DESTINATION
# process_datasets(spark, input_data, output_data)
process_datasets_local(spark)
log.info("Data processing completed!")
if __name__ == "__main__":
main()