Skip to content

Commit

Permalink
Add connection arguments in S3ToSnowflakeOperator (#12564)
Browse files Browse the repository at this point in the history
* Add connection arguments in S3ToSnowflakeOperator

* delete database

* add database

* indent

Co-authored-by: javier.lopez <javier.lopez@promocionesfarma.com>
  • Loading branch information
JavierLopezT and JavierLTPromofarma committed Jan 16, 2021
1 parent 6fb4f4b commit dbf7511
Showing 1 changed file with 44 additions and 5 deletions.
49 changes: 44 additions & 5 deletions airflow/providers/snowflake/transfers/s3_to_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,37 @@ class S3ToSnowflakeOperator(BaseOperator):
:type s3_keys: list
:param table: reference to a specific table in snowflake database
:type table: str
:param stage: reference to a specific snowflake stage
:param schema: name of schema (will overwrite schema defined in
connection)
:type schema: str
:param stage: reference to a specific snowflake stage. If the stage's schema is not the same as the
table one, it must be specified
:type stage: str
:param file_format: reference to a specific file format
:type file_format: str
:param schema: reference to a specific schema in snowflake database
:type schema: str
:param warehouse: name of warehouse (will overwrite any warehouse
defined in the connection's extra JSON)
:type warehouse: str
:param database: reference to a specific database in Snowflake connection
:type database: str
:param columns_array: reference to a specific columns array in snowflake database
:type columns_array: list
:param snowflake_conn_id: reference to a specific snowflake database
:param snowflake_conn_id: reference to a specific snowflake connection
:type snowflake_conn_id: str
:param role: name of role (will overwrite any role defined in
connection's extra JSON)
:type role: str
:param authenticator: authenticator for Snowflake.
'snowflake' (default) to use the internal Snowflake authenticator
'externalbrowser' to authenticate using your web browser and
Okta, ADFS or any other SAML 2.0-compliant identify provider
(IdP) that has been defined for your account
'https://<your_okta_account_name>.okta.com' to authenticate
through native Okta.
:type authenticator: str
:param session_parameters: You can set session-level parameters at
the time you connect to Snowflake
:type session_parameters: dict
"""

@apply_defaults
Expand All @@ -58,22 +79,40 @@ def __init__(
file_format: str,
schema: str, # TODO: shouldn't be required, rely on session/user defaults
columns_array: Optional[list] = None,
warehouse: Optional[str] = None,
database: Optional[str] = None,
autocommit: bool = True,
snowflake_conn_id: str = 'snowflake_default',
role: Optional[str] = None,
authenticator: Optional[str] = None,
session_parameters: Optional[dict] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.s3_keys = s3_keys
self.table = table
self.warehouse = warehouse
self.database = database
self.stage = stage
self.file_format = file_format
self.schema = schema
self.columns_array = columns_array
self.autocommit = autocommit
self.snowflake_conn_id = snowflake_conn_id
self.role = role
self.authenticator = authenticator
self.session_parameters = session_parameters

def execute(self, context: Any) -> None:
snowflake_hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
snowflake_hook = SnowflakeHook(
snowflake_conn_id=self.snowflake_conn_id,
warehouse=self.warehouse,
database=self.database,
role=self.role,
schema=self.schema,
authenticator=self.authenticator,
session_parameters=self.session_parameters,
)

# Snowflake won't accept list of files it has to be tuple only.
# but in python tuple([1]) = (1,) => which is invalid for snowflake
Expand Down

0 comments on commit dbf7511

Please sign in to comment.