Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Pure Client example:
> 1
> ```

More usages examples find [here](./examples).

# Features

- Basic SQL.
Expand Down
29 changes: 18 additions & 11 deletions databend_py/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import json
import re
from urllib.parse import urlparse, parse_qs, unquote

from .connection import Connection
Expand All @@ -21,7 +20,8 @@ def __init__(self, *args, **kwargs):
self.query_result_cls = QueryResult
self.helper = Helper
self._debug = asbool(self.settings.get('debug', False))
self._uploader = DataUploader(self, self.settings, debug=self._debug, compress=self.settings.get('compress', False))
self._uploader = DataUploader(self, self.connection, self.settings, debug=self._debug,
compress=self.settings.get('compress', False))

def __enter__(self):
return self
Expand Down Expand Up @@ -122,17 +122,14 @@ def _process_insert_query(self, query, params):
query = query.split("values")[0] + 'values'
elif "VALUES" in query:
query = query.split("VALUES")[0] + 'VALUES'
insert_re = re.compile("(?i)^INSERT INTO\s+\x60?([\w.^\(]+)\x60?\s*(\([^\)]*\))?")
match = insert_re.match(query.strip())
if len(match.group().split(' ')) < 2:
raise Exception("Not standard insert statement")
table_name = match[1]

if len(query.split(' ')) < 3:
raise Exception("Not standard insert/replace statement")
table_name = query.split(' ')[2]
batch_size = query.count(',') + 1
if params is not None:
tuple_ls = [tuple(params[i:i + batch_size]) for i in range(0, len(params), batch_size)]
insert_rows = len(tuple_ls)
self._uploader.upload_to_table(table_name, tuple_ls)
self._uploader.upload_to_table_by_copy(table_name, tuple_ls)
return insert_rows

def _process_ordinary_query(self, query, params=None, with_column_types=False,
Expand Down Expand Up @@ -239,7 +236,17 @@ def insert(self, database_name, table_name, data):
data: the data which write into, it's a list of tuple
"""
# TODO: escape the database & table name
self._uploader.upload_to_table("%s.%s" % (database_name, table_name), data)
self._uploader.upload_to_table_by_copy("%s.%s" % (database_name, table_name), data)

def replace(self, database_name, table_name, conflict_keys, data):
"""
replace the data into database.table according to the file
database_name: the target database
table_name: the table which write into
conflict_keys: the key that use to replace into
data: the data which write into, it's a list of tuple
"""
self._uploader.upload_to_table_with_attachment("%s.%s" % (database_name, table_name), conflict_keys, data)

def upload_to_stage(self, stage_dir, file_name, data):
"""
Expand All @@ -249,4 +256,4 @@ def upload_to_stage(self, stage_dir, file_name, data):
:param data: the data value or file handler
:return:
"""
return self._uploader.upload_to_stage(stage_dir, file_name, data)
return self._uploader.upload_to_stage(stage_dir, file_name, data)
13 changes: 8 additions & 5 deletions databend_py/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class Connection(object):
# 'database': 'default'
# }
def __init__(self, host, port=None, user=defines.DEFAULT_USER, password=defines.DEFAULT_PASSWORD,
database=defines.DEFAULT_DATABASE, secure=False, copy_purge=False, session_settings=None, persist_cookies=False):
database=defines.DEFAULT_DATABASE, secure=False, copy_purge=False, session_settings=None,
persist_cookies=False):
self.host = host
self.port = port
self.user = user
Expand Down Expand Up @@ -116,16 +117,18 @@ def disconnect(self):
@retry(times=5, exceptions=WarehouseTimeoutException)
def do_query(self, url, query_sql):
response = self.requests_session.post(url,
data=json.dumps(query_sql),
headers=self.make_headers(),
auth=HTTPBasicAuth(self.user, self.password),
verify=True)
data=json.dumps(query_sql),
headers=self.make_headers(),
auth=HTTPBasicAuth(self.user, self.password),
verify=True)
try:
resp_dict = json.loads(response.content)
except json.decoder.JSONDecodeError:
raise UnexpectedException("failed to parse response: %s" % response.content)
if resp_dict and resp_dict.get('error') and "no endpoint" in resp_dict.get('error'):
raise WarehouseTimeoutException
if resp_dict and resp_dict.get('error'):
raise UnexpectedException("failed to query: %s" % response.content)
if self.persist_cookies:
self.cookies = response.cookies
return resp_dict
Expand Down
54 changes: 52 additions & 2 deletions databend_py/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,42 @@
import json
import time
import gzip
from . import log


class DataUploader:
def __init__(self, client, settings, default_stage_dir='@~', debug=False, compress=False):
def __init__(self, client, connection, settings, default_stage_dir='@~', debug=False, compress=False):
# TODO: make it depends on Connection instead of Client
self.client = client
self.connection = connection
self.settings = settings
self.default_stage_dir = default_stage_dir
self._compress = compress
self._debug = debug

def upload_to_table(self, table_name, data):
def upload_to_table_by_copy(self, table_name, data):
if len(data) == 0:
return
stage_path = self._gen_stage_path(self.default_stage_dir)
presigned_url, headers = self._execute_presign(stage_path)
self._upload_to_presigned_url(presigned_url, headers, data)
self._execute_copy(table_name, stage_path, 'CSV')

def upload_to_table_with_attachment(self, table_name, conflict_keys, data):
"""
:param table_name: table name
:param conflict_keys: if use replace, the conflict_keys can't be None
:param data: list data to insert/replace
:return:
"""
if len(data) == 0:
return
stage_path = self._gen_stage_path(self.default_stage_dir)
presigned_url, headers = self._execute_presign(stage_path)
self._upload_to_presigned_url(presigned_url, headers, data)
sql_statement = f"REPLACE INTO {table_name} ON ({','.join(conflict_keys)}) VALUES"
self._execute_with_attachment(sql_statement, stage_path, "CSV")

def upload_to_stage(self, stage_dir, filename, data):
stage_path = self._gen_stage_path(stage_dir, filename)
presigned_url, headers = self._execute_presign(stage_path)
Expand Down Expand Up @@ -106,3 +123,36 @@ def _make_copy_statement(self, table_name, stage_path, file_type):
f"FILE_FORMAT = (type = {file_type} RECORD_DELIMITER = '\\r\\n' COMPRESSION = AUTO) " \
f"PURGE = {copy_options['PURGE']} FORCE = {copy_options['FORCE']} " \
f"SIZE_LIMIT={copy_options['SIZE_LIMIT']} ON_ERROR = {copy_options['ON_ERROR']}"

def _execute_with_attachment(self, sql_statement, stage_path, file_type):
start_time = time.time()
data = self._make_attachment(sql_statement, stage_path, file_type)
url = self.connection.format_url()

try:
resp_dict = self.connection.do_query(url, data)
self.client_session = resp_dict.get("session", self.connection.default_session())
if self._debug:
print('upload:_execute_attachment sql=%s %s' % (sql_statement, time.time() - start_time))
except Exception as e:
log.logger.error(
f"http error on {url}, SQL: {sql_statement} error msg:{str(e)}"
)
raise

def _make_attachment(self, sql_statement, stage_path, file_type):
copy_options = {}
copy_options["PURGE"] = self.settings.get("copy_purge", "False")
copy_options["FORCE"] = self.settings.get("force", "False")
copy_options["SIZE_LIMIT"] = self.settings.get("size_limit", "0")
copy_options["ON_ERROR"] = self.settings.get("on_error", "abort")

file_format_options = {}
file_format_options["type"] = file_type

data = {
"sql": sql_statement,
"stage_attachment": {"location": stage_path, "file_format_options": file_format_options,
"copy_options": copy_options}
}
return data
31 changes: 31 additions & 0 deletions examples/batch_insert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from databend_py import Client


def insert():
client = Client.from_url("http://root:root@localhost:8000")
client.execute('DROP TABLE IF EXISTS test_upload')
client.execute('CREATE TABLE if not exists test_upload (x Int32,y VARCHAR)')
client.execute('DESC test_upload')
client.insert("default", "test_upload", [(1, 'a'), (1, 'b')])
_, upload_res = client.execute('select * from test_upload')
# upload_res is [(1, 'a'), (1, 'b')]


def batch_insert():
c = Client.from_url("http://root:root@localhost:8000")
c.execute('DROP TABLE IF EXISTS test')
c.execute('CREATE TABLE if not exists test (x Int32,y VARCHAR)')
c.execute('DESC test')
_, r1 = c.execute('INSERT INTO test (x,y) VALUES (%,%)', [1, 'yy', 2, 'xx'])
_, ss = c.execute('select * from test')
# ss is [(1, 'yy'), (2, 'xx')]


def batch_insert_with_tuple():
c = Client.from_url("http://root:root@localhost:8000")
c.execute('DROP TABLE IF EXISTS test')
c.execute('CREATE TABLE if not exists test (x Int32,y VARCHAR)')
c.execute('DESC test')
# data is tuple list
_, r1 = c.execute('INSERT INTO test (x,y) VALUES', [(3, 'aa'), (4, 'bb')])
_, ss = c.execute('select * from test')
9 changes: 9 additions & 0 deletions examples/iter_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from databend_py import Client


def iter_query():
client = Client.from_url("http://root:root@localhost:8000")
result = client.execute_iter("select 1", with_column_types=False)
result_list = [i for i in result]
# result_list is [1]
print(result_list)
17 changes: 17 additions & 0 deletions examples/ordinary_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from databend_py import Client


def ordinary_query():
client = Client.from_url("http://root:root@localhost:8000")
_, res = client.execute("select 1", with_column_types=False)
# res is [(1,)]

column_type, res2 = client.execute("select 1", with_column_types=True)
# column_type is [('1', 'UInt8')]
# res2 [(1,)]
print(column_type)
print(res2)

# create table/ drop table
client.execute('DROP TABLE IF EXISTS test')
client.execute('CREATE TABLE if not exists test (x Int32,y VARCHAR)')
12 changes: 12 additions & 0 deletions examples/replace_into.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from databend_py import Client


def replace_into():
client = Client.from_url("http://root:root@localhost:8000")
client.execute('DROP TABLE IF EXISTS test_replace')
client.execute('CREATE TABLE if not exists test_replace (x Int32,y VARCHAR)')
client.execute('DESC test_replace')
client.replace("default", "test_replace", ['x'], [(1, 'a'), (2, 'b')])
client.replace("default", "test_replace", ['x'], [(1, 'c'), (2, 'd')])
_, upload_res = client.execute('select * from test_replace')
# upload_res is [(1, 'c\r'), (2, 'd\r')]
8 changes: 8 additions & 0 deletions examples/session_setting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from databend_py import Client


def session_settings():
# docs: https://databend.rs/doc/integrations/api/rest#client-side-session
session_settings = {"db": "test"}
client = Client(host="localhost", port=8000, user="root", password="root", session_settings=session_settings)
print(client)
28 changes: 28 additions & 0 deletions examples/upload_to_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from databend_py import Client
import os


def create_csv():
import csv
with open('upload.csv', 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow([1, 'a'])
writer.writerow([1, 'b'])


def upload_to_stage():
client = Client.from_url("http://root:root@localhost:8000")
# upload [(1, 'a'), (1, 'b')] as csv to stage ~
stage_path = client.upload_to_stage('@~', "upload.csv", [(1, 'a'), (1, 'b')])
print(stage_path)
# stage_path is @~/upload.csv


def upload_file_to_stage():
create_csv()
client = Client.from_url("http://root:root@localhost:8000")
with open("upload.csv", "rb") as f:
stage_path = client.upload_to_stage('@~', "upload.csv", f)
print(stage_path)

os.remove("upload.csv")
11 changes: 11 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ def test_insert(self):
_, upload_res = client.execute('select * from test_upload')
self.assertEqual(upload_res, [(1, 'a'), (1, 'b')])

def test_replace(self):
client = Client.from_url(self.databend_url)
client.execute('DROP TABLE IF EXISTS test_replace')
client.execute('CREATE TABLE if not exists test_replace (x Int32,y VARCHAR)')
client.execute('DESC test_replace')
client.replace("default", "test_replace", ['x'], [(1, 'a'), (2, 'b')])
client.replace("default", "test_replace", ['x'], [(1, 'c'), (2, 'd')])
_, upload_res = client.execute('select * from test_replace')
self.assertEqual(upload_res, [(1, 'c\r'), (2, 'd\r')])

def test_insert_with_compress(self):
client = Client.from_url(self.databend_url + "?compress=True&debug=True")
self.assertEqual(client._uploader._compress, True)
Expand Down Expand Up @@ -177,6 +187,7 @@ def test_cookies(self):
dt.test_batch_insert()
dt.test_iter_query()
dt.test_insert()
dt.test_replace()
dt.test_insert_with_compress()
dt.test_upload_to_stage()
dt.test_upload_file_to_stage()
Expand Down