Skip to content

Commit

Permalink
Merge pull request #2039 from fishtown-analytics/feature/source-tags
Browse files Browse the repository at this point in the history
Tags for sources and columns (#1906, #1586)
  • Loading branch information
beckjake committed Jan 27, 2020
2 parents fdfcd4c + 8b722c7 commit 4e23e7d
Show file tree
Hide file tree
Showing 14 changed files with 343 additions and 72 deletions.
6 changes: 2 additions & 4 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class ColumnInfo(JsonSchemaMixin, Replaceable):
description: str = ''
meta: Dict[str, Any] = field(default_factory=dict)
data_type: Optional[str] = None
tags: List[str] = field(default_factory=list)


# Docrefs are not quite like regular references, as they indicate what they
Expand Down Expand Up @@ -513,6 +514,7 @@ class ParsedSourceDefinition(
columns: Dict[str, ColumnInfo] = field(default_factory=dict)
meta: Dict[str, Any] = field(default_factory=dict)
source_meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)

@property
def is_ephemeral_model(self):
Expand All @@ -530,10 +532,6 @@ def refs(self):
def sources(self):
return []

@property
def tags(self):
return []

@property
def has_freshness(self):
return bool(self.freshness) and self.loaded_at_field is not None
Expand Down
9 changes: 8 additions & 1 deletion core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,14 @@ def __post_init__(self):
self.tests = []


@dataclass
class UnparsedColumn(NamedTested):
tags: List[str] = field(default_factory=list)


@dataclass
class ColumnDescription(JsonSchemaMixin, Replaceable):
columns: List[NamedTested] = field(default_factory=list)
columns: List[UnparsedColumn] = field(default_factory=list)


@dataclass
Expand Down Expand Up @@ -206,6 +211,7 @@ class UnparsedSourceTableDefinition(ColumnDescription, NodeDescription):
external: Optional[ExternalTable] = field(
default_factory=ExternalTable
)
tags: List[str] = field(default_factory=list)

def __post_init__(self):
NodeDescription.__post_init__(self)
Expand All @@ -225,6 +231,7 @@ class UnparsedSourceDefinition(JsonSchemaMixin, Replaceable):
)
loaded_at_field: Optional[str] = None
tables: List[UnparsedSourceTableDefinition] = field(default_factory=list)
tags: List[str] = field(default_factory=list)

@property
def yaml_key(self) -> 'str':
Expand Down
5 changes: 4 additions & 1 deletion core/dbt/graph/selector.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from itertools import chain

import networkx as nx # type: ignore

Expand Down Expand Up @@ -180,7 +181,9 @@ class TagSelector(ManifestSelector):

def search(self, included_nodes, selector):
""" yields nodes from graph that have the specified tag """
for node, real_node in self.parsed_nodes(included_nodes):
search = chain(self.parsed_nodes(included_nodes),
self.source_nodes(included_nodes))
for node, real_node in search:
if selector in real_node.tags:
yield node

Expand Down
36 changes: 28 additions & 8 deletions core/dbt/parser/schema_test_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dbt.clients.jinja import get_rendered
from dbt.contracts.graph.unparsed import (
UnparsedNodeUpdate, UnparsedSourceDefinition,
UnparsedSourceTableDefinition, NamedTested
UnparsedSourceTableDefinition, UnparsedColumn
)
from dbt.exceptions import raise_compiler_error
from dbt.parser.search import FileBlock
Expand Down Expand Up @@ -79,7 +79,7 @@ def name(self) -> str:
return '{0.name}_{1.name}'.format(self.source, self.table)

@property
def columns(self) -> List[NamedTested]:
def columns(self) -> List[UnparsedColumn]:
if self.table.columns is None:
return []
else:
Expand Down Expand Up @@ -136,17 +136,23 @@ def from_yaml_block(
class SchemaTestBlock(TargetBlock):
test: Dict[str, Any]
column_name: Optional[str]
tags: List[str]

@classmethod
def from_target_block(
cls, src: TargetBlock, test: Dict[str, Any], column_name: Optional[str]
cls,
src: TargetBlock,
test: Dict[str, Any],
column_name: Optional[str],
tags: List[str],
) -> 'SchemaTestBlock':
return cls(
file=src.file,
data=src.data,
target=src.target,
test=test,
column_name=column_name
column_name=column_name,
tags=tags,
)


Expand All @@ -156,16 +162,14 @@ class TestBuilder(Generic[Target]):
Test names have the following pattern:
- the test name itself may be namespaced (package.test)
- or it may not be namespaced (test)
- the test may have arguments embedded in the name (, severity=WARN)
- or it may not have arguments.
"""
TEST_NAME_PATTERN = re.compile(
r'((?P<test_namespace>([a-zA-Z_][0-9a-zA-Z_]*))\.)?'
r'(?P<test_name>([a-zA-Z_][0-9a-zA-Z_]*))'
)
# map magic keys to default values
MODIFIER_ARGS = {'severity': 'ERROR'}
MODIFIER_ARGS = {'severity': 'ERROR', 'tags': []}

def __init__(
self,
Expand Down Expand Up @@ -243,6 +247,22 @@ def extract_test_args(test, name=None) -> Tuple[str, Dict[str, Any]]:
def severity(self) -> str:
return self.modifiers.get('severity', 'ERROR').upper()

def tags(self) -> List[str]:
tags = self.modifiers.get('tags', [])
if isinstance(tags, str):
tags = [tags]
if not isinstance(tags, list):
raise_compiler_error(
f'got {tags} ({type(tags)}) for tags, expected a list of '
f'strings'
)
for tag in tags:
if not isinstance(tag, str):
raise_compiler_error(
f'got {tag} ({type(tag)}) for tag, expected a str'
)
return tags[:]

def test_kwargs_str(self) -> str:
# sort the dict so the keys are rendered deterministically (for tests)
return ', '.join((
Expand Down Expand Up @@ -286,7 +306,7 @@ def build_raw_sql(self) -> str:
model=self.build_model_str(),
macro=self.macro_name(),
kwargs=self.test_kwargs_str(),
severity=self.severity()
severity=self.severity(),
)

def build_model_str(self):
Expand Down
46 changes: 34 additions & 12 deletions core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import os

from abc import abstractmethod
Expand All @@ -19,7 +20,7 @@
ParsedTestNode,
)
from dbt.contracts.graph.unparsed import (
UnparsedSourceDefinition, UnparsedNodeUpdate, NamedTested,
UnparsedSourceDefinition, UnparsedNodeUpdate, UnparsedColumn,
UnparsedSourceTableDefinition, FreshnessThreshold
)
from dbt.context.parser import docs
Expand Down Expand Up @@ -68,11 +69,14 @@ def __init__(self):
self.column_info: Dict[str, ColumnInfo] = {}
self.docrefs: List[Docref] = []

def add(self, column_name, description, data_type, meta):
self.column_info[column_name] = ColumnInfo(name=column_name,
description=description,
data_type=data_type,
meta=meta)
def add(self, column: UnparsedColumn, description, data_type, meta):
self.column_info[column.name] = ColumnInfo(
name=column.name,
description=description,
data_type=data_type,
meta=meta,
tags=column.tags,
)


def collect_docrefs(
Expand Down Expand Up @@ -160,21 +164,21 @@ def _yaml_from_file(
return None

def parse_column(
self, block: TargetBlock, column: NamedTested, refs: ParserRef
self, block: TargetBlock, column: UnparsedColumn, refs: ParserRef
) -> None:
column_name = column.name
description = column.description
data_type = column.data_type
meta = column.meta
collect_docrefs(block.target, refs, column_name, description)

refs.add(column_name, description, data_type, meta)
refs.add(column, description, data_type, meta)

if not column.tests:
return

for test in column.tests:
self.parse_test(block, test, column_name)
self.parse_test(block, test, column)

def parse_node(self, block: SchemaTestBlock) -> ParsedTestNode:
"""In schema parsing, we rewrite most of the part of parse_node that
Expand Down Expand Up @@ -209,11 +213,17 @@ def parse_node(self, block: SchemaTestBlock) -> ParsedTestNode:
'kwargs': builder.args,
}

# copy - we don't want to mutate the tags!
tags = block.tags[:]
tags.extend(builder.tags())
if 'schema' not in tags:
tags.append('schema')

node = self._create_parsetime_node(
block=block,
path=compiled_path,
config=config,
tags=['schema'],
tags=tags,
name=builder.fqn_name,
raw_sql=builder.build_raw_sql(),
column_name=block.column_name,
Expand All @@ -227,16 +237,24 @@ def parse_test(
self,
target_block: TargetBlock,
test: TestDef,
column_name: Optional[str]
column: Optional[UnparsedColumn],
) -> None:

if isinstance(test, str):
test = {test: {}}

if column is None:
column_name: Optional[str] = None
column_tags: List[str] = []
else:
column_name = column.name
column_tags = column.tags

block = SchemaTestBlock.from_target_block(
src=target_block,
test=test,
column_name=column_name
column_name=column_name,
tags=column_tags,
)
try:
self.parse_node(block)
Expand Down Expand Up @@ -395,6 +413,9 @@ def parse_with_refs(
path = block.path.original_file_path
source_meta = source.meta or {}

# make sure we don't do duplicate tags from source + table
tags = sorted(set(itertools.chain(source.tags, table.tags)))

result = ParsedSourceDefinition(
package_name=self.project.project_name,
database=(source.database or self.default_database),
Expand All @@ -419,6 +440,7 @@ def parse_with_refs(
quoting=quoting,
resource_type=NodeType.Source,
fqn=[self.project.project_name, source.name, table.name],
tags=tags,
)
self.results.add_source(self.yaml.file, result)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ models:
tests:
- not_null
- unique
tags:
- table_id
- name: first_name
description: "The user's first name"
tests:
- not_null
tags:
- table_first_name
- name: ip_address
description: "The user's IP address"
tests:
Expand All @@ -28,13 +32,21 @@ models:
- name: favorite_color
description: "The user's favorite color"
tests:
- accepted_values: { values: ['blue', 'green'], quote: true }
- accepted_values: {
values: ['blue', 'green'],
quote: true,
tags: table_copy_favorite_color # tags can be a single string
}
tags:
- table_favorite_color
- name: fav_number
description: "The user's favorite number"
tests:
- accepted_values:
values: [3.14159265]
quote: false
tags: # tags can be a list of strings
- favorite_number_is_pi


- name: table_summary
Expand All @@ -47,6 +59,8 @@ models:
- unique
- accepted_values: { values: ['blue', 'green'] }
- relationships: { field: favorite_color, to: ref('table_copy') }
tags:
- table_favorite_color
- name: count
description: "The number of responses for this favorite color"
tests:
Expand All @@ -61,10 +75,14 @@ models:
tests:
- not_null
- unique
tags:
- xfail
- name: favorite_color
description: "The user's favorite color"
tests:
- accepted_values: { values: ['blue', 'green'] }
tags:
- xfail

# all of these constraints will fail
- name: table_failure_summary
Expand All @@ -75,6 +93,8 @@ models:
tests:
- accepted_values: { values: ['red'] }
- relationships: { field: favorite_color, to: ref('table_copy') }
tags:
- xfail

# this table is disabled so these tests should be ignored
- name: table_disabled
Expand All @@ -94,3 +114,5 @@ models:
description: "The user ID"
tests:
- relationships: { field: id, to: ref('table_failure_copy') }
tags:
- xfail
Loading

0 comments on commit 4e23e7d

Please sign in to comment.