Skip to content
Permalink
Browse files

Add dbt test solid

Summary: This adds a solid which wraps the dbt test command that emits dagster ExpectationResults.

Test Plan: Manual local testing on jaffle plus included unit tests

Reviewers: #ft, alangenfeld

Reviewed By: #ft, alangenfeld

Subscribers: alangenfeld

Differential Revision: https://dagster.phacility.com/D821
  • Loading branch information...
schrockn committed Aug 13, 2019
1 parent ed69837 commit 69f8dcb78787d6e3f6e93f77e038c572341a38a1
@@ -1,10 +1,16 @@
'''
'''
from dagster import file_relative_path, pipeline

from dagster_dbt import create_dbt_solid
from dagster_dbt import create_dbt_solid, create_dbt_test_solid

jaffle_solid = create_dbt_solid(file_relative_path(__file__, 'jaffle_shop'))
PROJECT_DIR = file_relative_path(__file__, 'jaffle_shop')
PROFILES_DIR = file_relative_path(__file__, 'profiles')

jaffle_solid = create_dbt_solid(PROJECT_DIR, profiles_dir=PROFILES_DIR)
jaffle_test_solid = create_dbt_test_solid(PROJECT_DIR, profiles_dir=PROFILES_DIR)


@pipeline
def jaffle_pipeline():
jaffle_solid() # pylint: disable=no-value-for-parameter
jaffle_test_solid(jaffle_solid()) # pylint: disable=no-value-for-parameter
@@ -57,7 +57,10 @@ models:
description: '{{ doc("orders_status") }}'
tests:
- accepted_values:
values: ['placed', 'shipped', 'completed', 'return_pending', 'returned']
values:
["placed", "shipped", "completed", "return_pending", "returned"]
# Keeping here to inject failures
# values: ["jdkfjkd", "ddddd"]

- name: amount
description: Total amount (AUD) of the order
@@ -0,0 +1 @@
.user.yml
@@ -0,0 +1,18 @@
# For more information on how to configure this file, please see:
# https://github.com/fishtown-analytics/dbt/blob/master/sample.profiles.yml

# Checking in for trivial test.
# Normally this should not be checked in and lives in ~/.dbt/profiles by default.

jaffle_shop:
target: dev
outputs:
dev:
type: postgres
host: localhost
user: test
pass: test
port: 5432
dbname: jaffle_shop
schema: dbt_alice
threads: 4
@@ -24,6 +24,7 @@
'dagstermill',
'dagster-aws',
'dagster-slack',
'dbt-postgres',
'descartes==1.1.0',
'geopandas==0.4.0',
'matplotlib==3.0.2; python_version >= "3.5"',
@@ -4,6 +4,7 @@

from snapshottest import Snapshot


snapshots = Snapshot()

snapshots['test_basic_expectations_within_compute_step_events 1'] = [
@@ -22,7 +23,7 @@
'success': False
},
'level': 'DEBUG',
'message': 'Expectation always_false failed',
'message': 'Failure',
'step': {
'key': 'emit_failed_expectation.compute',
'solidHandleID': 'emit_failed_expectation'
@@ -46,7 +47,7 @@
'success': True
},
'level': 'DEBUG',
'message': 'Expectation always_true passed',
'message': 'Successful',
'step': {
'key': 'emit_successful_expectation.compute',
'solidHandleID': 'emit_successful_expectation'
@@ -65,7 +66,7 @@
'success': True
},
'level': 'DEBUG',
'message': 'Expectation no_metadata passed',
'message': 'Successful',
'step': {
'key': 'emit_successful_expectation_no_metadata.compute',
'solidHandleID': 'emit_successful_expectation_no_metadata'
@@ -371,16 +371,21 @@ def step_materialization(step_context, materialization):
@staticmethod
def step_expectation_result(step_context, expectation_result):
check.inst_param(expectation_result, 'expectation_result', ExpectationResult)

def _msg():
if expectation_result.description:
return expectation_result.description

return 'Expectation{label_clause} {result_verb}'.format(
label_clause=' ' + expectation_result.label if expectation_result.label else '',
result_verb='passed' if expectation_result.success else 'failed',
)

return DagsterEvent.from_step(
event_type=DagsterEventType.STEP_EXPECTATION_RESULT,
step_context=step_context,
event_specific_data=StepExpectationResultData(expectation_result),
message='Expectation{label_clause} {result_verb}'.format(
label_clause=' {label}'.format(label=expectation_result.label)
if expectation_result.label
else '',
result_verb='passed' if expectation_result.success else 'failed',
),
message=_msg(),
)

@staticmethod
@@ -7,8 +7,10 @@

from dagster import (
check,
ExpectationResult,
EventMetadataEntry,
Failure,
InputDefinition,
Materialization,
Nothing,
Output,
@@ -19,6 +21,8 @@
CREATE_VIEW_REGEX = re.compile(r'OK created view model (\w+)\.(\w+)\.* \[CREATE VIEW')
CREATE_TABLE_REGEX = re.compile(r'OK created table model (\w+)\.(\w+)\.* \[SELECT (\d+)')
ANSI_ESCAPE = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')
TEST_PASS_REGEX = re.compile(r'PASS (\w+)\.* \[PASS')
TEST_FAIL_REGEX = re.compile(r'FAIL (\d+) (\w+)\.* \[FAIL')


def try_parse_create_view(text):
@@ -54,23 +58,28 @@ def try_parse_create_table(text):
)


def try_parse(text):
def try_parse_run(text):
for parser in [try_parse_create_view, try_parse_create_table]:
mat = parser(text)
if mat:
return mat


def create_dbt_solid(project_dir, name=None):
def create_dbt_solid(project_dir, name=None, profiles_dir=None):
check.str_param(project_dir, 'project_dir')
check.opt_str_param(name, 'name')
check.opt_str_param(profiles_dir, 'profiles_dir')

@solid(
name=name if name else os.path.basename(project_dir),
output_defs=[OutputDefinition(dagster_type=Nothing, name='run_complete')],
)
def dbt_solid(_):
args = shlex.split('dbt run --project-dir {}'.format(project_dir))
cmd = 'dbt run --project-dir {}'.format(project_dir)
if profiles_dir:
cmd += ' --profiles-dir {}'.format(profiles_dir)

args = shlex.split(cmd)
proc = subprocess.Popen(args, stdout=subprocess.PIPE)

# if https://github.com/fishtown-analytics/dbt/issues/1237 gets done
@@ -87,7 +96,7 @@ def dbt_solid(_):
# remove colors
text = ANSI_ESCAPE.sub('', text)

mat = try_parse(text)
mat = try_parse_run(text)
if mat:
yield mat

@@ -99,3 +108,83 @@ def dbt_solid(_):
yield Output(value=None, output_name='run_complete')

return dbt_solid


def try_parse_pass(text):
pass_match = TEST_PASS_REGEX.search(text)

if not pass_match:
return None

test_name = pass_match.group(1)

return ExpectationResult(
success=True,
label='dbt_test',
description='Dbt test {} passed'.format(test_name),
metadata_entries=[EventMetadataEntry.text(label='dbt_test_name', text=test_name)],
)


def try_parse_fail(text):
fail_match = TEST_FAIL_REGEX.search(text)

if not fail_match:
return None

failure_count = fail_match.group(1)
test_name = fail_match.group(2)

return ExpectationResult(
success=False,
label='dbt_test',
description='Dbt test {} failed'.format(test_name),
metadata_entries=[
EventMetadataEntry.text(label='dbt_test_name', text=test_name),
EventMetadataEntry.text(label='failure_count', text=failure_count),
],
)


def try_parse_test(text):
for parser in [try_parse_pass, try_parse_fail]:
expect = parser(text)
if expect:
return expect


def create_dbt_test_solid(project_dir, name=None, profiles_dir=None):
check.str_param(project_dir, 'project_dir')
check.opt_str_param(name, 'name')
check.opt_str_param(profiles_dir, 'profiles_dir')

@solid(
name=name if name else os.path.basename(project_dir) + '_test',
input_defs=[InputDefinition('test_start', Nothing)],
output_defs=[OutputDefinition(dagster_type=Nothing, name='test_complete')],
)
def dbt_test_solid(_):
cmd = 'dbt test --project-dir {}'.format(project_dir)
if profiles_dir:
cmd += ' --profiles-dir {}'.format(profiles_dir)
args = shlex.split(cmd)
proc = subprocess.Popen(args, stdout=subprocess.PIPE)
for line in io.TextIOWrapper(proc.stdout, encoding='utf-8'):
text = line.rstrip()
if not text:
continue

# print to stdout
print(text)

# remove colors
text = ANSI_ESCAPE.sub('', text)

expt = try_parse_test(text)

if expt:
yield expt

yield Output(value=None, output_name='test_complete')

return dbt_test_solid
@@ -1,4 +1,4 @@
from dagster_dbt import CREATE_TABLE_REGEX, CREATE_VIEW_REGEX
from dagster_dbt import CREATE_TABLE_REGEX, CREATE_VIEW_REGEX, TEST_PASS_REGEX, TEST_FAIL_REGEX

TEST_CREATE_VIEW = (
'17:36:00 | 3 of 8 OK created view model '
@@ -30,3 +30,43 @@ def test_match_table_model():
assert table == 'order_payments'
row_count = int(m.group(3))
assert row_count == 99


TEST_PASS_LONG_STRING = (
'13:55:22 | 1 of 20 PASS '
'accepted_values_fct_orders_status__placed__shipped__completed__return_pending__returned '
'[PASS in 0.05s]'
)


TEST_PASS_SHORT_STRING = (
'13:55:22 | 7 of 20 PASS not_null_fct_orders_coupon_amount'
'....................... [PASS in 0.04s]'
)

LONG_NAME = (
'accepted_values_fct_orders_status__placed__shipped__completed__return_pending__returned'
)


def test_pass_long_string():
m = TEST_PASS_REGEX.search(TEST_PASS_LONG_STRING)
assert m
assert m.group(1) == LONG_NAME


def test_pass_short_string():
m = TEST_PASS_REGEX.search(TEST_PASS_SHORT_STRING)
assert m
assert m.group(1) == 'not_null_fct_orders_coupon_amount'


def test_fail():
test_fail_text = (
'FAIL 5 accepted_values_fct_orders_status__jdkfjkd............ [FAIL 5 in 0.05s]'
)

m = TEST_FAIL_REGEX.search(test_fail_text)
assert m
assert m.group(1) == '5'
assert m.group(2) == 'accepted_values_fct_orders_status__jdkfjkd'

0 comments on commit 69f8dcb

Please sign in to comment.
You can’t perform that action at this time.