diff --git a/core/dbt/events/format.py b/core/dbt/events/format.py new file mode 100644 index 00000000000..5f0a61509a5 --- /dev/null +++ b/core/dbt/events/format.py @@ -0,0 +1,35 @@ +import dbt.logger as logger # type: ignore # TODO eventually remove dependency on this logger +from dbt import ui +from typing import Optional + + +def format_fancy_output_line( + msg: str, status: str, index: Optional[int], + total: Optional[int], execution_time: Optional[float] = None, + truncate: bool = False +) -> str: + if index is None or total is None: + progress = '' + else: + progress = '{} of {} '.format(index, total) + # TODO: remove this formatting once we rip out all the old logger + prefix = "{timestamp} | {progress}{message}".format( + timestamp=logger.get_timestamp(), + progress=progress, + message=msg) + + truncate_width = ui.printer_width() - 3 + justified = prefix.ljust(ui.printer_width(), ".") + if truncate and len(justified) > truncate_width: + justified = justified[:truncate_width] + '...' + + if execution_time is None: + status_time = "" + else: + status_time = " in {execution_time:0.2f}s".format( + execution_time=execution_time) + + output = "{justified} [{status}{status_time}]".format( + justified=justified, status=status, status_time=status_time) + + return output diff --git a/core/dbt/events/types.py b/core/dbt/events/types.py index c06fea22c7e..813c8b65605 100644 --- a/core/dbt/events/types.py +++ b/core/dbt/events/types.py @@ -2,6 +2,9 @@ from dataclasses import dataclass from typing import Any, List, Optional, Dict from dbt import ui +from dbt import utils +from dbt.node_types import NodeType +from dbt.events.format import format_fancy_output_line # types to represent log levels @@ -994,6 +997,618 @@ def cli_msg(self) -> str: return "Done." +@dataclass +class ServingDocsPort(InfoLevel, CliEventABC): + address: str + port: int + + def cli_msg(self) -> str: + return f"Serving docs at {self.address}:{self.port}" + + +@dataclass +class ServingDocsAccessInfo(InfoLevel, CliEventABC): + port: str + + def cli_msg(self) -> str: + return f"To access from your browser, navigate to: http://localhost:{self.port}" + + +class ServingDocsExitInfo(InfoLevel, CliEventABC): + def cli_msg(self) -> str: + return "Press Ctrl+C to exit.\n\n" + + +@dataclass +class SeedHeader(InfoLevel, CliEventABC): + header: str + + def cli_msg(self) -> str: + return self.header + + +@dataclass +class SeedHeaderSeperator(InfoLevel, CliEventABC): + len_header: int + + def cli_msg(self) -> str: + return "-" * self.len_header + + +@dataclass +class RunResultWarning(WarnLevel, CliEventABC): + resource_type: str + node_name: str + path: str + + def cli_msg(self) -> str: + info = 'Warning' + return ui.yellow(f"{info} in {self.resource_type} {self.node_name} ({self.path})") + + +@dataclass +class RunResultFailure(ErrorLevel, CliEventABC): + resource_type: str + node_name: str + path: str + + def cli_msg(self) -> str: + info = 'Failure' + return ui.red(f"{info} in {self.resource_type} {self.node_name} ({self.path})") + + +@dataclass +class StatsLine(InfoLevel, CliEventABC): + stats: Dict + + def cli_msg(self) -> str: + stats_line = ("\nDone. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} TOTAL={total}") + return stats_line.format(**self.stats) + + +@dataclass +class RunResultError(ErrorLevel, CliEventABC): + msg: str + + def cli_msg(self) -> str: + return f" {self.msg}" + + +@dataclass +class RunResultErrorNoMessage(ErrorLevel, CliEventABC): + status: str + + def cli_msg(self) -> str: + return f" Status: {self.status}" + + +@dataclass +class SQLCompiledPath(InfoLevel, CliEventABC): + path: str + + def cli_msg(self) -> str: + return f" compiled SQL at {self.path}" + + +@dataclass +class CheckNodeTestFailure(InfoLevel, CliEventABC): + relation_name: str + + def cli_msg(self) -> str: + msg = f"select * from {self.relation_name}" + border = '-' * len(msg) + return f" See test failures:\n {border}\n {msg}\n {border}" + + +@dataclass +class FirstRunResultError(ErrorLevel, CliEventABC): + msg: str + + def cli_msg(self) -> str: + return ui.yellow(self.msg) + + +@dataclass +class AfterFirstRunResultError(ErrorLevel, CliEventABC): + msg: str + + def cli_msg(self) -> str: + return self.msg + + +@dataclass +class EndOfRunSummary(InfoLevel, CliEventABC): + num_errors: int + num_warnings: int + keyboard_interrupt: bool = False + + def cli_msg(self) -> str: + error_plural = utils.pluralize(self.num_errors, 'error') + warn_plural = utils.pluralize(self.num_warnings, 'warning') + if self.keyboard_interrupt: + message = ui.yellow('Exited because of keyboard interrupt.') + elif self.num_errors > 0: + message = ui.red("Completed with {} and {}:".format( + error_plural, warn_plural)) + elif self.num_warnings > 0: + message = ui.yellow('Completed with {}:'.format(warn_plural)) + else: + message = ui.green('Completed successfully') + return message + + +@dataclass +class PrintStartLine(InfoLevel, CliEventABC): + description: str + index: int + total: int + + def cli_msg(self) -> str: + msg = "START {self.description}" + return format_fancy_output_line(msg=msg, status='RUN', index=self.index, total=self.total) + + +@dataclass +class PrintHookStartLine(InfoLevel, CliEventABC): + statement: str + index: int + total: int + truncate: bool + + def cli_msg(self) -> str: + msg = "START hook: {self.statement}" + return format_fancy_output_line(msg=msg, + status='RUN', + index=self.index, + total=self.total, + truncate=self.truncate) + + +@dataclass +class PrintHookEndLine(InfoLevel, CliEventABC): + statement: str + status: str + index: int + total: int + execution_time: int + truncate: bool + + def cli_msg(self) -> str: + msg = 'OK hook: {}'.format(self.statement) + return format_fancy_output_line(msg=msg, + status=ui.green(self.status), + index=self.index, + total=self.total, + execution_time=self.execution_time, + truncate=self.truncate) + + +@dataclass +class SkippingDetails(InfoLevel, CliEventABC): + resource_type: str + schema: str + node_name: str + index: int + total: int + + def cli_msg(self) -> str: + if self.resource_type in NodeType.refable(): + msg = f'SKIP relation {self.schema}.{self.node_name}' + else: + msg = f'SKIP {self.resource_type} {self.node_name}' + return format_fancy_output_line(msg=msg, + status=ui.yellow('SKIP'), + index=self.index, + total=self.total) + + +@dataclass +class PrintErrorTestResult(ErrorLevel, CliEventABC): + name: str + index: int + num_models: int + execution_time: int + + def cli_msg(self) -> str: + info = "ERROR" + msg = f"{info} {self.name}" + return format_fancy_output_line(msg=msg, + status=ui.red(info), + index=self.index, + total=self.num_models, + execution_time=self.execution_time) + + +@dataclass +class PrintPassTestResult(InfoLevel, CliEventABC): + name: str + index: int + num_models: int + execution_time: int + + def cli_msg(self) -> str: + info = "PASS" + msg = f"{info} {self.name}" + return format_fancy_output_line(msg=msg, + status=ui.green(info), + index=self.index, + total=self.num_models, + execution_time=self.execution_time) + + +@dataclass +class PrintWarnTestResult(WarnLevel, CliEventABC): + name: str + index: int + num_models: int + execution_time: int + failures: List[str] + + def cli_msg(self) -> str: + info = f'WARN {self.failures}' + msg = f"{info} {self.name}" + return format_fancy_output_line(msg=msg, + status=ui.yellow(info), + index=self.index, + total=self.num_models, + execution_time=self.execution_time) + + +@dataclass +class PrintFailureTestResult(ErrorLevel, CliEventABC): + name: str + index: int + num_models: int + execution_time: int + failures: List[str] + + def cli_msg(self) -> str: + info = f'FAIL {self.failures}' + msg = f"{info} {self.name}" + return format_fancy_output_line(msg=msg, + status=ui.red(info), + index=self.index, + total=self.num_models, + execution_time=self.execution_time) + + +@dataclass +class PrintSkipBecauseError(ErrorLevel, CliEventABC): + schema: str + relation: str + index: int + total: int + + def cli_msg(self) -> str: + msg = f'SKIP relation {self.schema}.{self.relation} due to ephemeral model error' + return format_fancy_output_line(msg=msg, + status=ui.red('ERROR SKIP'), + index=self.index, + total=self.total) + + +@dataclass +class PrintModelErrorResultLine(ErrorLevel, CliEventABC): + description: str + status: str + index: int + total: int + execution_time: int + + def cli_msg(self) -> str: + info = "ERROR creating" + msg = f"{info} {self.description}" + return format_fancy_output_line(msg=msg, + status=ui.red(self.status.upper()), + index=self.index, + total=self.total, + execution_time=self.execution_time) + + +@dataclass +class PrintModelResultLine(InfoLevel, CliEventABC): + description: str + status: str + index: int + total: int + execution_time: int + + def cli_msg(self) -> str: + info = "OK created" + msg = f"{info} {self.description}" + return format_fancy_output_line(msg=msg, + status=ui.green(self.status), + index=self.index, + total=self.total, + execution_time=self.execution_time) + + +@dataclass +class PrintSnapshotErrorResultLine(ErrorLevel, CliEventABC): + status: str + description: str + cfg: Dict + index: int + total: int + execution_time: int + + def cli_msg(self) -> str: + info = 'ERROR snapshotting' + msg = "{info} {description}".format(info=info, description=self.description, **self.cfg) + return format_fancy_output_line(msg=msg, + status=ui.red(self.status.upper()), + index=self.index, + total=self.total, + execution_time=self.execution_time) + + +@dataclass +class PrintSnapshotResultLine(InfoLevel, CliEventABC): + status: str + description: str + cfg: Dict + index: int + total: int + execution_time: int + + def cli_msg(self) -> str: + info = 'OK snapshotted' + msg = "{info} {description}".format(info=info, description=self.description, **self.cfg) + return format_fancy_output_line(msg=msg, + status=ui.green(self.status), + index=self.index, + total=self.total, + execution_time=self.execution_time) + + +@dataclass +class PrintSeedErrorResultLine(ErrorLevel, CliEventABC): + status: str + index: int + total: int + execution_time: int + schema: str + relation: str + + def cli_msg(self) -> str: + info = 'ERROR loading' + msg = f"{info} seed file {self.schema}.{self.relation}" + return format_fancy_output_line(msg=msg, + status=ui.red(self.status.upper()), + index=self.index, + total=self.total, + execution_time=self.execution_time) + + +@dataclass +class PrintSeedResultLine(InfoLevel, CliEventABC): + status: str + index: int + total: int + execution_time: int + schema: str + relation: str + + def cli_msg(self) -> str: + info = 'OK loaded' + msg = f"{info} seed file {self.schema}.{self.relation}" + return format_fancy_output_line(msg=msg, + status=ui.green(self.status), + index=self.index, + total=self.total, + execution_time=self.execution_time) + + +@dataclass +class PrintHookEndErrorLine(ErrorLevel, CliEventABC): + source_name: str + table_name: str + index: int + total: int + execution_time: int + + def cli_msg(self) -> str: + info = 'ERROR' + msg = f"{info} freshness of {self.source_name}.{self.table_name}" + return format_fancy_output_line(msg=msg, + status=ui.red(info), + index=self.index, + total=self.total, + execution_time=self.execution_time) + + +@dataclass +class PrintHookEndErrorStaleLine(ErrorLevel, CliEventABC): + source_name: str + table_name: str + index: int + total: int + execution_time: int + + def cli_msg(self) -> str: + info = 'ERROR STALE' + msg = f"{info} freshness of {self.source_name}.{self.table_name}" + return format_fancy_output_line(msg=msg, + status=ui.red(info), + index=self.index, + total=self.total, + execution_time=self.execution_time) + + +@dataclass +class PrintHookEndWarnLine(WarnLevel, CliEventABC): + source_name: str + table_name: str + index: int + total: int + execution_time: int + + def cli_msg(self) -> str: + info = 'WARN' + msg = f"{info} freshness of {self.source_name}.{self.table_name}" + return format_fancy_output_line(msg=msg, + status=ui.yellow(info), + index=self.index, + total=self.total, + execution_time=self.execution_time) + + +@dataclass +class PrintHookEndPassLine(InfoLevel, CliEventABC): + source_name: str + table_name: str + index: int + total: int + execution_time: int + + def cli_msg(self) -> str: + info = 'PASS' + msg = f"{info} freshness of {self.source_name}.{self.table_name}" + return format_fancy_output_line(msg=msg, + status=ui.green(info), + index=self.index, + total=self.total, + execution_time=self.execution_time) + + +@dataclass +class PrintCancelLine(ErrorLevel, CliEventABC): + conn_name: str + + def cli_msg(self) -> str: + msg = 'CANCEL query {}'.format(self.conn_name) + return format_fancy_output_line(msg=msg, + status=ui.red('CANCEL'), + index=None, + total=None) + + +@dataclass +class DefaultSelector(InfoLevel, CliEventABC): + name: str + + def cli_msg(self) -> str: + return f"Using default selector {self.name}" + + +@dataclass +class NodeStart(DebugLevel, CliEventABC): + unique_id: str + + def cli_msg(self) -> str: + return f"Began running node {self.unique_id}" + + +@dataclass +class NodeFinished(DebugLevel, CliEventABC): + unique_id: str + + def cli_msg(self) -> str: + return f"Finished running node {self.unique_id}" + + +@dataclass +class QueryCancelationUnsupported(InfoLevel, CliEventABC): + type: str + + def cli_msg(self) -> str: + msg = (f"The {self.type} adapter does not support query " + "cancellation. Some queries may still be " + "running!") + return ui.yellow(msg) + + +@dataclass +class ConcurrencyLine(InfoLevel, CliEventABC): + concurrency_line: str + + def cli_msg(self) -> str: + return self.concurrency_line + + +@dataclass +class StarterProjectPath(DebugLevel, CliEventABC): + dir: str + + def cli_msg(self) -> str: + return f"Starter project path: {self.dir}" + + +@dataclass +class ConfigFolderDirectory(InfoLevel, CliEventABC): + dir: str + + def cli_msg(self) -> str: + return f"Creating dbt configuration folder at {self.dir}" + + +@dataclass +class NoSampleProfileFound(InfoLevel, CliEventABC): + adapter: str + + def cli_msg(self) -> str: + return f"No sample profile found for {self.adapter}." + + +@dataclass +class ProfileWrittenWithSample(InfoLevel, CliEventABC): + name: str + path: str + + def cli_msg(self) -> str: + return (f"Profile {self.name} written to {self.path} " + "using target's sample configuration. Once updated, you'll be able to " + "start developing with dbt.") + + +@dataclass +class ProfileWrittenWithTargetTemplateYAML(InfoLevel, CliEventABC): + name: str + path: str + + def cli_msg(self) -> str: + return (f"Profile {self.name} written to {self.path} using target's " + "profile_template.yml and your supplied values. Run 'dbt debug' to " + "validate the connection.") + + +@dataclass +class ProfileWrittenWithProjectTemplateYAML(InfoLevel, CliEventABC): + name: str + path: str + + def cli_msg(self) -> str: + return (f"Profile {self.name} written to {self.path} using project's " + "profile_template.yml and your supplied values. Run 'dbt debug' to " + "validate the connection.") + + +class SettingUpProfile(InfoLevel, CliEventABC): + def cli_msg(self) -> str: + return "Setting up your profile." + + +class InvalidProfileTemplateYAML(InfoLevel, CliEventABC): + def cli_msg(self) -> str: + return "Invalid profile_template.yml in project." + + +@dataclass +class ProjectNameAlreadyExists(InfoLevel, CliEventABC): + name: str + + def cli_msg(self) -> str: + return f"A project called {self.name} already exists here." + + +@dataclass +class GetAddendum(InfoLevel, CliEventABC): + msg: str + + def cli_msg(self) -> str: + return self.msg + + # since mypy doesn't run on every file we need to suggest to mypy that every # class gets instantiated. But we don't actually want to run this code. # making the conditional `if False` causes mypy to skip it as dead code so @@ -1109,3 +1724,57 @@ def cli_msg(self) -> str: BuildingCatalog() CompileComplete() FreshnessCheckComplete() + ServingDocsPort(address='', port=0) + ServingDocsAccessInfo(port='') + ServingDocsExitInfo() + SeedHeader(header='') + SeedHeaderSeperator(len_header=0) + RunResultWarning(resource_type='', node_name='', path='') + RunResultFailure(resource_type='', node_name='', path='') + StatsLine(stats={}) + RunResultError(msg='') + RunResultErrorNoMessage(status='') + SQLCompiledPath(path='') + CheckNodeTestFailure(relation_name='') + FirstRunResultError(msg='') + AfterFirstRunResultError(msg='') + PrintStartLine(description='', index=0, total=0) + PrintHookStartLine(statement='', index=0, total=0, truncate=False) + PrintHookEndLine(statement='', status='', index=0, total=0, execution_time=0, truncate=False) + SkippingDetails(resource_type='', schema='', node_name='', index=0, total=0) + PrintErrorTestResult(name='', index=0, num_models=0, execution_time=0) + PrintPassTestResult(name='', index=0, num_models=0, execution_time=0) + PrintWarnTestResult(name='', index=0, num_models=0, execution_time=0, failures=[]) + PrintFailureTestResult(name='', index=0, num_models=0, execution_time=0, failures=[]) + PrintSkipBecauseError(schema='', relation='', index=0, total=0) + PrintModelErrorResultLine(description='', status='', index=0, total=0, execution_time=0) + PrintModelResultLine(description='', status='', index=0, total=0, execution_time=0) + PrintSnapshotErrorResultLine(status='', + description='', + cfg={}, + index=0, + total=0, + execution_time=0) + PrintSnapshotResultLine(status='', description='', cfg={}, index=0, total=0, execution_time=0) + PrintSeedErrorResultLine(status='', index=0, total=0, execution_time=0, schema='', relation='') + PrintSeedResultLine(status='', index=0, total=0, execution_time=0, schema='', relation='') + PrintHookEndErrorLine(source_name='', table_name='', index=0, total=0, execution_time=0) + PrintHookEndErrorStaleLine(source_name='', table_name='', index=0, total=0, execution_time=0) + PrintHookEndWarnLine(source_name='', table_name='', index=0, total=0, execution_time=0) + PrintHookEndPassLine(source_name='', table_name='', index=0, total=0, execution_time=0) + PrintCancelLine(conn_name='') + DefaultSelector(name='') + NodeStart(unique_id='') + NodeFinished(unique_id='') + QueryCancelationUnsupported(type='') + ConcurrencyLine(concurrency_line='') + StarterProjectPath(dir='') + ConfigFolderDirectory(dir='') + NoSampleProfileFound(adapter='') + ProfileWrittenWithSample(name='', path='') + ProfileWrittenWithTargetTemplateYAML(name='', path='') + ProfileWrittenWithProjectTemplateYAML(name='', path='') + SettingUpProfile() + InvalidProfileTemplateYAML() + ProjectNameAlreadyExists(name='') + GetAddendum(msg='') diff --git a/core/dbt/task/base.py b/core/dbt/task/base.py index fc21c8d1d2d..9bc6171dcda 100644 --- a/core/dbt/task/base.py +++ b/core/dbt/task/base.py @@ -20,10 +20,9 @@ DbtProjectError, DbtProjectErrorException, DbtProfileError, DbtProfileErrorException, ProfileListTitle, ListSingleProfile, NoDefinedProfiles, ProfileHelpMessage, CatchableExceptionOnRun, InternalExceptionOnRun, GenericExceptionOnRun, - NodeConnectionReleaseError, PrintDebugStackTrace + NodeConnectionReleaseError, PrintDebugStackTrace, SkippingDetails, PrintSkipBecauseError ) -from .printer import print_skip_caused_by_error, print_skip_line - +from .printer import print_run_result_error from dbt.adapters.factory import register_adapter from dbt.config import RuntimeConfig, Project @@ -309,9 +308,13 @@ def _handle_internal_exception(self, e, ctx): return str(e) def _handle_generic_exception(self, e, ctx): - fire_event(GenericExceptionOnRun(build_path=self.node.build_path, - unique_id=self.node.unique_id, - exc=e)) + fire_event( + GenericExceptionOnRun( + build_path=self.node.build_path, + unique_id=self.node.unique_id, + exc=e + ) + ) fire_event(PrintDebugStackTrace()) return str(e) @@ -394,14 +397,15 @@ def on_skip(self): # if this model was skipped due to an upstream ephemeral model # failure, print a special 'error skip' message. if self._skip_caused_by_ephemeral_failure(): - print_skip_caused_by_error( - self.node, - schema_name, - node_name, - self.node_index, - self.num_nodes, - self.skip_cause + fire_event( + PrintSkipBecauseError( + schema=schema_name, + relation=node_name, + index=self.node_index, + total=self.num_nodes + ) ) + print_run_result_error(result=self.skip_cause, newline=False) if self.skip_cause is None: # mypy appeasement raise InternalException( 'Skip cause not set but skip was somehow caused by ' @@ -415,12 +419,14 @@ def on_skip(self): self.skip_cause.node.unique_id) ) else: - print_skip_line( - self.node, - schema_name, - node_name, - self.node_index, - self.num_nodes + fire_event( + SkippingDetails( + resource_type=self.node.resource_type, + schema=schema_name, + node_name=node_name, + index=self.node_index, + total=self.num_nodes + ) ) node_result = self.skip_result(self.node, error_message) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index e8f765da8c8..88ddbcae44f 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -4,8 +4,6 @@ from .base import BaseRunner from .printer import ( - print_start_line, - print_freshness_result_line, print_run_result_error, ) from .runnable import GraphRunnableTask @@ -17,7 +15,10 @@ ) from dbt.exceptions import RuntimeException, InternalException from dbt.events.functions import fire_event -from dbt.events.types import FreshnessCheckComplete +from dbt.events.types import ( + FreshnessCheckComplete, PrintStartLine, PrintHookEndErrorLine, + PrintHookEndErrorStaleLine, PrintHookEndWarnLine, PrintHookEndPassLine +) from dbt.node_types import NodeType from dbt.graph import ResourceTypeSelector @@ -35,10 +36,61 @@ def on_skip(self): def before_execute(self): description = 'freshness of {0.source_name}.{0.name}'.format(self.node) - print_start_line(description, self.node_index, self.num_nodes) + fire_event( + PrintStartLine( + description=description, + index=self.node_index, + total=self.num_nodes + ) + ) def after_execute(self, result): - print_freshness_result_line(result, self.node_index, self.num_nodes) + if hasattr(result, 'node'): + source_name = result.node.source_name + table_name = result.node.name + else: + source_name = result.source_name + table_name = result.table_name + if result.status == FreshnessStatus.RuntimeErr: + fire_event( + PrintHookEndErrorLine( + source_name=source_name, + table_name=table_name, + index=self.node_index, + total=self.num_nodes, + execution_time=result.execution_time + ) + ) + elif result.status == FreshnessStatus.Error: + fire_event( + PrintHookEndErrorStaleLine( + source_name=source_name, + table_name=table_name, + index=self.node_index, + total=self.num_nodes, + execution_time=result.execution_time + ) + ) + elif result.status == FreshnessStatus.Warn: + fire_event( + PrintHookEndWarnLine( + source_name=source_name, + table_name=table_name, + index=self.node_index, + total=self.num_nodes, + execution_time=result.execution_time + ) + ) + else: + fire_event( + PrintHookEndPassLine( + source_name=source_name, + table_name=table_name, + index=self.node_index, + total=self.num_nodes, + execution_time=result.execution_time + ) + ) def error_result(self, node, message, start_time, timing_info): return self._build_run_result( diff --git a/core/dbt/task/init.py b/core/dbt/task/init.py index ba54800fb9f..7446ba800aa 100644 --- a/core/dbt/task/init.py +++ b/core/dbt/task/init.py @@ -14,7 +14,12 @@ from dbt.version import _get_adapter_plugin_names from dbt.adapters.factory import load_plugin, get_include_paths -from dbt.logger import GLOBAL_LOGGER as logger +from dbt.events.functions import fire_event +from dbt.events.types import ( + StarterProjectPath, ConfigFolderDirectory, NoSampleProfileFound, ProfileWrittenWithSample, + ProfileWrittenWithTargetTemplateYAML, ProfileWrittenWithProjectTemplateYAML, SettingUpProfile, + InvalidProfileTemplateYAML, ProjectNameAlreadyExists, GetAddendum +) from dbt.include.starter_project import PACKAGE_PATH as starter_project_directory @@ -55,7 +60,7 @@ class InitTask(BaseTask): def copy_starter_repo(self, project_name): - logger.debug("Starter project path: " + starter_project_directory) + fire_event(StarterProjectPath(dir=starter_project_directory)) shutil.copytree(starter_project_directory, project_name, ignore=shutil.ignore_patterns(*IGNORE_FILES)) @@ -63,8 +68,7 @@ def create_profiles_dir(self, profiles_dir: str) -> bool: """Create the user's profiles directory if it doesn't already exist.""" profiles_path = Path(profiles_dir) if not profiles_path.exists(): - msg = "Creating dbt configuration folder at {}" - logger.info(msg.format(profiles_dir)) + fire_event(ConfigFolderDirectory(dir=profiles_dir)) dbt.clients.system.make_directory(profiles_dir) return True return False @@ -79,7 +83,7 @@ def create_profile_from_sample(self, adapter: str, profile_name: str): sample_profiles_path = adapter_path / "sample_profiles.yml" if not sample_profiles_path.exists(): - logger.debug(f"No sample profile found for {adapter}.") + fire_event(NoSampleProfileFound(adapter=adapter)) else: with open(sample_profiles_path, "r") as f: sample_profile = f.read() @@ -98,10 +102,11 @@ def create_profile_from_sample(self, adapter: str, profile_name: str): else: with open(profiles_filepath, "w") as f: f.write(sample_profile) - logger.info( - f"Profile {profile_name} written to {profiles_filepath} " - "using target's sample configuration. Once updated, you'll be able to " - "start developing with dbt." + fire_event( + ProfileWrittenWithSample( + name=profile_name, + path=str(profiles_filepath) + ) ) def get_addendum(self, project_name: str, profiles_path: str) -> str: @@ -207,10 +212,11 @@ def create_profile_from_target(self, adapter: str, profile_name: str): profile_template = yaml.safe_load(f) self.create_profile_from_profile_template(profile_template, profile_name) profiles_filepath = Path(flags.PROFILES_DIR) / Path("profiles.yml") - logger.info( - f"Profile {profile_name} written to {profiles_filepath} using target's " - "profile_template.yml and your supplied values. Run 'dbt debug' to " - "validate the connection." + fire_event( + ProfileWrittenWithTargetTemplateYAML( + name=profile_name, + path=str(profiles_filepath) + ) ) else: # For adapters without a profile_template.yml defined, fallback on @@ -244,10 +250,11 @@ def create_profile_using_project_profile_template(self, profile_name): profile_template = yaml.safe_load(f) self.create_profile_from_profile_template(profile_template, profile_name) profiles_filepath = Path(flags.PROFILES_DIR) / Path("profiles.yml") - logger.info( - f"Profile {profile_name} written to {profiles_filepath} using project's " - "profile_template.yml and your supplied values. Run 'dbt debug' to " - "validate the connection." + fire_event( + ProfileWrittenWithProjectTemplateYAML( + name=profile_name, + path=str(profiles_filepath) + ) ) def ask_for_adapter_choice(self) -> str: @@ -276,7 +283,7 @@ def run(self): if in_project: # When dbt init is run inside an existing project, # just setup the user's profile. - logger.info("Setting up your profile.") + fire_event(SettingUpProfile()) profile_name = self.get_profile_name_from_current_project() # If a profile_template.yml exists in the project root, that effectively # overrides the profile_template.yml for the given target. @@ -288,7 +295,7 @@ def run(self): self.create_profile_using_project_profile_template(profile_name) return except Exception: - logger.info("Invalid profile_template.yml in project.") + fire_event(InvalidProfileTemplateYAML()) if not self.check_if_can_write_profile(profile_name=profile_name): return adapter = self.ask_for_adapter_choice() @@ -301,9 +308,7 @@ def run(self): project_name = click.prompt("What is the desired project name?") project_path = Path(project_name) if project_path.exists(): - logger.info( - f"A project called {project_name} already exists here." - ) + fire_event(ProjectNameAlreadyExists(name=project_name)) return self.copy_starter_repo(project_name) @@ -323,4 +328,5 @@ def run(self): self.create_profile_from_target( adapter, profile_name=project_name ) - logger.info(self.get_addendum(project_name, profiles_dir)) + msg = self.get_addendum(project_name, profiles_dir) + fire_event(GetAddendum(msg=msg)) diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index 4062c1466d2..945d4edcd6f 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -1,50 +1,22 @@ -from typing import Dict, Optional, Tuple, Callable +from typing import Dict from dbt.logger import ( - GLOBAL_LOGGER as logger, DbtStatusMessage, TextOnly, - get_timestamp, ) -from dbt.node_types import NodeType +from dbt.events.functions import fire_event +from dbt.events.types import ( + EmptyLine, RunResultWarning, RunResultFailure, StatsLine, RunResultError, + RunResultErrorNoMessage, SQLCompiledPath, CheckNodeTestFailure, FirstRunResultError, + AfterFirstRunResultError, EndOfRunSummary +) from dbt.tracking import InvocationProcessor -from dbt import ui from dbt import utils from dbt.contracts.results import ( - FreshnessStatus, NodeStatus, TestStatus + NodeStatus ) - - -def print_fancy_output_line( - msg: str, status: str, logger_fn: Callable, index: Optional[int], - total: Optional[int], execution_time: Optional[float] = None, - truncate: bool = False -) -> None: - if index is None or total is None: - progress = '' - else: - progress = '{} of {} '.format(index, total) - prefix = "{timestamp} | {progress}{message}".format( - timestamp=get_timestamp(), - progress=progress, - message=msg) - - truncate_width = ui.printer_width() - 3 - justified = prefix.ljust(ui.printer_width(), ".") - if truncate and len(justified) > truncate_width: - justified = justified[:truncate_width] + '...' - - if execution_time is None: - status_time = "" - else: - status_time = " in {execution_time:0.2f}s".format( - execution_time=execution_time) - - output = "{justified} [{status}{status_time}]".format( - justified=justified, status=status, status_time=status_time) - - logger_fn(output) +from dbt.node_types import NodeType def get_counts(flat_nodes) -> str: @@ -66,179 +38,6 @@ def get_counts(flat_nodes) -> str: return stat_line -def print_start_line(description: str, index: int, total: int) -> None: - msg = "START {}".format(description) - print_fancy_output_line(msg, 'RUN', logger.info, index, total) - - -def print_hook_start_line(statement: str, index: int, total: int) -> None: - msg = 'START hook: {}'.format(statement) - print_fancy_output_line( - msg, 'RUN', logger.info, index, total, truncate=True) - - -def print_hook_end_line( - statement: str, status: str, index: int, total: int, execution_time: float -) -> None: - msg = 'OK hook: {}'.format(statement) - # hooks don't fail into this path, so always green - print_fancy_output_line(msg, ui.green(status), logger.info, index, total, - execution_time=execution_time, truncate=True) - - -def print_skip_line( - node, schema: str, relation: str, index: int, num_models: int -) -> None: - if node.resource_type in NodeType.refable(): - msg = f'SKIP relation {schema}.{relation}' - else: - msg = f'SKIP {node.resource_type} {node.name}' - print_fancy_output_line( - msg, ui.yellow('SKIP'), logger.info, index, num_models) - - -def print_cancel_line(model) -> None: - msg = 'CANCEL query {}'.format(model) - print_fancy_output_line( - msg, ui.red('CANCEL'), logger.error, index=None, total=None) - - -def get_printable_result( - result, success: str, error: str) -> Tuple[str, str, Callable]: - if result.status == NodeStatus.Error: - info = 'ERROR {}'.format(error) - status = ui.red(result.status.upper()) - logger_fn = logger.error - else: - info = 'OK {}'.format(success) - status = ui.green(result.message) - logger_fn = logger.info - - return info, status, logger_fn - - -def print_test_result_line( - result, index: int, total: int -) -> None: - model = result.node - - if result.status == TestStatus.Error: - info = "ERROR" - color = ui.red - logger_fn = logger.error - elif result.status == TestStatus.Pass: - info = 'PASS' - color = ui.green - logger_fn = logger.info - elif result.status == TestStatus.Warn: - info = f'WARN {result.failures}' - color = ui.yellow - logger_fn = logger.warning - elif result.status == TestStatus.Fail: - info = f'FAIL {result.failures}' - color = ui.red - logger_fn = logger.error - else: - raise RuntimeError("unexpected status: {}".format(result.status)) - - print_fancy_output_line( - "{info} {name}".format(info=info, name=model.name), - color(info), - logger_fn, - index, - total, - result.execution_time) - - -def print_model_result_line( - result, description: str, index: int, total: int -) -> None: - info, status, logger_fn = get_printable_result( - result, 'created', 'creating') - - print_fancy_output_line( - "{info} {description}".format(info=info, description=description), - status, - logger_fn, - index, - total, - result.execution_time) - - -def print_snapshot_result_line( - result, description: str, index: int, total: int -) -> None: - model = result.node - - info, status, logger_fn = get_printable_result( - result, 'snapshotted', 'snapshotting') - cfg = model.config.to_dict(omit_none=True) - - msg = "{info} {description}".format( - info=info, description=description, **cfg) - print_fancy_output_line( - msg, - status, - logger_fn, - index, - total, - result.execution_time) - - -def print_seed_result_line(result, schema_name: str, index: int, total: int): - model = result.node - - info, status, logger_fn = get_printable_result(result, 'loaded', 'loading') - - print_fancy_output_line( - "{info} seed file {schema}.{relation}".format( - info=info, - schema=schema_name, - relation=model.alias), - status, - logger_fn, - index, - total, - result.execution_time) - - -def print_freshness_result_line(result, index: int, total: int) -> None: - if result.status == FreshnessStatus.RuntimeErr: - info = 'ERROR' - color = ui.red - logger_fn = logger.error - elif result.status == FreshnessStatus.Error: - info = 'ERROR STALE' - color = ui.red - logger_fn = logger.error - elif result.status == FreshnessStatus.Warn: - info = 'WARN' - color = ui.yellow - logger_fn = logger.warning - else: - info = 'PASS' - color = ui.green - logger_fn = logger.info - - if hasattr(result, 'node'): - source_name = result.node.source_name - table_name = result.node.name - else: - source_name = result.source_name - table_name = result.table_name - - msg = f"{info} freshness of {source_name}.{table_name}" - - print_fancy_output_line( - msg, - color(info), - logger_fn, - index, - total, - execution_time=result.execution_time - ) - - def interpret_run_result(result) -> str: if result.status in (NodeStatus.Error, NodeStatus.Fail): return 'error' @@ -266,8 +65,7 @@ def print_run_status_line(results) -> None: stats[result_type] += 1 stats['total'] += 1 - stats_line = "\nDone. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} TOTAL={total}" # noqa - logger.info(stats_line.format(**stats)) + fire_event(StatsLine(stats=stats)) def print_run_result_error( @@ -275,81 +73,51 @@ def print_run_result_error( ) -> None: if newline: with TextOnly(): - logger.info("") + fire_event(EmptyLine()) if result.status == NodeStatus.Fail or ( is_warning and result.status == NodeStatus.Warn ): if is_warning: - color = ui.yellow - info = 'Warning' - logger_fn = logger.warning + fire_event( + RunResultWarning( + resource_type=result.node.resource_type, + node_name=result.node.name, + path=result.node.original_file_path + ) + ) else: - color = ui.red - info = 'Failure' - logger_fn = logger.error - logger_fn(color("{} in {} {} ({})").format( - info, - result.node.resource_type, - result.node.name, - result.node.original_file_path)) + fire_event( + RunResultFailure( + resource_type=result.node.resource_type, + node_name=result.node.name, + path=result.node.original_file_path + ) + ) if result.message: - logger.error(f" {result.message}") + fire_event(RunResultError(msg=result.message)) else: - logger.error(f" Status: {result.status}") + fire_event(RunResultErrorNoMessage(status=result.status)) if result.node.build_path is not None: with TextOnly(): - logger.info("") - logger.info(" compiled SQL at {}".format( - result.node.compiled_path)) + fire_event(EmptyLine()) + fire_event(SQLCompiledPath(path=result.node.compiled_path)) if result.node.should_store_failures: with TextOnly(): - logger.info("") - msg = f"select * from {result.node.relation_name}" - border = '-' * len(msg) - logger.info(f" See test failures:\n {border}\n {msg}\n {border}") + fire_event(EmptyLine()) + fire_event(CheckNodeTestFailure(relation_name=result.node.relation_name)) elif result.message is not None: first = True for line in result.message.split("\n"): if first: - logger.error(ui.yellow(line)) + fire_event(FirstRunResultError(msg=line)) first = False else: - logger.error(line) - - -def print_skip_caused_by_error( - model, schema: str, relation: str, index: int, num_models: int, result -) -> None: - msg = ('SKIP relation {}.{} due to ephemeral model error' - .format(schema, relation)) - print_fancy_output_line( - msg, ui.red('ERROR SKIP'), logger.error, index, num_models) - print_run_result_error(result, newline=False) - - -def print_end_of_run_summary( - num_errors: int, num_warnings: int, keyboard_interrupt: bool = False -) -> None: - error_plural = utils.pluralize(num_errors, 'error') - warn_plural = utils.pluralize(num_warnings, 'warning') - if keyboard_interrupt: - message = ui.yellow('Exited because of keyboard interrupt.') - elif num_errors > 0: - message = ui.red("Completed with {} and {}:".format( - error_plural, warn_plural)) - elif num_warnings > 0: - message = ui.yellow('Completed with {}:'.format(warn_plural)) - else: - message = ui.green('Completed successfully') - - with TextOnly(): - logger.info('') - logger.info('{}'.format(message)) + fire_event(AfterFirstRunResultError(msg=line)) def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None: @@ -369,9 +137,15 @@ def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None: warnings.append(r) with DbtStatusMessage(), InvocationProcessor(): - print_end_of_run_summary(len(errors), - len(warnings), - keyboard_interrupt) + with TextOnly(): + fire_event(EmptyLine()) + fire_event( + EndOfRunSummary( + num_errors=len(errors), + num_warnings=len(warnings), + keyboard_interrupt=keyboard_interrupt + ) + ) for error in errors: print_run_result_error(error, is_warning=False) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index e5a217349f8..82ed8009903 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -8,10 +8,6 @@ from .compile import CompileRunner, CompileTask from .printer import ( - print_start_line, - print_model_result_line, - print_hook_start_line, - print_hook_end_line, print_run_end_messages, get_counts, ) @@ -33,7 +29,11 @@ missing_materialization, ) from dbt.events.functions import fire_event -from dbt.events.types import DatabaseErrorRunning, EmptyLine, HooksRunning, HookFinished +from dbt.events.types import ( + DatabaseErrorRunning, EmptyLine, HooksRunning, HookFinished, + PrintModelErrorResultLine, PrintModelResultLine, PrintStartLine, + PrintHookEndLine, PrintHookStartLine +) from dbt.logger import ( TextOnly, HookMetadata, @@ -173,13 +173,36 @@ def describe_node(self): self.get_node_representation()) def print_start_line(self): - description = self.describe_node() - print_start_line(description, self.node_index, self.num_nodes) + fire_event( + PrintStartLine( + description=self.describe_node(), + index=self.node_index, + total=self.num_nodes + ) + ) def print_result_line(self, result): description = self.describe_node() - print_model_result_line(result, description, self.node_index, - self.num_nodes) + if result.status == NodeStatus.Error: + fire_event( + PrintModelErrorResultLine( + description=description, + status=result.status, + index=self.node_index, + total=self.num_nodes, + execution_time=result.execution_time + ) + ) + else: + fire_event( + PrintModelResultLine( + description=description, + status=result.message, + index=self.node_index, + total=self.num_nodes, + execution_time=result.execution_time + ) + ) def before_execute(self): self.print_start_line() @@ -324,7 +347,14 @@ def run_hooks(self, adapter, hook_type: RunHookType, extra_context): hook_meta_ctx = HookMetadata(hook, self.index_offset(idx)) with UniqueID(hook.unique_id): with hook_meta_ctx, startctx: - print_hook_start_line(hook_text, idx, num_hooks) + fire_event( + PrintHookStartLine( + statement=hook_text, + index=idx, + total=num_hooks, + truncate=True + ) + ) status = 'OK' @@ -335,8 +365,15 @@ def run_hooks(self, adapter, hook_type: RunHookType, extra_context): self.ran_hooks.append(hook) with finishctx, DbtModelState({'node_status': 'passed'}): - print_hook_end_line( - hook_text, str(status), idx, num_hooks, timer.elapsed + fire_event( + PrintHookEndLine( + statement=hook_text, + status=str(status), + index=idx, + total=num_hooks, + execution_time=timer.elapsed, + truncate=True + ) ) self._total_executed += len(ordered_hooks) diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 779b7428cf9..6a3af18a188 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -11,16 +11,13 @@ from .printer import ( print_run_result_error, print_run_end_messages, - print_cancel_line, ) -from dbt import ui from dbt.clients.system import write_file from dbt.task.base import ConfiguredTask from dbt.adapters.base import BaseRelation from dbt.adapters.factory import get_adapter from dbt.logger import ( - GLOBAL_LOGGER as logger, DbtProcessState, TextOnly, UniqueID, @@ -28,9 +25,12 @@ DbtModelState, ModelMetadata, NodeCount, - print_timestamped_line, ) - +from dbt.events.functions import fire_event +from dbt.events.types import ( + EmptyLine, PrintCancelLine, DefaultSelector, NodeStart, NodeFinished, + QueryCancelationUnsupported, ConcurrencyLine +) from dbt.contracts.graph.compiled import CompileResultNode from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.parsed import ParsedSourceDefinition @@ -133,7 +133,7 @@ def get_selection_spec(self) -> SelectionSpec: spec = self.config.get_selector(self.args.selector_name) elif not (self.selection_arg or self.exclusion_arg) and default_selector_name: # use pre-defined selector (--selector) with default: true - logger.info(f"Using default selector {default_selector_name}") + fire_event(DefaultSelector(name=default_selector_name)) spec = self.config.get_selector(default_selector_name) else: # use --select and --exclude args @@ -208,8 +208,7 @@ def call_runner(self, runner): index = self.index_offset(runner.node_index) extended_metadata = ModelMetadata(runner.node, index) with startctx, extended_metadata: - logger.debug('Began running node {}'.format( - runner.node.unique_id)) + fire_event(NodeStart(unique_id=runner.node.unique_id)) status: Dict[str, str] try: result = runner.run_with_hooks(self.manifest) @@ -217,8 +216,7 @@ def call_runner(self, runner): finally: finishctx = TimestampNamed('node_finished_at') with finishctx, DbtModelState(status): - logger.debug('Finished running node {}'.format( - runner.node.unique_id)) + fire_event(NodeFinished(unique_id=runner.node.unique_id)) fail_fast = flags.FAIL_FAST @@ -337,12 +335,7 @@ def _cancel_connections(self, pool): adapter = get_adapter(self.config) if not adapter.is_cancelable(): - msg = ("The {} adapter does not support query " - "cancellation. Some queries may still be " - "running!".format(adapter.type())) - - yellow = ui.COLOR_FG_YELLOW - print_timestamped_line(msg, yellow) + fire_event(QueryCancelationUnsupported(type=adapter.type)) else: with adapter.connection_named('master'): for conn_name in adapter.cancel_open_connections(): @@ -352,7 +345,7 @@ def _cancel_connections(self, pool): continue # if we don't have a manifest/don't have a node, print # anyway. - print_cancel_line(conn_name) + fire_event(PrintCancelLine(conn_name=conn_name)) pool.join() @@ -363,9 +356,9 @@ def execute_nodes(self): text = "Concurrency: {} threads (target='{}')" concurrency_line = text.format(num_threads, target_name) with NodeCount(self.num_nodes): - print_timestamped_line(concurrency_line) + fire_event(ConcurrencyLine(concurrency_line=concurrency_line)) with TextOnly(): - print_timestamped_line("") + fire_event(EmptyLine()) pool = ThreadPool(num_threads) try: @@ -453,7 +446,7 @@ def run(self): ) else: with TextOnly(): - logger.info("") + fire_event(EmptyLine()) selected_uids = frozenset(n.unique_id for n in self._flattened_nodes) result = self.execute_with_hooks(selected_uids) diff --git a/core/dbt/task/seed.py b/core/dbt/task/seed.py index edd60f52c85..6609673a284 100644 --- a/core/dbt/task/seed.py +++ b/core/dbt/task/seed.py @@ -2,16 +2,20 @@ from .run import ModelRunner, RunTask from .printer import ( - print_start_line, - print_seed_result_line, print_run_end_messages, ) from dbt.contracts.results import RunStatus from dbt.exceptions import InternalException from dbt.graph import ResourceTypeSelector -from dbt.logger import GLOBAL_LOGGER as logger, TextOnly +from dbt.logger import TextOnly +from dbt.events.functions import fire_event +from dbt.events.types import ( + SeedHeader, SeedHeaderSeperator, EmptyLine, PrintSeedErrorResultLine, + PrintSeedResultLine, PrintStartLine +) from dbt.node_types import NodeType +from dbt.contracts.results import NodeStatus class SeedRunner(ModelRunner): @@ -19,8 +23,13 @@ def describe_node(self): return "seed file {}".format(self.get_node_representation()) def before_execute(self): - description = self.describe_node() - print_start_line(description, self.node_index, self.num_nodes) + fire_event( + PrintStartLine( + description=self.describe_node(), + index=self.node_index, + total=self.num_nodes + ) + ) def _build_run_model_result(self, model, context): result = super()._build_run_model_result(model, context) @@ -32,9 +41,29 @@ def compile(self, manifest): return self.node def print_result_line(self, result): - schema_name = self.node.schema - print_seed_result_line(result, schema_name, self.node_index, - self.num_nodes) + model = result.node + if result.status == NodeStatus.Error: + fire_event( + PrintSeedErrorResultLine( + status=result.status, + index=self.node_index, + total=self.num_nodes, + execution_time=result.execution_time, + schema=self.node.schema, + relation=model.alias + ) + ) + else: + fire_event( + PrintSeedResultLine( + status=result.message, + index=self.node_index, + total=self.num_nodes, + execution_time=result.execution_time, + schema=self.node.schema, + relation=model.alias + ) + ) class SeedTask(RunTask): @@ -75,12 +104,13 @@ def show_table(self, result): header = "Random sample of table: {}.{}".format(schema, alias) with TextOnly(): - logger.info("") - logger.info(header) - logger.info("-" * len(header)) + fire_event(EmptyLine()) + fire_event(SeedHeader(header=header)) + fire_event(SeedHeaderSeperator(len_header=len(header))) + rand_table.print_table(max_rows=10, max_columns=None) with TextOnly(): - logger.info("") + fire_event(EmptyLine()) def show_tables(self, results): for result in results: diff --git a/core/dbt/task/serve.py b/core/dbt/task/serve.py index 4d0e2473498..890241a9931 100644 --- a/core/dbt/task/serve.py +++ b/core/dbt/task/serve.py @@ -5,7 +5,8 @@ from dbt.include.global_project import DOCS_INDEX_FILE_PATH from http.server import SimpleHTTPRequestHandler from socketserver import TCPServer -from dbt.logger import GLOBAL_LOGGER as logger +from dbt.events.functions import fire_event +from dbt.events.types import ServingDocsPort, ServingDocsAccessInfo, ServingDocsExitInfo from dbt.task.base import ConfiguredTask @@ -15,19 +16,17 @@ def run(self): os.chdir(self.config.target_path) port = self.args.port + address = '0.0.0.0' shutil.copyfile(DOCS_INDEX_FILE_PATH, 'index.html') - logger.info("Serving docs at 0.0.0.0:{}".format(port)) - logger.info( - "To access from your browser, navigate to: http://localhost:{}" - .format(port) - ) - logger.info("Press Ctrl+C to exit.\n\n") + fire_event(ServingDocsPort(address=address, port=port)) + fire_event(ServingDocsAccessInfo(port=port)) + fire_event(ServingDocsExitInfo()) # mypy doesn't think SimpleHTTPRequestHandler is ok here, but it is httpd = TCPServer( # type: ignore - ('0.0.0.0', port), + (address, port), SimpleHTTPRequestHandler # type: ignore ) # type: ignore diff --git a/core/dbt/task/snapshot.py b/core/dbt/task/snapshot.py index 4515f11c583..63b1faa7279 100644 --- a/core/dbt/task/snapshot.py +++ b/core/dbt/task/snapshot.py @@ -1,9 +1,11 @@ from .run import ModelRunner, RunTask -from .printer import print_snapshot_result_line from dbt.exceptions import InternalException +from dbt.events.functions import fire_event +from dbt.events.types import PrintSnapshotErrorResultLine, PrintSnapshotResultLine from dbt.graph import ResourceTypeSelector from dbt.node_types import NodeType +from dbt.contracts.results import NodeStatus class SnapshotRunner(ModelRunner): @@ -11,11 +13,30 @@ def describe_node(self): return "snapshot {}".format(self.get_node_representation()) def print_result_line(self, result): - print_snapshot_result_line( - result, - self.get_node_representation(), - self.node_index, - self.num_nodes) + model = result.node + cfg = model.config.to_dict(omit_none=True) + if result.status == NodeStatus.Error: + fire_event( + PrintSnapshotErrorResultLine( + status=result.status, + description=self.get_node_representation(), + cfg=cfg, + index=self.node_index, + total=self.num_nodes, + execution_time=result.execution_time + ) + ) + else: + fire_event( + PrintSnapshotResultLine( + status=result.message, + description=self.get_node_representation(), + cfg=cfg, + index=self.node_index, + total=self.num_nodes, + execution_time=result.execution_time + ) + ) class SnapshotTask(RunTask): diff --git a/core/dbt/task/test.py b/core/dbt/task/test.py index c172492dc34..81f62c36bf3 100644 --- a/core/dbt/task/test.py +++ b/core/dbt/task/test.py @@ -8,7 +8,6 @@ from .compile import CompileRunner from .run import RunTask -from .printer import print_start_line, print_test_result_line from dbt.contracts.graph.compiled import ( CompiledSingularTestNode, @@ -19,6 +18,11 @@ from dbt.contracts.results import TestStatus, PrimitiveDict, RunResult from dbt.context.providers import generate_runtime_model_context from dbt.clients.jinja import MacroGenerator +from dbt.events.functions import fire_event +from dbt.events.types import ( + PrintErrorTestResult, PrintPassTestResult, PrintWarnTestResult, + PrintFailureTestResult, PrintStartLine +) from dbt.exceptions import ( InternalException, invalid_bool_error, @@ -61,11 +65,57 @@ def describe_node(self): return "test {}".format(node_name) def print_result_line(self, result): - print_test_result_line(result, self.node_index, self.num_nodes) + model = result.node + + if result.status == TestStatus.Error: + fire_event( + PrintErrorTestResult( + name=model.name, + index=self.node_index, + num_models=self.num_nodes, + execution_time=result.execution_time + ) + ) + elif result.status == TestStatus.Pass: + fire_event( + PrintPassTestResult( + name=model.name, + index=self.node_index, + num_models=self.num_nodes, + execution_time=result.execution_time + ) + ) + elif result.status == TestStatus.Warn: + fire_event( + PrintWarnTestResult( + name=model.name, + index=self.node_index, + num_models=self.num_nodes, + execution_time=result.execution_time, + failures=result.failures + ) + ) + elif result.status == TestStatus.Fail: + fire_event( + PrintFailureTestResult( + name=model.name, + index=self.node_index, + num_models=self.num_nodes, + execution_time=result.execution_time, + failures=result.failures + ) + ) + else: + raise RuntimeError("unexpected status: {}".format(result.status)) def print_start_line(self): - description = self.describe_node() - print_start_line(description, self.node_index, self.num_nodes) + fire_event( + PrintStartLine( + description=self.describe_node(), + index=self.node_index, + total=self.num_nodes + ) + ) def before_execute(self): self.print_start_line()