From dfe00bfee87873665f408ef0350cf9c73375b620 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 18 Feb 2021 11:15:13 -0800 Subject: [PATCH] feat(ingest): improve error reporting for pipelines (#2121) --- .../airflow/generic_recipe_sample_dag.py | 1 + .../examples/airflow/mysql_sample_dag.py | 1 + .../src/datahub/configuration/common.py | 4 ++ metadata-ingestion/src/datahub/entrypoints.py | 1 + .../src/datahub/ingestion/api/sink.py | 7 +++- .../src/datahub/ingestion/api/source.py | 17 +++++++- .../src/datahub/ingestion/run/pipeline.py | 40 ++++++++++++++++--- .../src/datahub/ingestion/source/kafka.py | 14 +------ .../datahub/ingestion/source/sql_common.py | 8 +--- .../tests/unit/serde/test_serde.py | 1 + .../tests/unit/test_pipeline.py | 1 + 11 files changed, 65 insertions(+), 30 deletions(-) diff --git a/metadata-ingestion/examples/airflow/generic_recipe_sample_dag.py b/metadata-ingestion/examples/airflow/generic_recipe_sample_dag.py index 7bb37d15ae054..a2cddb1b18bb7 100644 --- a/metadata-ingestion/examples/airflow/generic_recipe_sample_dag.py +++ b/metadata-ingestion/examples/airflow/generic_recipe_sample_dag.py @@ -33,6 +33,7 @@ def datahub_recipe(): pipeline = Pipeline.create(config) pipeline.run() + pipeline.raise_from_status() with DAG( diff --git a/metadata-ingestion/examples/airflow/mysql_sample_dag.py b/metadata-ingestion/examples/airflow/mysql_sample_dag.py index 98f7eeca34a3f..0ac8850ad9f38 100644 --- a/metadata-ingestion/examples/airflow/mysql_sample_dag.py +++ b/metadata-ingestion/examples/airflow/mysql_sample_dag.py @@ -45,6 +45,7 @@ def ingest_from_mysql(): } ) pipeline.run() + pipeline.raise_from_status() with DAG( diff --git a/metadata-ingestion/src/datahub/configuration/common.py b/metadata-ingestion/src/datahub/configuration/common.py index 4d655aae4e751..5be49df8be027 100644 --- a/metadata-ingestion/src/datahub/configuration/common.py +++ b/metadata-ingestion/src/datahub/configuration/common.py @@ -22,6 +22,10 @@ class MetaError(Exception): """A base class for all meta exceptions""" +class PipelineExecutionError(MetaError): + """An error occurred when executing the pipeline""" + + class ConfigurationError(MetaError): """A configuration error has happened""" diff --git a/metadata-ingestion/src/datahub/entrypoints.py b/metadata-ingestion/src/datahub/entrypoints.py index 0d8110d1a8a06..0a7bb34312164 100644 --- a/metadata-ingestion/src/datahub/entrypoints.py +++ b/metadata-ingestion/src/datahub/entrypoints.py @@ -62,3 +62,4 @@ def ingest(config: str): logger.debug(f"Using config: {pipeline_config}") pipeline = Pipeline.create(pipeline_config) pipeline.run() + pipeline.pretty_print_summary() diff --git a/metadata-ingestion/src/datahub/ingestion/api/sink.py b/metadata-ingestion/src/datahub/ingestion/api/sink.py index 0e0dc918b11d1..a6720f169da94 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/sink.py +++ b/metadata-ingestion/src/datahub/ingestion/api/sink.py @@ -9,13 +9,16 @@ @dataclass class SinkReport(Report): - # workunits_processed = 0 records_written = 0 + warnings: List[Any] = field(default_factory=list) failures: List[Any] = field(default_factory=list) def report_record_written(self, record_envelope: RecordEnvelope): self.records_written += 1 + def report_warning(self, info: Any) -> None: + self.warnings.append(info) + def report_failure(self, info: Any) -> None: self.failures.append(info) @@ -54,7 +57,7 @@ class Sink(Closeable, metaclass=ABCMeta): @classmethod @abstractmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> 'Sink': + def create(cls, config_dict: dict, ctx: PipelineContext) -> "Sink": pass @abstractmethod diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index bbb4d598b2338..267bf0a87da58 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -1,6 +1,6 @@ from abc import ABCMeta, abstractmethod from dataclasses import dataclass, field -from typing import Iterable, List +from typing import Dict, Iterable, List from .closeable import Closeable from .common import PipelineContext, RecordEnvelope, WorkUnit @@ -12,10 +12,23 @@ class SourceReport(Report): workunits_produced = 0 workunit_ids: List[str] = field(default_factory=list) + warnings: Dict[str, List[str]] = field(default_factory=dict) + failures: Dict[str, List[str]] = field(default_factory=dict) + def report_workunit(self, wu: WorkUnit): self.workunits_produced += 1 self.workunit_ids.append(wu.id) + def report_warning(self, key: str, reason: str) -> None: + if key not in self.warnings: + self.warnings[key] = [] + self.warnings[key].append(reason) + + def report_failure(self, key: str, reason: str) -> None: + if key not in self.failures: + self.failures[key] = [] + self.failures[key].append(reason) + class Extractor(Closeable, metaclass=ABCMeta): @abstractmethod @@ -34,7 +47,7 @@ class Source(Closeable, metaclass=ABCMeta): @classmethod @abstractmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> 'Source': + def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source": pass @abstractmethod diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 82730eefe0e1d..9442298b6b523 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -2,7 +2,13 @@ import logging import time -from datahub.configuration.common import ConfigModel, DynamicTypedConfig +import click + +from datahub.configuration.common import ( + ConfigModel, + DynamicTypedConfig, + PipelineExecutionError, +) from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.sink import Sink, WriteCallback from datahub.ingestion.api.source import Extractor, Source @@ -94,8 +100,30 @@ def run(self): self.sink.handle_work_unit_end(wu) self.sink.close() - print() - print("Source:") - print(self.source.get_report().as_string()) - print("Sink:") - print(self.sink.get_report().as_string()) + def raise_from_status(self, raise_warnings=False): + if self.source.get_report().failures: + raise PipelineExecutionError( + "Source reported errors", self.source.get_report() + ) + if self.sink.get_report().failures: + raise PipelineExecutionError("Sink reported errors", self.sink.get_report()) + if raise_warnings and ( + self.source.get_report().warnings or self.sink.get_report().warnings + ): + raise PipelineExecutionError( + "Source reported warnings", self.source.get_report() + ) + + def pretty_print_summary(self): + click.echo() + click.secho("Source report:", bold=True) + click.echo(self.source.get_report().as_string()) + click.secho("Sink report:", bold=True) + click.echo(self.sink.get_report().as_string()) + click.echo() + if self.source.get_report().failures or self.sink.get_report().failures: + click.secho("Pipeline finished with failures", fg="bright_red", bold=True) + elif self.source.get_report().warnings or self.sink.get_report().warnings: + click.secho("Pipeline finished with warnings", fg="yellow", bold=True) + else: + click.secho("Pipeline finished successfully", fg="green", bold=True) diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index cb895f8e538f7..37fd2ef82a5d6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -1,7 +1,7 @@ import logging import time from dataclasses import dataclass, field -from typing import Dict, Iterable, List +from typing import Iterable, List import confluent_kafka from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient @@ -34,23 +34,11 @@ class KafkaSourceConfig(ConfigModel): @dataclass class KafkaSourceReport(SourceReport): topics_scanned = 0 - warnings: Dict[str, List[str]] = field(default_factory=dict) - failures: Dict[str, List[str]] = field(default_factory=dict) filtered: List[str] = field(default_factory=list) def report_topic_scanned(self, topic: str) -> None: self.topics_scanned += 1 - def report_warning(self, topic: str, reason: str) -> None: - if topic not in self.warnings: - self.warnings[topic] = [] - self.warnings[topic].append(reason) - - def report_failure(self, topic: str, reason: str) -> None: - if topic not in self.failures: - self.failures[topic] = [] - self.failures[topic].append(reason) - def report_dropped(self, topic: str) -> None: self.filtered.append(topic) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql_common.py index a732f186af396..c257d8263321e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql_common.py @@ -2,7 +2,7 @@ import time from abc import abstractmethod from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional +from typing import Any, List, Optional from pydantic import BaseModel from sqlalchemy import create_engine, types @@ -34,12 +34,6 @@ class SQLSourceReport(SourceReport): tables_scanned = 0 filtered: List[str] = field(default_factory=list) - warnings: Dict[str, List[str]] = field(default_factory=dict) - - def report_warning(self, table_name: str, reason: str) -> None: - if table_name not in self.warnings: - self.warnings[table_name] = [] - self.warnings[table_name].append(reason) def report_table_scanned(self, table_name: str) -> None: self.tables_scanned += 1 diff --git a/metadata-ingestion/tests/unit/serde/test_serde.py b/metadata-ingestion/tests/unit/serde/test_serde.py index 7a17410429a8b..f62507f1e0b8c 100644 --- a/metadata-ingestion/tests/unit/serde/test_serde.py +++ b/metadata-ingestion/tests/unit/serde/test_serde.py @@ -19,6 +19,7 @@ def test_serde_large(pytestconfig, tmp_path): } ) pipeline.run() + pipeline.raise_from_status() output = mce_helpers.load_json_file(tmp_path / output_filename) golden = mce_helpers.load_json_file(golden_file) diff --git a/metadata-ingestion/tests/unit/test_pipeline.py b/metadata-ingestion/tests/unit/test_pipeline.py index c8f37063d2677..fcd732b404359 100644 --- a/metadata-ingestion/tests/unit/test_pipeline.py +++ b/metadata-ingestion/tests/unit/test_pipeline.py @@ -15,5 +15,6 @@ def test_configure(self, mock_sink, mock_source): } ) pipeline.run() + pipeline.raise_from_status() mock_source.assert_called_once() mock_sink.assert_called_once()