Skip to content

Commit

Permalink
Merge pull request #167 from analyst-collective/release/0.5.0
Browse files Browse the repository at this point in the history
Release/0.5.0
  • Loading branch information
drewbanin committed Sep 28, 2016
2 parents 5a6d107 + 815dd92 commit 0096774
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 55 deletions.
4 changes: 3 additions & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
[bumpversion]
current_version = 0.4.9
current_version = 0.5.0
commit = True
tag = True

[bumpversion:file:setup.py]

[bumpversion:file:dbt/version.py]

52 changes: 39 additions & 13 deletions dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections import defaultdict
import dbt.project
from dbt.source import Source
from dbt.utils import find_model_by_fqn, find_model_by_name, dependency_projects, split_path, This
from dbt.utils import find_model_by_fqn, find_model_by_name, dependency_projects, split_path, This, Var, compiler_error
from dbt.linker import Linker
import time
import sqlparse
Expand Down Expand Up @@ -82,6 +82,15 @@ def do_config(*args, **kwargs):
return ""
return do_config

def model_can_reference(self, src_model, other_model):
"""returns True if the src_model can reference the other_model. Models can access
other models in their package and dependency models, but a dependency model cannot
access models "up" the dependency chain"""

# hack for now b/c we don't support recursive dependencies
return other_model.own_project['name'] == src_model.own_project['name'] \
or src_model.own_project['name'] == src_model.project['name']

def __ref(self, linker, ctx, model, all_models):
schema = ctx['env']['schema']

Expand All @@ -96,15 +105,24 @@ def do_ref(*args):
other_model_package, other_model_name = args
other_model_name = self.create_template.model_name(other_model_name)
other_model = find_model_by_name(all_models, other_model_name, package_namespace=other_model_package)
else:
compiler_error(model, "ref() takes at most two arguments ({} given)".format(len(args)))

other_model_fqn = tuple(other_model.fqn[:-1] + [other_model_name])
src_fqn = ".".join(source_model)
ref_fqn = ".".join(other_model_fqn)

#if not self.model_can_reference(model, other_model):
# compiler_error(model, "Model '{}' exists but cannot be referenced from dependency model '{}'".format(ref_fqn, src_fqn))

if not other_model.is_enabled:
src_fqn = ".".join(source_model)
ref_fqn = ".".join(other_model_fqn)
raise RuntimeError("Model '{}' depends on model '{}' which is disabled in the project config".format(src_fqn, ref_fqn))

# this creates a trivial cycle -- should this be a compiler error?
if source_model != other_model_fqn:
# we can still interpolate the name w/o making a self-cycle
if source_model == other_model_fqn:
pass
else:
linker.dependency(source_model, other_model_fqn)

if other_model.is_ephemeral:
Expand All @@ -117,7 +135,9 @@ def wrapped_do_ref(*args):
try:
return do_ref(*args)
except RuntimeError as e:
print("Compiler error in {}".format(model.filepath))
root = os.path.relpath(model.root_dir, model.project['project-root'])
filepath = os.path.join(root, model.rel_filepath)
print("Compiler error in {}".format(filepath))
print("Enabled models:")
for m in all_models:
print(" - {}".format(".".join(m.fqn)))
Expand All @@ -131,16 +151,22 @@ def get_context(self, linker, model, models):
context['config'] = self.__model_config(model, linker)
context['this'] = This(context['env']['schema'], model.immediate_name, model.name)
context['compiled_at'] = time.strftime('%Y-%m-%d %H:%M:%S')
context['var'] = Var(model, context=context)
return context

def compile_model(self, linker, model, models):
jinja = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath=model.root_dir))
try:
jinja = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath=model.root_dir))

# this is a dumb jinja2 bug -- on windows, forward slashes are EXPECTED
posix_filepath = '/'.join(split_path(model.rel_filepath))
template = jinja.get_template(posix_filepath)
context = self.get_context(linker, model, models)

rendered = template.render(context)
except jinja2.exceptions.TemplateSyntaxError as e:
compiler_error(model, str(e))

# this is a dumb jinja2 bug -- on windows, forward slashes are EXPECTED
posix_filepath = '/'.join(split_path(model.rel_filepath))
template = jinja.get_template(posix_filepath)
context = self.get_context(linker, model, models)
rendered = template.render(context)
return rendered

def write_graph_file(self, linker):
Expand Down Expand Up @@ -175,10 +201,10 @@ def combine_query_with_ctes(self, model, query, ctes, compiled_models):
# these newlines are important -- comments could otherwise interfere w/ query
cte_stmts = [" {} as (\n{}\n)".format(name, contents) for (name, contents) in cte_mapping]

cte_text = ", ".join(cte_stmts)
cte_text = sqlparse.sql.Token(sqlparse.tokens.Keyword, ", ".join(cte_stmts))
parsed.insert_after(with_stmt, cte_text)

return sqlparse.format(str(parsed), keyword_case='lower', reindent=True)
return str(parsed)

def __recursive_add_ctes(self, linker, model):
if model not in linker.cte_map:
Expand Down
2 changes: 1 addition & 1 deletion dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,4 @@ def handle(args):
dbt.tracking.track_invocation_end(project=proj, args=parsed, result_type="ok", result=None)
except Exception as e:
dbt.tracking.track_invocation_end(project=proj, args=parsed, result_type="error", result=str(e))
raise e
raise
37 changes: 28 additions & 9 deletions dbt/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from dbt.utils import split_path
import dbt.schema_tester
import dbt.project
from dbt.utils import This, deep_merge
from dbt.utils import This, deep_merge, DBTConfigKeys, compiler_error

class SourceConfig(object):
Materializations = ['view', 'table', 'incremental', 'ephemeral']
ConfigKeys = ['enabled', 'materialized', 'dist', 'sort', 'sql_where', 'unique_key', 'sort_type', 'pre-hook', 'post-hook']
ConfigKeys = DBTConfigKeys

def __init__(self, active_project, own_project, fqn):
self.active_project = active_project
Expand All @@ -23,7 +23,7 @@ def __init__(self, active_project, own_project, fqn):
def _merge(self, *configs):
merged_config = {}
for config in configs:
intermediary_merged = deep_merge(merged_config, config)
intermediary_merged = deep_merge(merged_config.copy(), config.copy())
merged_config.update(intermediary_merged)
return merged_config

Expand Down Expand Up @@ -82,11 +82,20 @@ def __get_hooks(self, relevant_configs, key):

def get_project_config(self, project):
# most configs are overwritten by a more specific config, but pre/post hooks are appended!
hook_fields = ['pre-hook', 'post-hook']
config = {k:[] for k in hook_fields}
append_list_fields = ['pre-hook', 'post-hook']
extend_dict_fields = ['vars']

config = {}
for k in append_list_fields:
config[k] = []
for k in extend_dict_fields:
config[k] = {}

model_configs = project['models']

if model_configs is None:
return config

fqn = self.fqn[:]
for level in fqn:
level_config = model_configs.get(level, None)
Expand All @@ -95,11 +104,15 @@ def get_project_config(self, project):

relevant_configs = {key: level_config[key] for key in level_config if key in self.ConfigKeys}

for key in hook_fields:
for key in append_list_fields:
new_hooks = self.__get_hooks(relevant_configs, key)
config[key].extend([h for h in new_hooks if h not in config[key]])

clobber_configs = {k:v for (k,v) in relevant_configs.items() if k not in hook_fields}
for key in extend_dict_fields:
dict_val = relevant_configs.get(key, {})
config[key].update(dict_val)

clobber_configs = {k:v for (k,v) in relevant_configs.items() if k not in append_list_fields and k not in extend_dict_fields}
config.update(clobber_configs)
model_configs = model_configs[level]

Expand Down Expand Up @@ -207,6 +220,9 @@ def rename_query(self, schema):

return 'alter table "{schema}"."{tmp_name}" rename to "{final_name}"'.format(**opts)

@property
def nice_name(self):
return "{}.{}".format(self.fqn[0], self.fqn[-1])

class Model(DBTSource):
dbt_run_type = 'run'
Expand Down Expand Up @@ -271,8 +287,11 @@ def build_path(self):
return os.path.join(*path_parts)

def compile_string(self, ctx, string):
env = jinja2.Environment()
return env.from_string(string).render(ctx)
try:
env = jinja2.Environment()
return env.from_string(string).render(ctx)
except jinja2.exceptions.TemplateSyntaxError as e:
compiler_error(self, str(e))

def get_hooks(self, ctx, hook_key):
hooks = self.config.get(hook_key, [])
Expand Down
52 changes: 46 additions & 6 deletions dbt/task/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@ def __init__(self, args, project):
self.args = args
self.project = project

def __pull_repo(self, repo):
def __checkout_branch(self, branch, full_path):
print(" checking out branch {}".format(branch))
proc = subprocess.Popen(
['git', 'checkout', branch],
cwd=full_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = proc.communicate()

def __pull_repo(self, repo, branch=None):
proc = subprocess.Popen(
['git', 'clone', repo],
cwd=self.project['modules-path'],
Expand All @@ -37,34 +46,65 @@ def __pull_repo(self, repo):
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = proc.communicate()
remote_branch = 'origin/master' if branch is None else 'origin/{}'.format(branch)
proc = subprocess.Popen(
['git', 'reset', '--hard', 'origin/master'],
['git', 'reset', '--hard', remote_branch],
cwd=full_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = proc.communicate()
if branch is not None:
self.__checkout_branch(branch, full_path)
else:
matches = re.match("Cloning into '(.+)'", err.decode('utf-8'))
folder = matches.group(1)
full_path = os.path.join(self.project['modules-path'], folder)
print("pulled new dependency {}".format(folder))
if branch is not None:
self.__checkout_branch(branch, full_path)

return folder

def __pull_deps_recursive(self, repos, processed_repos = set()):
for repo in repos:
def __split_at_branch(self, repo_spec):
parts = repo_spec.split("@")
error = RuntimeError("Invalid dep specified: '{}' -- not a repo we can clone".format(repo_spec))

repo = None
if repo_spec.startswith("git@"):
if len(parts) == 1:
raise error
if len(parts) == 2:
repo, branch = repo_spec, None
elif len(parts) == 3:
repo, branch = "@".join(parts[:2]), parts[2]
else:
if len(parts) == 1:
repo, branch = parts[0], None
elif len(parts) == 2:
repo, branch = parts

if repo is None:
raise error

return repo, branch

def __pull_deps_recursive(self, repos, processed_repos = set(), i=0):
for repo_string in repos:
repo, branch = self.__split_at_branch(repo_string)
repo_folder = folder_from_git_remote(repo)

try:
if repo_folder in processed_repos:
print("skipping already processed dependency {}".format(repo_folder))
else:
dep_folder = self.__pull_repo(repo)
dep_folder = self.__pull_repo(repo, branch)
dep_project = project.read_project(
os.path.join(self.project['modules-path'],
dep_folder,
'dbt_project.yml')
)
processed_repos.add(dep_folder)
self.__pull_deps_recursive(dep_project['repositories'], processed_repos)
self.__pull_deps_recursive(dep_project['repositories'], processed_repos, i+1)
except IOError as e:
if e.errno == errno.ENOENT:
print("'{}' is not a valid dbt project - dbt_project.yml not found".format(repo))
Expand Down
40 changes: 21 additions & 19 deletions dbt/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,32 @@ class BaseCreateTemplate(object):
# but you cannot explicitly set them in the CREATE TABLE ... LIKE statement.
# via http://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html
incremental_template = """
create temporary table "{identifier}__dbt_incremental_tmp" {dist_qualifier} {sort_qualifier} as (
create temporary table "{identifier}__dbt_incremental_empty_tmp" {dist_qualifier} {sort_qualifier} as (
select * from (
{query}
) as tmp limit 0
);
create table if not exists "{schema}"."{identifier}" (like "{identifier}__dbt_incremental_tmp");
create table if not exists "{schema}"."{identifier}" (like "{identifier}__dbt_incremental_empty_tmp");
{incremental_delete_statement}
insert into "{schema}"."{identifier}" (
create temporary table "{identifier}__dbt_incremental_tmp" as (
with dbt_incr_sbq as (
{query}
)
select * from dbt_incr_sbq
where ({sql_where}) or ({sql_where}) is null
);
{incremental_delete_statement}
insert into "{schema}"."{identifier}" (
select * from "{identifier}__dbt_incremental_tmp"
);
"""

incremental_delete_template = """
delete from "{schema}"."{identifier}" where ({unique_key}) in (
with dbt_delete_sbq as (
{query}
)
select ({unique_key}) from dbt_delete_sbq
where ({sql_where}) or ({sql_where}) is null
select ({unique_key}) from "{identifier}__dbt_incremental_tmp"
);
"""

Expand Down Expand Up @@ -108,33 +108,35 @@ class DryCreateTemplate(object):


incremental_template = """
create temporary table "{identifier}__dbt_incremental_tmp" {dist_qualifier} {sort_qualifier} as (
create temporary table "{identifier}__dbt_incremental_empty_tmp" {dist_qualifier} {sort_qualifier} as (
select * from (
{query}
) as tmp limit 0
);
create table if not exists "{schema}"."{identifier}" (like "{identifier}__dbt_incremental_tmp");
{incremental_delete_statement}
create table if not exists "{schema}"."{identifier}" (like "{identifier}__dbt_incremental_empty_tmp");
insert into "{schema}"."{identifier}" (
create temporary table "{identifier}__dbt_incremental_tmp" as (
with dbt_incr_sbq as (
{query}
)
select * from dbt_incr_sbq
where ({sql_where}) or ({sql_where}) is null
limit 0
);
{incremental_delete_statement}
insert into "{schema}"."{identifier}" (
select * from "{identifier}__dbt_incremental_tmp"
);
"""

incremental_delete_template = """
delete from "{schema}"."{identifier}" where ({unique_key}) in (
with dbt_delete_sbq as (
{query}
)
select ({unique_key}) from dbt_delete_sbq
where ({sql_where}) or ({sql_where}) is null
select ({unique_key}) from "{identifier}__dbt_incremental_tmp"
);
"""

Expand Down
2 changes: 1 addition & 1 deletion dbt/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def track_invocation_start(project=None, args=None):
def track_model_run(options):
context = [SelfDescribingJson(RUN_MODEL_SPEC, options)]
model_id = options['model_id']
track(category="dbt", action='run_model', label=model_id, context=context)
track(category="dbt", action='run_model', label=invocation_id, context=context)

def track_invocation_end(project=None, args=None, result_type=None, result=None):
invocation_context = get_invocation_end_context(invocation_id, user, project, args, result_type, result)
Expand Down
Loading

0 comments on commit 0096774

Please sign in to comment.