Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

add poll_frequency_seconds to snowflake queries (#28) #29

Merged
merged 4 commits into from Aug 22, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 8 additions & 2 deletions prefect_snowflake/database.py
Expand Up @@ -116,6 +116,7 @@ async def snowflake_query(
snowflake_connector: SnowflakeConnector,
params: Union[Tuple[Any], Dict[str, Any]] = None,
cursor_type: SnowflakeCursor = SnowflakeCursor,
poll_frequency_seconds: int = 1,
) -> List[Tuple[Any]]:
"""
Executes a query against a Snowflake database.
Expand All @@ -125,6 +126,8 @@ async def snowflake_query(
params: The params to replace the placeholders in the query.
snowflake_connector: The credentials to use to authenticate.
cursor_type: The type of database cursor to use for the query.
poll_frequency_seconds: Number of seconds to wait in between checks for
run completion.

Returns:
The output of `response.fetchall()`.
Expand Down Expand Up @@ -168,7 +171,7 @@ def snowflake_query_flow():
while connection.is_still_running(
connection.get_query_status_throw_if_error(query_id)
):
await asyncio.sleep(0.05)
await asyncio.sleep(poll_frequency_seconds)
cursor.get_results_from_sfqid(query_id)
result = cursor.fetchall()
return result
Expand All @@ -182,6 +185,7 @@ async def snowflake_multiquery(
cursor_type: SnowflakeCursor = SnowflakeCursor,
as_transaction: bool = False,
return_transaction_control_results: bool = False,
poll_frequency_seconds: int = 1,
) -> List[List[Tuple[Any]]]:
"""
Executes multiple queries against a Snowflake database in a shared session.
Expand All @@ -195,6 +199,8 @@ async def snowflake_multiquery(
as_transaction: If True, queries are executed in a transaction.
return_transaction_control_results: Determines if the results of queries
controlling the transaction (BEGIN/COMMIT) should be returned.
poll_frequency_seconds: Number of seconds to wait in between checks for
run completion.

Returns:
List of the outputs of `response.fetchall()` for each query.
Expand Down Expand Up @@ -244,7 +250,7 @@ def snowflake_multiquery_flow():
while connection.is_still_running(
connection.get_query_status_throw_if_error(query_id)
):
await asyncio.sleep(0.05)
await asyncio.sleep(poll_frequency_seconds)
cursor.get_results_from_sfqid(query_id)
result = cursor.fetchall()
results.append(result)
Expand Down