Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AirbyteLib: add null cache and null writer #34587

Closed
wants to merge 60 commits into from

Conversation

aaronsteers
Copy link
Collaborator

@aaronsteers aaronsteers commented Jan 28, 2024

Note:

This should merge after:

This is born out of a desire to get a better understanding of performance bottlenecks. This PR add a "NullCache" which does nothing at all with incoming records.

The goal with this is to be able to get performance benchmarks on a source connector, with little or no slowdown from AirbyteLib, the file writer, and/or the SQL cache.

@aaronsteers aaronsteers added the airbyte-lib Related to AirbyteLib label Jan 28, 2024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should a _null_writers packages be introduced instead of adding this write to _file_writers?

class NullWriter(FileWriterBase):
"""A Null (no-op) file writer implementation."""

config_class = NullWriterConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config_class: Final[Classvar] = NullWriterConfig

Comment on lines +36 to +37
stream_name: str,
batch_id: str | None = None, # ULID of the batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If params are not used can you just pass, *args, **kwargs?

) -> FileWriterBatchHandle:
"""Process a record batch.

Return the path to the cache file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not returning a cache file but a FileWriteBatchHandle if i'm not mistaken

Return the path to the cache file.
"""
_ = batch_id, record_batch # unused
output_file_path = self.get_new_cache_file_path(stream_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the call to get_new_cache_file_path necessary there as its returning a dummy object?


batch_handle = FileWriterBatchHandle()
batch_handle.files.append(output_file_path)
return batch_handle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I get the purpose of returning a mutated FileWriterBatchHandle as its content will be dummy path? Could we create a NullWriterBatchHandle` instead?


def _table_exists(self, table_name: str) -> bool:
"""Check if a table exists."""
_ = table_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you assigning here to _ (and in other parts) if the parameters values are not used?

Comment on lines +49 to +53
"""A DuckDB implementation of the cache.

Parquet is used for local file storage before bulk loading.
Unlike the Snowflake implementation, we can't use the COPY command to load data
so we insert as values instead.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this docstring is not up to date

def _execute_sql(self, sql: str) -> None:
"""Execute SQL."""
_ = sql
# Do nothing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd appreciate an explicit "return None"

pass

@overrides
def _write_files_to_new_table(self, files: list[Path], stream_name: str, batch_id: str) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please change the type hint if none is returned here

@flash1293
Copy link
Contributor

flash1293 commented Jan 29, 2024

Based on the stated goal of this PR - the best you should be able to do is to just use list(source.get_records("...")). It's basically

with little or no slowdown from AirbyteLib, the file writer, and/or the SQL cache.

I'm always for taking a chance to not write some code :)

@aaronsteers aaronsteers marked this pull request as draft January 30, 2024 06:17
Base automatically changed from aj/airbyte-lib/progress-print to master January 30, 2024 06:39
@natikgadzhi
Copy link
Contributor

I'm closing because I assume you already made this change in pyairbyte repo. If not, apologies! Seems like a huge lift to rebase and rename anyway.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
airbyte-lib Related to AirbyteLib
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants