In [None]:
import sys;
import json;

from snowflake.snowpark.context import get_active_session
from snowflake.snowpark import Session
from snowflake.snowpark.functions import hash, sha2, lit, concat_ws, col, current_timestamp, array_construct
from snowflake.snowpark.types import *

session = get_active_session()

# Get JSON input parameters
# the result is a Python dictionary:
y = json.loads(sys.argv[0])

# Add quotes "'" around column name used in Snowpark query.  Don't Add "'" for SQL query

src_schema ='BRONZE_LAYER'# Absolute name (database.schema)
target_schema = 'SILVER_LAYER'
src_table = y["bronze_table"]
target_table = y["silver_table"]
technicalKey_name = y["silver_technicalKey_name"]
functionalKey_name = y["silver_functionalKey_name"]
ruleTechnicalKey = y["silver_ruleTechnicalKey"]
ruleFunctionalKey = y["silver_ruleFunctionalKey"]

print("src_schema param: " + src_schema)
print("src_table param: " + src_table)
print("target_schema param: " + target_schema)
print("target_table param: " + target_table)

print("technicalKey_name param: " + technicalKey_name)
print("functionalKey_name param: " + functionalKey_name)

print("Technical RULE param: " + ruleTechnicalKey)
print("Functional RULE param: " + ruleFunctionalKey)



In [None]:
select *, {{ ruleTechnicalKey }} as {{ technicalKey_name }}, {{ ruleFunctionalKey }} as {{ functionalKey_name }}, CURRENT_TIMESTAMP() AS SYS_DATE_CREATE 
from {{ src_table }};

In [None]:
res_table_df = build_table_with_new_fields.to_df()

res_table_df.write.mode("overwrite").save_as_table(target_table)

In [None]:
SELECT EXECUTION_STATUS as status,ERROR_MESSAGE  as first_error,  ROWS_PRODUCED as rows_parsed, ROWS_INSERTED as rows_loaded
FROM table(information_schema.QUERY_HISTORY_BY_SESSION())
WHERE QUERY_ID = LAST_QUERY_ID();

In [None]:
insert_result_df = get_insert_results.to_df()

monitoring_final_df = insert_result_df.with_column("src_table",array_construct(lit(src_schema + "."+src_table))).with_column("ingestion_time",current_timestamp()).with_column("layer",lit(target_schema))

#column_order="name" allows not to specify all the column name in the dataframe
monitoring_final_df.write.mode("append").save_as_table("MONITORING_LAYER.MONITORING_INGEST",  column_order="name")

In [None]:
/* the insert query is a success*/

EXECUTE IMMEDIATE $$
DECLARE
  status_number NUMBER := (SELECT count(*) from {{ get_insert_results }} where status != 'SUCCESS') ;
  LOAD_NOT_COMPLETE_EXCEPTION EXCEPTION (-20002, 'TABLE NOT LOADED CORRECTLY, CHECK MONITORING TABLE PLZ!');
BEGIN
  IF (status_number != 0) THEN
    RAISE LOAD_NOT_COMPLETE_EXCEPTION;
  END IF;
EXCEPTION
  WHEN OTHER THEN
    RAISE; -- Raise the same exception that you are handling.
END;
$$;