Import the required packages and modules

Installing Requirements

In [314]:
#Connector Packages:
from snowflake.snowpark.session import Session
from pyspark.sql import functions as F
import openpyxl

#ML Packages:
from config import *
import numpy as np
import pandas as pd
from scipy.stats import norm
from datetime import datetime

#Other Packages:
import os

In [315]:
db_name = input("DATABASE NAME : ").upper()
schema_name = input("SCHEMA NAME : ").upper()

In [316]:
summary = {}
excel_columns = ["COLUMN_NAME",]

workbook = openpyxl.Workbook()

In [317]:
def snowpark_session():
  conn_params = {
      "account" : "pr91731-production_northeurope",
      "user" : input("Username : "),
      "password" : input("Password : "),
      "database" : db_name,     
      "role" : "DATAENGINEER",
      "warehouse" : "INGESTION_WH"
  }

  session = Session.builder.configs(conn_params).create()

  return session

In [318]:
try:
    snowpark = snowpark_session()
    print("Connected Successfully")
except Exception as e:
    print(e)

Connected Successfully


Connect with the Snowflake to read the data

In [319]:
def write_to_excel_file(worksheet,summary):
    row = 2
    
    #Adding heading
    for excel_column_name in excel_columns:
        # print(excel_column_name,end=" ")
        worksheet.cell(row = 1, column = excel_columns.index(excel_column_name)+1, value = excel_column_name)
    # print()
    for column in summary.keys():
        # print(column,end=" > ")
        worksheet.cell(
            row = row, 
            column = 1,
            value = column.lower()
        )
        
        calculations = summary[column]
        
        for metric in calculations.keys():
            column = excel_columns.index(metric) + 1
            # print("\n\t",column," - ",calculations[metric],end="")
            if(isinstance(calculations[metric],str)):
                cal = calculations[metric].lower()
            else:
                cal = calculations[metric]
            
            worksheet.cell(
                row = row, 
                column = column, 
                value = cal
            )
        row += 1
        # print()
    return worksheet

In [320]:
def show(report,indent = 0,start_delimiter = "[",end_delimiter = "]",seperator = " -> "):    
    if(isinstance(report,list)):
        val = ""
        val += start_delimiter
        for value in report:
            if(report.index(value) == len(report)-1):
                val += str(value)
            else:
                val += str(value)+", "
        val+= end_delimiter
        print(indent+val)
    
    elif(isinstance(report,str)):
        print('"'+report+'"')
    
    
    elif(isinstance(report,dict)):
        for key,value in report.items():
            space = indent*'\t'
            print(f"{space}{key} {seperator} ",end="")
            if(isinstance(value,dict) or isinstance(value,list) or isinstance(value,tuple) or isinstance(value,set)):
                indent += 1
                print("")
                show(indent = indent,report = value)
                indent -=1
            elif(isinstance(value,str)):
                show(indent = indent,report = value)
            else:
                print(f"{value}")

In [321]:
def read_validity_check_list(table_name):
    try:
        validity_workbook = openpyxl.load_workbook("DQ_Validity.xlsx")
        
        if(table_name in validity_workbook.sheetnames):
            
            worksheet = validity_workbook[table_name]
            
            check_list = {}
            for row in worksheet.iter_rows(min_row=2, values_only=True):
                outer_key = row[0]
                inner_dict = {}
                for col_num, value in enumerate(row[2:], start=3):
                    if value is not None:
                        header_value = worksheet.cell(row=1, column=col_num).value
                        inner_dict[header_value] = value
                check_list[outer_key] = inner_dict
                
            validity_workbook.close()
        return check_list
    except Exception as e:
        print(e)
        return False

In [322]:
def check_validity(data_set, validation_dict):
    
    results = {}

    for col, check_dict in validation_dict.items():
        result = check_column(col, data_set, check_dict)
        results[col] = result

    return results

def check_column(col, data_set, check_dict):
    
    total_rows = data_set.shape[0]
    failed_validations = 0

    for key, check in check_dict.items():
        if key.lower() == "format":
            if check == "alnum":
                min_length = check_dict.get("min_length", None)
                max_length = check_dict.get("max_length", None)
                failed_validations += count_failed_validations(col, data_set, min_length, max_length)

        # Add more checks as needed

    success_percentage = ((total_rows - failed_validations) / total_rows) * 100 if total_rows > 0 else 0
    return {"success_percentage": success_percentage}

def count_failed_validations(col, data_set, min_length, max_length):

    failed_validations = 0

    for value in data_set[col]:
        if not isinstance(value, str):
            failed_validations += 1
        elif not value.isalnum() or (min_length is not None and len(value) < min_length) or (max_length is not None and len(value) > max_length):
            failed_validations += 1

    return failed_validations

In [329]:
def data_quality_check(table_name):
  try:
    
    ddl_sql = f"DESCRIBE TABLE {db_name}.{schema_name}.{table_name}"
    ddl = snowpark.sql(ddl_sql).collect()
    
    #collecting the data structure(Datatype)/schematics of the table for identifying the 
    data_sql = f"SELECT * FROM {db_name}.{schema_name}.{table_name}"
    data = snowpark.sql(data_sql)
    
    for row in ddl:
      
      col = row["name"]
      summary[col] = {}
      
      summary[col]["DATA_TYPE"] = row["type"]
      if("DATA_TYPE" not in excel_columns):
        excel_columns.append("DATA_TYPE")
      
      #FINDING THE NULL COUNT IN EACH COLUMN
      summary[col]["NULL_COUNT"] = data.filter(data['"'+col+'"'].isNull()).count()
      if("NULL_COUNT" not in excel_columns):
        excel_columns.append("NULL_COUNT")

      #FINDING THE TOTAL COUNT IN EACH COLUMN
      summary[col]["TOTAL_COUNT"] = data.count()
      if("TOTAL_COUNT" not in excel_columns):
        excel_columns.append("TOTAL_COUNT")
      
      #FINDING THE NOT-NULL COUNT IN EACH COLUMN
      summary[col]["NOT_NULL_COUNT"] = summary[col]["TOTAL_COUNT"] - summary[col]["NULL_COUNT"]
      if("NOT_NULL_COUNT" not in excel_columns):
        excel_columns.append("NOT_NULL_COUNT")
      
      #FINDING THE UNIQUE(DISTINCT) VALUE COUNT IN EACH COLUMN
      summary[col]["UNIQUE_COUNT"] = data.select('"'+col+'"').distinct().count() - 1
      if("UNIQUE_COUNT" not in excel_columns):
        excel_columns.append("UNIQUE_COUNT")
        
      #FINDING THE REPEARING/DUPLICATE VALUE COUNT IN EACH COLUMN EXCLUDING THE UNIQUE COUNT
      summary[col]["DUPLICATE"] = summary[col]["NOT_NULL_COUNT"] - summary[col]["UNIQUE_COUNT"]
      if("DUPLICATE" not in excel_columns):
        excel_columns.append("DUPLICATE")
      
      #Completeness
      summary[col]["COMPLETENESS"] ="{:.2f}".format((summary[col]["NOT_NULL_COUNT"] / summary[col]["TOTAL_COUNT"])*100)
      if "COMPLETENESS" not in excel_columns:
        excel_columns.append("COMPLETENESS")
        
      summary[col]["UNIQUENESS"] ="{:.2f}".format((summary[col]["NULL_COUNT"] / summary[col]["TOTAL_COUNT"])*100)
      if "UNIQUENESS" not in excel_columns:
        excel_columns.append("UNIQUENESS")
      
      
      summary[col]["ACCURACY"] ="{:.2f}".format((summary[col]["NULL_COUNT"] / summary[col]["TOTAL_COUNT"])*100)
      if "ACCURACY" not in excel_columns:
        excel_columns.append("ACCURACY")
    
      check_list = read_validity_check_list(table_name)
      
      if(check_list):
        col_check_list = check_list[col.lower()]

        print(data['"'+col+'"']," : ",col_check_list)
        # check_validity(data['"'+col+'"'],col_check_list)
        
      else:
        summary[col]["Error"] = "Cannot perform validity as checklist not available"
        if("Error" not in excel_columns):
          excel_columns.append("Error")
      
  except Exception as e:
    print("Error in calculating common data types - ",e)
    
  return summary

Initialising the required variables

In [330]:
all_tables = snowpark.sql(f"SHOW TABLES IN {db_name}.{schema_name}").collect()
table_names = [row["name"] for row in all_tables]

download_folder_path = os.path.expanduser("~" + os.path.sep + "Downloads")
report_path = os.path.join(download_folder_path, 'Data_Quality_Report.xlsx')

for table_name in table_names:
    print(f"Table selected : {table_name}")
    
    if(table_name == 'COHORT_STAGING_MASTER_DATA'):
        
        if(len(workbook.sheetnames) == 1):
            new_worksheet = workbook.active
            new_worksheet.title = table_name
        else:
            new_worksheet = workbook.create_sheet(title = table_name)
        
        report_summary = data_quality_check(table_name)
        # show(report = report_summary)    
        new_worksheet = write_to_excel_file(new_worksheet,report_summary,)
    
downloads_folder = os.path.expanduser("~" + os.path.sep + "Downloads")
file_path = os.path.join(downloads_folder,'Data_Quality_Report_'+schema_name+"("+str(datetime.now().strftime("%d-%m-%Y"))+').xlsx')
workbook.save(file_path)
print("File created...")

Table selected : ACCOUNTS_RECC_REOCC
Table selected : COHORT_STAGING_MASTER_DATA
Column["Company"]  :  {'format': 'string'}
Column["Client Account"]  :  {'format': 'string'}
Column["SF/FF Client Account ID"]  :  {'format': 'alnum', 'min_length': 4, 'max_length': 8}
Column["Entity Name"]  :  {'format': 'string'}
Column["FF Entity Code"]  :  {'format': 'alnum', 'min_length': 4, 'max_length': 8}
Column["Customer"]  :  {'format': 'alnum', 'min_length': 4, 'max_length': 8}
Column["Client Group Name"]  :  {'format': 'string'}
Column["Region"]  :  {'format': 'string'}
Column["GLA"]  :  {'format': 'alnum'}
Column["Acquired date"]  :  {'format': 'num', 'min_length': 4, 'max_length': 4, 'max_decimals': 0}
Column["DIM 1"]  :  {'format': 'string', 'min_length': 4, 'max_length': 4}
Column["Year"]  :  {'format': 'num', 'min_length': 4, 'max_length': 4}
Column["Currency"]  :  {'format': 'string', 'min_length': 3, 'max_length': 3}
Column["Jan"]  :  {'format': 'num', 'max_decimals': 2}
Column["Feb"]  :

Validation

In [325]:
#Validation

workbook = load_workbook(file_path)

# Get the number of sheets
number_of_sheets = len(workbook.sheetnames)

snowpark.sql("USE {db_name};")
snowflake_tables = list(snowpark.sql("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{schema_name}';").collect())

excel_sheets = list(workbook.sheetnames)

print(len(snowflake_tables),len(excel_sheets))

print("Worksheet not created for : ",set(snowflake_tables)-set(excel_sheets))
print("Unwanted worksheet :", set(excel_sheets)-set(snowflake_tables))

0 1
Worksheet not created for :  set()
Unwanted worksheet : {'COHORT_STAGING_MASTER_DATA'}
