Skip to content

Commit

Permalink
first pass using materialization for data tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyle Wigley committed Mar 29, 2021
1 parent ce30dfa commit a5ce658
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 55 deletions.
35 changes: 0 additions & 35 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from dbt.context.providers import generate_runtime_model
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.compiled import (
CompiledDataTestNode,
CompiledSchemaTestNode,
COMPILED_TYPES,
GraphMemberNode,
Expand Down Expand Up @@ -351,34 +350,6 @@ def _recursively_prepend_ctes(

return model, prepended_ctes

def _add_ctes(
self,
compiled_node: NonSourceCompiledNode,
manifest: Manifest,
extra_context: Dict[str, Any],
) -> NonSourceCompiledNode:
"""Wrap the data test SQL in a CTE."""

# for data tests, we need to insert a special CTE at the end of the
# list containing the test query, and then have the "real" query be a
# select count(*) from that model.
# the benefit of doing it this way is that _add_ctes() can be
# rewritten for different adapters to handle databases that don't
# support CTEs, or at least don't have full support.
if isinstance(compiled_node, CompiledDataTestNode):
# the last prepend (so last in order) should be the data test body.
# then we can add our select count(*) from _that_ cte as the "real"
# compiled_sql, and do the regular prepend logic from CTEs.
name = self._get_dbt_test_name()
cte = InjectedCTE(
id=name,
sql=f' {name} as (\n{compiled_node.compiled_sql}\n)'
)
compiled_node.extra_ctes.append(cte)
compiled_node.compiled_sql = f'\nselect count(*) from {name}'

return compiled_node

# creates a compiled_node from the ManifestNode passed in,
# creates a "context" dictionary for jinja rendering,
# and then renders the "compiled_sql" using the node, the
Expand Down Expand Up @@ -417,12 +388,6 @@ def _compile_node(

compiled_node.compiled = True

# add ctes for specific test nodes, and also for
# possible future use in adapters
compiled_node = self._add_ctes(
compiled_node, manifest, extra_context
)

return compiled_node

def write_graph_file(self, linker: Linker, manifest: Manifest):
Expand Down
10 changes: 10 additions & 0 deletions core/dbt/include/global_project/macros/materializations/test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{%- materialization test, default -%}

{% call statement('main', fetch_result=True) -%}
select count(*) as validation_errors
from (
{{ sql }}
) _dbt_internal_test
{%- endcall %}

{%- endmaterialization -%}
67 changes: 47 additions & 20 deletions core/dbt/task/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
ParsedSchemaTestNode,
)
from dbt.contracts.results import RunResult, TestStatus
from dbt.exceptions import raise_compiler_error, InternalException
from dbt.context.providers import generate_runtime_model
from dbt.clients.jinja import MacroGenerator
from dbt.exceptions import (
raise_compiler_error,
InternalException,
missing_materialization
)
from dbt.graph import (
ResourceTypeSelector,
SelectionSpec,
Expand All @@ -41,25 +47,8 @@ def print_start_line(self):
description = self.describe_node()
print_start_line(description, self.node_index, self.num_nodes)

def execute_data_test(self, test: CompiledDataTestNode):
res, table = self.adapter.execute(
test.compiled_sql, auto_begin=True, fetch=True
)

num_rows = len(table.rows)
if num_rows != 1:
num_cols = len(table.columns)
# since we just wrapped our query in `select count(*)`, we are in
# big trouble!
raise InternalException(
f"dbt internally failed to execute {test.unique_id}: "
f"Returned {num_rows} rows and {num_cols} cols, but expected "
f"1 row and 1 column"
)
return table[0][0]

def execute_schema_test(self, test: CompiledSchemaTestNode):
res, table = self.adapter.execute(
_, table = self.adapter.execute(
test.compiled_sql,
auto_begin=True,
fetch=True,
Expand All @@ -78,9 +67,47 @@ def execute_schema_test(self, test: CompiledSchemaTestNode):
def before_execute(self):
self.print_start_line()

def execute_data_test(
self,
test: CompiledDataTestNode,
manifest: Manifest
) -> int:
context = generate_runtime_model(
test, self.config, manifest
)

materialization_macro = manifest.find_materialization_macro_by_name(
self.config.project_name,
test.get_materialization(),
self.adapter.type())

if materialization_macro is None:
missing_materialization(test, self.adapter.type())

if 'config' not in context:
raise InternalException(
'Invalid materialization context generated, missing config: {}'
.format(context)
)

MacroGenerator(materialization_macro, context)()
result = context['load_result']('main')
table = result['table']
num_rows = len(table.rows)
if num_rows != 1:
num_cols = len(table.columns)
# since we just wrapped our query in `select count(*)`, we are in
# big trouble!
raise InternalException(
f"dbt internally failed to execute {test.unique_id}: "
f"Returned {num_rows} rows and {num_cols} cols, but expected "
f"1 row and 1 column"
)
return int(table[0][0])

def execute(self, test: CompiledTestNode, manifest: Manifest):
if isinstance(test, CompiledDataTestNode):
failed_rows = self.execute_data_test(test)
failed_rows = self.execute_data_test(test, manifest)
elif isinstance(test, CompiledSchemaTestNode):
failed_rows = self.execute_schema_test(test)
else:
Expand Down

0 comments on commit a5ce658

Please sign in to comment.