New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clickhouse query backend #1127
Clickhouse query backend #1127
Changes from all commits
d24b478
9dc605f
e7bb4c1
6b71f26
b855f95
5d1dea4
1ff165f
906b019
6ee06b3
2657bc5
3a4024f
a477e23
320b308
ae7fa50
5aa93b4
3a4261d
47553a4
243dff2
9d4d365
c9c6156
995192b
10d43c3
b2b8820
e8b8fd4
e6702ee
2f2ab74
5a082ea
52806fe
2a20131
f7f4c31
60e8814
e787d88
caa0b2e
a1c9d66
a95a807
5e73b8a
b13e5da
2809bbf
637dedd
6656801
b9cba61
8abe2a3
27bdc37
ff41290
c07a3c6
82c76ea
b638c0b
609aa9e
bab6ffb
2c5339b
4986f93
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
DROP TABLE IF EXISTS diamonds; | ||
|
||
CREATE TABLE diamonds ( | ||
`date` Date DEFAULT today(), | ||
carat Float64, | ||
cut String, | ||
color String, | ||
clarity String, | ||
depth Float64, | ||
`table` Float64, | ||
price Int64, | ||
x Float64, | ||
y Float64, | ||
z Float64 | ||
) ENGINE = MergeTree(date, (`carat`), 8192); | ||
|
||
DROP TABLE IF EXISTS batting; | ||
|
||
CREATE TABLE batting ( | ||
`date` Date DEFAULT today(), | ||
`playerID` String, | ||
`yearID` Int64, | ||
stint Int64, | ||
`teamID` String, | ||
`lgID` String, | ||
`G` Int64, | ||
`AB` Int64, | ||
`R` Int64, | ||
`H` Int64, | ||
`X2B` Int64, | ||
`X3B` Int64, | ||
`HR` Int64, | ||
`RBI` Int64, | ||
`SB` Int64, | ||
`CS` Int64, | ||
`BB` Int64, | ||
`SO` Int64, | ||
`IBB` Int64, | ||
`HBP` Int64, | ||
`SH` Int64, | ||
`SF` Int64, | ||
`GIDP` Int64 | ||
) ENGINE = MergeTree(date, (`playerID`), 8192); | ||
|
||
DROP TABLE IF EXISTS awards_players; | ||
|
||
CREATE TABLE awards_players ( | ||
`date` Date DEFAULT today(), | ||
`playerID` String, | ||
`awardID` String, | ||
`yearID` Int64, | ||
`lgID` String, | ||
tie String, | ||
notes String | ||
) ENGINE = MergeTree(date, (`playerID`), 8192); | ||
|
||
DROP TABLE IF EXISTS functional_alltypes; | ||
|
||
CREATE TABLE functional_alltypes ( | ||
`date` Date DEFAULT toDate(timestamp_col), | ||
`index` Int64, | ||
`Unnamed_0` Int64, | ||
id Int32, | ||
bool_col UInt8, | ||
tinyint_col Int8, | ||
smallint_col Int16, | ||
int_col Int32, | ||
bigint_col Int64, | ||
float_col Float32, | ||
double_col Float64, | ||
date_string_col String, | ||
string_col String, | ||
timestamp_col DateTime, | ||
year Int32, | ||
month Int32 | ||
) ENGINE = MergeTree(date, (`index`), 8192); | ||
|
||
DROP TABLE IF EXISTS tzone; | ||
|
||
CREATE TABLE tzone ( | ||
`date` Date DEFAULT today(), | ||
ts DateTime, | ||
key String, | ||
value Float64 | ||
) ENGINE = MergeTree(date, (key), 8192); | ||
|
||
DROP TABLE IF EXISTS array_types; | ||
|
||
CREATE TABLE IF NOT EXISTS array_types ( | ||
`date` Date DEFAULT today(), | ||
x Array(Int64), | ||
y Array(String), | ||
z Array(Float64), | ||
grouper String, | ||
scalar_column Float64 | ||
) ENGINE = MergeTree(date, (scalar_column), 8192); |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,83 @@ def cli(): | |
pass | ||
|
||
|
||
@cli.command() | ||
@click.argument('tables', nargs=-1) | ||
@click.option('-S', '--script', type=click.File('rt'), required=True) | ||
@click.option( | ||
'-d', '--database', | ||
default=os.environ.get('IBIS_TEST_CLICKHOUSE_DB', 'ibis_testing') | ||
) | ||
@click.option( | ||
'-D', '--data-directory', | ||
default=tempfile.gettempdir(), type=click.Path(exists=True) | ||
) | ||
def clickhouse(script, tables, database, data_directory): | ||
username = os.environ.get('IBIS_CLICKHOUSE_USER', 'default') | ||
host = os.environ.get('IBIS_CLICKHOUSE_HOST', 'localhost') | ||
password = os.environ.get('IBIS_CLICKHOUSE_PASS', '') | ||
|
||
url = sa.engine.url.URL( | ||
'clickhouse+native', | ||
username=username, | ||
host=host, | ||
password=password, | ||
) | ||
engine = sa.create_engine(str(url)) | ||
engine.execute('DROP DATABASE IF EXISTS "{}"'.format(database)) | ||
engine.execute('CREATE DATABASE "{}"'.format(database)) | ||
|
||
url = sa.engine.url.URL( | ||
'clickhouse+native', | ||
username=username, | ||
host=host, | ||
password=password, | ||
database=database, | ||
) | ||
engine = sa.create_engine(str(url)) | ||
script_text = script.read() | ||
|
||
# missing stmt | ||
# INSERT INTO array_types (x, y, z, grouper, scalar_column) VALUES | ||
# ([1, 2, 3], ['a', 'b', 'c'], [1.0, 2.0, 3.0], 'a', 1.0), | ||
# ([4, 5], ['d', 'e'], [4.0, 5.0], 'a', 2.0), | ||
# ([6], ['f'], [6.0], 'a', 3.0), | ||
# ([1], ['a'], [], 'b', 4.0), | ||
# ([2, 3], ['b', 'c'], [], 'b', 5.0), | ||
# ([4, 5], ['d', 'e'], [4.0, 5.0], 'c', 6.0); | ||
|
||
with engine.begin() as con: | ||
# doesn't support multiple statements | ||
for stmt in script_text.split(';'): | ||
if len(stmt.strip()): | ||
con.execute(stmt) | ||
|
||
table_paths = [ | ||
os.path.join(data_directory, '{}.csv'.format(table)) | ||
for table in tables | ||
] | ||
dtype = {'bool_col': np.bool_} | ||
for table, path in zip(tables, table_paths): | ||
# correct dtypes per table to be able to insert | ||
# TODO: cleanup, kinda ugly | ||
df = pd.read_csv(path, index_col=None, header=0, dtype=dtype) | ||
if table == 'functional_alltypes': | ||
df = df.rename(columns={'Unnamed: 0': 'Unnamed_0'}) | ||
cols = ['date_string_col', 'string_col'] | ||
df[cols] = df[cols].astype(str) | ||
df.timestamp_col = df.timestamp_col.astype('datetime64[s]') | ||
elif table == 'batting': | ||
cols = ['playerID', 'teamID', 'lgID'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These should already be strings. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
df[cols] = df[cols].astype(str) | ||
cols = df.select_dtypes([float]).columns | ||
df[cols] = df[cols].fillna(0).astype(int) | ||
elif table == 'awards_players': | ||
cols = ['playerID', 'awardID', 'lgID', 'tie', 'notes'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These should already be strings. |
||
df[cols] = df[cols].astype(str) | ||
|
||
df.to_sql(table, engine, index=False, if_exists='append') | ||
|
||
|
||
@cli.command() | ||
@click.argument('tables', nargs=-1) | ||
@click.option('-S', '--script', type=click.File('rt'), required=True) | ||
|
@@ -103,6 +180,8 @@ def sqlite(script, tables, database, data_directory): | |
os.path.join(data_directory, '{}.csv'.format(table)) | ||
for table in tables | ||
] | ||
click.echo(tables) | ||
click.echo(table_paths) | ||
for table, path in zip(tables, table_paths): | ||
df = pd.read_csv(path, index_col=None, header=0) | ||
with engine.begin() as con: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,3 +23,5 @@ dependencies: | |
- toolz | ||
- pip: | ||
- hdfs>=2.0.0 | ||
- clickhouse-driver | ||
- clickhouse-sqlalchemy |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,3 +21,5 @@ dependencies: | |
- toolz | ||
- pip: | ||
- hdfs>=2.0.0 | ||
- clickhouse-driver | ||
- clickhouse-sqlalchemy |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,3 +21,5 @@ dependencies: | |
- toolz | ||
- pip: | ||
- hdfs>=2.0.0 | ||
- clickhouse-driver | ||
- clickhouse-sqlalchemy |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,3 +21,5 @@ dependencies: | |
- toolz | ||
- pip: | ||
- hdfs>=2.0.0 | ||
- clickhouse-driver | ||
- clickhouse-sqlalchemy |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
import ibis.common as com | ||
|
||
from ibis.config import options | ||
from ibis.clickhouse.client import ClickhouseClient | ||
|
||
|
||
def compile(expr): | ||
""" | ||
Force compilation of expression as though it were an expression depending | ||
on Clickhouse. Note you can also call expr.compile() | ||
|
||
Returns | ||
------- | ||
compiled : string | ||
""" | ||
from .compiler import to_sql | ||
return to_sql(expr) | ||
|
||
|
||
def verify(expr): | ||
""" | ||
Determine if expression can be successfully translated to execute on | ||
Clickhouse | ||
""" | ||
try: | ||
compile(expr) | ||
return True | ||
except com.TranslationError: | ||
return False | ||
|
||
|
||
def connect(host='localhost', port=9000, database='default', user='default', | ||
password='', client_name='ibis', compression=False): | ||
"""Create an ClickhouseClient for use with Ibis. | ||
|
||
Parameters | ||
---------- | ||
host : str, optional | ||
Host name of the clickhouse server | ||
port : int, optional | ||
Clickhouse server's port | ||
database : str, optional | ||
Default database when executing queries | ||
user : str, optional | ||
User to authenticate with | ||
password : str, optional | ||
Password to authenticate with | ||
client_name: str, optional | ||
This will appear in clickhouse server logs | ||
compression: str, optional | ||
Weather or not to use compression. Default is False. | ||
Possible choices: lz4, lz4hc, quicklz, zstd | ||
True is equivalent to 'lz4'. | ||
|
||
Examples | ||
-------- | ||
>>> import ibis | ||
>>> import os | ||
>>> clickhouse_host = os.environ.get('IBIS_TEST_CLICKHOUSE_HOST', | ||
... 'localhost') | ||
>>> clickhouse_port = int(os.environ.get('IBIS_TEST_CLICKHOUSE_PORT', | ||
... 9000)) | ||
>>> client = ibis.clickhouse.connect( | ||
... host=clickhouse_host, | ||
... port=clickhouse_port | ||
... ) | ||
>>> client # doctest: +ELLIPSIS | ||
<ibis.clickhouse.client.ClickhouseClient object at 0x...> | ||
|
||
Returns | ||
------- | ||
ClickhouseClient | ||
""" | ||
|
||
client = ClickhouseClient(host, port=port, database=database, user=user, | ||
password=password, client_name=client_name, | ||
compression=compression) | ||
if options.default_backend is None: | ||
options.default_backend = client | ||
|
||
return client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should already be strings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a temporary workaround, I'd like to use zkostyan/clickhouse-sqlalchemy with pandas to_sql, but there are a couple of issues.