Skip to content

Commit

Permalink
ENH: Abstract out the table_class and table_expr_class properties
Browse files Browse the repository at this point in the history
Author: Phillip Cloud <cpcloud@gmail.com>

Closes ibis-project#1425 from cpcloud/generalize-table-class and squashes the following commits:

997f88f [Phillip Cloud] ENH: Abstract out the table_class and table_expr_class properties
  • Loading branch information
cpcloud committed Apr 16, 2018
1 parent f917f7c commit 48f2bb9
Show file tree
Hide file tree
Showing 13 changed files with 1,229 additions and 1,232 deletions.
9 changes: 5 additions & 4 deletions ibis/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,17 @@ def bq_param_date(param, value):
return bq.ScalarQueryParameter(param.get_name(), 'DATE', value)


class BigQueryTable(ops.DatabaseTable):
pass


class BigQueryClient(SQLClient):

sync_query = BigQueryQuery
database_class = BigQueryDatabase
proxy_class = BigQueryAPIProxy
dialect = comp.BigQueryDialect
table_class = BigQueryTable

def __init__(self, project_id, dataset_id):
self._proxy = type(self).proxy_class(project_id)
Expand All @@ -305,10 +310,6 @@ def project_id(self):
def dataset_id(self):
return self._dataset_id

@property
def _table_expr_klass(self):
return ir.TableExpr

def table(self, *args, **kwargs):
t = super(BigQueryClient, self).table(*args, **kwargs)
if NATIVE_PARTITION_COL in t.columns:
Expand Down
153 changes: 78 additions & 75 deletions ibis/clickhouse/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import ibis.expr.types as ir
import ibis.expr.schema as sch
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops

from ibis.config import options
from ibis.compat import zip as czip, parse_version
Expand Down Expand Up @@ -124,12 +125,89 @@ def _fetch(self, cursor):
return self.schema().apply_to(df)


class ClickhouseTable(ir.TableExpr, DatabaseEntity):
"""References a physical table in Clickhouse"""

@property
def _qualified_name(self):
return self.op().args[0]

@property
def _unqualified_name(self):
return self._match_name()[1]

@property
def _client(self):
return self.op().args[2]

def _match_name(self):
m = fully_qualified_re.match(self._qualified_name)
if not m:
raise com.IbisError('Cannot determine database name from {0}'
.format(self._qualified_name))
db, quoted, unquoted = m.groups()
return db, quoted or unquoted

@property
def _database(self):
return self._match_name()[0]

def invalidate_metadata(self):
self._client.invalidate_metadata(self._qualified_name)

def metadata(self):
"""
Return parsed results of DESCRIBE FORMATTED statement
Returns
-------
meta : TableMetadata
"""
return self._client.describe_formatted(self._qualified_name)

describe_formatted = metadata

@property
def name(self):
return self.op().name

def _execute(self, stmt):
return self._client._execute(stmt)

def insert(self, obj, **kwargs):
from .identifiers import quote_identifier
schema = self.schema()

assert isinstance(obj, pd.DataFrame)
assert set(schema.names) >= set(obj.columns)

columns = ', '.join(map(quote_identifier, obj.columns))
query = 'INSERT INTO {table} ({columns}) VALUES'.format(
table=self._qualified_name, columns=columns)

# convert data columns with datetime64 pandas dtype to native date
# because clickhouse-driver 0.0.10 does arithmetic operations on it
obj = obj.copy()
for col in obj.select_dtypes(include=[np.datetime64]):
if isinstance(schema[col], dt.Date):
obj[col] = obj[col].dt.date

data = obj.to_dict('records')
return self._client.con.process_insert_query(query, data, **kwargs)


class ClickhouseDatabaseTable(ops.DatabaseTable):
pass


class ClickhouseClient(SQLClient):
"""An Ibis client interface that uses Clickhouse"""

database_class = ClickhouseDatabase
sync_query = ClickhouseQuery
dialect = ClickhouseDialect
table_class = ClickhouseDatabaseTable
table_expr_class = ClickhouseTable

def __init__(self, *args, **kwargs):
self.con = _DriverClient(*args, **kwargs)
Expand All @@ -142,10 +220,6 @@ def current_database(self):
# might be better to use driver.Connection instead of Client
return self.con.connection.database

@property
def _table_expr_klass(self):
return ClickhouseTable

def log(self, msg):
log(msg)

Expand Down Expand Up @@ -336,74 +410,3 @@ def version(self):
raise
else:
return parse_version(vstring)


class ClickhouseTable(ir.TableExpr, DatabaseEntity):
"""References a physical table in Clickhouse"""

@property
def _qualified_name(self):
return self.op().args[0]

@property
def _unqualified_name(self):
return self._match_name()[1]

@property
def _client(self):
return self.op().args[2]

def _match_name(self):
m = fully_qualified_re.match(self._qualified_name)
if not m:
raise com.IbisError('Cannot determine database name from {0}'
.format(self._qualified_name))
db, quoted, unquoted = m.groups()
return db, quoted or unquoted

@property
def _database(self):
return self._match_name()[0]

def invalidate_metadata(self):
self._client.invalidate_metadata(self._qualified_name)

def metadata(self):
"""
Return parsed results of DESCRIBE FORMATTED statement
Returns
-------
meta : TableMetadata
"""
return self._client.describe_formatted(self._qualified_name)

describe_formatted = metadata

@property
def name(self):
return self.op().name

def _execute(self, stmt):
return self._client._execute(stmt)

def insert(self, obj, **kwargs):
from .identifiers import quote_identifier
schema = self.schema()

assert isinstance(obj, pd.DataFrame)
assert set(schema.names) >= set(obj.columns)

columns = ', '.join(map(quote_identifier, obj.columns))
query = 'INSERT INTO {table} ({columns}) VALUES'.format(
table=self._qualified_name, columns=columns)

# convert data columns with datetime64 pandas dtype to native date
# because clickhouse-driver 0.0.10 does arithmetic operations on it
obj = obj.copy()
for col in obj.select_dtypes(include=[np.datetime64]):
if isinstance(schema[col], dt.Date):
obj[col] = obj[col].dt.date

data = obj.to_dict('records')
return self._client.con.process_insert_query(query, data, **kwargs)
24 changes: 4 additions & 20 deletions ibis/client.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
# Copyright 2014 Cloudera Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import abc

import six
Expand Down Expand Up @@ -102,6 +88,8 @@ class SQLClient(six.with_metaclass(abc.ABCMeta, Client)):
async_query = Query

dialect = comp.Dialect
table_class = ops.DatabaseTable
table_expr_class = ir.TableExpr

def table(self, name, database=None):
"""
Expand All @@ -119,12 +107,8 @@ def table(self, name, database=None):
"""
qualified_name = self._fully_qualified_name(name, database)
schema = self._get_table_schema(qualified_name)
node = ops.DatabaseTable(qualified_name, schema, self)
return self._table_expr_klass(node)

@property
def _table_expr_klass(self):
return ir.TableExpr
node = self.table_class(qualified_name, schema, self)
return self.table_expr_class(node)

@property
def current_database(self):
Expand Down
5 changes: 3 additions & 2 deletions ibis/file/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class CSVClient(FileClient):

dialect = dialect
extension = 'csv'
table_class = CSVTable

def insert(self, path, expr, index=False, **kwargs):
path = self.root / path
Expand All @@ -71,7 +72,7 @@ def table(self, name, path=None, schema=None, **kwargs):

# infer sample's schema and define table
schema = sch.infer(sample)
table = CSVTable(name, schema, self, **kwargs).to_expr()
table = self.table_class(name, schema, self, **kwargs).to_expr()

self.dictionary[name] = f

Expand All @@ -91,7 +92,7 @@ def version(self):
return parse_version(pd.__version__)


@pre_execute.register(CSVTable, CSVClient)
@pre_execute.register(CSVClient.table_class, CSVClient)
def csv_pre_execute_table(op, client, scope, **kwargs):
# cache
if isinstance(scope.get(op), pd.DataFrame):
Expand Down
5 changes: 3 additions & 2 deletions ibis/file/hdf5.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class HDFTable(ops.DatabaseTable):

class HDFClient(FileClient):
extension = 'h5'
table_class = HDFTable

def insert(self, path, key, expr, format='table',
data_columns=True, **kwargs):
Expand All @@ -43,7 +44,7 @@ def table(self, name, path):
df = store.select(name, start=0, stop=0)
schema = sch.infer(df)

t = HDFTable(name, schema, self).to_expr()
t = self.table_class(name, schema, self).to_expr()
self.dictionary[name] = path
return t

Expand All @@ -65,7 +66,7 @@ def list_databases(self, path=None):
return self._list_databases_dirs_or_files(path)


@pre_execute.register(HDFTable, HDFClient)
@pre_execute.register(HDFClient.table_class, HDFClient)
def hdf_pre_execute_table(op, client, scope, **kwargs):

# cache
Expand Down
5 changes: 3 additions & 2 deletions ibis/file/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ParquetClient(FileClient):

dialect = dialect
extension = 'parquet'
table_class = ParquetTable

def insert(self, path, expr, **kwargs):
path = self.root / path
Expand All @@ -84,7 +85,7 @@ def table(self, name, path):
parquet_file = pq.ParquetFile(str(f))
schema = sch.infer(parquet_file.schema)

table = ParquetTable(name, schema, self).to_expr()
table = self.table_class(name, schema, self).to_expr()
self.dictionary[name] = f

return table
Expand All @@ -103,7 +104,7 @@ def version(self):
return parse_version(pa.__version__)


@pre_execute.register(ParquetTable, ParquetClient)
@pre_execute.register(ParquetClient.table_class, ParquetClient)
def parquet_pre_execute_client(op, client, scope, **kwargs):
# cache
if isinstance(scope.get(op), pd.DataFrame):
Expand Down
Loading

0 comments on commit 48f2bb9

Please sign in to comment.