From 36fa5329c13b2610983709a0f6a67e6498f92c75 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 23 Jul 2024 22:55:54 -0700 Subject: [PATCH] improve stream-level progress prints --- airbyte/progress.py | 27 +++++++++++++-------- examples/run_sync_to_destination_w_cache.py | 2 +- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/airbyte/progress.py b/airbyte/progress.py index 9e72e72..95b26b5 100644 --- a/airbyte/progress.py +++ b/airbyte/progress.py @@ -185,7 +185,7 @@ def __init__( self.finalize_end_time: float | None = None self.total_records_finalized = 0 self.total_batches_finalized = 0 - self.finalized_stream_names: set[str] = set() + self.finalized_stream_names: list[str] = [] # Destination stream writes self.destination_stream_records_delivered: dict[str, int] = defaultdict(int) @@ -564,8 +564,9 @@ def log_cache_processing_complete(self) -> None: def log_stream_finalized(self, stream_name: str) -> None: """Log that a stream has been finalized.""" - self.finalized_stream_names.add(stream_name) - self._update_display(force_refresh=True) + if stream_name not in self.finalized_stream_names: + self.finalized_stream_names.append(stream_name) + self._update_display(force_refresh=True) def _update_display(self, *, force_refresh: bool = False) -> None: """Update the display.""" @@ -629,10 +630,13 @@ def _get_status_message(self) -> str: if len(self.stream_read_counts) <= MAX_ITEMIZED_STREAMS: # noqa: PLR2004 # Magic numbers OK here. for stream_name in self.stream_read_counts: status_message += ( - f" - {stream_name}: {self.stream_read_counts[stream_name]:,} records\n" + f" - {self.stream_read_counts[stream_name]:,} {stream_name}\n" ) else: - status_message += f" - {', '.join(self.stream_read_counts.keys())}\n" + status_message += " - " + ", ".join( + f"{self.stream_read_counts[stream_name]:,} {stream_name}" + for stream_name in self.stream_read_counts + ) # Source cache writes if self.total_records_written > 0: @@ -660,10 +664,10 @@ def _get_status_message(self) -> str: status_message += ( f"- Completed cache processing for {len(self.finalized_stream_names)} " + (f"out of {self.num_streams_expected} " if self.num_streams_expected else "") - + "streams:\n\n" + + "streams: " + + ", ".join(self.finalized_stream_names) + + "\n\n" ) - for stream_name in self.finalized_stream_names: - status_message += f" - {stream_name}\n" if self.finalize_end_time is not None: completion_time_str = _to_time_str(self.finalize_end_time) @@ -685,12 +689,15 @@ def _get_status_message(self) -> str: ) status_message += "- Stream records delivered:\n\n" for stream_name, record_count in self.destination_stream_records_delivered.items(): - status_message += f" - {stream_name}: {record_count:,} records\n" + status_message += f" - {record_count:,} {stream_name}\n" status_message += "\n" if self.end_time is not None: - status_message += f"\n\n**Total time elapsed: {self.total_time_elapsed_str}**\n\n" + status_message += ( + f"\n\n**Sync completed at `{_to_time_str(self.end_time)}`. " + f"Total time elapsed: {self.total_time_elapsed_str}**\n\n" + ) status_message += HORIZONTAL_LINE diff --git a/examples/run_sync_to_destination_w_cache.py b/examples/run_sync_to_destination_w_cache.py index 0c5f5e4..0bd905f 100644 --- a/examples/run_sync_to_destination_w_cache.py +++ b/examples/run_sync_to_destination_w_cache.py @@ -35,7 +35,7 @@ def get_my_source() -> Source: "parallelism": 16, }, install_if_missing=False, - streams=["purchases"], + streams="*", ) def get_my_destination() -> Destination: