Skip to content

Commit

Permalink
Improve schema parsing from BigQuery client (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
connor-charles committed Jan 19, 2023
1 parent 6ab5207 commit d8dba53
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 9 deletions.
5 changes: 4 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

# dbt-dry-run v0.6.3

## Bugfixes
## Bugfixes & Improvements

- Add support for INTERVAL and JSON types.

- Improved error handling when parsing the predicted schema of the dry run queries. Error message will now raise an
`UnknownSchemaException` detailing the field type returned by BigQuery that it does not recognise

# dbt-dry-run v0.6.2

## Bugfixes
Expand Down
24 changes: 24 additions & 0 deletions dbt_dry_run/exception.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from google.cloud.bigquery import SchemaField
from pydantic import ValidationError


class NotCompiledException(Exception):
pass

Expand Down Expand Up @@ -32,3 +36,23 @@ class SnapshotConfigException(Exception):

class ManifestValidationError(Exception):
pass


class UnknownSchemaException(Exception):
pass

@classmethod
def from_validation_error(
cls, schema_field: SchemaField, e: ValidationError
) -> "UnknownSchemaException":
errors = e.errors()
column_type_exception = list(filter(lambda err: "type" in err["loc"], errors))
if column_type_exception:
return cls(
f"BigQuery dry run field '{schema_field.name}' returned unknown column types: {column_type_exception[0]['msg']}."
f"If you think this column type is valid then raise an issue on GitHub"
)
else:
return cls(
f"Couldn't understand schema returned from BigQuery for field '{schema_field.name}' error:\n{str(e)}"
)
7 changes: 6 additions & 1 deletion dbt_dry_run/models/table.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from enum import Enum
from typing import List, Optional, Set

import pydantic
from google.cloud.bigquery import SchemaField
from google.cloud.bigquery.table import Table as BigQueryTable
from pydantic import Field
Expand Down Expand Up @@ -39,9 +40,13 @@ class TableField(BaseModel):
name: str
type_: BigQueryFieldType = Field(..., alias="type")
mode: Optional[BigQueryFieldMode]
fields: Optional[List["TableField"]]
fields: Optional[List["TableField"]] = None
description: Optional[str]

@pydantic.validator("type_", pre=True)
def validate_type_field(cls, field: str) -> BigQueryFieldType:
return BigQueryFieldType(field)


TableField.update_forward_refs()

Expand Down
35 changes: 28 additions & 7 deletions dbt_dry_run/sql_runner/big_query_sql_runner.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from contextlib import contextmanager
from typing import Optional, Tuple
from typing import List, Optional, Tuple

from google.cloud.bigquery import (
Client,
DatasetReference,
QueryJob,
QueryJobConfig,
SchemaField,
TableReference,
)
from google.cloud.exceptions import BadRequest, Forbidden, NotFound
from pydantic import ValidationError
from tenacity import (
retry,
retry_if_exception_type,
Expand All @@ -17,6 +19,7 @@
)

from dbt_dry_run.adapter.service import ProjectService
from dbt_dry_run.exception import UnknownSchemaException
from dbt_dry_run.models import Table, TableField
from dbt_dry_run.models.manifest import Node
from dbt_dry_run.results import DryRunStatus
Expand Down Expand Up @@ -63,7 +66,7 @@ def query(
client = self.get_client()
try:
query_job = client.query(sql, job_config=self.JOB_CONFIG)
table = self.get_schema_from_query_job(query_job)
table = self.get_schema_from_schema_fields(query_job.schema or [])
status = DryRunStatus.SUCCESS
except (Forbidden, BadRequest, NotFound) as e:
status = DryRunStatus.FAILURE
Expand All @@ -73,9 +76,27 @@ def query(
return status, table, exception

@staticmethod
def get_schema_from_query_job(query_job: QueryJob) -> Table:
job_fields_raw = query_job._properties["statistics"]["query"]["schema"][
"fields"
]
job_fields = [TableField.parse_obj(field) for field in job_fields_raw]
def get_schema_from_schema_fields(schema_fields: List[SchemaField]) -> Table:
def _map_schema_fields_to_table_field(schema_field: SchemaField) -> TableField:
try:
parsed_fields = (
BigQuerySQLRunner.get_schema_from_schema_fields(
schema_field.fields
).fields
if schema_field.fields
else None
)
return TableField(
name=schema_field.name,
mode=schema_field.mode,
type=schema_field.field_type,
description=schema_field.description,
fields=parsed_fields,
)
except ValidationError as e:
raise UnknownSchemaException.from_validation_error(
schema_field, e
) from e

job_fields = list(map(_map_schema_fields_to_table_field, schema_fields))
return Table(fields=job_fields)
11 changes: 11 additions & 0 deletions dbt_dry_run/test/sql_runner/test_big_query_sql_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from tenacity import RetryError, wait_none

from dbt_dry_run.adapter.service import ProjectService
from dbt_dry_run.exception import UnknownSchemaException
from dbt_dry_run.results import DryRunStatus
from dbt_dry_run.sql_runner.big_query_sql_runner import (
MAX_ATTEMPT_NUMBER,
Expand Down Expand Up @@ -104,3 +105,13 @@ def test_get_node_schema_returns_table_schema() -> None:

table_column_names = set(field.name for field in table.fields)
assert table_column_names == set("a")


def test_get_schema_from_schema_fields_raises_error_if_unknown_field_type() -> None:
invalid_field_type = "INVALID_FIELD_TYPE"
invalid_field_name = "a"
expected_error_message = f"BigQuery dry run field '{invalid_field_name}' returned unknown column types: '{invalid_field_type}' is not a valid BigQueryFieldType"
with pytest.raises(UnknownSchemaException, match=expected_error_message):
BigQuerySQLRunner.get_schema_from_schema_fields(
[SchemaField(name=invalid_field_name, field_type=invalid_field_type)]
)

0 comments on commit d8dba53

Please sign in to comment.