Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ability for Snowflake to attribute usage to Airflow by adding an application parameter #16420

Merged
merged 8 commits into from Jun 15, 2021
2 changes: 2 additions & 0 deletions airflow/providers/snowflake/hooks/snowflake.py
Expand Up @@ -143,6 +143,7 @@ def __init__(self, *args, **kwargs) -> None:
self.schema = kwargs.pop("schema", None)
self.authenticator = kwargs.pop("authenticator", None)
self.session_parameters = kwargs.pop("session_parameters", None)
self.application = kwargs.pop("application", None)
self.query_ids = []

def _get_conn_params(self) -> Dict[str, Optional[str]]:
Expand Down Expand Up @@ -179,6 +180,7 @@ def _get_conn_params(self) -> Dict[str, Optional[str]]:
"role": self.role or role,
"authenticator": self.authenticator or authenticator,
"session_parameters": self.session_parameters or session_parameters,
"application": self.application,
}

# If private_key_file is specified in the extra json, load the contents of the file as a private
Expand Down
5 changes: 5 additions & 0 deletions airflow/providers/snowflake/operators/snowflake.py
Expand Up @@ -64,6 +64,8 @@ class SnowflakeOperator(BaseOperator):
:param session_parameters: You can set session-level parameters at
the time you connect to Snowflake
:type session_parameters: dict
:param application: allows snowflake to attribute usage to airflow
:type application: str
"""

template_fields = ('sql',)
Expand All @@ -83,6 +85,7 @@ def __init__(
schema: Optional[str] = None,
authenticator: Optional[str] = None,
session_parameters: Optional[dict] = None,
application: Optional[str] = 'AIRFLOW',
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -97,6 +100,7 @@ def __init__(
self.authenticator = authenticator
self.session_parameters = session_parameters
self.query_ids = []
self.application = application

def get_hook(self) -> SnowflakeHook:
"""
Expand All @@ -112,6 +116,7 @@ def get_hook(self) -> SnowflakeHook:
schema=self.schema,
authenticator=self.authenticator,
session_parameters=self.session_parameters,
application=self.application,
)

def execute(self, context: Any) -> None:
Expand Down