Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snapshot config blocks (#1613) #1759

Merged
merged 2 commits into from Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 8 additions & 2 deletions core/dbt/config/project.py
Expand Up @@ -147,7 +147,8 @@ def __init__(self, project_name, version, project_root, profile_name,
source_paths, macro_paths, data_paths, test_paths,
analysis_paths, docs_paths, target_path, snapshot_paths,
clean_targets, log_path, modules_path, quoting, models,
on_run_start, on_run_end, seeds, dbt_version, packages):
on_run_start, on_run_end, seeds, snapshots, dbt_version,
packages):
self.project_name = project_name
self.version = version
self.project_root = project_root
Expand All @@ -168,6 +169,7 @@ def __init__(self, project_name, version, project_root, profile_name,
self.on_run_start = on_run_start
self.on_run_end = on_run_end
self.seeds = seeds
self.snapshots = snapshots
self.dbt_version = dbt_version
self.packages = packages

Expand All @@ -181,7 +183,7 @@ def _preprocess(project_dict):
('on-run-end',): _list_if_none_or_string,
}

for k in ('models', 'seeds'):
for k in ('models', 'seeds', 'snapshots'):
handlers[(k,)] = _dict_if_none
handlers[(k, 'vars')] = _dict_if_none
handlers[(k, 'pre-hook')] = _list_if_none_or_string
Expand Down Expand Up @@ -252,6 +254,7 @@ def from_project_config(cls, project_dict, packages_dict=None):
on_run_start = project_dict.get('on-run-start', [])
on_run_end = project_dict.get('on-run-end', [])
seeds = project_dict.get('seeds', {})
snapshots = project_dict.get('snapshots', {})
dbt_raw_version = project_dict.get('require-dbt-version', '>=0.0.0')

try:
Expand Down Expand Up @@ -285,6 +288,7 @@ def from_project_config(cls, project_dict, packages_dict=None):
on_run_start=on_run_start,
on_run_end=on_run_end,
seeds=seeds,
snapshots=snapshots,
dbt_version=dbt_version,
packages=packages
)
Expand Down Expand Up @@ -331,6 +335,7 @@ def to_project_config(self, with_packages=False):
'on-run-start': self.on_run_start,
'on-run-end': self.on_run_end,
'seeds': self.seeds,
'snapshots': self.snapshots,
'require-dbt-version': [
v.to_version_string() for v in self.dbt_version
],
Expand Down Expand Up @@ -394,6 +399,7 @@ def get_resource_config_paths(self):
return {
'models': _get_config_paths(self.models),
'seeds': _get_config_paths(self.seeds),
'snapshots': _get_config_paths(self.snapshots),
}

def get_unused_resource_config_paths(self, resource_fqns, disabled):
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/config/renderer.py
Expand Up @@ -23,7 +23,7 @@ def _is_hook_or_model_vars_path(keypath):
if first in {'on-run-start', 'on-run-end'}:
return True
# models have two things to avoid
if first in {'seeds', 'models'}:
if first in {'seeds', 'models', 'snapshots'}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably add support for pre- and post- hooks in snapshots here

# model-level hooks
if 'pre-hook' in keypath or 'post-hook' in keypath:
return True
Expand Down
6 changes: 4 additions & 2 deletions core/dbt/config/runtime.py
Expand Up @@ -20,8 +20,8 @@ def __init__(self, project_name, version, project_root, source_paths,
macro_paths, data_paths, test_paths, analysis_paths,
docs_paths, target_path, snapshot_paths, clean_targets,
log_path, modules_path, quoting, models, on_run_start,
on_run_end, seeds, dbt_version, profile_name, target_name,
config, threads, credentials, packages, args):
on_run_end, seeds, snapshots, dbt_version, profile_name,
target_name, config, threads, credentials, packages, args):
# 'vars'
self.args = args
self.cli_vars = parse_cli_vars(getattr(args, 'vars', '{}'))
Expand All @@ -48,6 +48,7 @@ def __init__(self, project_name, version, project_root, source_paths,
on_run_start=on_run_start,
on_run_end=on_run_end,
seeds=seeds,
snapshots=snapshots,
dbt_version=dbt_version,
packages=packages
)
Expand Down Expand Up @@ -97,6 +98,7 @@ def from_parts(cls, project, profile, args):
on_run_start=project.on_run_start,
on_run_end=project.on_run_end,
seeds=project.seeds,
snapshots=project.snapshots,
dbt_version=project.dbt_version,
packages=project.packages,
profile_name=profile.profile_name,
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/project.py
Expand Up @@ -156,6 +156,7 @@ class Project(HyphenatedJsonSchemaMixin, Replaceable):
require_dbt_version: Optional[Union[List[str], str]] = None
models: Dict[str, Any] = field(default_factory=dict)
seeds: Dict[str, Any] = field(default_factory=dict)
snapshots: Dict[str, Any] = field(default_factory=dict)
packages: List[PackageSpec] = field(default_factory=list)

@classmethod
Expand Down
Expand Up @@ -199,6 +199,11 @@
{% do exceptions.relation_wrong_type(target_relation, 'table') %}
{%- endif -%}


{{ run_hooks(pre_hooks, inside_transaction=False) }}

{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}

Expand Down Expand Up @@ -251,10 +256,14 @@

{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{% if staging_table is defined %}
{% do post_snapshot(staging_table) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{% endmaterialization %}
1 change: 0 additions & 1 deletion core/dbt/parser/snapshots.py
Expand Up @@ -69,7 +69,6 @@ def transform(self, node: IntermediateSnapshotNode) -> ParsedSnapshotNode:
parsed_node = ParsedSnapshotNode.from_dict(node.to_dict())
self.set_snapshot_attributes(parsed_node)
return parsed_node

except ValidationError as exc:
raise CompilationException(validator_error_message(exc), node)

Expand Down
14 changes: 11 additions & 3 deletions core/dbt/source_config.py
Expand Up @@ -17,9 +17,17 @@ class SourceConfig:
'database',
'severity',

'incremental_strategy'
'incremental_strategy',

# snapshots
'target_database',
'target_schema',
'strategy',
'updated_at',
# this is often a list, but it should replace and not append (sometimes
# it's 'all')
'check_cols',
}

ConfigKeys = AppendListFields | ExtendDictFields | ClobberFields

def __init__(self, active_project, own_project, fqn, node_type):
Expand Down Expand Up @@ -164,7 +172,7 @@ def get_project_config(self, runtime_config):
if self.node_type == NodeType.Seed:
model_configs = runtime_config.seeds
elif self.node_type == NodeType.Snapshot:
model_configs = {}
model_configs = runtime_config.snapshots
else:
model_configs = runtime_config.models

Expand Down
@@ -0,0 +1,9 @@
{% snapshot snapshot_actual %}
select * from {{target.database}}.{{schema}}.seed
{% endsnapshot %}

{# This should be exactly the same #}
{% snapshot snapshot_checkall %}
{{ config(check_cols='all') }}
select * from {{target.database}}.{{schema}}.seed
{% endsnapshot %}
@@ -0,0 +1,41 @@
{% snapshot snapshot_actual %}

{{
config(
target_database=var('target_database', database),
target_schema=var('target_schema', schema),
)
}}
select * from {{target.database}}.{{target.schema}}.seed

{% endsnapshot %}

{% snapshot snapshot_castillo %}

{{
config(
target_database=var('target_database', database),
updated_at='"1-updated_at"',
)
}}
select id,first_name,last_name,email,gender,ip_address,updated_at as "1-updated_at" from {{target.database}}.{{schema}}.seed where last_name = 'Castillo'

{% endsnapshot %}

{% snapshot snapshot_alvarez %}

{{
config(
target_database=var('target_database', database),
)
}}
select * from {{target.database}}.{{schema}}.seed where last_name = 'Alvarez'

{% endsnapshot %}


{% snapshot snapshot_kelly %}
{# This has no target_database set, which is allowed! #}
select * from {{target.database}}.{{schema}}.seed where last_name = 'Kelly'

{% endsnapshot %}
Expand Up @@ -114,7 +114,7 @@ def project_config(self):
return {
"data-paths": ['data'],
"snapshot-paths": ['test-snapshots-select',
'test-snapshots-pg'],
'test-snapshots-pg'],
}

@use_profile('postgres')
Expand Down Expand Up @@ -159,6 +159,23 @@ def test__postgres_select_snapshots(self):
self.assertTableDoesNotExist('snapshot_actual')


class TestConfiguredSnapshotFileSelects(TestSimpleSnapshotFileSelects):
@property
def project_config(self):
return {
"data-paths": ['data'],
"snapshot-paths": ['test-snapshots-select-noconfig'],
"snapshots": {
"test": {
"target_schema": self.unique_schema(),
"unique_key": "id || '-' || first_name",
'strategy': 'timestamp',
'updated_at': 'updated_at',
}
}
}


class TestSimpleSnapshotFilesBigquery(DBTIntegrationTest):
@property
def schema(self):
Expand Down Expand Up @@ -378,6 +395,23 @@ def project_config(self):
}


class TestConfiguredCheckCols(TestCheckCols):
@property
def project_config(self):
return {
"data-paths": ['data'],
"snapshot-paths": ['test-check-col-snapshots-noconfig'],
"snapshots": {
"test": {
"target_schema": self.unique_schema(),
"unique_key": "id || '-' || first_name",
"strategy": "check",
"check_cols": ["email"],
}
}
}


class TestCheckColsBigquery(TestSimpleSnapshotFilesBigquery):
def _assertTablesEqualSql(self, relation_a, relation_b, columns=None):
# When building the equality tests, only test columns that don't start
Expand Down
@@ -0,0 +1,14 @@
{% snapshot example_snapshot %}
{{
config(
target_schema=schema,
unique_key='a',
strategy='check',
check_cols='all',
post_hook='alter table {{ this }} add column new_col int')
}}
{{
config(post_hook='update {{ this }} set new_col = 1')
}}
select * from {{ ref('example_seed') }}
{% endsnapshot %}
@@ -0,0 +1,7 @@
version: 2
models:
- name: example_snapshot
columns:
- name: new_col
tests:
- not_null
6 changes: 6 additions & 0 deletions test/integration/014_hook_tests/test-snapshots/snapshots.sql
@@ -0,0 +1,6 @@
{% snapshot example_snapshot %}
{{
config(target_schema=schema, unique_key='a', strategy='check', check_cols='all')
}}
select * from {{ ref('example_seed') }}
{% endsnapshot %}
48 changes: 47 additions & 1 deletion test/integration/014_hook_tests/test_model_hooks.py
Expand Up @@ -183,6 +183,39 @@ def test_postgres_hooks_on_seeds(self):
self.assertEqual(len(res), 1, 'Expected exactly one item')


class TestPrePostModelHooksOnSnapshots(DBTIntegrationTest):
@property
def schema(self):
return "model_hooks_014"

@property
def models(self):
return "test-snapshot-models"

@property
def project_config(self):
return {
'data-paths': ['data'],
'snapshot-paths': ['test-snapshots'],
'models': {},
'snapshots': {
'post-hook': [
'alter table {{ this }} add column new_col int',
'update {{ this }} set new_col = 1'
]
}
}

@use_profile('postgres')
def test_postgres_hooks_on_snapshots(self):
res = self.run_dbt(['seed'])
self.assertEqual(len(res), 1, 'Expected exactly one item')
res = self.run_dbt(['snapshot'])
self.assertEqual(len(res), 1, 'Expected exactly one item')
res = self.run_dbt(['test'])
self.assertEqual(len(res), 1, 'Expected exactly one item')


class TestPrePostModelHooksInConfig(BaseTestPrePost):
@property
def project_config(self):
Expand Down Expand Up @@ -229,13 +262,26 @@ def test_postgres_pre_and_post_model_hooks_model_and_project(self):
self.check_hooks('start', count=2)
self.check_hooks('end', count=2)

class TestPrePostModelHooksInConfigKwargs(TestPrePostModelHooksInConfig):

class TestPrePostModelHooksInConfigKwargs(TestPrePostModelHooksInConfig):
@property
def models(self):
return "kwargs-models"


class TestPrePostSnapshotHooksInConfigKwargs(TestPrePostModelHooksOnSnapshots):
@property
def models(self):
return "test-snapshot-models"

@property
def project_config(self):
return {
'data-paths': ['data'],
'snapshot-paths': ['test-kwargs-snapshots'],
'models': {},
}


class TestDuplicateHooksInConfigs(DBTIntegrationTest):
@property
Expand Down