From 383f853af4a1420a0aa8c8745cd6c1aa10ee128f Mon Sep 17 00:00:00 2001 From: hantmac Date: Wed, 25 Dec 2024 15:36:22 +0800 Subject: [PATCH 1/4] fix: batch insert using attachment --- databend_py/client.py | 2 +- databend_py/uploader.py | 24 +++++++++++++++++------- tests/test_client.py | 12 +++++++++++- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/databend_py/client.py b/databend_py/client.py index ec0affc..e442e40 100644 --- a/databend_py/client.py +++ b/databend_py/client.py @@ -147,7 +147,7 @@ def _process_insert_query(self, query, params): for i in range(0, len(params), batch_size) ] insert_rows = len(tuple_ls) - self._uploader.upload_to_table_by_copy(table_name, tuple_ls) + self._uploader.upload_to_table_by_attachment(query, tuple_ls) return insert_rows def _process_ordinary_query( diff --git a/databend_py/uploader.py b/databend_py/uploader.py index f078841..53a2030 100644 --- a/databend_py/uploader.py +++ b/databend_py/uploader.py @@ -10,13 +10,13 @@ class DataUploader: def __init__( - self, - client, - connection, - settings, - default_stage_dir="@~", - debug=False, - compress=False, + self, + client, + connection, + settings, + default_stage_dir="@~", + debug=False, + compress=False, ): # TODO: make it depends on Connection instead of Client self.client = client @@ -34,6 +34,14 @@ def upload_to_table_by_copy(self, table_name, data): self._upload_to_presigned_url(presigned_url, headers, data) self._execute_copy(table_name, stage_path, "CSV") + def upload_to_table_by_attachment(self, sql_statement, data): + if len(data) == 0: + return + stage_path = self._gen_stage_path(self.default_stage_dir) + presigned_url, headers = self._execute_presign(stage_path) + self._upload_to_presigned_url(presigned_url, headers, data) + self._execute_with_attachment(sql_statement, stage_path, "CSV") + def replace_into_table(self, table_name, conflict_keys, data): """ :param table_name: table name @@ -175,6 +183,8 @@ def _make_attachment(self, sql_statement, stage_path, file_type): file_format_options = {} file_format_options["type"] = file_type + file_format_options["RECORD_DELIMITER"] = '\r\n' + file_format_options["COMPRESSION"] = "AUTO" data = { "sql": sql_statement, diff --git a/tests/test_client.py b/tests/test_client.py index 97c9d8c..c0e6523 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -21,7 +21,7 @@ class DatabendPyTestCase(unittest.TestCase): databend_url = None def setUp(self): - self.databend_url = os.getenv("TEST_DATABEND_DSN") + self.databend_url = "https://sjh1Test:abc123@tn3ftqihs.gw.aws-us-east-2.default.databend.com:443?warehouse=test-sjh&secure=True" def assertHostsEqual(self, client, another, msg=None): self.assertEqual(client.connection.host, another, msg=msg) @@ -141,6 +141,16 @@ def test_batch_insert_with_dict_list(self): _, ss = c.execute("select * from test") self.assertEqual(ss, [(5, "cc"), (6, "dd")]) + def test_batch_insert_with_dict_multi_fields(self): + c = Client.from_url(self.databend_url) + c.execute("DROP TABLE IF EXISTS test") + c.execute("CREATE TABLE if not exists test (id int, x Int32, y VARCHAR, z Int32)") + c.execute("DESC test") + _, r1 = c.execute("INSERT INTO test (x,y) VALUES", [{"x": 7, "y": "ee"}, {"x": 8, "y": "ff"}]) + self.assertEqual(r1, 2) + _, ss = c.execute("select * from test") + self.assertEqual(ss, [('NULL', 7, 'ee', 'NULL'), ('NULL', 8, 'ff', 'NULL')]) + def test_iter_query(self): client = Client.from_url(self.databend_url) result = client.execute_iter("select 1", with_column_types=False) From 205bac375157f267bc618d2bb14933f212960a67 Mon Sep 17 00:00:00 2001 From: hantmac Date: Wed, 25 Dec 2024 15:37:06 +0800 Subject: [PATCH 2/4] f --- tests/test_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_client.py b/tests/test_client.py index c0e6523..c10b1a2 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -21,7 +21,7 @@ class DatabendPyTestCase(unittest.TestCase): databend_url = None def setUp(self): - self.databend_url = "https://sjh1Test:abc123@tn3ftqihs.gw.aws-us-east-2.default.databend.com:443?warehouse=test-sjh&secure=True" + self.databend_url = os.getenv("TEST_DATABEND_DSN") def assertHostsEqual(self, client, another, msg=None): self.assertEqual(client.connection.host, another, msg=msg) From 88dbef58234a616e1d63f3cd01a4907f5c1b9c56 Mon Sep 17 00:00:00 2001 From: hantmac Date: Wed, 25 Dec 2024 15:40:12 +0800 Subject: [PATCH 3/4] fix ci --- databend_py/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/databend_py/client.py b/databend_py/client.py index e442e40..081ab89 100644 --- a/databend_py/client.py +++ b/databend_py/client.py @@ -133,7 +133,6 @@ def _process_insert_query(self, query, params): query = query.split("VALUES")[0] + "VALUES" if len(query.split(" ")) < 3: raise Exception("Not standard insert/replace statement") - table_name = query.split(" ")[2] batch_size = query.count(",") + 1 if params is not None and len(params) > 0: if isinstance(params[0], tuple): From 95ad6065733ef17a2d783271db33759ad55ace59 Mon Sep 17 00:00:00 2001 From: hantmac Date: Wed, 25 Dec 2024 15:42:00 +0800 Subject: [PATCH 4/4] fix tests --- tests/test_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_client.py b/tests/test_client.py index c10b1a2..42cf8c7 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -177,7 +177,7 @@ def test_replace(self): client.replace("default", "test_replace", ["x"], [(1, "a"), (2, "b")]) client.replace("default", "test_replace", ["x"], [(1, "c"), (2, "d")]) _, upload_res = client.execute("select * from test_replace") - self.assertEqual(upload_res, [(1, "c\r"), (2, "d\r")]) + self.assertEqual(upload_res, [(1, "c"), (2, "d")]) def test_insert_with_compress(self): client = Client.from_url(self.databend_url + "?compress=True&debug=True")