Skip to content

Commit

Permalink
refactor(clickhouse): use sqlglot for create_table implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Oct 11, 2023
1 parent 8ddfa94 commit ea0826d
Showing 1 changed file with 74 additions and 43 deletions.
117 changes: 74 additions & 43 deletions ibis/backends/clickhouse/__init__.py
Expand Up @@ -486,21 +486,10 @@ def raw_sql(
self._log(query)
return self.con.query(query, external_data=external_data, **kwargs)

def fetch_from_cursor(self, cursor, schema):
import pandas as pd

from ibis.formats.pandas import PandasData

df = pd.DataFrame.from_records(iter(cursor), columns=schema.names)
return PandasData.convert_table(df, schema)

def close(self) -> None:
"""Close ClickHouse connection."""
self.con.close()

def _fully_qualified_name(self, name: str, database: str | None) -> str:
return sg.table(name, db=database).sql(dialect="clickhouse")

def get_schema(self, table_name: str, database: str | None = None) -> sch.Schema:
"""Return a Schema object for the indicated table and database.
Expand Down Expand Up @@ -673,61 +662,103 @@ def create_table(
Table
The new table
"""
tmp = "TEMPORARY " * temp
replace = "OR REPLACE " * overwrite

if temp and overwrite:
raise com.IbisInputError("Cannot specify both temp and overwrite")

if not temp:
table = self._fully_qualified_name(name, database)
else:
table = name
database = None
code = f"CREATE {replace}{tmp}TABLE {table}"
raise com.IbisInputError(
"Cannot specify both `temp=True` and `overwrite=True` for ClickHouse"
)

if obj is None and schema is None:
raise com.IbisError("The schema or obj parameter is required")
raise com.IbisError("The `schema` or `obj` parameter is required")

if obj is not None and not isinstance(obj, ir.Expr):
obj = ibis.memtable(obj, schema=schema)

if schema is None:
schema = obj.schema()

serialized_schema = ", ".join(
f"`{name}` {ClickhouseType.to_string(typ)}" for name, typ in schema.items()
this = sg.exp.Schema(
this=sg.table(name, db=database),
expressions=[
sg.exp.ColumnDef(
this=sg.to_identifier(name), kind=ClickhouseType.from_ibis(typ)
)
for name, typ in schema.items()
],
)

code += f" ({serialized_schema}) ENGINE = {engine}"

if order_by is not None:
code += f" ORDER BY {', '.join(util.promote_list(order_by))}"
elif engine == "MergeTree":
# empty tuple to indicate no specific order when engine is
# MergeTree
code += " ORDER BY tuple()"
properties = [
# the engine cannot be quoted, since clickhouse won't allow e.g.,
# "File(Native)"
sg.exp.EngineProperty(this=sg.to_identifier(engine, quoted=False))
]

if temp:
properties.append(sg.exp.TemporaryProperty())

if order_by is not None or engine == "MergeTree":
# engine == "MergeTree" requires an order by clause, which is the
# empty tuple if order_by is False-y
properties.append(
sg.exp.Order(
expressions=[
sg.exp.Ordered(
this=sg.exp.Tuple(
expressions=list(map(sg.column, order_by or ()))
)
)
]
)
)

if partition_by is not None:
code += f" PARTITION BY {', '.join(util.promote_list(partition_by))}"
properties.append(
sg.exp.PartitionedByProperty(
this=sg.exp.Schema(
expressions=list(map(sg.to_identifier, partition_by))
)
)
)

if sample_by is not None:
code += f" SAMPLE BY {sample_by}"
properties.append(
sg.exp.SampleProperty(
this=sg.exp.Tuple(expressions=list(map(sg.column, sample_by)))
)
)

if settings:
kvs = ", ".join(f"{name}={value!r}" for name, value in settings.items())
code += f" SETTINGS {kvs}"
properties.append(
sg.exp.SettingsProperty(
expressions=[
sg.exp.SetItem(
this=sg.exp.EQ(
this=sg.to_identifier(name), expression=lit(value)
)
)
for name, value in settings.items()
]
)
)

external_tables = {}
expression = None

if obj is not None:
code += f" AS {self.compile(obj)}"
external_tables = self._collect_in_memory_tables(obj)
else:
external_tables = {}
expression = self._to_sqlglot(obj)
external_tables.update(self._collect_in_memory_tables(obj))

code = sg.exp.Create(
this=this,
kind="TABLE",
replace=overwrite,
expression=expression,
properties=sg.exp.Properties(expressions=properties),
)

external_data = self._normalize_external_tables(external_tables)

# create the table
self.con.raw_query(code, external_data=external_data)
sql = code.sql(self.name, pretty=True)
self.con.raw_query(sql, external_data=external_data)

return self.table(name, database=database)

Expand Down

0 comments on commit ea0826d

Please sign in to comment.