Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve schema parsing from BigQuery client #29

Merged
1 commit merged into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

# dbt-dry-run v0.6.3

## Bugfixes
## Bugfixes & Improvements

- Add support for INTERVAL and JSON types.

- Improved error handling when parsing the predicted schema of the dry run queries. Error message will now raise an
`UnknownSchemaException` detailing the field type returned by BigQuery that it does not recognise

# dbt-dry-run v0.6.2

## Bugfixes
Expand Down
24 changes: 24 additions & 0 deletions dbt_dry_run/exception.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from google.cloud.bigquery import SchemaField
from pydantic import ValidationError


class NotCompiledException(Exception):
pass

Expand Down Expand Up @@ -32,3 +36,23 @@ class SnapshotConfigException(Exception):

class ManifestValidationError(Exception):
pass


class UnknownSchemaException(Exception):
pass

@classmethod
def from_validation_error(
cls, schema_field: SchemaField, e: ValidationError
) -> "UnknownSchemaException":
errors = e.errors()
column_type_exception = list(filter(lambda err: "type" in err["loc"], errors))
if column_type_exception:
return cls(
f"BigQuery dry run field '{schema_field.name}' returned unknown column types: {column_type_exception[0]['msg']}."
f"If you think this column type is valid then raise an issue on GitHub"
)
else:
return cls(
f"Couldn't understand schema returned from BigQuery for field '{schema_field.name}' error:\n{str(e)}"
)
7 changes: 6 additions & 1 deletion dbt_dry_run/models/table.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from enum import Enum
from typing import List, Optional, Set

import pydantic
from google.cloud.bigquery import SchemaField
from google.cloud.bigquery.table import Table as BigQueryTable
from pydantic import Field
Expand Down Expand Up @@ -39,9 +40,13 @@ class TableField(BaseModel):
name: str
type_: BigQueryFieldType = Field(..., alias="type")
mode: Optional[BigQueryFieldMode]
fields: Optional[List["TableField"]]
fields: Optional[List["TableField"]] = None
description: Optional[str]

@pydantic.validator("type_", pre=True)
def validate_type_field(cls, field: str) -> BigQueryFieldType:
return BigQueryFieldType(field)


TableField.update_forward_refs()

Expand Down
35 changes: 28 additions & 7 deletions dbt_dry_run/sql_runner/big_query_sql_runner.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from contextlib import contextmanager
from typing import Optional, Tuple
from typing import List, Optional, Tuple

from google.cloud.bigquery import (
Client,
DatasetReference,
QueryJob,
QueryJobConfig,
SchemaField,
TableReference,
)
from google.cloud.exceptions import BadRequest, Forbidden, NotFound
from pydantic import ValidationError
from tenacity import (
retry,
retry_if_exception_type,
Expand All @@ -17,6 +19,7 @@
)

from dbt_dry_run.adapter.service import ProjectService
from dbt_dry_run.exception import UnknownSchemaException
from dbt_dry_run.models import Table, TableField
from dbt_dry_run.models.manifest import Node
from dbt_dry_run.results import DryRunStatus
Expand Down Expand Up @@ -63,7 +66,7 @@ def query(
client = self.get_client()
try:
query_job = client.query(sql, job_config=self.JOB_CONFIG)
table = self.get_schema_from_query_job(query_job)
table = self.get_schema_from_schema_fields(query_job.schema or [])
status = DryRunStatus.SUCCESS
except (Forbidden, BadRequest, NotFound) as e:
status = DryRunStatus.FAILURE
Expand All @@ -73,9 +76,27 @@ def query(
return status, table, exception

@staticmethod
def get_schema_from_query_job(query_job: QueryJob) -> Table:
job_fields_raw = query_job._properties["statistics"]["query"]["schema"][
"fields"
]
job_fields = [TableField.parse_obj(field) for field in job_fields_raw]
def get_schema_from_schema_fields(schema_fields: List[SchemaField]) -> Table:
def _map_schema_fields_to_table_field(schema_field: SchemaField) -> TableField:
try:
parsed_fields = (
BigQuerySQLRunner.get_schema_from_schema_fields(
schema_field.fields
).fields
if schema_field.fields
else None
)
return TableField(
name=schema_field.name,
mode=schema_field.mode,
type=schema_field.field_type,
description=schema_field.description,
fields=parsed_fields,
)
except ValidationError as e:
raise UnknownSchemaException.from_validation_error(
schema_field, e
) from e

job_fields = list(map(_map_schema_fields_to_table_field, schema_fields))
return Table(fields=job_fields)
11 changes: 11 additions & 0 deletions dbt_dry_run/test/sql_runner/test_big_query_sql_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from tenacity import RetryError, wait_none

from dbt_dry_run.adapter.service import ProjectService
from dbt_dry_run.exception import UnknownSchemaException
from dbt_dry_run.results import DryRunStatus
from dbt_dry_run.sql_runner.big_query_sql_runner import (
MAX_ATTEMPT_NUMBER,
Expand Down Expand Up @@ -104,3 +105,13 @@ def test_get_node_schema_returns_table_schema() -> None:

table_column_names = set(field.name for field in table.fields)
assert table_column_names == set("a")


def test_get_schema_from_schema_fields_raises_error_if_unknown_field_type() -> None:
invalid_field_type = "INVALID_FIELD_TYPE"
invalid_field_name = "a"
expected_error_message = f"BigQuery dry run field '{invalid_field_name}' returned unknown column types: '{invalid_field_type}' is not a valid BigQueryFieldType"
with pytest.raises(UnknownSchemaException, match=expected_error_message):
BigQuerySQLRunner.get_schema_from_schema_fields(
[SchemaField(name=invalid_field_name, field_type=invalid_field_type)]
)