From e34830b5a0d16c992c31c747b7d7e518e2c81d1a Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 13 Jun 2023 10:58:16 +0800 Subject: [PATCH 1/3] feat: replace into with stage attachment --- README.md | 2 ++ databend_py/client.py | 28 ++++++++++++------- databend_py/connection.py | 13 +++++---- databend_py/uploader.py | 54 +++++++++++++++++++++++++++++++++++-- examples/batch_insert.py | 31 +++++++++++++++++++++ examples/iter_query.py | 9 +++++++ examples/ordinary_query.py | 17 ++++++++++++ examples/replace_into.py | 12 +++++++++ examples/session_setting.py | 7 +++++ examples/upload_to_stage.py | 27 +++++++++++++++++++ tests/test_client.py | 11 ++++++++ 11 files changed, 194 insertions(+), 17 deletions(-) create mode 100644 examples/batch_insert.py create mode 100644 examples/iter_query.py create mode 100644 examples/ordinary_query.py create mode 100644 examples/replace_into.py create mode 100644 examples/session_setting.py create mode 100644 examples/upload_to_stage.py diff --git a/README.md b/README.md index e85bf43..f19a978 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,8 @@ Pure Client example: > 1 > ``` +More usages examples find [here](./examples). + # Features - Basic SQL. diff --git a/databend_py/client.py b/databend_py/client.py index bb07f19..6394c95 100644 --- a/databend_py/client.py +++ b/databend_py/client.py @@ -21,7 +21,8 @@ def __init__(self, *args, **kwargs): self.query_result_cls = QueryResult self.helper = Helper self._debug = asbool(self.settings.get('debug', False)) - self._uploader = DataUploader(self, self.settings, debug=self._debug, compress=self.settings.get('compress', False)) + self._uploader = DataUploader(self, self.connection, self.settings, debug=self._debug, + compress=self.settings.get('compress', False)) def __enter__(self): return self @@ -122,17 +123,14 @@ def _process_insert_query(self, query, params): query = query.split("values")[0] + 'values' elif "VALUES" in query: query = query.split("VALUES")[0] + 'VALUES' - insert_re = re.compile("(?i)^INSERT INTO\s+\x60?([\w.^\(]+)\x60?\s*(\([^\)]*\))?") - match = insert_re.match(query.strip()) - if len(match.group().split(' ')) < 2: - raise Exception("Not standard insert statement") - table_name = match[1] - + if len(query.split(' ')) < 3: + raise Exception("Not standard insert/replace statement") + table_name = query.split(' ')[2] batch_size = query.count(',') + 1 if params is not None: tuple_ls = [tuple(params[i:i + batch_size]) for i in range(0, len(params), batch_size)] insert_rows = len(tuple_ls) - self._uploader.upload_to_table(table_name, tuple_ls) + self._uploader.upload_to_table_by_copy(table_name, tuple_ls) return insert_rows def _process_ordinary_query(self, query, params=None, with_column_types=False, @@ -239,7 +237,17 @@ def insert(self, database_name, table_name, data): data: the data which write into, it's a list of tuple """ # TODO: escape the database & table name - self._uploader.upload_to_table("%s.%s" % (database_name, table_name), data) + self._uploader.upload_to_table_by_copy("%s.%s" % (database_name, table_name), data) + + def replace(self, database_name, table_name, conflict_keys, data): + """ + replace the data into database.table according to the file + database_name: the target database + table_name: the table which write into + conflict_keys: the key that use to replace into + data: the data which write into, it's a list of tuple + """ + self._uploader.upload_to_table_with_attachment("%s.%s" % (database_name, table_name), conflict_keys, data) def upload_to_stage(self, stage_dir, file_name, data): """ @@ -249,4 +257,4 @@ def upload_to_stage(self, stage_dir, file_name, data): :param data: the data value or file handler :return: """ - return self._uploader.upload_to_stage(stage_dir, file_name, data) \ No newline at end of file + return self._uploader.upload_to_stage(stage_dir, file_name, data) diff --git a/databend_py/connection.py b/databend_py/connection.py index f8fad9b..d08f045 100644 --- a/databend_py/connection.py +++ b/databend_py/connection.py @@ -69,7 +69,8 @@ class Connection(object): # 'database': 'default' # } def __init__(self, host, port=None, user=defines.DEFAULT_USER, password=defines.DEFAULT_PASSWORD, - database=defines.DEFAULT_DATABASE, secure=False, copy_purge=False, session_settings=None, persist_cookies=False): + database=defines.DEFAULT_DATABASE, secure=False, copy_purge=False, session_settings=None, + persist_cookies=False): self.host = host self.port = port self.user = user @@ -116,16 +117,18 @@ def disconnect(self): @retry(times=5, exceptions=WarehouseTimeoutException) def do_query(self, url, query_sql): response = self.requests_session.post(url, - data=json.dumps(query_sql), - headers=self.make_headers(), - auth=HTTPBasicAuth(self.user, self.password), - verify=True) + data=json.dumps(query_sql), + headers=self.make_headers(), + auth=HTTPBasicAuth(self.user, self.password), + verify=True) try: resp_dict = json.loads(response.content) except json.decoder.JSONDecodeError: raise UnexpectedException("failed to parse response: %s" % response.content) if resp_dict and resp_dict.get('error') and "no endpoint" in resp_dict.get('error'): raise WarehouseTimeoutException + if resp_dict and resp_dict.get('error'): + raise UnexpectedException("failed to query: %s" % response.content) if self.persist_cookies: self.cookies = response.cookies return resp_dict diff --git a/databend_py/uploader.py b/databend_py/uploader.py index ab54455..2c11a28 100644 --- a/databend_py/uploader.py +++ b/databend_py/uploader.py @@ -5,18 +5,20 @@ import json import time import gzip +from . import log class DataUploader: - def __init__(self, client, settings, default_stage_dir='@~', debug=False, compress=False): + def __init__(self, client, connection, settings, default_stage_dir='@~', debug=False, compress=False): # TODO: make it depends on Connection instead of Client self.client = client + self.connection = connection self.settings = settings self.default_stage_dir = default_stage_dir self._compress = compress self._debug = debug - def upload_to_table(self, table_name, data): + def upload_to_table_by_copy(self, table_name, data): if len(data) == 0: return stage_path = self._gen_stage_path(self.default_stage_dir) @@ -24,6 +26,21 @@ def upload_to_table(self, table_name, data): self._upload_to_presigned_url(presigned_url, headers, data) self._execute_copy(table_name, stage_path, 'CSV') + def upload_to_table_with_attachment(self, table_name, conflict_keys, data): + """ + :param table_name: table name + :param conflict_keys: if use replace, the conflict_keys can't be None + :param data: list data to insert/replace + :return: + """ + if len(data) == 0: + return + stage_path = self._gen_stage_path(self.default_stage_dir) + presigned_url, headers = self._execute_presign(stage_path) + self._upload_to_presigned_url(presigned_url, headers, data) + sql_statement = f"REPLACE INTO {table_name} ON ({','.join(conflict_keys)}) VALUES" + self._execute_with_attachment(sql_statement, stage_path, "CSV") + def upload_to_stage(self, stage_dir, filename, data): stage_path = self._gen_stage_path(stage_dir, filename) presigned_url, headers = self._execute_presign(stage_path) @@ -106,3 +123,36 @@ def _make_copy_statement(self, table_name, stage_path, file_type): f"FILE_FORMAT = (type = {file_type} RECORD_DELIMITER = '\\r\\n' COMPRESSION = AUTO) " \ f"PURGE = {copy_options['PURGE']} FORCE = {copy_options['FORCE']} " \ f"SIZE_LIMIT={copy_options['SIZE_LIMIT']} ON_ERROR = {copy_options['ON_ERROR']}" + + def _execute_with_attachment(self, sql_statement, stage_path, file_type): + start_time = time.time() + data = self._make_attachment(sql_statement, stage_path, file_type) + url = self.connection.format_url() + + try: + resp_dict = self.connection.do_query(url, data) + self.client_session = resp_dict.get("session", self.connection.default_session()) + if self._debug: + print('upload:_execute_attachment sql=%s %s' % (sql_statement, time.time() - start_time)) + except Exception as e: + log.logger.error( + f"http error on {url}, SQL: {sql_statement} error msg:{str(e)}" + ) + raise + + def _make_attachment(self, sql_statement, stage_path, file_type): + copy_options = {} + copy_options["PURGE"] = self.settings.get("copy_purge", "False") + copy_options["FORCE"] = self.settings.get("force", "False") + copy_options["SIZE_LIMIT"] = self.settings.get("size_limit", "0") + copy_options["ON_ERROR"] = self.settings.get("on_error", "abort") + + file_format_options = {} + file_format_options["type"] = file_type + + data = { + "sql": sql_statement, + "stage_attachment": {"location": stage_path, "file_format_options": file_format_options, + "copy_options": copy_options} + } + return data diff --git a/examples/batch_insert.py b/examples/batch_insert.py new file mode 100644 index 0000000..4f88ae8 --- /dev/null +++ b/examples/batch_insert.py @@ -0,0 +1,31 @@ +from databend_py import Client + + +def insert(): + client = Client.from_url("http://root:root@localhost:8000") + client.execute('DROP TABLE IF EXISTS test_upload') + client.execute('CREATE TABLE if not exists test_upload (x Int32,y VARCHAR)') + client.execute('DESC test_upload') + client.insert("default", "test_upload", [(1, 'a'), (1, 'b')]) + _, upload_res = client.execute('select * from test_upload') + # upload_res is [(1, 'a'), (1, 'b')] + + +def batch_insert(): + c = Client.from_url("http://root:root@localhost:8000") + c.execute('DROP TABLE IF EXISTS test') + c.execute('CREATE TABLE if not exists test (x Int32,y VARCHAR)') + c.execute('DESC test') + _, r1 = c.execute('INSERT INTO test (x,y) VALUES (%,%)', [1, 'yy', 2, 'xx']) + _, ss = c.execute('select * from test') + # ss is [(1, 'yy'), (2, 'xx')] + + +def batch_insert_with_tuple(): + c = Client.from_url("http://root:root@localhost:8000") + c.execute('DROP TABLE IF EXISTS test') + c.execute('CREATE TABLE if not exists test (x Int32,y VARCHAR)') + c.execute('DESC test') + # data is tuple list + _, r1 = c.execute('INSERT INTO test (x,y) VALUES', [(3, 'aa'), (4, 'bb')]) + _, ss = c.execute('select * from test') diff --git a/examples/iter_query.py b/examples/iter_query.py new file mode 100644 index 0000000..851943a --- /dev/null +++ b/examples/iter_query.py @@ -0,0 +1,9 @@ +from databend_py import Client + + +def iter_query(): + client = Client.from_url("http://root:root@localhost:8000") + result = client.execute_iter("select 1", with_column_types=False) + result_list = [i for i in result] + # result_list is [1] + print(result_list) diff --git a/examples/ordinary_query.py b/examples/ordinary_query.py new file mode 100644 index 0000000..702ac3b --- /dev/null +++ b/examples/ordinary_query.py @@ -0,0 +1,17 @@ +from databend_py import Client + + +def ordinary_query(): + client = Client.from_url("http://root:root@localhost:8000") + _, res = client.execute("select 1", with_column_types=False) + # res is [(1,)] + + column_type, res2 = client.execute("select 1", with_column_types=True) + # column_type is [('1', 'UInt8')] + # res2 [(1,)] + print(column_type) + print(res2) + + # create table/ drop table + client.execute('DROP TABLE IF EXISTS test') + client.execute('CREATE TABLE if not exists test (x Int32,y VARCHAR)') diff --git a/examples/replace_into.py b/examples/replace_into.py new file mode 100644 index 0000000..baa921a --- /dev/null +++ b/examples/replace_into.py @@ -0,0 +1,12 @@ +from databend_py import Client + + +def replace_into(): + client = Client.from_url("http://root:root@localhost:8000") + client.execute('DROP TABLE IF EXISTS test_replace') + client.execute('CREATE TABLE if not exists test_replace (x Int32,y VARCHAR)') + client.execute('DESC test_replace') + client.replace("default", "test_replace", ['x'], [(1, 'a'), (2, 'b')]) + client.replace("default", "test_replace", ['x'], [(1, 'c'), (2, 'd')]) + _, upload_res = client.execute('select * from test_replace') + # upload_res is [(1, 'c\r'), (1, 'd\r')] diff --git a/examples/session_setting.py b/examples/session_setting.py new file mode 100644 index 0000000..aba9637 --- /dev/null +++ b/examples/session_setting.py @@ -0,0 +1,7 @@ +from databend_py import Client + + +def session_settings(): + # docs: https://databend.rs/doc/integrations/api/rest#client-side-session + session_settings = {"db": "test"} + client = Client(host="localhost", port=8000, user="root", password="root", session_settings=session_settings) diff --git a/examples/upload_to_stage.py b/examples/upload_to_stage.py new file mode 100644 index 0000000..dc80833 --- /dev/null +++ b/examples/upload_to_stage.py @@ -0,0 +1,27 @@ +from databend_py import Client +import os + + +def create_csv(): + import csv + with open('upload.csv', 'w', newline='') as file: + writer = csv.writer(file) + writer.writerow([1, 'a']) + writer.writerow([1, 'b']) + + +def upload_to_stage(): + client = Client.from_url("http://root:root@localhost:8000") + # upload [(1, 'a'), (1, 'b')] as csv to stage ~ + stage_path = client.upload_to_stage('@~', "upload.csv", [(1, 'a'), (1, 'b')]) + # stage_path is @~/upload.csv + + +def upload_file_to_stage(): + create_csv() + client = Client.from_url("http://root:root@localhost:8000") + with open("upload.csv", "rb") as f: + stage_path = client.upload_to_stage('@~', "upload.csv", f) + print(stage_path) + + os.remove("upload.csv") diff --git a/tests/test_client.py b/tests/test_client.py index fb697ce..cd9e7aa 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -118,6 +118,16 @@ def test_insert(self): _, upload_res = client.execute('select * from test_upload') self.assertEqual(upload_res, [(1, 'a'), (1, 'b')]) + def test_replace(self): + client = Client.from_url(self.databend_url) + client.execute('DROP TABLE IF EXISTS test_replace') + client.execute('CREATE TABLE if not exists test_replace (x Int32,y VARCHAR)') + client.execute('DESC test_replace') + client.replace("default", "test_replace", ['x'], [(1, 'a'), (2, 'b')]) + client.replace("default", "test_replace", ['x'], [(1, 'c'), (2, 'd')]) + _, upload_res = client.execute('select * from test_replace') + self.assertEqual(upload_res, [(1, 'c\r'), (1, 'd\r')]) + def test_insert_with_compress(self): client = Client.from_url(self.databend_url + "?compress=True&debug=True") self.assertEqual(client._uploader._compress, True) @@ -177,6 +187,7 @@ def test_cookies(self): dt.test_batch_insert() dt.test_iter_query() dt.test_insert() + dt.test_replace() dt.test_insert_with_compress() dt.test_upload_to_stage() dt.test_upload_file_to_stage() From 2cd9080a79f37ce8062cc86ee90bc23b95913e11 Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 13 Jun 2023 11:01:13 +0800 Subject: [PATCH 2/3] fix --- databend_py/client.py | 1 - examples/session_setting.py | 1 + examples/upload_to_stage.py | 1 + 3 files changed, 2 insertions(+), 1 deletion(-) diff --git a/databend_py/client.py b/databend_py/client.py index 6394c95..a15c84f 100644 --- a/databend_py/client.py +++ b/databend_py/client.py @@ -1,5 +1,4 @@ import json -import re from urllib.parse import urlparse, parse_qs, unquote from .connection import Connection diff --git a/examples/session_setting.py b/examples/session_setting.py index aba9637..cc93460 100644 --- a/examples/session_setting.py +++ b/examples/session_setting.py @@ -5,3 +5,4 @@ def session_settings(): # docs: https://databend.rs/doc/integrations/api/rest#client-side-session session_settings = {"db": "test"} client = Client(host="localhost", port=8000, user="root", password="root", session_settings=session_settings) + print(client) diff --git a/examples/upload_to_stage.py b/examples/upload_to_stage.py index dc80833..4a630e2 100644 --- a/examples/upload_to_stage.py +++ b/examples/upload_to_stage.py @@ -14,6 +14,7 @@ def upload_to_stage(): client = Client.from_url("http://root:root@localhost:8000") # upload [(1, 'a'), (1, 'b')] as csv to stage ~ stage_path = client.upload_to_stage('@~', "upload.csv", [(1, 'a'), (1, 'b')]) + print(stage_path) # stage_path is @~/upload.csv From 0d2678ba8527306a7d6b1b13d5e6ed41d330cc60 Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 13 Jun 2023 11:03:17 +0800 Subject: [PATCH 3/3] fix tests --- examples/replace_into.py | 2 +- tests/test_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/replace_into.py b/examples/replace_into.py index baa921a..5c00cc5 100644 --- a/examples/replace_into.py +++ b/examples/replace_into.py @@ -9,4 +9,4 @@ def replace_into(): client.replace("default", "test_replace", ['x'], [(1, 'a'), (2, 'b')]) client.replace("default", "test_replace", ['x'], [(1, 'c'), (2, 'd')]) _, upload_res = client.execute('select * from test_replace') - # upload_res is [(1, 'c\r'), (1, 'd\r')] + # upload_res is [(1, 'c\r'), (2, 'd\r')] diff --git a/tests/test_client.py b/tests/test_client.py index cd9e7aa..6438d78 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -126,7 +126,7 @@ def test_replace(self): client.replace("default", "test_replace", ['x'], [(1, 'a'), (2, 'b')]) client.replace("default", "test_replace", ['x'], [(1, 'c'), (2, 'd')]) _, upload_res = client.execute('select * from test_replace') - self.assertEqual(upload_res, [(1, 'c\r'), (1, 'd\r')]) + self.assertEqual(upload_res, [(1, 'c\r'), (2, 'd\r')]) def test_insert_with_compress(self): client = Client.from_url(self.databend_url + "?compress=True&debug=True")