Skip to content
Permalink
Browse files

Initial dagster-dbt prototype

Summary:
Finally was able to make some time for this! This is a
prototype-quality dbt integration, but it demonstrates what the shape of
this would look like.

I copied the example from https://github.com/fishtown-analytics/jaffle_shop/

This shells out to dbt itself and runs against the database in the
examples docker container. (I had to manually create the database). It
just parses stdout with regex's, which is quite fragile.

It would be better if dbt emitted some sort of structured log so that
this could be parsed more reliably. Taylor is excited enough about this
possibility that he might tackle it. See fishtown-analytics/dbt#1237

This emits materializations for each view or table created in the
example. This has not been thoroughly tested.

Next Steps:
  - Parse materializations out of the dbt project and render them as
  outputs
  - Also create a type-per-model and render metadata within dagit.
  - Consume dbt tests during a run and emit Expectations.
  - Model "seeds" as inputs into the dbt node.

Test Plan: Run jaffle example in dagit. Buildkite

Reviewers: #ft, natekupp

Reviewed By: #ft, natekupp

Subscribers: natekupp

Differential Revision: https://dagster.phacility.com/D795
  • Loading branch information...
schrockn committed Aug 10, 2019
1 parent 6c78175 commit f545529d3bdb948d8fb132a806296cc73875fcea
@@ -18,6 +18,7 @@ def define_demo_repo():
from dagster_examples.event_pipeline_demo.pipelines import event_ingest_pipeline from dagster_examples.event_pipeline_demo.pipelines import event_ingest_pipeline
from dagster_examples.pyspark_pagerank.pyspark_pagerank_pipeline import pyspark_pagerank from dagster_examples.pyspark_pagerank.pyspark_pagerank_pipeline import pyspark_pagerank
from dagster_pandas.examples import papermill_pandas_hello_world_pipeline from dagster_pandas.examples import papermill_pandas_hello_world_pipeline
from dagster_examples.jaffle_dbt.jaffle import jaffle_pipeline


return RepositoryDefinition( return RepositoryDefinition(
name='demo_repository', name='demo_repository',
@@ -34,5 +35,6 @@ def define_demo_repo():
event_ingest_pipeline, event_ingest_pipeline,
pyspark_pagerank, pyspark_pagerank,
papermill_pandas_hello_world_pipeline, papermill_pandas_hello_world_pipeline,
jaffle_pipeline,
], ],
) )
@@ -0,0 +1,10 @@
from dagster import file_relative_path, pipeline

from dagster_dbt import create_dbt_solid

jaffle_solid = create_dbt_solid(file_relative_path(__file__, 'jaffle_shop'))


@pipeline
def jaffle_pipeline():
jaffle_solid() # pylint: disable=no-value-for-parameter
@@ -56,7 +56,7 @@
} }
] ]
}, },
'message': 'Materialized value all_types.', 'message': 'a materialization with all metadata types',
'step': { 'step': {
'key': 'materialize.compute', 'key': 'materialize.compute',
'solidHandleID': 'materialize' 'solidHandleID': 'materialize'
@@ -359,7 +359,9 @@ def step_materialization(step_context, materialization):
event_type=DagsterEventType.STEP_MATERIALIZATION, event_type=DagsterEventType.STEP_MATERIALIZATION,
step_context=step_context, step_context=step_context,
event_specific_data=StepMaterializationData(materialization), event_specific_data=StepMaterializationData(materialization),
message='Materialized value{label_clause}.'.format( message=materialization.description
if materialization.description
else 'Materialized value{label_clause}.'.format(
label_clause=' {label}'.format(label=materialization.label) label_clause=' {label}'.format(label=materialization.label)
if materialization.label if materialization.label
else '' else ''
@@ -92,6 +92,7 @@ def test_cloudwatch_logging(cloudwatch_client):
now = millisecond_timestamp(datetime.datetime.utcnow()) now = millisecond_timestamp(datetime.datetime.utcnow())


attempt_num = 0 attempt_num = 0

found_orig_message = False found_orig_message = False


while not found_orig_message and attempt_num < NUM_POLL_ATTEMPTS: while not found_orig_message and attempt_num < NUM_POLL_ATTEMPTS:
@@ -0,0 +1,2 @@
[run]
branch = True
@@ -0,0 +1,101 @@
from collections import namedtuple
import io
import os
import re
import shlex
import subprocess

from dagster import (
check,
EventMetadataEntry,
Failure,
Materialization,
Nothing,
Output,
OutputDefinition,
solid,
)

CREATE_VIEW_REGEX = re.compile(r'OK created view model (\w+)\.(\w+)\.* \[CREATE VIEW')
CREATE_TABLE_REGEX = re.compile(r'OK created table model (\w+)\.(\w+)\.* \[SELECT (\d+)')
ANSI_ESCAPE = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')


def try_parse_create_view(text):
view_match = CREATE_VIEW_REGEX.search(text)

if not view_match:
return None

return Materialization(
label='create_view',
description=text,
metadata_entries=[
EventMetadataEntry.text(view_match.group(1), 'schema'),
EventMetadataEntry.text(view_match.group(2), 'view'),
],
)


def try_parse_create_table(text):
table_match = CREATE_TABLE_REGEX.search(text)

if not table_match:
return None

return Materialization(
label='create_table',
description=text,
metadata_entries=[
EventMetadataEntry.text(table_match.group(1), 'schema'),
EventMetadataEntry.text(table_match.group(2), 'table'),
EventMetadataEntry.text(table_match.group(3), 'row_count'),
],
)


def try_parse(text):
for parser in [try_parse_create_view, try_parse_create_table]:
mat = parser(text)
if mat:
return mat


def create_dbt_solid(project_dir, name=None):
check.str_param(project_dir, 'project_dir')
check.opt_str_param(name, 'name')

@solid(
name=name if name else os.path.basename(project_dir),
output_defs=[OutputDefinition(dagster_type=Nothing, name='run_complete')],
)
def dbt_solid(_):
args = shlex.split('dbt run --project-dir {}'.format(project_dir))
proc = subprocess.Popen(args, stdout=subprocess.PIPE)

# if https://github.com/fishtown-analytics/dbt/issues/1237 gets done
# we should definitely switch to parsing the json output, as that
# would be much more reliable/resilient
for line in io.TextIOWrapper(proc.stdout, encoding='utf-8'):
text = line.rstrip()
if not text:
continue

# print to stdout
print(text)

# remove colors
text = ANSI_ESCAPE.sub('', text)

mat = try_parse(text)
if mat:
yield mat

proc.wait()

if proc.returncode != 0:
raise Failure('Dbt invocation errored')

yield Output(value=None, output_name='run_complete')

return dbt_solid
@@ -0,0 +1,3 @@
__version__ = '0.5.5'

__nightly__ = 'nightly-2019.07.29'
@@ -0,0 +1,32 @@
from dagster_dbt import CREATE_TABLE_REGEX, CREATE_VIEW_REGEX

TEST_CREATE_VIEW = (
'17:36:00 | 3 of 8 OK created view model '
'dbt_alice.stg_customers................. [CREATE VIEW in 0.18s]'
)

TEST_CREATE_TABLE = (
'17:36:01 | 4 of 8 OK created table model '
'dbt_alice.order_payments............... [SELECT 99 in 0.07s]'
)


def test_match_view_model():
m = CREATE_VIEW_REGEX.search(TEST_CREATE_VIEW)
assert m
schema = m.group(1)
assert schema == 'dbt_alice'
view = m.group(2)
assert view == 'stg_customers'


def test_match_table_model():
m = CREATE_TABLE_REGEX.search(TEST_CREATE_TABLE)
assert m

schema = m.group(1)
assert schema == 'dbt_alice'
table = m.group(2)
assert table == 'order_payments'
row_count = int(m.group(3))
assert row_count == 99
@@ -0,0 +1,51 @@
import argparse
import sys

from setuptools import find_packages, setup


def get_version(name):
version = {}
with open('dagster_dbt/version.py') as fp:
exec(fp.read(), version) # pylint: disable=W0122

if name == 'dagster-dbt':
return version['__version__']
elif name == 'dagster-dbt-nightly':
return version['__nightly__']
else:
raise Exception('Shouldn\'t be here: bad package name {name}'.format(name=name))


parser = argparse.ArgumentParser()
parser.add_argument('--nightly', action='store_true')


def _do_setup(name='dagster-dbt'):
setup(
name='dagster_dbt',
version=get_version(name),
author='Elementl',
license='Apache-2.0',
description='Package for Dagster dbt integration.',
url='https://github.com/dagster-io/dagster/tree/master/python_modules/libraries/dagster-dbt',
classifiers=[
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'License :: OSI Approved :: Apache Software License',
'Operating System :: OS Independent',
],
packages=find_packages(exclude=['test']),
install_requires=['dagster', 'dbt-core'],
zip_safe=False,
)


if __name__ == '__main__':
parsed, unparsed = parser.parse_known_args()
sys.argv = [sys.argv[0]] + unparsed
if parsed.nightly:
_do_setup('dagster-dbt-nightly')
else:
_do_setup('dagster-dbt')
@@ -0,0 +1,15 @@
[tox]
envlist = py37,py36,py35,py27

[testenv]
passenv = CI_* COVERALLS_REPO_TOKEN
deps =
-e ../../dagster
-r ../../dagster/dev-requirements.txt
-e .
commands =
coverage erase
pytest -vv --junitxml=test_results.xml --cov=dagster_dbt --cov-append --cov-report=
coverage report --omit='.tox/*,**/test_*.py' --skip-covered
coverage html --omit='.tox/*,**/test_*.py'
coverage xml --omit='.tox/*,**/test_*.py'

0 comments on commit f545529

Please sign in to comment.
You can’t perform that action at this time.