Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions databend_py/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ def _process_insert_query(self, query, params):
query = query.split("VALUES")[0] + "VALUES"
if len(query.split(" ")) < 3:
raise Exception("Not standard insert/replace statement")
table_name = query.split(" ")[2]
batch_size = query.count(",") + 1
if params is not None and len(params) > 0:
if isinstance(params[0], tuple):
Expand All @@ -147,7 +146,7 @@ def _process_insert_query(self, query, params):
for i in range(0, len(params), batch_size)
]
insert_rows = len(tuple_ls)
self._uploader.upload_to_table_by_copy(table_name, tuple_ls)
self._uploader.upload_to_table_by_attachment(query, tuple_ls)
return insert_rows

def _process_ordinary_query(
Expand Down
24 changes: 17 additions & 7 deletions databend_py/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@

class DataUploader:
def __init__(
self,
client,
connection,
settings,
default_stage_dir="@~",
debug=False,
compress=False,
self,
client,
connection,
settings,
default_stage_dir="@~",
debug=False,
compress=False,
):
# TODO: make it depends on Connection instead of Client
self.client = client
Expand All @@ -34,6 +34,14 @@ def upload_to_table_by_copy(self, table_name, data):
self._upload_to_presigned_url(presigned_url, headers, data)
self._execute_copy(table_name, stage_path, "CSV")

def upload_to_table_by_attachment(self, sql_statement, data):
if len(data) == 0:
return
stage_path = self._gen_stage_path(self.default_stage_dir)
presigned_url, headers = self._execute_presign(stage_path)
self._upload_to_presigned_url(presigned_url, headers, data)
self._execute_with_attachment(sql_statement, stage_path, "CSV")

def replace_into_table(self, table_name, conflict_keys, data):
"""
:param table_name: table name
Expand Down Expand Up @@ -175,6 +183,8 @@ def _make_attachment(self, sql_statement, stage_path, file_type):

file_format_options = {}
file_format_options["type"] = file_type
file_format_options["RECORD_DELIMITER"] = '\r\n'
file_format_options["COMPRESSION"] = "AUTO"

data = {
"sql": sql_statement,
Expand Down
12 changes: 11 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ def test_batch_insert_with_dict_list(self):
_, ss = c.execute("select * from test")
self.assertEqual(ss, [(5, "cc"), (6, "dd")])

def test_batch_insert_with_dict_multi_fields(self):
c = Client.from_url(self.databend_url)
c.execute("DROP TABLE IF EXISTS test")
c.execute("CREATE TABLE if not exists test (id int, x Int32, y VARCHAR, z Int32)")
c.execute("DESC test")
_, r1 = c.execute("INSERT INTO test (x,y) VALUES", [{"x": 7, "y": "ee"}, {"x": 8, "y": "ff"}])
self.assertEqual(r1, 2)
_, ss = c.execute("select * from test")
self.assertEqual(ss, [('NULL', 7, 'ee', 'NULL'), ('NULL', 8, 'ff', 'NULL')])

def test_iter_query(self):
client = Client.from_url(self.databend_url)
result = client.execute_iter("select 1", with_column_types=False)
Expand All @@ -167,7 +177,7 @@ def test_replace(self):
client.replace("default", "test_replace", ["x"], [(1, "a"), (2, "b")])
client.replace("default", "test_replace", ["x"], [(1, "c"), (2, "d")])
_, upload_res = client.execute("select * from test_replace")
self.assertEqual(upload_res, [(1, "c\r"), (2, "d\r")])
self.assertEqual(upload_res, [(1, "c"), (2, "d")])

def test_insert_with_compress(self):
client = Client.from_url(self.databend_url + "?compress=True&debug=True")
Expand Down
Loading