In [0]:
catalog_name=dbutils.widgets.get("CATALOG_NAME")
schema_name=dbutils.widgets.get("SCHEMA_NAME")
spark.sql(f"use catalog {catalog_name}")
spark.sql(f"use {schema_name}")

DataFrame[]

In [0]:
%sql
select current_catalog();

current_catalog()
dkushari_uc


In [0]:
%sql
select current_database();

current_database()
fgac


### Encrypter Function

In [0]:
%sql
CREATE OR REPLACE FUNCTION encrypt(query_text STRING) RETURNS STRING
COMMENT 'Registering Encrypt Function'
RETURN SELECT base64(aes_encrypt(query_text, secret('dkushari-scope', 'key'),'ECB')) AS ColumnName;


### Decrypter Function

In [0]:
%sql
CREATE OR REPLACE FUNCTION decrypt(enc_data STRING) RETURNS STRING
COMMENT 'Registering Decrypt Function'
RETURN SELECT aes_decrypt(unbase64(enc_data), secret('dkushari-scope', 'key'),'ECB') AS ColumnName;

### Returns Dynamic SQL statement based on predicate and Delta table version

In [0]:
%sql
CREATE OR REPLACE FUNCTION sql_query_fn(catalog_name string, schema_name string, table_name string, predicate string, version_number int)
RETURNS STRING
LANGUAGE PYTHON
AS
$$
if version_number is not None:
  table_ver = f"version as of {version_number}"
  sql_query_text = f"select * from {catalog_name}.{schema_name}.{table_name} {table_ver} where {predicate}"
  print(sql_query_text)
else:
  sql_query_text = f"select * from {catalog_name}.{schema_name}.{table_name} where {predicate}"

return sql_query_text
$$;


### Returns encrypted SQL Statement

In [0]:
%sql
CREATE OR REPLACE FUNCTION sql_query_generate(catalog_name string, schema_name string, table_name string, predicate string, version_number int)
RETURNS STRING
LANGUAGE SQL
RETURN SELECT encrypt(sql_query_fn(catalog_name, schema_name, table_name, predicate, version_number));

In [0]:
%sql
drop table if exists predicate_mapping;
create table if not exists predicate_mapping (catalog_name string, schema_name string, table_name string, predicate STRING);

In [0]:
%sql
insert into predicate_mapping values ("dkushari_uc", "fgac", "customer", """ CASE
    WHEN is_account_group_member('ANALYST_ES') THEN TRUE
    ELSE region='West'
  and age > 35 
  AND EXISTS (
    SELECT
      1
    FROM
      dkushari_uc.fgac.valid_users v
    WHERE
      v.username = CURRENT_USER()
  ) end;""")

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
select * from predicate_mapping;

catalog_name,schema_name,table_name,predicate
dkushari_uc,fgac,customer,CASE  WHEN is_account_group_member('ANALYST_ES') THEN TRUE  ELSE region='West'  and age > 35 AND EXISTS (  SELECT  1  FROM  dkushari_uc.fgac.valid_users v  WHERE  v.username = CURRENT_USER()  ) end;


### Execute the dynamic SQL Statement

In [0]:
%sql
DECLARE OR REPLACE CATALOG_NAME STRING ;
DECLARE OR REPLACE SCHEMA_NAME STRING ;
DECLARE OR REPLACE TABLE_NAME STRING ;
DECLARE OR REPLACE VERSION_NUMBER INT DEFAULT 0;  
SET VAR VERSION_NUMBER=:VERSION_NUMBER;
SET VAR CATALOG_NAME=:CATALOG_NAME;
SET VAR SCHEMA_NAME=:SCHEMA_NAME;
SET VAR TABLE_NAME=:TABLE_NAME;
VALUES(VERSION_NUMBER);
DECLARE OR REPLACE PREDICATE STRING DEFAULT NULL ;
SET VAR PREDICATE=(select predicate from predicate_mapping where catalog_name = CATALOG_NAME and schema_name = SCHEMA_NAME and table_name=TABLE_NAME);
VALUES (PREDICATE);
DECLARE OR REPLACE DYNAMIC_GENERATE_SQL STRING DEFAULT "SELECT 1";
VALUES (DYNAMIC_GENERATE_SQL);
SET VAR DYNAMIC_GENERATE_SQL=(select sql_query_generate(CATALOG_NAME, SCHEMA_NAME, TABLE_NAME, PREDICATE, VERSION_NUMBER));
VALUES (DYNAMIC_GENERATE_SQL);
DECLARE OR REPLACE DECRYPT_SQL STRING DEFAULT "SELECT 1";
SET VAR DECRYPT_SQL = (SELECT decrypt(DYNAMIC_GENERATE_SQL));
values (DECRYPT_SQL);
EXECUTE IMMEDIATE DECRYPT_SQL;

record_id,first_name,last_name,date_of_birth,age,sex,address,ssn,region
18,Alice,Williams,1980-01-01,43,M,123 Main St,222-22-2222,West
19,Doe,Johnson,1980-01-01,43,F,101 Oak St,333-33-3333,West
27,Doe,Williams,1985-01-01,38,M,123 Main St,111-11-1111,West
35,Jane,Johnson,1980-01-01,43,M,123 Main St,222-22-2222,West
45,Doe,Williams,1985-01-01,38,M,456 Elm St,333-33-3333,West
45,Doe,Williams,1985-01-01,38,M,456 Elm St,333-33-3333,West


### Use the dataframe downstream

In [0]:
_sqldf.createOrReplaceTempView("CUST_DATA")

In [0]:
spark.sql("select * from CUST_DATA").display()

record_id,first_name,last_name,date_of_birth,age,sex,address,ssn,region
18,Alice,Williams,1980-01-01,43,M,123 Main St,222-22-2222,West
19,Doe,Johnson,1980-01-01,43,F,101 Oak St,333-33-3333,West
27,Doe,Williams,1985-01-01,38,M,123 Main St,111-11-1111,West
35,Jane,Johnson,1980-01-01,43,M,123 Main St,222-22-2222,West
45,Doe,Williams,1985-01-01,38,M,456 Elm St,333-33-3333,West
45,Doe,Williams,1985-01-01,38,M,456 Elm St,333-33-3333,West
