Skip to content

Commit

Permalink
AirbyteLib: Fix no-such-table-error (#35311)
Browse files Browse the repository at this point in the history
Co-authored-by: Bindi Pankhudi <bindi@airbyte.com>
Co-authored-by: Aaron Steers <aj@airbyte.io>
  • Loading branch information
3 people committed Feb 15, 2024
1 parent b05c490 commit 48e933b
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 246 deletions.
17 changes: 13 additions & 4 deletions airbyte-lib/airbyte_lib/_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@


DEFAULT_BATCH_SIZE = 10_000
DEBUG_MODE = False # Set to True to enable additional debug logging.


class BatchHandle:
Expand All @@ -60,6 +61,7 @@ class RecordProcessor(abc.ABC):

config_class: type[CacheConfigBase]
skip_finalize_step: bool = False
_expected_streams: set[str]

def __init__(
self,
Expand Down Expand Up @@ -109,6 +111,7 @@ def register_source(
incoming_source_catalog=incoming_source_catalog,
incoming_stream_names=stream_names,
)
self._expected_streams = stream_names

@property
def _streams_with_data(self) -> set[str]:
Expand Down Expand Up @@ -203,12 +206,18 @@ def process_airbyte_messages(
# Type.LOG, Type.TRACE, Type.CONTROL, etc.
pass

# Add empty streams to the dictionary, so we create a destination table for it
for stream_name in self._expected_streams:
if stream_name not in stream_batches:
if DEBUG_MODE:
print(f"Stream {stream_name} has no data")
stream_batches[stream_name] = []

# We are at the end of the stream. Process whatever else is queued.
for stream_name, stream_batch in stream_batches.items():
if stream_batch:
record_batch = pa.Table.from_pylist(stream_batch)
self._process_batch(stream_name, record_batch)
progress.log_batch_written(stream_name, len(stream_batch))
record_batch = pa.Table.from_pylist(stream_batch)
self._process_batch(stream_name, record_batch)
progress.log_batch_written(stream_name, len(stream_batch))

# Finalize any pending batches
for stream_name in list(self._pending_batches.keys()):
Expand Down
4 changes: 2 additions & 2 deletions airbyte-lib/examples/run_github.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
source = ab.get_source("source-github")
source.set_config(
{
"repositories": ["airbytehq/quickstarts"],
"repositories": ["airbytehq/airbyte-lib-private-beta"],
"credentials": {"personal_access_token": GITHUB_TOKEN},
}
)
source.check()
source.select_streams(["issues", "pull_requests", "commits", "collaborators"])
source.select_streams(["issues", "pull_requests", "commits", "collaborators", "deployments"])

result = source.read(cache=ab.new_local_cache("github"))
print(result.processed_records)
Expand Down
Loading

0 comments on commit 48e933b

Please sign in to comment.