-
Notifications
You must be signed in to change notification settings - Fork 25
/
etl.py
304 lines (253 loc) · 11.8 KB
/
etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
"""
Functions for transferring data in and out of databases.
"""
from itertools import zip_longest
import logging
from warnings import warn
from etlhelper.row_factories import namedtuple_rowfactory
from etlhelper.db_helper_factory import DB_HELPER_FACTORY
from etlhelper.exceptions import (
ETLHelperExtractError,
ETLHelperInsertError,
ETLHelperQueryError,
)
logger = logging.getLogger('etlhelper')
CHUNKSIZE = 5000
# iter_chunks is where data are retrieved from source database
# All data extraction processes call this function.
def iter_chunks(select_query, conn, parameters=(),
row_factory=namedtuple_rowfactory,
transform=None, read_lob=False):
"""
Run SQL query against connection and return iterator object to loop over
results in batches of etlhelper.etl.CHUNKSIZE (default 5000).
The row_factory changes the output format of the results. Other row
factories e.g. dict_rowfactory are available.
The transform function is applied to chunks of data as they are extracted
from the database.
The read_lob parameter will convert Oracle LOB objects to strings. It is
required to access results of some Oracle Spatial functions.
:param select_query: str, SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param read_lob: bool, convert Oracle LOB objects to strings
"""
logger.info("Fetching rows")
logger.debug(f"Fetching:\n\n{select_query}\n\nwith parameters:\n\n"
f"{parameters}\n\nagainst\n\n{conn}")
helper = DB_HELPER_FACTORY.from_conn(conn)
with helper.cursor(conn) as cursor:
# Run query
try:
cursor.execute(select_query, parameters)
except helper.sql_exceptions as exc:
# Even though we haven't modified data, we have to rollback to
# clear the failed transaction before any others can be started.
conn.rollback()
msg = f"SQL query raised an error.\n\n{select_query}\n\n{exc}\n"
raise ETLHelperExtractError(msg)
# Set row factory
create_row = row_factory(cursor)
# Parse results
first_pass = True
while True:
rows = cursor.fetchmany(CHUNKSIZE)
# No more rows to process
if not rows:
if first_pass:
msg = "No rows returned"
else:
if cursor.rowcount == -1:
# SQLite3 drive doesn't support row count (always -1)
msg = "All rows returned"
else:
msg = f"{cursor.rowcount} rows returned"
logger.info(msg)
return
# Convert Oracle LOBs to strings if required
if read_lob:
rows = _read_lob(rows)
# Apply row_factory
rows = (create_row(row) for row in rows)
# Apply transform
if transform:
rows = transform(rows)
# Return data
yield rows
first_pass = False
def iter_rows(select_query, conn, parameters=(),
row_factory=namedtuple_rowfactory,
transform=None, read_lob=False):
"""
Run SQL query against connection and return iterator object to loop over
results, row-by-row.
:param select_query: str, SQL query to execute
:param conn: dbapi connection
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param parameters: sequence or dict of bind variables to insert in the query
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param read_lob: bool, convert Oracle LOB objects to strings
"""
for chunk in iter_chunks(select_query, conn, row_factory=row_factory,
parameters=parameters, transform=transform,
read_lob=read_lob):
for row in chunk:
yield row
def get_rows(select_query, conn, parameters=(),
row_factory=namedtuple_rowfactory, transform=None):
"""
Get results of query as a list. See iter_rows for details.
:param select_query: str, SQL query to execute
:param conn: dbapi connection
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param parameters: sequence or dict of bind variables to insert in the query
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
"""
return list(iter_rows(select_query, conn, row_factory=row_factory,
parameters=parameters, transform=transform))
def dump_rows(select_query, conn, output_func, parameters=(),
row_factory=namedtuple_rowfactory, transform=None):
"""
Call output_func(row) one-by-one on results of query. See iter_rows for
details.
:param select_query: str, SQL query to execute
:param conn: dbapi connection
:param output_func: function to be called for each row
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param parameters: sequence or dict of bind variables to insert in the query
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
"""
for row in iter_rows(select_query, conn, row_factory=row_factory,
parameters=parameters, transform=transform):
output_func(row)
def executemany(query, rows, conn, commit_chunks=True):
"""
Use query to insert/update data from rows to database at conn. This
method uses the executemany or execute_batch (PostgreSQL) commands to
process the data in chunks and avoid creating a new database connection for
each row. Row data are passed as parameters into query.
commit_chunks controls if chunks the transaction should be committed after
each chunk has been inserted. Committing chunks means that errors during
a long-running insert do not require all all data to be loaded again. The
disadvantage is that investigation may be required to determine exactly
which records have been successfully transferred.
:param query: str, SQL insert command with placeholders for data
:param rows: List of tuples containing data to be inserted/updated
:param conn: dbapi connection
:param commit_chunks: bool, commit after each chunk has been inserted/updated
:return row_count: int, number of rows inserted/updated
"""
msg = ("executemany parameter order will be changed in a future release to "
"(query, conn, rows). "
"Avoid breaking code by using named parameters for all e.g. "
"executemany(query=my_query, conn=my_conn, rows=my_rows)")
warn(msg, DeprecationWarning)
logger.info(f"Executing many (chunksize={CHUNKSIZE})")
logger.debug(f"Executing:\n\n{query}\n\nagainst\n\n{conn}")
helper = DB_HELPER_FACTORY.from_conn(conn)
processed = 0
with helper.cursor(conn) as cursor:
for chunk in _chunker(rows, CHUNKSIZE):
# Run query
try:
# Chunker pads to whole chunk with None; remove these
chunk = [row for row in chunk if row is not None]
# Show first row as example of data
if processed == 0:
logger.debug(f"First row: {chunk[0]}")
# Execute query
helper.executemany(cursor, query, chunk)
processed += len(chunk)
except helper.sql_exceptions as exc:
# Rollback to clear the failed transaction before any others can
# be # started.
conn.rollback()
msg = f"SQL query raised an error.\n\n{query}\n\n{exc}\n"
raise ETLHelperInsertError(msg)
logger.info(
f'{processed} rows processed')
# Commit changes so far
if commit_chunks:
conn.commit()
# Commit changes where not already committed
if not commit_chunks:
conn.commit()
logger.info(f'{processed} rows processed in total')
def copy_rows(select_query, source_conn, insert_query, dest_conn,
parameters=(), transform=None, commit_chunks=True,
read_lob=False):
"""
Copy rows from source_conn to dest_conn. select_query and insert_query
specify the data to be transferred.
Note: ODBC driver requires separate connections for source_conn and
dest_conn, even if they represent the same database.
Geometry columns from Oracle spatial should be selected with:
SDO_UTIL.TO_WKTGEOMETRY(shape_bng) AS geom_wkt
and inserted into PostGIS with:
ST_GeomFromText(%s, 27700)
:param select_query: str, select rows from Oracle.
:param source_conn: open dbapi connection
:param insert_query:
:param dest_conn: open dbapi connection
:param parameters: sequence or dict of bind variables for select query
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param commit_chunks: bool, commit after each chunk (see executemany)
:param read_lob: bool, convert Oracle LOB objects to strings
"""
rows_generator = iter_rows(select_query, source_conn,
parameters=parameters, transform=transform,
read_lob=read_lob)
executemany(insert_query, rows_generator, dest_conn,
commit_chunks=commit_chunks)
def execute(query, conn, parameters=()):
"""
Run SQL query against connection.
:param query: str, SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
"""
logger.info("Executing query")
logger.debug(f"Executing:\n\n{query}\n\nwith parameters:\n\n"
f"{parameters}\n\nagainst\n\n{conn}")
helper = DB_HELPER_FACTORY.from_conn(conn)
with helper.cursor(conn) as cursor:
# Run query
try:
cursor.execute(query, parameters)
conn.commit()
except helper.sql_exceptions as exc:
# Even though we haven't modified data, we have to rollback to
# clear the failed transaction before any others can be started.
conn.rollback()
msg = f"SQL query raised an error.\n\n{query}\n\n{exc}\n"
raise ETLHelperQueryError(msg)
def _chunker(iterable, n_chunks, fillvalue=None):
"""Collect data into fixed-length chunks or blocks.
Code from recipe at https://docs.python.org/3.6/library/itertools.html
"""
# _chunker('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n_chunks
return zip_longest(*args, fillvalue=fillvalue)
def _read_lob(rows):
"""
Replace Oracle LOB objects within rows with their string representation.
:param rows: list of tuples of output data
:return: list of tuples with LOB objects converted to strings
"""
clean_rows = []
for row in rows:
clean_row = [x.read() if str(x.__class__) == "<class 'cx_Oracle.LOB'>"
else x for x in row]
clean_rows.append(clean_row)
return clean_rows