Skip to content

Commit

Permalink
feat(ingest): improve error reporting for pipelines (#2121)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Feb 18, 2021
1 parent 12ff330 commit dfe00bf
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 30 deletions.
Expand Up @@ -33,6 +33,7 @@ def datahub_recipe():

pipeline = Pipeline.create(config)
pipeline.run()
pipeline.raise_from_status()


with DAG(
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/examples/airflow/mysql_sample_dag.py
Expand Up @@ -45,6 +45,7 @@ def ingest_from_mysql():
}
)
pipeline.run()
pipeline.raise_from_status()


with DAG(
Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/configuration/common.py
Expand Up @@ -22,6 +22,10 @@ class MetaError(Exception):
"""A base class for all meta exceptions"""


class PipelineExecutionError(MetaError):
"""An error occurred when executing the pipeline"""


class ConfigurationError(MetaError):
"""A configuration error has happened"""

Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/entrypoints.py
Expand Up @@ -62,3 +62,4 @@ def ingest(config: str):
logger.debug(f"Using config: {pipeline_config}")
pipeline = Pipeline.create(pipeline_config)
pipeline.run()
pipeline.pretty_print_summary()
7 changes: 5 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/api/sink.py
Expand Up @@ -9,13 +9,16 @@

@dataclass
class SinkReport(Report):
# workunits_processed = 0
records_written = 0
warnings: List[Any] = field(default_factory=list)
failures: List[Any] = field(default_factory=list)

def report_record_written(self, record_envelope: RecordEnvelope):
self.records_written += 1

def report_warning(self, info: Any) -> None:
self.warnings.append(info)

def report_failure(self, info: Any) -> None:
self.failures.append(info)

Expand Down Expand Up @@ -54,7 +57,7 @@ class Sink(Closeable, metaclass=ABCMeta):

@classmethod
@abstractmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> 'Sink':
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Sink":
pass

@abstractmethod
Expand Down
17 changes: 15 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
@@ -1,6 +1,6 @@
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass, field
from typing import Iterable, List
from typing import Dict, Iterable, List

from .closeable import Closeable
from .common import PipelineContext, RecordEnvelope, WorkUnit
Expand All @@ -12,10 +12,23 @@ class SourceReport(Report):
workunits_produced = 0
workunit_ids: List[str] = field(default_factory=list)

warnings: Dict[str, List[str]] = field(default_factory=dict)
failures: Dict[str, List[str]] = field(default_factory=dict)

def report_workunit(self, wu: WorkUnit):
self.workunits_produced += 1
self.workunit_ids.append(wu.id)

def report_warning(self, key: str, reason: str) -> None:
if key not in self.warnings:
self.warnings[key] = []
self.warnings[key].append(reason)

def report_failure(self, key: str, reason: str) -> None:
if key not in self.failures:
self.failures[key] = []
self.failures[key].append(reason)


class Extractor(Closeable, metaclass=ABCMeta):
@abstractmethod
Expand All @@ -34,7 +47,7 @@ class Source(Closeable, metaclass=ABCMeta):

@classmethod
@abstractmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> 'Source':
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
pass

@abstractmethod
Expand Down
40 changes: 34 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Expand Up @@ -2,7 +2,13 @@
import logging
import time

from datahub.configuration.common import ConfigModel, DynamicTypedConfig
import click

from datahub.configuration.common import (
ConfigModel,
DynamicTypedConfig,
PipelineExecutionError,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.sink import Sink, WriteCallback
from datahub.ingestion.api.source import Extractor, Source
Expand Down Expand Up @@ -94,8 +100,30 @@ def run(self):
self.sink.handle_work_unit_end(wu)
self.sink.close()

print()
print("Source:")
print(self.source.get_report().as_string())
print("Sink:")
print(self.sink.get_report().as_string())
def raise_from_status(self, raise_warnings=False):
if self.source.get_report().failures:
raise PipelineExecutionError(
"Source reported errors", self.source.get_report()
)
if self.sink.get_report().failures:
raise PipelineExecutionError("Sink reported errors", self.sink.get_report())
if raise_warnings and (
self.source.get_report().warnings or self.sink.get_report().warnings
):
raise PipelineExecutionError(
"Source reported warnings", self.source.get_report()
)

def pretty_print_summary(self):
click.echo()
click.secho("Source report:", bold=True)
click.echo(self.source.get_report().as_string())
click.secho("Sink report:", bold=True)
click.echo(self.sink.get_report().as_string())
click.echo()
if self.source.get_report().failures or self.sink.get_report().failures:
click.secho("Pipeline finished with failures", fg="bright_red", bold=True)
elif self.source.get_report().warnings or self.sink.get_report().warnings:
click.secho("Pipeline finished with warnings", fg="yellow", bold=True)
else:
click.secho("Pipeline finished successfully", fg="green", bold=True)
14 changes: 1 addition & 13 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
@@ -1,7 +1,7 @@
import logging
import time
from dataclasses import dataclass, field
from typing import Dict, Iterable, List
from typing import Iterable, List

import confluent_kafka
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient
Expand Down Expand Up @@ -34,23 +34,11 @@ class KafkaSourceConfig(ConfigModel):
@dataclass
class KafkaSourceReport(SourceReport):
topics_scanned = 0
warnings: Dict[str, List[str]] = field(default_factory=dict)
failures: Dict[str, List[str]] = field(default_factory=dict)
filtered: List[str] = field(default_factory=list)

def report_topic_scanned(self, topic: str) -> None:
self.topics_scanned += 1

def report_warning(self, topic: str, reason: str) -> None:
if topic not in self.warnings:
self.warnings[topic] = []
self.warnings[topic].append(reason)

def report_failure(self, topic: str, reason: str) -> None:
if topic not in self.failures:
self.failures[topic] = []
self.failures[topic].append(reason)

def report_dropped(self, topic: str) -> None:
self.filtered.append(topic)

Expand Down
Expand Up @@ -2,7 +2,7 @@
import time
from abc import abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from typing import Any, List, Optional

from pydantic import BaseModel
from sqlalchemy import create_engine, types
Expand Down Expand Up @@ -34,12 +34,6 @@
class SQLSourceReport(SourceReport):
tables_scanned = 0
filtered: List[str] = field(default_factory=list)
warnings: Dict[str, List[str]] = field(default_factory=dict)

def report_warning(self, table_name: str, reason: str) -> None:
if table_name not in self.warnings:
self.warnings[table_name] = []
self.warnings[table_name].append(reason)

def report_table_scanned(self, table_name: str) -> None:
self.tables_scanned += 1
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/tests/unit/serde/test_serde.py
Expand Up @@ -19,6 +19,7 @@ def test_serde_large(pytestconfig, tmp_path):
}
)
pipeline.run()
pipeline.raise_from_status()

output = mce_helpers.load_json_file(tmp_path / output_filename)
golden = mce_helpers.load_json_file(golden_file)
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/tests/unit/test_pipeline.py
Expand Up @@ -15,5 +15,6 @@ def test_configure(self, mock_sink, mock_source):
}
)
pipeline.run()
pipeline.raise_from_status()
mock_source.assert_called_once()
mock_sink.assert_called_once()

0 comments on commit dfe00bf

Please sign in to comment.