Skip to content

Commit

Permalink
New SnowflakeQueryFromFile Task and SQLServerExecute Fix (#4363)
Browse files Browse the repository at this point in the history
* Adding SnowflakeFromFile task

* SnowflakeFromFile task

* black formatting

* Typo

* Rearranging

* changes YAML file

* Fixed missing import

* formatting

* adding temporary file

* moved arguments to runtime

* added fixture for sql file

* cleaning up fixture and mocks

* cleaned up mysql task and allowed runtime configurability

* mysql run configurations not applying

* removing the or statements

* Update tests/tasks/snowflake/test_snowflake.py

Co-authored-by: Zach Angell <42625717+zangell44@users.noreply.github.com>

* typos

* Typos

* moving file reading inside connection

* testing connection was made

* Update src/prefect/tasks/snowflake/snowflake.py

Co-authored-by: Zach Angell <42625717+zangell44@users.noreply.github.com>

* added warehouse in kwargs

* typo in docstring

* removing mysql stuff from this PR

* sql server test for kwargs

* linter

* Force docs rebuild

* used finally block

* remove entire except block

Co-authored-by: Zach Angell <42625717+zangell44@users.noreply.github.com>
  • Loading branch information
kvnkho and zangell44 committed Aug 30, 2021
1 parent 9e65a35 commit c720962
Show file tree
Hide file tree
Showing 6 changed files with 466 additions and 28 deletions.
2 changes: 2 additions & 0 deletions changes/pr4363.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
task:
- "Added SnowflakeQueryFromFile Task [#3744](https://github.com/PrefectHQ/prefect/pull/4363)"
7 changes: 5 additions & 2 deletions src/prefect/tasks/snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
"""

try:
from prefect.tasks.snowflake.snowflake import SnowflakeQuery
except ImportError as err:
from prefect.tasks.snowflake.snowflake import (
SnowflakeQuery,
SnowflakeQueriesFromFile,
)
except ImportError:
raise ImportError(
'Using `prefect.tasks.snowflake` requires Prefect to be installed with the "snowflake" extra.'
) from err
249 changes: 227 additions & 22 deletions src/prefect/tasks/snowflake/snowflake.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from pathlib import Path
import snowflake.connector as sf
from snowflake.connector.cursor import SnowflakeCursor

Expand All @@ -7,14 +8,14 @@

class SnowflakeQuery(Task):
"""
Task for executing a query against a snowflake database.
Task for executing a query against a Snowflake database.
Args:
- account (str): snowflake account name, see snowflake connector
package documentation for details
- user (str): user name used to authenticate
- password (str, optional): password used to authenticate.
password or private_lkey must be present
password or private_key must be present
- private_key (bytes, optional): pem to authenticate.
password or private_key must be present
- database (str, optional): name of the default database to use
Expand All @@ -23,17 +24,19 @@ class SnowflakeQuery(Task):
- warehouse (str, optional): name of the default warehouse to use
- query (str, optional): query to execute against database
- data (tuple, optional): values to use in query, must be specified using placeholder
is query string
in query string
- autocommit (bool, optional): set to True to autocommit, defaults to None, which
takes snowflake AUTOCOMMIT parameter
- cursor_type (SnowflakeCursor, optional): specify the type of database
cursor to use for the query, defaults to SnowflakeCursor
- **kwargs (dict, optional): additional keyword arguments to pass to the
Task constructor
"""

def __init__(
self,
account: str,
user: str,
account: str = None,
user: str = None,
password: str = None,
private_key: bytes = None,
database: str = None,
Expand All @@ -43,24 +46,47 @@ def __init__(
query: str = None,
data: tuple = None,
autocommit: bool = None,
cursor_type: SnowflakeCursor = SnowflakeCursor,
**kwargs
):
self.account = account
self.user = user
self.password = password
self.private_key = private_key
self.database = database
self.schema = schema
self.role = role
self.warehouse = warehouse
self.query = query
self.data = data
self.autocommit = autocommit
self.private_key = private_key
self.cursor_type = cursor_type
super().__init__(**kwargs)

@defaults_from_attrs("query", "data", "autocommit")
@defaults_from_attrs(
"account",
"user",
"password",
"private_key",
"database",
"schema",
"role",
"warehouse",
"query",
"data",
"autocommit",
"cursor_type",
)
def run(
self,
account: str = None,
user: str = None,
password: str = None,
private_key: bytes = None,
database: str = None,
schema: str = None,
role: str = None,
warehouse: str = None,
query: str = None,
data: tuple = None,
autocommit: bool = None,
Expand All @@ -70,37 +96,53 @@ def run(
Task run method. Executes a query against snowflake database.
Args:
- account (str, optional): snowflake account name, see snowflake connector
package documentation for details
- user (str, optional): user name used to authenticate
- password (str, optional): password used to authenticate.
password or private_lkey must be present
- private_key (bytes, optional): pem to authenticate.
password or private_key must be present
- database (str, optional): name of the default database to use
- schema (int, optional): name of the default schema to use
- role (str, optional): name of the default role to use
- warehouse (str, optional): name of the default warehouse to use
- query (str, optional): query to execute against database
- data (tuple, optional): values to use in query, must be specified using
placeholder is query string
- autocommit (bool, optional): set to True to autocommit, defaults to None
which takes the snowflake AUTOCOMMIT parameter
- data (tuple, optional): values to use in query, must be specified using placeholder
in query string
- autocommit (bool, optional): set to True to autocommit, defaults to None, which
takes snowflake AUTOCOMMIT parameter
- cursor_type (SnowflakeCursor, optional): specify the type of database
cursor to use for the query, defaults to SnowflakeCursor
Returns:
- None
- List[List]: output of cursor.fetchall()
Raises:
- ValueError: if query parameter is None or a blank string
- ValueError: if a required parameter is not supplied
- DatabaseError: if exception occurs when executing the query
"""
if not account:
raise ValueError("An account must be provided")
if not user:
raise ValueError("A user must be provided")
if not query:
raise ValueError("A query string must be provided")

# build the connection parameter dictionary
# we will remove `None` values next
connect_params = {
"account": self.account,
"user": self.user,
"password": self.password,
"private_key": self.private_key,
"database": self.database,
"schema": self.schema,
"role": self.role,
"warehouse": self.warehouse,
"autocommit": self.autocommit,
"account": account,
"user": user,
"password": password,
"private_key": private_key,
"database": database,
"schema": schema,
"role": role,
"warehouse": warehouse,
"autocommit": autocommit,
}

# filter out unset values
connect_params = {
param: value
Expand All @@ -123,3 +165,166 @@ def run(
except Exception as error:
conn.close()
raise error


class SnowflakeQueriesFromFile(Task):
"""
Task for executing queries loaded from a file against a Snowflake database.
Return a list containings the results of the queries.
Note that using execute_string() is vulnerable to SQL injection.
Args:
- account (str, optional): snowflake account name, see snowflake connector
package documentation for details
- user (str, optional): user name used to authenticate
- password (str, optional): password used to authenticate.
password or private_lkey must be present
- private_key (bytes, optional): pem to authenticate.
password or private_key must be present
- database (str, optional): name of the default database to use
- schema (int, optional): name of the default schema to use
- role (str, optional): name of the default role to use
- warehouse (str, optional): name of the default warehouse to use
- file_path (str, optional): file path to load query from
- autocommit (bool, optional): set to True to autocommit, defaults to None, which
takes snowflake AUTOCOMMIT parameter
- cursor_type (SnowflakeCursor, optional): specify the type of database
cursor to use for the query, defaults to SnowflakeCursor
- **kwargs (dict, optional): additional keyword arguments to pass to the
Task constructor
"""

def __init__(
self,
account: str = None,
user: str = None,
password: str = None,
private_key: bytes = None,
database: str = None,
schema: str = None,
role: str = None,
warehouse: str = None,
file_path: str = None,
autocommit: bool = None,
cursor_type: SnowflakeCursor = SnowflakeCursor,
**kwargs
):
self.account = account
self.user = user
self.password = password
self.private_key = private_key
self.database = database
self.schema = schema
self.role = role
self.warehouse = warehouse
self.file_path = file_path
self.autocommit = autocommit
self.cursor_type = cursor_type
super().__init__(**kwargs)

@defaults_from_attrs(
"account",
"user",
"password",
"private_key",
"database",
"schema",
"role",
"warehouse",
"file_path",
"autocommit",
"cursor_type",
)
def run(
self,
account: str,
user: str,
password: str = None,
private_key: bytes = None,
database: str = None,
schema: str = None,
role: str = None,
warehouse: str = None,
file_path: str = None,
autocommit: bool = None,
cursor_type: SnowflakeCursor = SnowflakeCursor,
):
"""
Task run method. Executes a query against snowflake database.
Args:
- account (str): snowflake account name, see snowflake connector
package documentation for details
- user (str): user name used to authenticate
- password (str, optional): password used to authenticate.
password or private_lkey must be present
- private_key (bytes, optional): pem to authenticate.
password or private_key must be present
- database (str, optional): name of the default database to use
- schema (int, optional): name of the default schema to use
- role (str, optional): name of the default role to use
- warehouse (str, optional): name of the default warehouse to use
- file_path (str, optional): file path to load query from
- autocommit (bool, optional): set to True to autocommit, defaults to None, which
takes snowflake AUTOCOMMIT parameter
- cursor_type (SnowflakeCursor, optional): specify the type of database
cursor to use for the query, defaults to SnowflakeCursor
Returns:
- List[List]: containing the results of the different queries executed
Raises:
- ValueError: if query parameter is None or a blank string
- DatabaseError: if exception occurs when executing the query
- FileNotFoundError: if File does not exist
"""
if account is None:
raise ValueError("An account must be provided")
if user is None:
raise ValueError("A user must be provided")
if file_path is None:
raise ValueError("A file path must be provided")

# build the connection parameter dictionary
# we will remove `None` values next
connect_params = {
"account": account,
"user": user,
"password": password,
"private_key": private_key,
"database": database,
"schema": schema,
"role": role,
"warehouse": warehouse,
"autocommit": autocommit,
}

# filter out unset values
connect_params = {
param: value
for (param, value) in connect_params.items()
if value is not None
}

# connect to database, open cursor
conn = sf.connect(**connect_params)

# try to execute query
# context manager automatically rolls back failed transactions
try:
# load query from file
query = Path(file_path).read_text()

with conn:
result = []
cursor_list = conn.execute_string(query, cursor_class=cursor_type)

for cursor in cursor_list:
result.append(cursor.fetchall())
# return fetch for each cursor
return result

# ensure connection is closed
finally:
conn.close()
2 changes: 1 addition & 1 deletion src/prefect/tasks/sql_server/sql_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def run(
# context manager automatically rolls back failed transactions
try:
with cnxn.cursor() as cursor:
executed = cursor.execute(query=query, vars=data)
executed = cursor.execute(query, data)
if commit:
cnxn.commit()
else:
Expand Down
Loading

0 comments on commit c720962

Please sign in to comment.