Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always use the executemany method when inserting rows in DbApiHook as it's way much faster #38715

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
0312a8f
refactor: Always use the fast_executemany method when inserting rows …
Apr 3, 2024
219bd43
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 3, 2024
64b7a85
refactor: Only set fast_executemany option if explicitly defined in c…
Apr 3, 2024
c183404
refactor: Fixed tests related to insert_rows for Postgres
Apr 4, 2024
bc3895e
refactor: Fixed assertions on executemany for insert rows in Postgres…
Apr 4, 2024
3af4c7c
refactor: Deprecated bulk_insert_rows in Teradata as the insert_rows …
Apr 4, 2024
ef36ed0
refactor: Removed duplicate calls object definition in test_bulk_inse…
Apr 4, 2024
e222af5
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 4, 2024
d0fd763
refactor: Use DeprecationWarning instead
Apr 4, 2024
4dbdff5
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 4, 2024
42b7ab7
refactor: Use AirflowProviderDeprecationWarning instead
Apr 4, 2024
58d3cfa
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 4, 2024
04d0a47
refactor: Ignore deprecation warnings for TestTeradataHook
Apr 4, 2024
6b0ce23
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 4, 2024
1cb5e5e
refactor: Re-added check on rows parameter for bulk_insert_rows metho…
Apr 4, 2024
57318a9
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 4, 2024
4831868
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 4, 2024
6bb724d
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 4, 2024
35580fe
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 4, 2024
3a9fcbf
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 4, 2024
1e577c7
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 6, 2024
96f5019
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 8, 2024
b64351f
refactor: Fixed logging statement regarding total rows in insert_rows…
Apr 8, 2024
92775af
refactor: Changed string values back to int's but adapted expected du…
Apr 8, 2024
1369944
refactor: Added deprecation warning for executemany parameter in inse…
Apr 8, 2024
ac185d2
refactor: Reformatted mocked call for test_bulk_insert_rows_with_comm…
Apr 8, 2024
7216c3e
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 8, 2024
f8d8d92
refactor: Added explicit fast_executemany parameter to constructor of…
Apr 8, 2024
fee8e01
refactor: Reformatted second mocked call in test_bulk_insert_rows_wit…
Apr 8, 2024
39fe36b
refactor: Re-added executemany param to insert_rows method
Apr 8, 2024
7b65015
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 8, 2024
5634360
refactor: Put executemany between single quote to avoid spelling chec…
Apr 8, 2024
cdfeaf2
Revert "refactor: Added deprecation warning for executemany parameter…
Apr 8, 2024
934239a
refactor: Still support original implementation without executemany i…
Apr 8, 2024
b3ad6a6
refactor: Reformatted insert_rows method in DbApiHook
Apr 8, 2024
49e923c
fix: Fixed insert_rows method in DbApiHook and extracted common _seri…
Apr 8, 2024
4e6df4e
fix: Re-added check on fast_executemany in insert_rows method
Apr 8, 2024
a216fe9
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 8, 2024
82ca138
refactor: Put docstring of _closing_supporting_autocommit method on o…
Apr 8, 2024
be17da1
refactor: Added autocommit parameter which is False by default to _cl…
Apr 8, 2024
8e8b692
refactor: Added supports_executemany class variable which allows to s…
Apr 8, 2024
4d6d240
refactor: test_insert_rows should use executemany for Postgres
Apr 8, 2024
564b5ef
refactor: Deprecation warning for test_insert_rows_replace_executeman…
Apr 8, 2024
e6dd1a5
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 8, 2024
215d719
refactor: Deprecation warning for test_insert_rows_executemany should…
Apr 8, 2024
45b3a94
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 8, 2024
161f3d2
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 9, 2024
8566316
refactor: Removed fast_executemany
Apr 9, 2024
74e0659
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 9, 2024
ec16892
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 9, 2024
65d2ae7
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 9, 2024
4590409
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 9, 2024
f6fc1be
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 9, 2024
4b9e47b
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 10, 2024
d2b0629
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 10, 2024
5580451
refactor: Removed unused fast_executemany param from DbApiHook constr…
Apr 11, 2024
b242539
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 11, 2024
0250d49
refactor: Removed placeholder attribute from DbApiHook interface
Apr 11, 2024
e17aca5
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 11, 2024
15f76dc
Revert unnecessary format change
uranusjr Apr 11, 2024
7814ee3
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 11, 2024
65f0518
docs: Updated docstring for excutemany parameter in insert_rows method
dabla Apr 11, 2024
8850f82
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 11, 2024
3b029e1
refactor: Renamed _closing_supporting_autocommit method to _create_au…
Apr 11, 2024
85536d9
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 11, 2024
b9ccfc1
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 11, 2024
1d295c0
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 11, 2024
ad65b9b
Use comprehension instead of map
uranusjr Apr 11, 2024
1c58f99
Merge branch 'main' into feature/sql-performance-enhancement-insertmany
dabla Apr 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 12 additions & 32 deletions airflow/providers/common/sql/hooks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,6 @@ def insert_rows(
target_fields=None,
commit_every=1000,
replace=False,
*,
executemany=False,
dabla marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
"""Insert a collection of tuples into a table.
Expand All @@ -534,8 +532,6 @@ def insert_rows(
:param commit_every: The maximum number of rows to insert in one
transaction. Set to 0 to insert all rows in one transaction.
:param replace: Whether to replace instead of insert
:param executemany: Insert all rows at once in chunks defined by the commit_every parameter, only
works if all rows have same number of column names but leads to better performance
"""
i = 0
with closing(self.get_conn()) as conn:
Expand All @@ -545,35 +541,19 @@ def insert_rows(
conn.commit()
dabla marked this conversation as resolved.
Show resolved Hide resolved

with closing(conn.cursor()) as cur:
if executemany:
for chunked_rows in chunked(rows, commit_every):
values = list(
map(
lambda row: tuple(map(lambda cell: self._serialize_cell(cell, conn), row)),
chunked_rows,
)
for chunked_rows in chunked(rows, commit_every):
values = list(
map(
lambda row: tuple(map(lambda cell: self._serialize_cell(cell, conn), row)),
chunked_rows,
)
sql = self._generate_insert_sql(table, values[0], target_fields, replace, **kwargs)
self.log.debug("Generated sql: %s", sql)
cur.fast_executemany = True
cur.executemany(sql, values)
conn.commit()
self.log.info("Loaded %s rows into %s so far", len(chunked_rows), table)
else:
for i, row in enumerate(rows, 1):
lst = []
for cell in row:
lst.append(self._serialize_cell(cell, conn))
values = tuple(lst)
sql = self._generate_insert_sql(table, values, target_fields, replace, **kwargs)
self.log.debug("Generated sql: %s", sql)
cur.execute(sql, values)
if commit_every and i % commit_every == 0:
conn.commit()
self.log.info("Loaded %s rows into %s so far", i, table)

if not executemany:
conn.commit()
)
sql = self._generate_insert_sql(table, values[0], target_fields, replace, **kwargs)
self.log.debug("Generated sql: %s", sql)
cur.fast_executemany = True
dabla marked this conversation as resolved.
Show resolved Hide resolved
cur.executemany(sql, values)
conn.commit()
self.log.info("Loaded %s rows into %s so far", len(chunked_rows), table)
self.log.info("Done loading. Loaded a total of %s rows into %s", i, table)

@staticmethod
Expand Down
31 changes: 8 additions & 23 deletions tests/providers/common/sql/hooks/test_dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from unittest import mock

import pytest
from pyodbc import Cursor

from airflow.hooks.base import BaseHook
from airflow.models import Connection
Expand All @@ -39,7 +40,7 @@ class TestDbApiHook:
def setup_method(self, **kwargs):
self.cur = mock.MagicMock(
rowcount=0,
spec=["description", "rowcount", "execute", "executemany", "fetchall", "fetchone", "close"],
spec=Cursor,
)
self.conn = mock.MagicMock()
self.conn.cursor.return_value = self.cur
Expand Down Expand Up @@ -110,8 +111,7 @@ def test_insert_rows(self):
assert commit_count == self.conn.commit.call_count

sql = f"INSERT INTO {table} VALUES (%s)"
for row in rows:
self.cur.execute.assert_any_call(sql, row)
self.cur.executemany.assert_any_call(sql, rows)

def test_insert_rows_replace(self):
table = "table"
Expand All @@ -126,8 +126,7 @@ def test_insert_rows_replace(self):
assert commit_count == self.conn.commit.call_count

sql = f"REPLACE INTO {table} VALUES (%s)"
for row in rows:
self.cur.execute.assert_any_call(sql, row)
self.cur.executemany.assert_any_call(sql, rows)

def test_insert_rows_target_fields(self):
table = "table"
Expand All @@ -143,8 +142,7 @@ def test_insert_rows_target_fields(self):
assert commit_count == self.conn.commit.call_count

sql = f"INSERT INTO {table} ({target_fields[0]}) VALUES (%s)"
for row in rows:
self.cur.execute.assert_any_call(sql, row)
self.cur.executemany.assert_any_call(sql, rows)

def test_insert_rows_commit_every(self):
table = "table"
Expand All @@ -156,32 +154,19 @@ def test_insert_rows_commit_every(self):
assert self.conn.close.call_count == 1
assert self.cur.close.call_count == 1

commit_count = 2 + divmod(len(rows), commit_every)[0]
commit_count = len(rows) + 1
assert commit_count == self.conn.commit.call_count

sql = f"INSERT INTO {table} VALUES (%s)"
for row in rows:
self.cur.execute.assert_any_call(sql, row)

def test_insert_rows_executemany(self):
table = "table"
rows = [("hello",), ("world",)]

self.db_hook.insert_rows(table, rows, executemany=True)

assert self.conn.close.call_count == 1
assert self.cur.close.call_count == 1
assert self.conn.commit.call_count == 2

sql = f"INSERT INTO {table} VALUES (%s)"
self.cur.executemany.assert_any_call(sql, rows)
self.cur.executemany.assert_any_call(sql, [row])

def test_insert_rows_replace_executemany_hana_dialect(self):
self.setup_method(replace_statement_format="UPSERT {} {} VALUES ({}) WITH PRIMARY KEY")
table = "table"
rows = [("hello",), ("world",)]

self.db_hook.insert_rows(table, rows, replace=True, executemany=True)
self.db_hook.insert_rows(table, rows, replace=True)

assert self.conn.close.call_count == 1
assert self.cur.close.call_count == 1
Expand Down
Loading