diff --git a/databend_py/client.py b/databend_py/client.py index ec0affc..081ab89 100644 --- a/databend_py/client.py +++ b/databend_py/client.py @@ -133,7 +133,6 @@ def _process_insert_query(self, query, params): query = query.split("VALUES")[0] + "VALUES" if len(query.split(" ")) < 3: raise Exception("Not standard insert/replace statement") - table_name = query.split(" ")[2] batch_size = query.count(",") + 1 if params is not None and len(params) > 0: if isinstance(params[0], tuple): @@ -147,7 +146,7 @@ def _process_insert_query(self, query, params): for i in range(0, len(params), batch_size) ] insert_rows = len(tuple_ls) - self._uploader.upload_to_table_by_copy(table_name, tuple_ls) + self._uploader.upload_to_table_by_attachment(query, tuple_ls) return insert_rows def _process_ordinary_query( diff --git a/databend_py/uploader.py b/databend_py/uploader.py index f078841..53a2030 100644 --- a/databend_py/uploader.py +++ b/databend_py/uploader.py @@ -10,13 +10,13 @@ class DataUploader: def __init__( - self, - client, - connection, - settings, - default_stage_dir="@~", - debug=False, - compress=False, + self, + client, + connection, + settings, + default_stage_dir="@~", + debug=False, + compress=False, ): # TODO: make it depends on Connection instead of Client self.client = client @@ -34,6 +34,14 @@ def upload_to_table_by_copy(self, table_name, data): self._upload_to_presigned_url(presigned_url, headers, data) self._execute_copy(table_name, stage_path, "CSV") + def upload_to_table_by_attachment(self, sql_statement, data): + if len(data) == 0: + return + stage_path = self._gen_stage_path(self.default_stage_dir) + presigned_url, headers = self._execute_presign(stage_path) + self._upload_to_presigned_url(presigned_url, headers, data) + self._execute_with_attachment(sql_statement, stage_path, "CSV") + def replace_into_table(self, table_name, conflict_keys, data): """ :param table_name: table name @@ -175,6 +183,8 @@ def _make_attachment(self, sql_statement, stage_path, file_type): file_format_options = {} file_format_options["type"] = file_type + file_format_options["RECORD_DELIMITER"] = '\r\n' + file_format_options["COMPRESSION"] = "AUTO" data = { "sql": sql_statement, diff --git a/tests/test_client.py b/tests/test_client.py index 97c9d8c..42cf8c7 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -141,6 +141,16 @@ def test_batch_insert_with_dict_list(self): _, ss = c.execute("select * from test") self.assertEqual(ss, [(5, "cc"), (6, "dd")]) + def test_batch_insert_with_dict_multi_fields(self): + c = Client.from_url(self.databend_url) + c.execute("DROP TABLE IF EXISTS test") + c.execute("CREATE TABLE if not exists test (id int, x Int32, y VARCHAR, z Int32)") + c.execute("DESC test") + _, r1 = c.execute("INSERT INTO test (x,y) VALUES", [{"x": 7, "y": "ee"}, {"x": 8, "y": "ff"}]) + self.assertEqual(r1, 2) + _, ss = c.execute("select * from test") + self.assertEqual(ss, [('NULL', 7, 'ee', 'NULL'), ('NULL', 8, 'ff', 'NULL')]) + def test_iter_query(self): client = Client.from_url(self.databend_url) result = client.execute_iter("select 1", with_column_types=False) @@ -167,7 +177,7 @@ def test_replace(self): client.replace("default", "test_replace", ["x"], [(1, "a"), (2, "b")]) client.replace("default", "test_replace", ["x"], [(1, "c"), (2, "d")]) _, upload_res = client.execute("select * from test_replace") - self.assertEqual(upload_res, [(1, "c\r"), (2, "d\r")]) + self.assertEqual(upload_res, [(1, "c"), (2, "d")]) def test_insert_with_compress(self): client = Client.from_url(self.databend_url + "?compress=True&debug=True")