In [33]:
from snowflake.snowpark import Session
from snowflake.snowpark.dataframe import col
import logging


In [34]:
def setup_logger():
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)  # Set the desired log level (INFO, DEBUG, ERROR, etc.)

    # Create a log file handler to write log messages to a file
    log_file = 'pytest.log'
    file_handler = logging.FileHandler(log_file)
    file_handler.setLevel(logging.INFO)  # Set log level for the file handler

    # Create a console handler to print log messages to the console
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)  # Set log level for the console handler

    # Create a formatter to define the format of log messages
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)

    # Add handlers to the logger
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    return logger

logger = setup_logger()

In [45]:
import os

path = './log_'
os.listdir(path=path)

['pytest.log']

In [35]:
user = 'leo'
password = 'SnowFlakeTrial222222'
account = 'woimctp-dw57998'
warehouse='COMPUTE_WH'
role = 'ACCOUNTADMIN'


connection_parameters = {
    "user": user,
    "password" : password,
    "account": account,
    "role": role,
    "warehouse": warehouse
 }

session = Session.builder.configs(connection_parameters).create()

In [36]:
session.use_database(database='HR_DB');
session.use_schema(schema='HR_SCHEMA');
session.use_warehouse(warehouse='COMPUTE_WH');

file_path = r'./transformation_query/query.sql';
file_content = open(file=file_path,mode='r+');
query = file_content.read();

In [37]:
print(query)

select 
SPLIT(TRIM(split(EMPLOYEE_NAME,',')[1]::varchar),' ')[0]::text as FIRSTNAME,TRIM(split(EMPLOYEE_NAME,',')[0]::varchar) as LASTNAME,
-- EMPLOYEE_NAME,
EMPID,SALARY,POSITION,STATE,ZIP,DOB,SEX,DATEOFHIRE,DATEOFTERMINATION,EMPLOYMENTSTATUS,DEPARTMENT,MANAGERNAME,RECRUITMENTSOURCE,PERFORMANCESCORE,LASTPERFORMANCEREVIEW_DATE 
from hr_schema.employee_copy_tbl;


In [41]:
src = session.sql(query=query.replace(';',''));
target = session.table(name='HR_SCHEMA.EMPLOYEE_DETAILS_DYNAMIC');

# print(src,target)

# INPUT for Getting COLUMNS LIST
columns_list = input("Enter the column list with ',' seperated : ").split(',')
list_of_cols = [col_.strip() for col_ in columns_list]

In [42]:
src.count() == target.count()

True

In [43]:
mismatch_cols = []
matched_cols = [tgt_col for src_col,tgt_col in zip(sorted(src.columns),sorted(target.columns)) if src_col == tgt_col]
result = src.select(matched_cols).minus(target.select(matched_cols)).collect()
log_res = 'PASSED' if not bool(result) == True else 'NOT PASSED'
logger.info(f"DATA MISMATCH : {log_res}")
assert not bool(result) == True,f"The Mismatching count is {len(mismatch_cols)} \n and the columns are {','.join(mismatch_cols)}"

2023-10-21 09:22:50,201 - __main__ - INFO - DATA MISMATCH : PASSED
2023-10-21 09:22:50,201 - __main__ - INFO - DATA MISMATCH : PASSED
