# Imports

In [2]:
import pandas as pd
import numpy as np
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import udf, avg, col
from snowflake.snowpark.types import IntegerType, FloatType, StringType, BooleanType
from snowflake.snowpark.files import SnowflakeFile
import sys
import logging
sys.path.append('..')
from credentials import Credentials

In [3]:
cred = Credentials()
session = Session.builder.configs(cred.__dict__).create()

In [4]:
session.use_role("SYSADMIN")
session.use_database("ACCOUNTADMIN_MGMT")
session.use_warehouse("ACCOUNTADMIN_MGMT")
session.use_schema("UTILITIES")

In [5]:
def get_tables_with_missing_columns_in_mapping(session: Session, schema_name:str) -> list:
	"""
        Args:
            schema_name: schema name
        Returns:
            list of tables with missing columns in salesforce column mapping table
    """
	missing_columns_table_list = session.sql(f"""SELECT DISTINCT A.TABLE_NAME
FROM (
    SELECT C.TABLE_NAME, C.COLUMN_NAME
    FROM STITCH.INFORMATION_SCHEMA.COLUMNS AS C
    INNER JOIN (
        SELECT TABLE_SCHEMA, TABLE_NAME FROM STITCH.INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{schema_name}' AND TABLE_TYPE = 'BASE TABLE' AND TABLE_NAME != '_SDC_REJECTED'
    ) AS T
    ON C.TABLE_SCHEMA = T.TABLE_SCHEMA AND C.TABLE_NAME = T.TABLE_NAME
    WHERE C.TABLE_SCHEMA = '{schema_name}'
) AS A
LEFT JOIN (
    SELECT NAME_SALESFORCE_OBJECT, NAME_SALESFORCE_ATTRIBUTE
    FROM ACCOUNTADMIN_MGMT.UTILITIES.SALESFORCE_COLUMN_MAPPING
) AS B
ON A.TABLE_NAME = B.NAME_SALESFORCE_OBJECT AND A.COLUMN_NAME = B.NAME_SALESFORCE_ATTRIBUTE
WHERE B.NAME_SALESFORCE_ATTRIBUTE IS NULL;""".format(schema_name)).collect()
	if missing_columns_table_list:
		# return list of values from result set
		return [row.asDict().get('TABLE_NAME') for row in missing_columns_table_list]
	else:
		return []

In [None]:
# Testing function
get_tables_with_missing_columns_in_mapping(session=session, schema_name='SALESFORCEQASIT')

In [6]:
def get_columns_from_tables_with_missing_columns(session: Session, schema_name:str, table_name:str) -> list:
	"""
		Args:
			schema_name: schema name
			table_name: table name
		Returns:
			list of columns from tables with missing columns in salesforce column mapping table
	"""
	
	columns_from_missing_columns_table_list = session.sql(f"""SELECT A.COLUMN_NAME, LEN(A.COLUMN_NAME) AS COLUMN_NAME_LENGTH
FROM (
    SELECT COLUMN_NAME
    FROM STITCH.INFORMATION_SCHEMA.COLUMNS
    WHERE TABLE_SCHEMA = '{schema_name}'
      AND TABLE_NAME = '{table_name}'
) AS A
LEFT JOIN (
    SELECT NAME_SALESFORCE_ATTRIBUTE
    FROM ACCOUNTADMIN_MGMT.UTILITIES.SALESFORCE_COLUMN_MAPPING
    WHERE NAME_SALESFORCE_OBJECT = '{table_name}'
) AS B
ON A.COLUMN_NAME = B.NAME_SALESFORCE_ATTRIBUTE
WHERE B.NAME_SALESFORCE_ATTRIBUTE IS NULL
ORDER BY 2 DESC;""".format(table_name, schema_name)).collect()
	return [row.asDict() for row in columns_from_missing_columns_table_list]

In [None]:
# Testing function
get_columns_from_tables_with_missing_columns(session=session, schema_name='SALESFORCEQASIT', table_name='SERVICERESOURCESKILL')

In [7]:
def insert_large_columns_into_table(session: Session, table_name:str, column_name:str, column_name_length:int) -> None:
	"""
		Args:
			table_name: table name
			column_name: column name
		Returns:
			None
	"""
	session.sql(f"""MERGE INTO ACCOUNTADMIN_MGMT.UTILITIES.SALESFORCE_LARGE_COLUMN_MAPPING AS TARGET USING (
    				SELECT '{table_name}' AS TABLE_NAME, '{column_name}' AS COLUMN_NAME, {column_name_length} AS COLUMN_NAME_LENGTH
					) AS SOURCE ON TARGET.TABLE_NAME = SOURCE.TABLE_NAME AND TARGET.COLUMN_NAME = SOURCE.COLUMN_NAME
					WHEN MATCHED THEN UPDATE SET TARGET.COLUMN_NAME_LENGTH = SOURCE.COLUMN_NAME_LENGTH
					WHEN NOT MATCHED THEN INSERT (TABLE_NAME, COLUMN_NAME, COLUMN_NAME_LENGTH) VALUES (SOURCE.TABLE_NAME, SOURCE.COLUMN_NAME, SOURCE.COLUMN_NAME_LENGTH);
					""".format(table_name, column_name)).collect()

In [None]:
# Testing function
# insert_large_columns_into_table(session=session, table_name='SERVICEAPPOINTMENT', column_name='FSM_PREVENT_STATUS_CHANGE_TO_CANCEL_FLAG__C', column_name_length=43)

In [8]:
def insert_small_columns_into_table(session: Session, table_name:str, column_name:str) -> None:
	"""
		Args:
			table_name: table name
			column_name: column name
		Returns:
			None
	"""
	session.sql(f"""INSERT INTO ACCOUNTADMIN_MGMT.UTILITIES.SALESFORCE_COLUMN_MAPPING (NAME_SALESFORCE_OBJECT, NAME_SALESFORCE_ATTRIBUTE, NAME_ALIAS) 
			 		VALUES ('{table_name}','{column_name}','{column_name}');""".format(table_name, column_name)).collect()

In [9]:
def send_notification_email(session: Session) -> None:
	"""
		Args:
			None
		Returns:
			None
	"""
	session.sql(f"""CALL SYSTEM$SEND_EMAIL('NOTIFICATION_INTEGRATION_ADMIN','luis.fuentes@compucom.com',
    'SALESFORCE REPLICATION: Large Column Detected',
    'Please check the following table for large columns: ACCOUNTADMIN_MGMT.UTILITIES.SALESFORCE_LARGE_COLUMN_MAPPING');""").collect()


In [10]:
def sp_dynamic_salesforce_column_detector(session: Session) -> str:
	for schema_name in ['SALESFORCEQASIT', 'WORKDAYSANDBOXPREVIEW']:

		print(f"->{schema_name}")
		# Getting tables with missing columns
		tables_with_missing_columns_in_mapping:list = get_tables_with_missing_columns_in_mapping(session=session, schema_name=schema_name)
		
		# if empty list then no missing columns, exist function with success message
		if not tables_with_missing_columns_in_mapping:
			print("No missing columns in salesforce column mapping table")
		
		list_of_table_names:str = []
		
		# if not empty list then get tables from tables list with missing columns
		for table_name in tables_with_missing_columns_in_mapping:
			print(f"--->{table_name}")
			# Getting columns from tables with missing columns
			columns_from_tables_with_missing_columns:list = get_columns_from_tables_with_missing_columns(session=session, schema_name=schema_name, table_name=table_name)
			# Transform list of dictionaries to pandas DataFrame
			columns_from_tables_with_missing_columns_df:pd.DataFrame = pd.DataFrame(columns_from_tables_with_missing_columns)
			# Convert 'COLUMN_NAME_LENGTH' to int
			columns_from_tables_with_missing_columns_df['COLUMN_NAME_LENGTH'] = columns_from_tables_with_missing_columns_df['COLUMN_NAME_LENGTH'].astype(int)
			
			# CASE 1: Getting columns with length greater than 30
			columns_from_tables_with_missing_columns_list_tuples:list = list(columns_from_tables_with_missing_columns_df[columns_from_tables_with_missing_columns_df['COLUMN_NAME_LENGTH'] > 30].itertuples(index=False, name=None))
			# Inserting large columns into table
			for column_name, column_name_length in columns_from_tables_with_missing_columns_list_tuples:
				insert_large_columns_into_table(session=session, table_name=table_name, column_name=column_name, column_name_length=column_name_length)
				list_of_table_names.append(table_name)
				
			# CASE 2: Getting columns with length less than 30
			columns_from_tables_with_missing_columns_list_tuples:list = list(columns_from_tables_with_missing_columns_df[columns_from_tables_with_missing_columns_df['COLUMN_NAME_LENGTH'] <= 30].itertuples(index=False, name=None))
			# Inserting small columns into table
			for column_name, column_name_length in columns_from_tables_with_missing_columns_list_tuples:
				insert_small_columns_into_table(session=session, table_name=table_name, column_name=column_name)
			
		# if list of table names is not empty then send notification email
		if list_of_table_names:
			send_notification_email(session=session)
			print(f"Missing columns in salesforce column mapping table for tables: {list_of_table_names}")
	
	return 'Success'


In [11]:
sp_dynamic_salesforce_column_detector(session=session)

->SALESFORCEQASIT
No missing columns in salesforce column mapping table
->WORKDAYSANDBOXPREVIEW
No missing columns in salesforce column mapping table


'Success'

# Register procedure

In [12]:
session.sproc.register(
	func = sp_dynamic_salesforce_column_detector,
	name = "sp_dynamic_salesforce_column_detector",
	packages = ["snowflake-snowpark-python", "pandas", "numpy"],
	is_permanent = True,
	stage_location = "@ACCOUNTADMIN_MGMT.UTILITIES.ACCOUNTADMIN_MGMT_STAGE",
	source_code_display = False,
	execute_as = 'caller',
	replace=True
)

The version of package 'snowflake-snowpark-python' in the local environment is 1.11.1, which does not fit the criteria for the requirement 'snowflake-snowpark-python'. Your UDF might not work when the package version is different between the server and your local environment.


<snowflake.snowpark.stored_procedure.StoredProcedure at 0x7fffc1fa3370>

In [12]:
# Calling stored Procedure
session.call("sp_dynamic_salesforce_column_detector")

'Success'

# Creating Task

In [None]:
session.use_role("SYSADMIN")
session.use_database("ACCOUNTADMIN_MGMT")
session.use_warehouse("ACCOUNTADMIN_MGMT")
session.use_schema("UTILITIES")

In [None]:
task_run_dynamic_salesforce_column_detector = """
CREATE OR REPLACE TASK ACCOUNTADMIN_MGMT.UTILITIES.TASK_RUN_DYNAMIC_SALESFORCE_COLUMN_DETECTOR
    WAREHOUSE = 'ACCOUNTADMIN_MGMT'
    SCHEDULE = 'USING CRON 0 */5 * * * CET'
    ALLOW_OVERLAPPING_EXECUTION = FALSE
    COMMENT = 'Run dynamic selesforce_column_detector stored procedure every 5 hours'
AS
        CALL ACCOUNTADMIN_MGMT.UTILITIES.SP_DYNAMIC_SALESFORCE_COLUMN_DETECTOR();
"""

In [None]:
session.sql(task_run_dynamic_salesforce_column_detector).collect()

In [None]:
session.sql("ALTER TASK ACCOUNTADMIN_MGMT.UTILITIES.TASK_RUN_DYNAMIC_SALESFORCE_COLUMN_DETECTOR RESUME;").collect()

In [None]:
session.close()