Skip to content

Commit

Permalink
Merge pull request #18 from PrefectHQ/httpx-timeout
Browse files Browse the repository at this point in the history
add parameter to `trigger_sync` and `export_configuration` for `httpx.AsyncClient` timeout
  • Loading branch information
zzstoatzz committed Sep 2, 2022
2 parents ea9e7e1 + 58ec73b commit 163e35b
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 19 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

## 0.1.1

Released on September 2, 2022.

### Added

- a `timeout` parameter to `trigger_sync` and `export_configuration` passed to `httpx.AsyncClient`

### Fixed

- Using `asyncio.sleep` instead of `time.sleep` within `trigger_sync` task.

### Security
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Welcome!
<!-- &emsp; <img src="imgs/airbyte.png" width="40" height="55" /> -->

`prefect-airbyte` is a collections of prebuilt Prefect tasks that can be used to quickly construct Prefect flows.
`prefect-airbyte` is a collection of prebuilt Prefect tasks that can be used to quickly construct Prefect flows to trigger Airbyte syncs or export your connector configurations.

## Getting Started

Expand Down
11 changes: 6 additions & 5 deletions prefect_airbyte/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,31 @@ def __init__(
self,
logger: logging.Logger,
airbyte_base_url: str = "http://localhost:8000/api/v1",
timeout: int = 5,
) -> None:
"""
`AirbyteClient` constructor
Args:
logger: for client use, e.g. `prefect.logging.loggers.get_logger`
airbyte_base_url: Full Airbyte API endpoint.
airbyte_base_url: Full Airbyte API endpoint
timeout: seconds for httpx client timeout
Returns:
AirbyteClient: an instance of the `AirbyteClient` class
"""
self.airbyte_base_url = airbyte_base_url
self.logger = logger
self.timeout = timeout

async def _establish_session(self) -> httpx.AsyncClient:
"""
AirbyteClient method to `check_health_status` and establish a `client` session
Args:
Returns:
client: `httpx.AsyncClient` used to communicate with the Airbyte API
"""
client = httpx.AsyncClient()
client = httpx.AsyncClient(timeout=self.timeout)
if await self.check_health_status(client):
return client
else:
Expand Down Expand Up @@ -88,7 +89,7 @@ async def create_client(self) -> httpx.AsyncClient:
Convenience method for establishing a healthy `httpx` Airbyte client
Args:
timeout: `int` seconds for request timeout with this client
Returns:
httpx.AsyncClient: client for interacting with Airbyte instance
"""
Expand Down
8 changes: 5 additions & 3 deletions prefect_airbyte/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ async def export_configuration(
airbyte_server_host: str = "localhost",
airbyte_server_port: int = "8000",
airbyte_api_version: str = "v1",
timeout: int = 5,
) -> bytearray:

"""
Expand All @@ -23,6 +24,7 @@ async def export_configuration(
airbyte_api_version (str, optional): Version of Airbyte API to use to
trigger connection sync, will overwrite the value provided at
init if provided.
timeout (int): timeout in seconds on the httpx AirbyteClient
Returns:
bytearray: `bytearray` containing Airbyte configuration
Expand All @@ -39,8 +41,8 @@ async def export_configuration(
@task
def zip_and_write_somewhere(
airbyte_config: bytearray
somwhere: str = 'my_destination.gz','
airbyte_configuration: bytearray
somewhere: str = 'my_destination.gz','
):
with gzip.open('my_destination.gz', 'wb') as f:
f.write(airbyte_configuration)
Expand Down Expand Up @@ -69,7 +71,7 @@ def example_export_configuration_flow():
f"{airbyte_server_port}/api/{airbyte_api_version}"
)

airbyte = AirbyteClient(logger, airbyte_base_url)
airbyte = AirbyteClient(logger, airbyte_base_url, timeout=timeout)

logger.info("Initiating export of Airbyte configuration")
airbyte_config = await airbyte.export_configuration()
Expand Down
24 changes: 14 additions & 10 deletions prefect_airbyte/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async def trigger_sync(
connection_id: str = None,
poll_interval_s: int = 15,
status_updates: bool = False,
timeout: int = 5,
) -> dict:
"""
Task run method for triggering an Airbyte Connection.
Expand All @@ -45,17 +46,17 @@ async def trigger_sync(
Args:
str airbyte_server_host : Hostname of Airbyte server where connection is
configured. Will overwrite the value provided at init if provided.
configured.
str airbyte_server_port: Port that the Airbyte server is listening on.
Will overwrite the value provided at init if provided.
str airbyte_api_version: Version of Airbyte API to use to trigger connection
sync. Will overwrite the value provided at init if provided.
str connection_id: if provided,
will overwrite the value provided at init.
int poll_interval_s: this task polls the
Airbyte API for status, if provided this value will
sync.
str connection_id: the Airbyte connection ID
int poll_interval_s: how often to poll the
Airbyte API for sync status, if provided this will
override the default polling time of 15 seconds.
bool status_updates: whether to log status as the task polls jobs
str timeout: The request `timeout` for the `httpx.AsyncClient`
Returns:
dict: connection_id (str) and succeeded_at (timestamp str)
Expand Down Expand Up @@ -105,7 +106,7 @@ def example_trigger_sync_flow():
f"{airbyte_server_port}/api/{airbyte_api_version}"
)

airbyte = AirbyteClient(logger, airbyte_base_url)
airbyte = AirbyteClient(logger, airbyte_base_url, timeout=timeout)

logger.info(
f"Getting Airbyte Connection {connection_id}, poll interval "
Expand Down Expand Up @@ -147,9 +148,12 @@ def example_trigger_sync_flow():
"job_updated_at": job_updated_at,
}
elif connection_status == CONNECTION_STATUS_INACTIVE:
logger.error(f"Please enable the Connection {connection_id} in Airbyte Server.")
logger.error(
f"Connection: {connection_id} is inactive"
" - you'll need to enable it in your Airbyte instance"
)
raise err.AirbyteConnectionInactiveException(
f"Please enable the Connection {connection_id} in Airbyte Server."
f"Please enable the Connection {connection_id} in Airbyte instance."
)
elif connection_status == CONNECTION_STATUS_DEPRECATED:
logger.error(f"Connection {connection_id} is deprecated.")
Expand Down
Binary file removed tests/__pycache__/conftest.cpython-38-pytest-7.1.1.pyc
Binary file not shown.
Binary file not shown.

0 comments on commit 163e35b

Please sign in to comment.