From bece6af289109de8be4c5aa97780806ae1db8275 Mon Sep 17 00:00:00 2001 From: Nam Ngo Date: Wed, 4 May 2016 18:21:51 +1000 Subject: [PATCH] Add bulk_insert_rows() for more performant inserts. --- airflow/hooks/oracle_hook.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/airflow/hooks/oracle_hook.py b/airflow/hooks/oracle_hook.py index 7cd2e491e868b..d66d90a4951c2 100644 --- a/airflow/hooks/oracle_hook.py +++ b/airflow/hooks/oracle_hook.py @@ -82,3 +82,36 @@ def insert_rows(self, table, rows, target_fields = None, commit_every = 1000): cur.close() conn.close() logging.info('Done loading. Loaded a total of {i} rows'.format(**locals())) + + def bulk_insert_rows(self, table, rows, target_fields=None, commit_every=5000): + """A performant bulk insert for cx_Oracle that uses prepared statements via `executemany()`. + For best performance, pass in `rows` as an iterator. + """ + conn = self.get_conn() + cursor = conn.cursor() + values = ', '.join(':%s' % i for i in range(1, len(target_fields) + 1)) + prepared_stm = 'insert into {tablename} ({columns}) values ({values})'.format( + tablename=table, + columns=', '.join(target_fields), + values=values + ) + row_count = 0 + # Chunk the rows + row_chunk = [] + for row in rows: + row_chunk.append(row) + row_count += 1 + if row_count % commit_every == 0: + cursor.prepare(prepared_stm) + cursor.executemany(None, row_chunk) + conn.commit() + logging.info('[%s] inserted %s rows', table, row_count) + # Empty chunk + row_chunk = [] + # Commit the leftover chunk + cursor.prepare(prepared_stm) + cursor.executemany(None, row_chunk) + conn.commit() + logging.info('[%s] inserted %s rows', table, row_count) + cursor.close() + conn.close()