Skip to content

Commit

Permalink
feat(cli): add spinner to indicate progress (#5769)
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Aug 30, 2022
1 parent d558db9 commit 134ca67
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 26 deletions.
2 changes: 2 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def get_long_description():
"aiohttp<4",
"cached_property",
"ijson",
"click-spinner",
}

kafka_common = {
Expand Down Expand Up @@ -335,6 +336,7 @@ def get_long_description():
# avrogen package requires this
"types-pytz",
"types-pyOpenSSL",
"types-click-spinner",
}

base_dev_requirements = {
Expand Down
44 changes: 24 additions & 20 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Optional

import click
import click_spinner
from click_default_group import DefaultGroup
from tabulate import tabulate

Expand Down Expand Up @@ -123,27 +124,30 @@ def run_pipeline_to_completion(
pipeline: Pipeline, structured_report: Optional[str] = None
) -> int:
logger.info("Starting metadata ingestion")
try:
pipeline.run()
except Exception as e:
logger.info(
f"Source ({pipeline.config.source.type}) report:\n{pipeline.source.get_report().as_string()}"
)
logger.info(
f"Sink ({pipeline.config.sink.type}) report:\n{pipeline.sink.get_report().as_string()}"
)
# We dont want to log sensitive information in variables if the pipeline fails due to
# an unexpected error. Disable printing sensitive info to logs if ingestion is running
# with `--suppress-error-logs` flag.
if suppress_error_logs:
raise SensitiveError() from e
with click_spinner.spinner(
beep=False, disable=False, force=False, stream=sys.stdout
):
try:
pipeline.run()
except Exception as e:
logger.info(
f"Source ({pipeline.config.source.type}) report:\n{pipeline.source.get_report().as_string()}"
)
logger.info(
f"Sink ({pipeline.config.sink.type}) report:\n{pipeline.sink.get_report().as_string()}"
)
# We dont want to log sensitive information in variables if the pipeline fails due to
# an unexpected error. Disable printing sensitive info to logs if ingestion is running
# with `--suppress-error-logs` flag.
if suppress_error_logs:
raise SensitiveError() from e
else:
raise e
else:
raise e
else:
logger.info("Finished metadata ingestion")
pipeline.log_ingestion_stats()
ret = pipeline.pretty_print_summary(warnings_as_failure=strict_warnings)
return ret
logger.info("Finished metadata ingestion")
pipeline.log_ingestion_stats()
ret = pipeline.pretty_print_summary(warnings_as_failure=strict_warnings)
return ret

async def run_pipeline_async(pipeline: Pipeline) -> int:
loop = asyncio._get_running_loop()
Expand Down
35 changes: 29 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,17 @@ def _count_all_vals(self, d: Dict[str, List]) -> int:
result += len(val)
return result

def _get_text_color(self, running: bool, failures: bool, warnings: bool) -> str:
if running:
return "cyan"
else:
if failures:
return "bright_red"
elif warnings:
return "bright_yellow"
else:
return "bright_green"

def pretty_print_summary(
self, warnings_as_failure: bool = False, currently_running: bool = False
) -> int:
Expand All @@ -483,30 +494,42 @@ def pretty_print_summary(
click.echo(self.sink.get_report().as_string())
click.echo()
workunits_produced = self.source.get_report().events_produced
duration_message = (
f"in {self.source.get_report().running_time_in_seconds} seconds."
)

if self.source.get_report().failures or self.sink.get_report().failures:
num_failures_source = self._count_all_vals(
self.source.get_report().failures
)
num_failures_sink = len(self.sink.get_report().failures)
click.secho(
f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with {num_failures_source+num_failures_sink} failures {'so far' if currently_running else ''}; produced {workunits_produced} events",
fg="bright_red",
f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with {num_failures_source+num_failures_sink} failures {'so far' if currently_running else ''}; produced {workunits_produced} events {duration_message}",
fg=self._get_text_color(
running=currently_running,
failures=True,
warnings=False,
),
bold=True,
)
return 1
elif self.source.get_report().warnings or self.sink.get_report().warnings:
num_warn_source = self._count_all_vals(self.source.get_report().warnings)
num_warn_sink = len(self.sink.get_report().warnings)
click.secho(
f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with {num_warn_source+num_warn_sink} warnings {'so far' if currently_running else ''}; produced {workunits_produced} events",
fg="yellow",
f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with {num_warn_source+num_warn_sink} warnings {'so far' if currently_running else ''}; produced {workunits_produced} events {duration_message}",
fg=self._get_text_color(
running=currently_running, failures=False, warnings=True
),
bold=True,
)
return 1 if warnings_as_failure else 0
else:
click.secho(
f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} successfully {'so far' if currently_running else ''}; produced {workunits_produced} events",
fg="green",
f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} successfully {'so far' if currently_running else ''}; produced {workunits_produced} events {duration_message}",
fg=self._get_text_color(
running=currently_running, failures=False, warnings=False
),
bold=True,
)
return 0
Expand Down

0 comments on commit 134ca67

Please sign in to comment.