Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update SnowflakeOperator and UcToSnowflakeOperator to accept sql_file and custom dbx sql #116

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d98cc7e
Add Optional logic to read query from sql file provided
bsangars May 15, 2024
92a52e0
Add Optional logic to read query from sql file provided
bsangars May 15, 2024
e1e30bb
Add option to include custom query in extracting data from UC
bsangars May 15, 2024
f5b5634
Add option to include custom query in extracting data from UC
bsangars May 15, 2024
87d8074
Add option to include custom query in extracting data from UC
bsangars May 15, 2024
b01163c
Add option to include custom query in extracting data from UC
bsangars May 15, 2024
67ff375
update parameters in UctoSnowflake
bsangars May 15, 2024
7d934e0
updated SnowflakeOperator init
bsangars May 15, 2024
dc7990f
updated SnowflakeOperator init
bsangars May 15, 2024
952b835
Add write_mode to better control custom sql
bsangars May 15, 2024
7ffcf61
Add write_mode to better control custom sql
bsangars May 15, 2024
adc17cc
Change manadatory keys to expect one of either dbx_sql, or dbx_catalo…
bsangars May 15, 2024
a50fda1
Update Snowflake and UctoSnowflakeOperator
bsangars May 15, 2024
4ee65da
Update documentation for Snowflake and UcToSnowflakeOperator usage
bsangars May 15, 2024
61c8b5c
Updated the usage documentation
bsangars May 15, 2024
1c93204
Updated the usage documentation
bsangars May 15, 2024
155152c
updated usage documentation
bsangars May 21, 2024
d3d910d
Add function to get brickflow root to utils and respective tests
bsangars May 22, 2024
ede821b
adjusted usage to account for brickflow root
bsangars May 22, 2024
428d117
add project root
bsangars May 22, 2024
78471be
add project root function
bsangars May 22, 2024
d736f6f
add project root function with file name parameter
bsangars May 22, 2024
8dc8c14
add project root function with file name parameter
bsangars May 22, 2024
8ebfed9
Add dynamic file path in getting brickflow root
bsangars May 23, 2024
3720a5f
add file_name parameter
bsangars May 23, 2024
4c9c928
add file_name parameter
bsangars May 23, 2024
32c42ed
prepend brickflow path to sql file
bsangars May 23, 2024
a7429e1
prepend brickflow path to sql file
bsangars May 23, 2024
aaa37df
prepend brickflow path to sql file
bsangars May 23, 2024
9c3eea7
updated documentation for tasks
bsangars May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions brickflow/engine/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import functools
from typing import Callable, Type, List, Iterator, Union
import inspect
import pathlib
import os

from pydantic import SecretStr
from databricks.sdk import WorkspaceClient
Expand Down Expand Up @@ -75,3 +78,23 @@ def get_job_id(
ctx.log.info("An error occurred: %s", e)

return None


def get_bf_project_root() -> pathlib.Path:
"""Returns the root directory of the current Brickflow project

Parameters:
_file (str): file path where the function is called

Returns:
pathlib.Path: Brickflow project root directory
"""
try:
_file_name = os.getcwd()
_project_root = pathlib.Path(_file_name).resolve().parents[0]
ctx.log.info("Setting Brickflow project root as %s", _project_root)
return _project_root
except Exception as e:
ctx.log.info("An error occurred: %s", e)
raise e

120 changes: 90 additions & 30 deletions brickflow_plugins/databricks/uc_to_snowflake_operator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging as log

from pathlib import Path
from brickflow.engine.utils import get_bf_project_root
try:
import snowflake.connector
except ImportError:
Expand Down Expand Up @@ -60,18 +61,32 @@ class SnowflakeOperator:

above code snippet expects the data as follows
databricks_secrets_psc contains username, password, account, warehouse, database and role keys with snowflake values
query_string : required parameter with queries separeted by semicolon(;)
query_string : Optional parameter with queries separeted by semicolon(;)
sql_file : Optional parameter with file path (relative to brickflow project root) to .sql file
parameters: optional parameter dictionary with key value pairs to substitute in the query
"""

def __init__(self, secret_scope, query_string, parameters={}, *args, **kwargs):
def __init__(
self,
secret_scope,
query_string=None,
sql_file=None,
parameters={},
*args,
**kwargs,
):
self.cur = None
self.query = None
self.sql_file = None
self.secret_scope = secret_scope
self.log = log
self.query = query_string
self.parameters = parameters
self.sql_file = sql_file
self.brickflow_root = get_bf_project_root()

if query_string is not None and sql_file is not None:
raise ValueError("Cannot specify both sql_file and query_string !")
if not self.secret_scope:
raise ValueError(
"Must provide reference to Snowflake connection in databricks secretes !"
Expand Down Expand Up @@ -152,6 +167,27 @@ def get_snowflake_connection(self):

return con

def read_sql_file(self):
"""
logic to read the sql file and return the query string
"""
try:
if self.sql_file is not None:
sql_loc = Path(self.sql_file)
sql_path = self.brickflow_root/sql_loc
if not sql_path.exists():
raise FileNotFoundError(
f"Unable to locate specified {sql_path.as_posix()}"
)
self.query = sql_path.read_text(encoding="utf-8")
return self
if self.query is None or len(self.query) == 0:
raise ValueError("SQL Query is empty")
return self
except Exception as e:
self.log.error("Failed to read the sql file")
raise ValueError("Failed to read the sql file")

def get_cursor(self):
"""
logic to create a cursor for a successful snowflake connection to execute queries
Expand Down Expand Up @@ -219,12 +255,14 @@ def execute(self):
"""
logic that triggers the flow of events
"""
if self.sql_file is not None:
self.read_sql_file()
self.log.info("Executing SQL Query: " + str(self.query))
self.get_cursor()
query_string = str(self.query).strip()
# Run the query against SnowFlake
try:
self.snowflake_query_exec(self.cur, self.database, query_string)
self.snowflake_query_exec(self.cur, self.database, self.query)
except:
self.log.error("failed to execute")
finally:
Expand All @@ -246,7 +284,7 @@ class UcToSnowflakeOperator(SnowflakeOperator):

Example Usage in your brickflow task
UcToSnowflakeOperator(
secret_scope=databricks_secrets_psc
secret_scope=databricks_secrets_psc,
parameters= uc_parameters
)

Expand All @@ -267,7 +305,8 @@ class UcToSnowflakeOperator(SnowflakeOperator):
parameters = {'load_type':'incremental','dbx_catalog':'sample_catalog','dbx_database':'sample_schema',
'dbx_table':'sf_operator_1', 'sf_schema':'stage','sf_table':'SF_OPERATOR_1',
'sf_grantee_roles':'downstream_read_role', 'incremental_filter':"dt='2023-10-22'",
'sf_cluster_keys':''}
'sf_cluster_keys':''
'dbx_sql':'select * from sample_catalog.sample_schema.sf_operator_1 where dt='2023-10-22'}

in the parameters dictionary we have mandatory keys as follows
load_type(required): incremental/full
Expand All @@ -280,12 +319,20 @@ class UcToSnowflakeOperator(SnowflakeOperator):
incremental_filter (optional): mandatory parameter for incremental load type to delete existing data in snowflake table
dbx_data_filter (optional): parameter to filter databricks table if different from snowflake filter
sf_cluster_keys (optional): list of keys to cluster the data in snowflake
dbx_sql (optional): sql query to extract data from unity catalog

One of dbx_sql or dbx_catalog, dbx_database, dbx_table should be provided
If custom sql is mentioned in db_sql, for incremental process make sure to include to write_mode or adjust incremental filter in the Operator to align with custom sql
if not, there could be duplicates in Snowflake table

"""

def __init__(self, secret_scope, parameters={}, *args, **kwargs):
SnowflakeOperator.__init__(self, secret_scope, "", parameters)
def __init__(self, secret_scope, parameters={}, write_mode=None, *args, **kwargs):
super().__init__(secret_scope, *args, **kwargs)
self.parameters = parameters
self.dbx_data_filter = self.parameters.get("dbx_data_filter") or None
self.write_mode = None
self.dbx_sql = self.parameters.get("dbx_sql") or None
self.write_mode = write_mode
"""
self.authenticator = None
try:
Expand Down Expand Up @@ -349,12 +396,17 @@ def validate_input_params(self):
# Setup the mandatory params for snowflake load
mandatory_keys = (
"load_type",
"dbx_catalog",
"dbx_database",
"dbx_table",
"sf_schema",
"sf_table",
)
if self.dbx_sql is not None:
mandatory_keys = mandatory_keys + ("dbx_sql",)
else:
mandatory_keys = mandatory_keys + (
"dbx_catalog",
"dbx_database",
"dbx_table",
)
if not all(key in self.parameters for key in mandatory_keys):
self.log.info(
"Mandatory keys for UcToSnowflakeOperator(parameters): %s\n"
Expand Down Expand Up @@ -414,24 +466,31 @@ def apply_grants(self):
self.submit_job_snowflake(self.sf_post_grants_sql)

def extract_source(self):
if self.parameters["load_type"] == "incremental":
self.dbx_data_filter = (
self.parameters.get("dbx_data_filter")
or self.parameters.get("incremental_filter")
or "1=1"
if self.dbx_sql is not None:
self.log.info(
f"Executing Custom sql to extract data from Unity catalog. Query -: {self.dbx_sql}"
)
df = ctx.spark.sql(self.dbx_sql)
return df
else:
self.dbx_data_filter = self.parameters.get("dbx_data_filter") or "1=1"

df = ctx.spark.sql(
"""select * from {}.{}.{} where {}""".format(
self.parameters["dbx_catalog"],
self.parameters["dbx_database"],
self.parameters["dbx_table"],
self.dbx_data_filter,
if self.parameters["load_type"] == "incremental":
self.dbx_data_filter = (
self.parameters.get("dbx_data_filter")
or self.parameters.get("incremental_filter")
or "1=1"
)
else:
self.dbx_data_filter = self.parameters.get("dbx_data_filter") or "1=1"

df = ctx.spark.sql(
"""select * from {}.{}.{} where {}""".format(
self.parameters["dbx_catalog"],
self.parameters["dbx_database"],
self.parameters["dbx_table"],
self.dbx_data_filter,
)
)
)
return df
return df

def load_snowflake(self, source_df, target_table):
sf_package = "net.snowflake.spark.snowflake"
Expand Down Expand Up @@ -480,9 +539,10 @@ def submit_job_compute(self):
else self.parameters["sf_table"]
)
source_data = self.extract_source()
self.write_mode = (
"Overwrite" if self.parameters["load_type"] == "full" else "Append"
)
if self.write_mode is None:
self.write_mode = (
"Overwrite" if self.parameters["load_type"] == "full" else "Append"
)
self.sf_cluster_keys = (
[]
if "sf_cluster_keys" not in self.parameters.keys()
Expand Down
34 changes: 26 additions & 8 deletions docs/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,14 @@ As databricks secrets is a key value store, code expects the secret scope to con
    role : role to which the user has write access for ex: sample_write_role

SnowflakeOperator can accept the following as inputs
    secret_scope (required): databricks secret scope identifier
    query_string (required): queries separated by semicolon

    secret_scope (required) : databricks secret scope identifier
    query_string (required) : queries separated by semicolon
    sql_file (optional) : path to the sql file
    parameters (optional) : dictionary with variables that can be used to substitute in queries

```python title="snowflake_operator"
Operator only takes one of either query_string or sql_file needs to be passed
```python title="snowflake_operator using sql queries "
from brickflow_plugins import SnowflakeOperator

wf = Workflow(...)
Expand All @@ -453,8 +456,20 @@ def run_snowflake_queries(*args):
parameters = {"schema":"test_schema","table":"sample_table","filter_condition":"col='something'"}
)
sf_query_run.execute()

```

```python title="snowflake_operator_using_sql_files"
#Sql file path is relative from the brickflow project root (Ex: root/products/{product_name})
@wf.task
def run_snowflake_files(*args):
sf_file_run = SnowflakeOperator(
secret_cope="sample_scope",
sql_file=f"src/sql/sample.sql",
parameters={"database": "sample_db"},
)
sf_file_run.execute()
```

#### UC to Snowflake Operator

Expand All @@ -469,18 +484,20 @@ As databricks secrets is a key value store, code expects the secret scope to con
    role : role to which the user has write access for ex: sample_write_role

UcToSnowflakeOperator can expects the following as inputs to copy data in parameters
one of Either dbx_sql or (dbx_catalog, dbx_database, dbx_table ) needs to be provided
    load_type (required): type of data load , acceptable values full or incremental
    dbx_catalog (required) : name of the databricks catalog in which object resides
    dbx_database (required): name of the databricks schema in which object is available
    dbx_table (required) : name of the databricks object we want to copy to snowflake
    dbx_catalog (optional) : name of the databricks catalog in which object resides
    dbx_database (optional): name of the databricks schema in which object is available
    dbx_table (optional) : name of the databricks object we want to copy to snowflake
    dbx_sql (optional) : Custom sql to extract data from databricks Unity Catalog
    sf_database (optional) : name of the snowflake database if different from the one in secret_scope
    sf_schema (required): name of the snowflake schema in which we want to copy the data
    sf_table (required) : name of the snowflake object to which we want to copy from databricks
    incremental_filter (required for incrmental mode) : condition to manage data before writing to snowflake
    dbx_data_filter (optional): filter condition on databricks source for full or incremental (if different from inremental_filter)
    sf_grantee_roles (optional) : snowflake roles to which we want to grant select/read access
    sf_cluster_keys (optional) : list of keys we want to cluster our snowflake table.

    write_mode (optional) : write mode to write into snowflake table ( overwrite, append etc)
```python title="uc_to_snowflake_operator"
from brickflow_plugins import UcToSnowflakeOperator

Expand All @@ -490,10 +507,11 @@ wf = Workflow(...)
def run_snowflake_queries(*args):
uc_to_sf_copy = UcToSnowflakeOperator(
secret_scope = "your_databricks secrets scope name",
write_mode ="overwrite",
parameters = {'load_type':'incremental','dbx_catalog':'sample_catalog','dbx_database':'sample_schema',
'dbx_table':'sf_operator_1', 'sf_schema':'stage','sf_table':'SF_OPERATOR_1',
'sf_grantee_roles':'downstream_read_role', 'incremental_filter':"dt='2023-10-22'",
'sf_cluster_keys':''}
'sf_cluster_keys':'', 'dbx_sql':'Custom sql query to read data from UC'}
)
uc_to_sf_copy.execute()
```
Expand Down
2 changes: 2 additions & 0 deletions examples/brickflow_examples/src/sql/sample.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
create or replace table $database.$schema.sample as
select * from $database.$schema.source
Loading
Loading