Skip to content

Commit

Permalink
refactor(bigquery): align datatype conversions with the new convention
Browse files Browse the repository at this point in the history
  • Loading branch information
kszucs authored and cpcloud committed May 25, 2023
1 parent 5d11f48 commit 70b8232
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 192 deletions.
18 changes: 6 additions & 12 deletions ibis/backends/bigquery/__init__.py
Expand Up @@ -15,19 +15,18 @@
import ibis
import ibis.common.exceptions as com
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
from ibis.backends.base import Database
from ibis.backends.base.sql import BaseSQLBackend
from ibis.backends.bigquery.client import (
BigQueryCursor,
bigquery_field_to_ibis_dtype,
bigquery_param,
ibis_schema_to_bigquery_schema,
parse_project_and_dataset,
rename_partitioned_column,
schema_from_bigquery_table,
)
from ibis.backends.bigquery.compiler import BigQueryCompiler
from ibis.backends.bigquery.datatypes import schema_from_bigquery, schema_to_bigquery

with contextlib.suppress(ImportError):
from ibis.backends.bigquery.udf import udf # noqa: F401
Expand Down Expand Up @@ -218,17 +217,13 @@ def _fully_qualified_name(self, name, database):
def _get_schema_using_query(self, query):
job_config = bq.QueryJobConfig(dry_run=True, use_query_cache=False)
job = self.client.query(query, job_config=job_config)
fields = self._adapt_types(job.schema)
return sch.Schema(fields)
return schema_from_bigquery(job.schema)

def _get_table_schema(self, qualified_name):
dataset, table = qualified_name.rsplit(".", 1)
assert dataset is not None, "dataset is None"
return self.get_schema(table, database=dataset)

def _adapt_types(self, descr):
return {col.name: bigquery_field_to_ibis_dtype(col) for col in descr}

def _execute(self, stmt, results=True, query_parameters=None):
job_config = bq.job.QueryJobConfig()
job_config.query_parameters = query_parameters or []
Expand Down Expand Up @@ -383,8 +378,8 @@ def to_pyarrow_batches(
def get_schema(self, name, database=None):
table_id = self._fully_qualified_name(name, database)
table_ref = bq.TableReference.from_string(table_id)
bq_table = self.client.get_table(table_ref)
return sch.infer(bq_table)
table = self.client.get_table(table_ref)
return schema_from_bigquery_table(table)

def list_databases(self, like=None):
results = [
Expand Down Expand Up @@ -428,8 +423,7 @@ def create_table(
)
if schema is not None:
table_id = self._fully_qualified_name(name, database)
bigquery_schema = ibis_schema_to_bigquery_schema(schema)
table = bq.Table(table_id, schema=bigquery_schema)
table = bq.Table(table_id, schema=schema_to_bigquery(schema))
self.client.create_table(table)
else:
project_id, dataset = self._parse_project_and_dataset(database)
Expand Down
76 changes: 9 additions & 67 deletions ibis/backends/bigquery/client.py
Expand Up @@ -10,82 +10,24 @@
import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.schema as sch
from ibis.backends.bigquery.datatypes import ibis_type_to_bigquery_type
from ibis.backends.bigquery.datatypes import dtype_to_bigquery, schema_from_bigquery

NATIVE_PARTITION_COL = "_PARTITIONTIME"


_DTYPE_TO_IBIS_TYPE = {
"INT64": dt.int64,
"FLOAT64": dt.double,
"BOOL": dt.boolean,
"STRING": dt.string,
"DATE": dt.date,
"DATETIME": dt.Timestamp(timezone=None),
"TIME": dt.time,
"TIMESTAMP": dt.Timestamp(timezone="UTC"),
"BYTES": dt.binary,
"NUMERIC": dt.Decimal(38, 9),
"GEOGRAPHY": dt.GeoSpatial(geotype="geography", srid=4326),
}


_LEGACY_TO_STANDARD = {
"INTEGER": "INT64",
"FLOAT": "FLOAT64",
"BOOLEAN": "BOOL",
}


@dt.dtype.register(bq.schema.SchemaField)
def bigquery_field_to_ibis_dtype(field):
"""Convert BigQuery `field` to an ibis type."""
typ = field.field_type
if typ == "RECORD":
fields = field.fields
assert fields, "RECORD fields are empty"
names = [el.name for el in fields]
ibis_types = list(map(dt.dtype, fields))
ibis_type = dt.Struct(dict(zip(names, ibis_types)))
else:
ibis_type = _LEGACY_TO_STANDARD.get(typ, typ)
ibis_type = _DTYPE_TO_IBIS_TYPE.get(ibis_type, ibis_type)
if field.mode == "REPEATED":
ibis_type = dt.Array(ibis_type)
return ibis_type

def schema_from_bigquery_table(table):
schema = schema_from_bigquery(table.schema)

@sch.infer.register(bq.table.Table)
def bigquery_schema(table):
"""Infer the schema of a BigQuery `table` object."""
fields = {el.name: dt.dtype(el) for el in table.schema}
# Check for partitioning information
partition_info = table._properties.get("timePartitioning", None)

# We have a partitioned table
if partition_info is not None:
# We have a partitioned table
partition_field = partition_info.get("field", NATIVE_PARTITION_COL)

# Only add a new column if it's not already a column in the schema
fields.setdefault(partition_field, dt.timestamp)
return sch.schema(fields)
if partition_field not in schema:
schema |= {partition_field: dt.timestamp}


def ibis_schema_to_bigquery_schema(schema: sch.Schema):
return [
(
bq.SchemaField(
name,
ibis_type_to_bigquery_type(type_),
mode='NULLABLE' if type_.nullable else 'REQUIRED',
)
if not type_.is_array()
else bq.SchemaField(
name, ibis_type_to_bigquery_type(type_.value_type), mode='REPEATED'
)
)
for name, type_ in schema.items()
]
return schema


class BigQueryCursor:
Expand Down Expand Up @@ -142,7 +84,7 @@ def bq_param_array(dtype: dt.Array, value, name):
value_type = dtype.value_type

try:
bigquery_type = ibis_type_to_bigquery_type(value_type)
bigquery_type = dtype_to_bigquery(value_type)
except NotImplementedError:
raise com.UnsupportedBackendType(dtype)
else:
Expand Down
202 changes: 116 additions & 86 deletions ibis/backends/bigquery/datatypes.py
@@ -1,103 +1,133 @@
from __future__ import annotations

from multipledispatch import Dispatcher
import google.cloud.bigquery as bq

import ibis.common.exceptions as com
import ibis.expr.datatypes as dt

ibis_type_to_bigquery_type = Dispatcher("ibis_type_to_bigquery_type")


@ibis_type_to_bigquery_type.register(str)
def trans_string_default(datatype):
return ibis_type_to_bigquery_type(dt.dtype(datatype))


@ibis_type_to_bigquery_type.register(dt.Floating)
def trans_float64(t):
return "FLOAT64"


@ibis_type_to_bigquery_type.register(dt.Integer)
def trans_integer(t):
return "INT64"


@ibis_type_to_bigquery_type.register(dt.Binary)
def trans_binary(t):
return "BYTES"


@ibis_type_to_bigquery_type.register(dt.UInt64)
def trans_lossy_integer(t):
raise TypeError("Conversion from uint64 to BigQuery integer type (int64) is lossy")


@ibis_type_to_bigquery_type.register(dt.Array)
def trans_array(t):
return f"ARRAY<{ibis_type_to_bigquery_type(t.value_type)}>"
import ibis.expr.schema as sch

_from_bigquery_types = {
"INT64": dt.Int64,
"INTEGER": dt.Int64,
"FLOAT": dt.Float64,
"FLOAT64": dt.Float64,
"BOOL": dt.Boolean,
"BOOLEAN": dt.Boolean,
"STRING": dt.String,
"DATE": dt.Date,
"TIME": dt.Time,
"BYTES": dt.Binary,
"JSON": dt.JSON,
}


def dtype_from_bigquery(typ: str, nullable=True) -> dt.DataType:
if typ == "DATETIME":
return dt.Timestamp(timezone=None, nullable=nullable)
elif typ == "TIMESTAMP":
return dt.Timestamp(timezone="UTC", nullable=nullable)
elif typ == "NUMERIC":
return dt.Decimal(38, 9, nullable=nullable)
elif typ == "BIGNUMERIC":
return dt.Decimal(76, 38, nullable=nullable)
elif typ == "GEOGRAPHY":
return dt.GeoSpatial(geotype="geography", srid=4326, nullable=nullable)
else:
try:
return _from_bigquery_types[typ](nullable=nullable)
except KeyError:
raise TypeError(f"Unable to convert BigQuery type to ibis: {typ}")


def dtype_from_bigquery_field(field: bq.SchemaField) -> dt.DataType:
typ = field.field_type
if typ == "RECORD":
assert field.fields, "RECORD fields are empty"
fields = {f.name: dtype_from_bigquery_field(f) for f in field.fields}
dtype = dt.Struct(fields)
else:
dtype = dtype_from_bigquery(typ)

mode = field.mode
if mode == "NULLABLE":
return dtype.copy(nullable=True)
elif mode == "REQUIRED":
return dtype.copy(nullable=False)
elif mode == "REPEATED":
return dt.Array(dtype)
else:
raise TypeError(f"Unknown BigQuery field.mode: {mode}")


@ibis_type_to_bigquery_type.register(dt.Struct)
def trans_struct(t):
return "STRUCT<{}>".format(
", ".join(
f"{name} {ibis_type_to_bigquery_type(dt.dtype(type_))}"
for name, type_ in t.fields.items()
def dtype_to_bigquery(dtype: dt.DataType) -> str:
if dtype.is_floating():
return "FLOAT64"
elif dtype.is_uint64():
raise TypeError(
"Conversion from uint64 to BigQuery integer type (int64) is lossy"
)
)


@ibis_type_to_bigquery_type.register(dt.Date)
def trans_date(t):
return "DATE"


@ibis_type_to_bigquery_type.register(dt.Timestamp)
def trans_timestamp(t):
if t.timezone is None:
return "DATETIME"
elif t.timezone == 'UTC':
return "TIMESTAMP"
else:
raise com.UnsupportedOperationError(
"BigQuery does not support timestamps with timezones other than 'UTC'"
elif dtype.is_integer():
return "INT64"
elif dtype.is_binary():
return "BYTES"
elif dtype.is_date():
return "DATE"
elif dtype.is_timestamp():
if dtype.timezone is None:
return "DATETIME"
elif dtype.timezone == 'UTC':
return "TIMESTAMP"
else:
raise TypeError(
"BigQuery does not support timestamps with timezones other than 'UTC'"
)
elif dtype.is_decimal():
if (dtype.precision, dtype.scale) == (76, 38):
return 'BIGNUMERIC'
if (dtype.precision, dtype.scale) in [(38, 9), (None, None)]:
return "NUMERIC"
raise TypeError(
"BigQuery only supports decimal types with precision of 38 and "
f"scale of 9 (NUMERIC) or precision of 76 and scale of 38 (BIGNUMERIC). "
f"Current precision: {dtype.precision}. Current scale: {dtype.scale}"
)
elif dtype.is_array():
return f"ARRAY<{dtype_to_bigquery(dtype.value_type)}>"
elif dtype.is_struct():
fields = (f"{k} {dtype_to_bigquery(v)}" for k, v in dtype.fields.items())
return "STRUCT<{}>".format(", ".join(fields))
elif dtype.is_json():
return "JSON"
elif dtype.is_geospatial():
if (dtype.geotype, dtype.srid) == ("geography", 4326):
return "GEOGRAPHY"
raise TypeError(
"BigQuery geography uses points on WGS84 reference ellipsoid."
f"Current geotype: {dtype.geotype}, Current srid: {dtype.srid}"
)
else:
return str(dtype).upper()


@ibis_type_to_bigquery_type.register(dt.DataType)
def trans_type(t):
return str(t).upper()


@ibis_type_to_bigquery_type.register(dt.Decimal)
def trans_numeric(t):
if (t.precision, t.scale) == (76, 38):
return 'BIGNUMERIC'
if (t.precision, t.scale) in [(38, 9), (None, None)]:
return "NUMERIC"
raise TypeError(
"BigQuery only supports decimal types with precision of 38 and "
f"scale of 9 (NUMERIC) or precision of 76 and scale of 38 (BIGNUMERIC). "
f"Current precision: {t.precision}. Current scale: {t.scale}"
)


@ibis_type_to_bigquery_type.register(dt.JSON)
def trans_json(t):
return "JSON"
def schema_to_bigquery(schema: sch.Schema) -> list[bq.SchemaField]:
result = []
for name, dtype in schema.items():
if isinstance(dtype, dt.Array):
mode = "REPEATED"
dtype = dtype.value_type
else:
mode = "REQUIRED" if not dtype.nullable else "NULLABLE"
field = bq.SchemaField(name, dtype_to_bigquery(dtype), mode=mode)
result.append(field)
return result


@ibis_type_to_bigquery_type.register(dt.GeoSpatial)
def trans_geography(t):
if (t.geotype, t.srid) == ("geography", 4326):
return "GEOGRAPHY"
raise com.UnsupportedOperationError(
"BigQuery geography uses points on WGS84 reference ellipsoid."
f"Current geotype: {t.geotype}, Current srid: {t.srid}"
)
def schema_from_bigquery(fields: list[bq.SchemaField]) -> sch.Schema:
return sch.Schema({f.name: dtype_from_bigquery_field(f) for f in fields})


# TODO(kszucs): we can eliminate this function by making dt.DataType traversible
# using ibis.common.graph.Node, similarly to how we traverse ops.Node instances:
# node.find(types)
def spread_type(dt: dt.DataType):
"""Returns a generator that contains all the types in the given type.
Expand Down

0 comments on commit 70b8232

Please sign in to comment.