Skip to content

Commit

Permalink
improve stream-level progress prints
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Jul 24, 2024
1 parent c639b33 commit 36fa532
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
27 changes: 17 additions & 10 deletions airbyte/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def __init__(
self.finalize_end_time: float | None = None
self.total_records_finalized = 0
self.total_batches_finalized = 0
self.finalized_stream_names: set[str] = set()
self.finalized_stream_names: list[str] = []

# Destination stream writes
self.destination_stream_records_delivered: dict[str, int] = defaultdict(int)
Expand Down Expand Up @@ -564,8 +564,9 @@ def log_cache_processing_complete(self) -> None:

def log_stream_finalized(self, stream_name: str) -> None:
"""Log that a stream has been finalized."""
self.finalized_stream_names.add(stream_name)
self._update_display(force_refresh=True)
if stream_name not in self.finalized_stream_names:
self.finalized_stream_names.append(stream_name)
self._update_display(force_refresh=True)

def _update_display(self, *, force_refresh: bool = False) -> None:
"""Update the display."""
Expand Down Expand Up @@ -629,10 +630,13 @@ def _get_status_message(self) -> str:
if len(self.stream_read_counts) <= MAX_ITEMIZED_STREAMS: # noqa: PLR2004 # Magic numbers OK here.
for stream_name in self.stream_read_counts:
status_message += (
f" - {stream_name}: {self.stream_read_counts[stream_name]:,} records\n"
f" - {self.stream_read_counts[stream_name]:,} {stream_name}\n"
)
else:
status_message += f" - {', '.join(self.stream_read_counts.keys())}\n"
status_message += " - " + ", ".join(
f"{self.stream_read_counts[stream_name]:,} {stream_name}"
for stream_name in self.stream_read_counts
)

# Source cache writes
if self.total_records_written > 0:
Expand Down Expand Up @@ -660,10 +664,10 @@ def _get_status_message(self) -> str:
status_message += (
f"- Completed cache processing for {len(self.finalized_stream_names)} "
+ (f"out of {self.num_streams_expected} " if self.num_streams_expected else "")
+ "streams:\n\n"
+ "streams: "
+ ", ".join(self.finalized_stream_names)
+ "\n\n"
)
for stream_name in self.finalized_stream_names:
status_message += f" - {stream_name}\n"

if self.finalize_end_time is not None:
completion_time_str = _to_time_str(self.finalize_end_time)
Expand All @@ -685,12 +689,15 @@ def _get_status_message(self) -> str:
)
status_message += "- Stream records delivered:\n\n"
for stream_name, record_count in self.destination_stream_records_delivered.items():
status_message += f" - {stream_name}: {record_count:,} records\n"
status_message += f" - {record_count:,} {stream_name}\n"

status_message += "\n"

if self.end_time is not None:
status_message += f"\n\n**Total time elapsed: {self.total_time_elapsed_str}**\n\n"
status_message += (
f"\n\n**Sync completed at `{_to_time_str(self.end_time)}`. "
f"Total time elapsed: {self.total_time_elapsed_str}**\n\n"
)

status_message += HORIZONTAL_LINE

Expand Down
2 changes: 1 addition & 1 deletion examples/run_sync_to_destination_w_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def get_my_source() -> Source:
"parallelism": 16,
},
install_if_missing=False,
streams=["purchases"],
streams="*",
)

def get_my_destination() -> Destination:
Expand Down

0 comments on commit 36fa532

Please sign in to comment.