# Snowflake Metadata Refresh Setup

## Import Python Libraries

In [None]:
import snowflake.connector
import os
import sys
import pandas as pd
import numpy as np
import pandas as pd
import pyarrow as pa
# pd.set_option('max_columns', 40)

## Set Snowflake Variables

In [None]:
# snowflake connection variables
snowflake_user = 'JMILLER'
snowflake_password = os.environ['BISNOWPASS']
snowflake_account = 'eh69371.east-us-2.azure'
snowflake_role = 'SYSADMIN'
snowflake_warehouse = 'COMPUTE_WH'

# # database and schema for metadata objects
snowflake_database = 'UTIL_DB'
snowflake_schema = 'METADATA'

print('Using Notebook Variables:')
print('snowflake_user: ' + snowflake_user)
print('snowflake_password: ' + '***************')
print('snowflake_account: ' + snowflake_account)
print('snowflake_role: ' + snowflake_role)
print('snowflake_warehouse: ' + snowflake_warehouse)
print('snowflake_database: ' + snowflake_database)
print('snowflake_schema: ' + snowflake_schema)

## Set Notebook Variables

In [None]:
# notebook variables
src_database = 'UTIL_DB'
tgt_database = 'UTIL_DB'
src_schema = 'INFORMATION_SCHEMA'
tgt_schema = 'METADATA'
src_table = 'DATABASES'
tgt_table = 'D_DATABASES'
added_dim_column_names_tag = 'standard_uc'
natural_key_columns = 'DATABASE_NAME'
type_2_columns = 'DATABASE_OWNER,RETENTION_TIME'
type_0_columns = ''

src_table_full = '"' + src_database + '"."' + src_schema + '"."' + src_table + '"'
if not src_database:
        src_table_full = '"' + src_schema + '"."' + src_table + '"'

tgt_table_full = '"' + tgt_database + '"."' + tgt_schema + '"."' + tgt_table + '"'
if not tgt_database:
        src_table_full = '"' + tgt_schema + '"."' + tgt_table + '"'

print('Using Notebook Variables:')
print('src_table_full: ' + src_table_full)
print('tgt_table_full: ' + tgt_table_full)
print('added_dim_column_names_tag: ' + added_dim_column_names_tag)
print('natural_key_columns: ' + natural_key_columns)
print('type_2_columns: ' + type_2_columns)
print('type_0_columns: ' + type_0_columns)


## Check for Required values

In [None]:
if not src_schema:
    sys.exit("src_schema is required")

if not tgt_schema:
    sys.exit("tgt_schema is required")

if not src_table:
    sys.exit("src_table is required")

if not tgt_table:
    sys.exit("tgt_table is required")

if not added_dim_column_names_tag:
    sys.exit("added_dim_column_names_tag is required")

if not natural_key_columns:
    sys.exit("natural_key_columns is required")

## Establish Snowflake Connection

In [None]:
ctx = snowflake.connector.connect(
    user = snowflake_user,
    password = snowflake_password,
    account = snowflake_account,
    role = snowflake_role,
    warehouse = snowflake_warehouse
    )
cur = ctx.cursor()

# Return Client
cur.execute("select CURRENT_CLIENT()")
one_row = cur.fetchone()
print('Snowflake Connection Successful')
print(one_row[0])

## Set Snowflake Database and Schema Context

In [None]:
sql = "use database " + snowflake_database + ";"
print(sql)
cur.execute(sql)

sql = "use schema " + snowflake_schema + ";"
print(sql)
cur.execute(sql)

## Get names for supplemental dimension columns

In [None]:
sql = """
SELECT ROW_IS_CURRENT, ROW_EFFECTIVE_DATE, ROW_EXPIRATION_DATE, ROW_INSERT_DATE, ROW_UPDATE_DATE 
  FROM ADDED_DIM_COLUMN_NAMES 
 WHERE ADDED_DIM_COLUMN_NAMES_TAG  = '""" + added_dim_column_names_tag + """' 
"""
print(sql)
cur.execute(sql)
one_row = cur.fetchone()
row_is_current = one_row[0]
row_effective_date = one_row[1]
row_expiration_date = one_row[2]
row_insert_date = one_row[3]
row_update_date = one_row[4]

print('row_is_current:', row_is_current)
print('row_effective_date:', row_effective_date)
print('row_expiration_date:', row_expiration_date)
print('row_insert_date:', row_insert_date)
print('row_update_date:', row_update_date)

## Natural key select and joins

In [None]:
sql = """
WITH base AS
(
SELECT DISTINCT TOP 1000 ORDINAL_POSITION, COLUMN_NAME
  FROM """ + src_database + """.INFORMATION_SCHEMA."COLUMNS"
 WHERE COLUMN_NAME IN (select trim(value) from table(split_to_table('""" + natural_key_columns + """', ',')))
   AND TABLE_SCHEMA = '""" + src_schema + """'
   AND TABLE_NAME = '""" + src_table + """'
)
SELECT TRIM(LISTAGG(' s.' || COLUMN_NAME || ', '), ', ') || ' ' AS nat_key_select
	 , REPLACE(REPLACE(LISTAGG(' r.' || COLUMN_NAME || ' = s.' || COLUMN_NAME  || ' AND ~') || '~','AND ~~'), '~')  AS nat_key_join
	 , REPLACE(REPLACE(LISTAGG(' t1.' || COLUMN_NAME || ' = s.' || COLUMN_NAME  || ' AND ~') || '~','AND ~~'), '~')  AS nat_key_join_t1
	 , REPLACE(REPLACE(LISTAGG(' t2.' || COLUMN_NAME || ' = s.' || COLUMN_NAME  || ' AND ~') || '~','AND ~~'), '~')  AS nat_key_join_t2
	 , REPLACE(REPLACE(LISTAGG(' nc.' || COLUMN_NAME || ' = s.' || COLUMN_NAME  || ' AND ~') || '~','AND ~~'), '~')  AS nat_key_join_nc
	 , REPLACE(REPLACE(LISTAGG(' src.' || COLUMN_NAME || ' = tgt.' || COLUMN_NAME  || ' AND ~') || '~','AND ~~'), '~')  AS nat_key_join_src
  FROM base
 ORDER BY ORDINAL_POSITION
"""
print(sql)

cur.execute(sql)
one_row = cur.fetchone()
nat_key_select = one_row[0]
nat_key_join = one_row[1]
nat_key_join_t1 = one_row[2]
nat_key_join_t2 = one_row[3]
nat_key_join_nc = one_row[4]
nat_key_join_src = one_row[5]

print('nat_key_select:', nat_key_select)
print('nat_key_join:', nat_key_join)
print('nat_key_join_t1:', nat_key_join_t1)
print('nat_key_join_t2:', nat_key_join_t2)
print('nat_key_join_nc:', nat_key_join_nc)
print('nat_key_join_src:', nat_key_join_src)

## Type 1 change check

In [None]:
sql = """
WITH base AS
(
SELECT DISTINCT TOP 1000 ORDINAL_POSITION, COLUMN_NAME
  FROM """ + src_database + """.INFORMATION_SCHEMA."COLUMNS"
 WHERE COLUMN_NAME NOT IN (select trim(value) from table(split_to_table('""" + natural_key_columns + """', ',')))
   AND TABLE_SCHEMA = '""" + src_schema + """'
   AND TABLE_NAME = '""" + src_table + """'
 ORDER BY ORDINAL_POSITION
)
, base2 AS
(
SELECT TRIM(LISTAGG('\n NVL(CAST(r.' || column_name || ' AS VARCHAR),''-99999'') != NVL(CAST(s.' || column_name || ' AS VARCHAR),''-99999'')' || ' OR '), 'OR ') AS sql
  FROM base
)
SELECT CASE WHEN TRIM(sql) = '' THEN ' 1=2 ' ELSE sql END AS ret_sql
  FROM base2
"""
print(sql)

cur.execute(sql)
one_row = cur.fetchone()
type_1_change_check = one_row[0]

print(type_1_change_check)

## Type 2 change check

In [None]:
sql = """
WITH base AS
(
SELECT DISTINCT TOP 1000 ORDINAL_POSITION, COLUMN_NAME
  FROM """ + src_database + """.INFORMATION_SCHEMA."COLUMNS"
 WHERE COLUMN_NAME IN (select trim(value) from table(split_to_table('""" + type_2_columns + """', ',')))
   AND TABLE_SCHEMA = '""" + src_schema + """'
   AND TABLE_NAME = '""" + src_table + """'
 ORDER BY ORDINAL_POSITION
)
, base2 AS
(
SELECT TRIM(LISTAGG('\n NVL(CAST(r.' || column_name || ' AS VARCHAR),''-99999'') != NVL(CAST(s.' || column_name || ' AS VARCHAR),''-99999'')' || ' OR '), 'OR ') AS sql
  FROM base
)
SELECT CASE WHEN TRIM(sql) = '' THEN ' 1=2 ' ELSE sql END AS ret_sql
  FROM base2
"""
print(sql)

cur.execute(sql)
one_row = cur.fetchone()
type_2_change_check = one_row[0]

print(type_2_change_check)

## Staging columns

In [None]:
sql = """WITH base AS
(
SELECT DISTINCT TOP 1000 ORDINAL_POSITION, COLUMN_NAME
  FROM """ + src_database + """.INFORMATION_SCHEMA."COLUMNS"
 WHERE TABLE_SCHEMA = '""" + src_schema + """'
   AND TABLE_NAME = '""" + src_table + """'
 ORDER BY ORDINAL_POSITION
)
SELECT TRIM(LISTAGG('s.' || COLUMN_NAME || '' || ', '), ', ')
     , TRIM(LISTAGG( COLUMN_NAME || '' || ', '), ', ')
     , TRIM(LISTAGG('src.' || COLUMN_NAME || '' || ', '), ', ')
  FROM base;
"""
print(sql)
cur.execute(sql)
one_row = cur.fetchone()

staging_columns = one_row[0]
merge_insert_list = one_row[1]
merge_output_list = one_row[2]

print('staging_columns:' , staging_columns)
print('merge_insert_list:' , merge_insert_list)
print('merge_output_list:' , merge_output_list)


## Merge update set

In [None]:
sql = """
WITH base AS
(
SELECT DISTINCT TOP 1000 ORDINAL_POSITION, COLUMN_NAME
  FROM """ + src_database + """.INFORMATION_SCHEMA."COLUMNS"
 WHERE COLUMN_NAME NOT IN (select trim(value) from table(split_to_table('""" + type_0_columns + """', ',')))
   AND COLUMN_NAME NOT IN (select trim(value) from table(split_to_table('""" + natural_key_columns + """', ',')))  
   AND TABLE_SCHEMA = '""" + src_schema + """'
   AND TABLE_NAME = '""" + src_table + """'
 ORDER BY ORDINAL_POSITION
)
SELECT TRIM(LISTAGG('\n ' || column_name || ' = CASE ChangeType WHEN ''Type 1'' THEN SRC.' || column_name || ' ELSE DST.' || column_name || ' END' || ' , '))
FROM base
"""
print(sql)
cur.execute(sql)
one_row = cur.fetchone()

merge_update_set = one_row[0]
print('merge_update_set:' , merge_update_set)
