diff --git a/CHANGELOG.md b/CHANGELOG.md index e3a77fa..aff7ef7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +## 0.1.1 + +Released on September 2, 2022. + +### Added + +- a `timeout` parameter to `trigger_sync` and `export_configuration` passed to `httpx.AsyncClient` + +### Fixed + - Using `asyncio.sleep` instead of `time.sleep` within `trigger_sync` task. ### Security diff --git a/README.md b/README.md index f76d7f6..406cfe1 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ ## Welcome! -`prefect-airbyte` is a collections of prebuilt Prefect tasks that can be used to quickly construct Prefect flows. +`prefect-airbyte` is a collection of prebuilt Prefect tasks that can be used to quickly construct Prefect flows to trigger Airbyte syncs or export your connector configurations. ## Getting Started diff --git a/prefect_airbyte/client.py b/prefect_airbyte/client.py index 7a7c702..f68e339 100644 --- a/prefect_airbyte/client.py +++ b/prefect_airbyte/client.py @@ -30,30 +30,31 @@ def __init__( self, logger: logging.Logger, airbyte_base_url: str = "http://localhost:8000/api/v1", + timeout: int = 5, ) -> None: """ `AirbyteClient` constructor Args: logger: for client use, e.g. `prefect.logging.loggers.get_logger` - airbyte_base_url: Full Airbyte API endpoint. + airbyte_base_url: Full Airbyte API endpoint + timeout: seconds for httpx client timeout Returns: AirbyteClient: an instance of the `AirbyteClient` class """ self.airbyte_base_url = airbyte_base_url self.logger = logger + self.timeout = timeout async def _establish_session(self) -> httpx.AsyncClient: """ AirbyteClient method to `check_health_status` and establish a `client` session - Args: - Returns: client: `httpx.AsyncClient` used to communicate with the Airbyte API """ - client = httpx.AsyncClient() + client = httpx.AsyncClient(timeout=self.timeout) if await self.check_health_status(client): return client else: @@ -88,7 +89,7 @@ async def create_client(self) -> httpx.AsyncClient: Convenience method for establishing a healthy `httpx` Airbyte client Args: - + timeout: `int` seconds for request timeout with this client Returns: httpx.AsyncClient: client for interacting with Airbyte instance """ diff --git a/prefect_airbyte/configuration.py b/prefect_airbyte/configuration.py index 59b3e5a..3d820ed 100644 --- a/prefect_airbyte/configuration.py +++ b/prefect_airbyte/configuration.py @@ -10,6 +10,7 @@ async def export_configuration( airbyte_server_host: str = "localhost", airbyte_server_port: int = "8000", airbyte_api_version: str = "v1", + timeout: int = 5, ) -> bytearray: """ @@ -23,6 +24,7 @@ async def export_configuration( airbyte_api_version (str, optional): Version of Airbyte API to use to trigger connection sync, will overwrite the value provided at init if provided. + timeout (int): timeout in seconds on the httpx AirbyteClient Returns: bytearray: `bytearray` containing Airbyte configuration @@ -39,8 +41,8 @@ async def export_configuration( @task def zip_and_write_somewhere( - airbyte_config: bytearray - somwhere: str = 'my_destination.gz',' + airbyte_configuration: bytearray + somewhere: str = 'my_destination.gz',' ): with gzip.open('my_destination.gz', 'wb') as f: f.write(airbyte_configuration) @@ -69,7 +71,7 @@ def example_export_configuration_flow(): f"{airbyte_server_port}/api/{airbyte_api_version}" ) - airbyte = AirbyteClient(logger, airbyte_base_url) + airbyte = AirbyteClient(logger, airbyte_base_url, timeout=timeout) logger.info("Initiating export of Airbyte configuration") airbyte_config = await airbyte.export_configuration() diff --git a/prefect_airbyte/connections.py b/prefect_airbyte/connections.py index 60d253f..07913cc 100644 --- a/prefect_airbyte/connections.py +++ b/prefect_airbyte/connections.py @@ -27,6 +27,7 @@ async def trigger_sync( connection_id: str = None, poll_interval_s: int = 15, status_updates: bool = False, + timeout: int = 5, ) -> dict: """ Task run method for triggering an Airbyte Connection. @@ -45,17 +46,17 @@ async def trigger_sync( Args: str airbyte_server_host : Hostname of Airbyte server where connection is - configured. Will overwrite the value provided at init if provided. + configured. str airbyte_server_port: Port that the Airbyte server is listening on. - Will overwrite the value provided at init if provided. + str airbyte_api_version: Version of Airbyte API to use to trigger connection - sync. Will overwrite the value provided at init if provided. - str connection_id: if provided, - will overwrite the value provided at init. - int poll_interval_s: this task polls the - Airbyte API for status, if provided this value will + sync. + str connection_id: the Airbyte connection ID + int poll_interval_s: how often to poll the + Airbyte API for sync status, if provided this will override the default polling time of 15 seconds. bool status_updates: whether to log status as the task polls jobs + str timeout: The request `timeout` for the `httpx.AsyncClient` Returns: dict: connection_id (str) and succeeded_at (timestamp str) @@ -105,7 +106,7 @@ def example_trigger_sync_flow(): f"{airbyte_server_port}/api/{airbyte_api_version}" ) - airbyte = AirbyteClient(logger, airbyte_base_url) + airbyte = AirbyteClient(logger, airbyte_base_url, timeout=timeout) logger.info( f"Getting Airbyte Connection {connection_id}, poll interval " @@ -147,9 +148,12 @@ def example_trigger_sync_flow(): "job_updated_at": job_updated_at, } elif connection_status == CONNECTION_STATUS_INACTIVE: - logger.error(f"Please enable the Connection {connection_id} in Airbyte Server.") + logger.error( + f"Connection: {connection_id} is inactive" + " - you'll need to enable it in your Airbyte instance" + ) raise err.AirbyteConnectionInactiveException( - f"Please enable the Connection {connection_id} in Airbyte Server." + f"Please enable the Connection {connection_id} in Airbyte instance." ) elif connection_status == CONNECTION_STATUS_DEPRECATED: logger.error(f"Connection {connection_id} is deprecated.") diff --git a/tests/__pycache__/conftest.cpython-38-pytest-7.1.1.pyc b/tests/__pycache__/conftest.cpython-38-pytest-7.1.1.pyc deleted file mode 100644 index 20e9b59..0000000 Binary files a/tests/__pycache__/conftest.cpython-38-pytest-7.1.1.pyc and /dev/null differ diff --git a/tests/__pycache__/test_messages.cpython-38-pytest-7.1.1.pyc b/tests/__pycache__/test_messages.cpython-38-pytest-7.1.1.pyc deleted file mode 100644 index 78aa3a1..0000000 Binary files a/tests/__pycache__/test_messages.cpython-38-pytest-7.1.1.pyc and /dev/null differ