Skip to content

Commit

Permalink
D205 Support - Providers: Snowflake to Zendesk (inclusive) (#32359)
Browse files Browse the repository at this point in the history
  • Loading branch information
ferruzzi committed Jul 6, 2023
1 parent 3318212 commit 21e8f87
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 43 deletions.
25 changes: 14 additions & 11 deletions airflow/providers/snowflake/hooks/snowflake_sql_api.py
Expand Up @@ -33,10 +33,10 @@

class SnowflakeSqlApiHook(SnowflakeHook):
"""
A client to interact with Snowflake using SQL API and allows submitting
multiple SQL statements in a single request. In combination with aiohttp, make post request to submit SQL
statements for execution, poll to check the status of the execution of a statement. Fetch query results
asynchronously.
A client to interact with Snowflake using SQL API and submit multiple SQL statements in a single request.
In combination with aiohttp, make post request to submit SQL statements for execution,
poll to check the status of the execution of a statement. Fetch query results asynchronously.
This hook requires the snowflake_conn_id connection. This hooks mainly uses account, schema, database,
warehouse, private_key_file or private_key_content field must be setup in the connection. Other inputs
Expand Down Expand Up @@ -137,7 +137,10 @@ def execute_query(
conn_config = self._get_conn_params()

req_id = uuid.uuid4()
url = f"https://{conn_config['account']}.{conn_config['region']}.snowflakecomputing.com/api/v2/statements"
url = (
f"https://{conn_config['account']}.{conn_config['region']}"
f".snowflakecomputing.com/api/v2/statements"
)
params: dict[str, Any] | None = {"requestId": str(req_id), "async": True, "pageSize": 10}
headers = self.get_headers()
if bindings is None:
Expand Down Expand Up @@ -171,9 +174,7 @@ def execute_query(
return self.query_ids

def get_headers(self) -> dict[str, Any]:
"""Based on the private key, and with connection details JWT Token is generated and header
is formed.
"""
"""Form JWT Token and header based on the private key, and connection details."""
if not self.private_key:
self.get_private_key()
conn_config = self._get_conn_params()
Expand Down Expand Up @@ -206,13 +207,15 @@ def get_request_url_header_params(self, query_id: str) -> tuple[dict[str, Any],
req_id = uuid.uuid4()
header = self.get_headers()
params = {"requestId": str(req_id)}
url = f"https://{conn_config['account']}.{conn_config['region']}.snowflakecomputing.com/api/v2/statements/{query_id}"
url = (
f"https://{conn_config['account']}.{conn_config['region']}"
f".snowflakecomputing.com/api/v2/statements/{query_id}"
)
return header, params, url

def check_query_output(self, query_ids: list[str]) -> None:
"""
Based on the query ids passed as the parameter make HTTP request to snowflake SQL API and logs
the response.
Make HTTP request to snowflake SQL API based on the provided query ids and log the response.
:param query_ids: statement handles query id for the individual statements.
"""
Expand Down
20 changes: 10 additions & 10 deletions airflow/providers/snowflake/operators/snowflake.py
Expand Up @@ -135,10 +135,11 @@ def _process_output(self, results: list[Any], descriptions: list[Sequence[Sequen

class SnowflakeCheckOperator(SQLCheckOperator):
"""
Performs a check against Snowflake. The ``SnowflakeCheckOperator`` expects
a sql query that will return a single row. Each value on that
first row is evaluated using python ``bool`` casting. If any of the
values return ``False`` the check is failed and errors out.
Performs a check against Snowflake.
The ``SnowflakeCheckOperator`` expects a sql query that will return a single row. Each
value on that first row is evaluated using python ``bool`` casting. If any of the values
return ``False`` the check is failed and errors out.
Note that Python bool casting evals the following as ``False``:
Expand Down Expand Up @@ -225,8 +226,7 @@ def __init__(

class SnowflakeValueCheckOperator(SQLValueCheckOperator):
"""
Performs a simple check using sql code against a specified value, within a
certain level of tolerance.
Performs a simple check using sql code against a specified value, within a certain level of tolerance.
:param sql: the sql to be executed
:param pass_value: the value to check against
Expand Down Expand Up @@ -293,8 +293,7 @@ def __init__(

class SnowflakeIntervalCheckOperator(SQLIntervalCheckOperator):
"""
Checks that the values of metrics given as SQL expressions are within
a certain tolerance of the ones from days_back before.
Checks that the metrics given as SQL expressions are within tolerance of the ones from days_back before.
This method constructs a query like so ::
Expand Down Expand Up @@ -479,6 +478,7 @@ def __init__(
def execute(self, context: Context) -> None:
"""
Make a POST API request to snowflake by using SnowflakeSQL and execute the query to get the ids.
By deferring the SnowflakeSqlApiTrigger class passed along with query ids.
"""
self.log.info("Executing: %s", self.sql)
Expand Down Expand Up @@ -539,8 +539,8 @@ def poll_on_queries(self):
def execute_complete(self, context: Context, event: dict[str, str | list[str]] | None = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
if event:
if "status" in event and event["status"] == "error":
Expand Down
5 changes: 1 addition & 4 deletions airflow/providers/snowflake/transfers/copy_into_snowflake.py
Expand Up @@ -15,10 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains abstract operator that child classes
implement "COPY INTO <TABLE> SQL in Snowflake".
"""
"""Abstract operator that child classes implement ``COPY INTO <TABLE> SQL in Snowflake``."""
from __future__ import annotations

from typing import Any, Sequence
Expand Down
14 changes: 8 additions & 6 deletions airflow/providers/snowflake/transfers/snowflake_to_slack.py
Expand Up @@ -25,12 +25,14 @@

class SnowflakeToSlackOperator(SqlToSlackOperator):
"""
Executes an SQL statement in Snowflake and sends the results to Slack. The results of the query are
rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{
results_df }}'. The 'results_df' variable name can be changed by specifying a different
'results_df_name' parameter. The Tabulate library is added to the JINJA environment as a filter to
allow the dataframe to be rendered nicely. For example, set 'slack_message' to {{ results_df |
tabulate(tablefmt="pretty", headers="keys") }} to send the results to Slack as an ascii rendered table.
Executes an SQL statement in Snowflake and sends the results to Slack.
The results of the query are rendered into the 'slack_message' parameter as a Pandas dataframe
using a Jinja variable called '{{ results_df }}'. The 'results_df' variable name can be changed
by specifying a different 'results_df_name' parameter. The Tabulate library is added to the
Jinja environment as a filter to allow the dataframe to be rendered nicely. For example, set
'slack_message' to {{ results_df | tabulate(tablefmt="pretty", headers="keys") }} to send the
results to Slack as an ASCII rendered table.
.. seealso::
For more information on how to use this operator, take a look at the guide:
Expand Down
6 changes: 1 addition & 5 deletions airflow/providers/snowflake/triggers/snowflake_trigger.py
Expand Up @@ -93,11 +93,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent({"status": "error", "message": str(e)})

async def get_query_status(self, query_id: str) -> dict[str, Any]:
"""
Async function to check whether the query statement submitted via SQL API is still
running state and returns True if it is still running else
return False.
"""
"""Return True if the SQL query is still running otherwise return False."""
hook = SnowflakeSqlApiHook(
self.snowflake_conn_id,
self.token_life_time,
Expand Down
6 changes: 5 additions & 1 deletion airflow/providers/snowflake/utils/sql_api_generate_jwt.py
Expand Up @@ -42,6 +42,7 @@
class JWTGenerator:
"""
Creates and signs a JWT with the specified private key file, username, and account identifier.
The JWTGenerator keeps the generated token and only regenerates the token if a specified period
of time has passed.
Expand Down Expand Up @@ -92,6 +93,7 @@ def __init__(
def prepare_account_name_for_jwt(self, raw_account: str) -> str:
"""
Prepare the account identifier for use in the JWT.
For the JWT, the account identifier must not include the subdomain or any region or cloud provider
information.
Expand All @@ -113,7 +115,9 @@ def prepare_account_name_for_jwt(self, raw_account: str) -> str:

def get_token(self) -> str | None:
"""
Generates a new JWT. If a JWT has been already been generated earlier, return the previously
Generates a new JWT.
If a JWT has been already been generated earlier, return the previously
generated token unless the specified renewal time has passed.
"""
now = datetime.now(timezone.utc) # Fetch the current time
Expand Down
3 changes: 1 addition & 2 deletions airflow/providers/tableau/hooks/tableau.py
Expand Up @@ -162,8 +162,7 @@ def get_job_status(self, job_id: str) -> TableauJobFinishCode:

def wait_for_state(self, job_id: str, target_state: TableauJobFinishCode, check_interval: float) -> bool:
"""
Wait until the current state of a defined Tableau Job is equal
to target_state or different from PENDING.
Wait until the current state of a defined Tableau Job is target_state or different from PENDING.
:param job_id: The id of the job to check.
:param target_state: Enum that describe the Tableau job's target state
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/tableau/operators/tableau.py
Expand Up @@ -95,6 +95,7 @@ def __init__(
def execute(self, context: Context) -> str:
"""
Executes the Tableau API resource and pushes the job id or downloaded file URI to xcom.
:param context: The task context during execution.
:return: the id of the job that executes the extract refresh or downloaded file URI.
"""
Expand Down
6 changes: 4 additions & 2 deletions airflow/providers/tabular/hooks/tabular.py
Expand Up @@ -30,8 +30,10 @@

class TabularHook(BaseHook):
"""
This hook acts as a base hook for tabular services. It offers the ability to generate temporary,
short-lived session tokens to use within Airflow submitted jobs.
This hook acts as a base hook for tabular services.
It offers the ability to generate temporary, short-lived
session tokens to use within Airflow submitted jobs.
:param tabular_conn_id: The :ref:`Tabular connection id<howto/connection:tabular>`
which refers to the information to connect to the Tabular OAuth service.
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/telegram/operators/telegram.py
Expand Up @@ -31,6 +31,7 @@
class TelegramOperator(BaseOperator):
"""
This operator allows you to post messages to Telegram using Telegram Bot API.
Takes both Telegram Bot API token directly or connection that has Telegram token in password field.
If both supplied, token parameter will be given precedence.
Expand Down
3 changes: 1 addition & 2 deletions airflow/providers/trino/hooks/trino.py
Expand Up @@ -240,8 +240,7 @@ def insert_rows(
@staticmethod
def _serialize_cell(cell: Any, conn: Connection | None = None) -> Any:
"""
Trino will adapt all arguments to the execute() method internally,
hence we return cell without any conversion.
Trino will adapt all execute() args internally, hence we return cell without any conversion.
:param cell: The cell to insert into the table
:param conn: The database connection
Expand Down

0 comments on commit 21e8f87

Please sign in to comment.