In [0]:
dbutils.widgets.removeAll()
dbutils.widgets.text("Raw_File_Path","/mnt/data/load_date=2023-07-17/101_Daily_Sales.014.20230606062634.csv")
rawFilePath=dbutils.widgets.get("Raw_File_Path")
dbutils.widgets.text("Gold_File_Path","/mnt/data/gold/101_Daily_Sales.014.20230606062634_gold.csv")
goldFilePath=dbutils.widgets.get("Gold_File_Path")


In [0]:
from pyspark.sql.functions import col
dfs= spark.read.option("header","true").csv(rawFilePath)
dfs.createOrReplaceTempView('vwSrcData')
bronzSql="select * from vwSrcData where DIV is not null and STORE is not null  and ITEM_CONSUMER_UPC is not null and POS_DATE is not null and POS_NET_DOL_AMOUNT is not null and POS_UOM_QTY is not null and POS_UNITS is not null " 
silverDf=spark.sql(bronzSql)
i_df = silverDf.filter("UPC_TYPE='I'")
c_df =  silverDf.filter("UPC_TYPE='C'").join(i_df,["DIV","STORE","ITEM_CONSUMER_UPC"],"left_anti")
combined_df = i_df.unionByName(c_df)
combined_df.createOrReplaceTempView('vwSilverData')
rollupSql="SELECT  DIV,STORE, ITEM_CONSUMER_UPC, POS_DATE,SUM(POS_NET_DOL_AMOUNT) AS POS_NET_DOL_AMOUNT, SUM(POS_UOM_QTY) AS POS_UOM_QTY, SUM(POS_UNITS) AS POS_UNITS, FIRST(POS_UOM) AS POS_UOM,ITEM_SRC_DIV, ITEM_SRC_LOC,FIRST(MODALITY) AS MODALITY,FIRST(FULFILLMENT) AS FULFILLMENT,FIRST(UPC_TYPE) AS UPC_TYPE from vwSilverData GROUP BY  DIV,STORE, ITEM_CONSUMER_UPC, POS_DATE, ITEM_SRC_DIV, ITEM_SRC_LOC"
trollup_df = spark.sql(rollupSql)
df1=trollup_df.drop("FULFILLMENT","MODALITY","UPC_TYPE")
df1.createOrReplaceTempView("src_vw")
df2= spark.read.option("header","true").csv(goldFilePath)
df2.createOrReplaceTempView("gld_vw")

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark_session = SparkSession.builder.appName('spark_Session').getOrCreate()
emp_RDD = spark_session.sparkContext.emptyRDD()
columns = StructType([
                      StructField('TestCaseSummary',StringType(),False),
                      StructField('TestCaseDescription',StringType(),False),
                      StructField('Status', StringType(), False),
                      StructField('Comments', StringType(), False),
                      StructField('Test_Data', StringType(), False)
                    ])
testResult = spark_session.createDataFrame(data=emp_RDD,schema=columns)


# Count Validation

In [0]:
dfsrccnt=spark.sql("select count(1) from src_vw minus select count(1) from gld_vw union all select count(1) from gld_vw minus select count(1) from src_vw")
dfsrccountunion=spark.sql("select 'src' as tabnm, count(1) from src_vw union all select 'gld' as tabnm, count(1) from gld_vw")
dfrowcount=dfsrccnt.count()
testResult=[]
if dfrowcount<1:
    testResult.append(('Count comparison vs target','Count comparison rolled_up raw file vs ETL Ready file', 'Pass', 'Record count is matching', 'count check'))
else:
    testResult.append(('Count comparison vs target','Count comparison raw file vs ETL Ready file', 'Fail', 'Record count is not matching, please check dfsrccountunion', 'count check'))
    

# Aggregate Validation

In [0]:
dfaggdiff=spark.sql("select  sum(cast(POS_NET_DOL_AMOUNT as decimal(8,3))) , sum(cast(POS_UOM_QTY as decimal(18,4))), sum(POS_UNITS) from src_vw minus select sum(cast(POS_NET_DOL_AMOUNT as decimal(8,3))),sum(cast(POS_UOM_QTY as decimal(18,4))), sum(POS_UNITS) from gld_vw")
dfsaggunion=spark.sql("select 'src' as tabnm, sum(cast(POS_NET_DOL_AMOUNT as decimal(8,3))),sum(cast(POS_UOM_QTY as decimal(18,4))), sum(POS_UNITS) from src_vw union all select 'gld' as tabnm, sum(cast(POS_NET_DOL_AMOUNT as decimal(8,3))),sum(cast(POS_UOM_QTY as decimal(18,4))), sum(POS_UNITS) from gld_vw")
dfaggcount=dfaggdiff.count()
if dfaggcount<1:
    testResult.append(('Aggregate validation','Aggregate validation rolled up raw file vs ETL Ready file', 'Pass', 'Sum of Amount and unit values are matching', 'aggregate validation'))
else:
    testResult.append(('Aggregate validation', 'Aggregate validation rolled up raw file vs ETL Ready file', 'Fail', 'Sum of Amount and unit values are not matching please refer to dfsaggunion', 'aggregate validation'))
    

# Minus query

In [0]:
dfsminust=spark.sql("select DIV,STORE,ITEM_CONSUMER_UPC,POS_DATE,cast(POS_NET_DOL_AMOUNT as decimal (8,3)),cast(POS_UOM_QTY as decimal (4,2)),cast(POS_UNITS as decimal) from src_vw minus select lpad(DIV,3,'0'),lpad(STORE,5,'0'),lpad(ITEM_CONSUMER_UPC,13,'0'),POS_DATE,cast(POS_NET_DOL_AMOUNT as decimal (8,3)),cast(POS_UOM_QTY as decimal (4,2)),cast(POS_UNITS as decimal) from gld_vw")
dftminuss=spark.sql("select lpad(DIV,3,'0'),lpad(STORE,5,'0'),lpad(ITEM_CONSUMER_UPC,13,'0'),POS_DATE,cast(POS_NET_DOL_AMOUNT as decimal (8,3)),cast(POS_UOM_QTY as decimal (4,2)),cast(POS_UNITS as decimal) from gld_vw minus select DIV,STORE,ITEM_CONSUMER_UPC,POS_DATE,cast(POS_NET_DOL_AMOUNT as decimal (8,3)),cast(POS_UOM_QTY as decimal (4,2)),cast(POS_UNITS as decimal) from src_vw")
dfminusunion=dfsminust.union(dftminuss)
dfminuscount=dfminusunion.count()
if dfminuscount<1:
    testResult.append(('Column level data validation','Minus query rolled-up raw file vs ETL Ready file', 'Pass', 'Data matches raw file vs ETL Ready file','full validation check'))
else:
    testResult.append(('Column level data validation','Minus query rolled-up raw file vs ETL Ready file', 'Fail', 'Data does not match, please check dfminusunion', 'full validation check'))
    

# Duplicate check on KCMS data

In [0]:
dfdup=spark.sql("select DIV,STORE,ITEM_CONSUMER_UPC,POS_DATE,POS_NET_DOL_AMOUNT,POS_UOM_QTY,POS_UNITS,POS_UOM,ITEM_SRC_DIV,ITEM_SRC_LOC from gld_vw group by DIV,STORE,ITEM_CONSUMER_UPC,POS_DATE,POS_NET_DOL_AMOUNT,POS_UOM_QTY,POS_UNITS,POS_UOM,ITEM_SRC_DIV,ITEM_SRC_LOC having count(1)>2")
dfdupcount=dfdup.count()
if dfdupcount<1:
    testResult.append(('Duplicate check', 'Check for dupliacte records', 'Pass', 'No duplicate records', 'duplicate check'))
else:
    testResult.append(('Duplicate check', 'Check for dupliacte records', 'Fail', 'Duplicate records are foundcheck dfdup', 'duplicate check'))
    

# Result Dashboard

In [0]:
dfResult = spark_session.createDataFrame (data=testResult, schema=columns)
dfResult.createOrReplaceTempView("testResult_vw")
dfResult.display()

TestCaseSummary,TestCaseDescription,Status,Comments,Test_Data
Count comparison vs target,Count comparison rolled_up raw file vs ETL Ready file,Pass,Record count is matching,count check
Aggregate validation,Aggregate validation rolled up raw file vs ETL Ready file,Pass,Sum of Amount and unit values are matching,aggregate validation
Column level data validation,Minus query rolled-up raw file vs ETL Ready file,Pass,Data matches raw file vs ETL Ready file,full validation check
Duplicate check,Check for dupliacte records,Pass,No duplicate records,duplicate check


# Analysis

In [0]:
display(dfsaggunion)

tabnm,"sum(CAST(POS_NET_DOL_AMOUNT AS DECIMAL(8,3)))","sum(CAST(POS_UOM_QTY AS DECIMAL(18,4)))",sum(POS_UNITS)
src,20231489.032,1206146.2691,5859420.0
gld,20231489.032,1206146.2691,5859420.0


In [0]:
dfResult.display()

TestCaseSummary,TestCaseDescription,Status,Comments,Test_Data
Count comparison vs target,Count comparison rolled_up raw file vs ETL Ready file,Pass,Record count is matching,count check
Aggregate validation,Aggregate validation rolled up raw file vs ETL Ready file,Pass,Sum of Amount and unit values are matching,aggregate validation
Column level data validation,Minus query rolled-up raw file vs ETL Ready file,Pass,Data matches raw file vs ETL Ready file,full validation check
Duplicate check,Check for dupliacte records,Pass,No duplicate records,duplicate check


In [0]:
import pandas as pd

testcases_data = [
("TestCase1","Count comparison vs target","Counts in both the dataframe must be equal","","High","kon7198","count check","Count are equal in source and target","DEART-23755"),
("TestCase2","Aggregate validation","Aggregate validation rolled up raw file vs ETL Ready file","","High","kon7198","aggregate validation","Sum of Amount and unit values are matching","DEART-23755"),
("TestCase3","Column level data validation","Column level data validation source vs target","","High","kon7198","full validation check","Values in source and target dataframe are matching","DEART-23755"),
("TestCase4","Check for dupliacte records","Duplicates should not be present","","High","kon7198","duplicate check","No duplicates are present","DEART-23755")
]

testcases_schema = ["TestCases", "Summary", "Description", "TestStatus", "Priority", "Assignee", "Test_Data", "Expected_Result", "JIRA"] 

df_testcases = spark.createDataFrame(testcases_data, testcases_schema)

df_testcases.display()

TestCases,Summary,Description,TestStatus,Priority,Assignee,Test_Data,Expected_Result,JIRA
TestCase1,Count comparison vs target,Counts in both the dataframe must be equal,,High,kon7198,count check,Count are equal in source and target,DEART-23755
TestCase2,Aggregate validation,Aggregate validation rolled up raw file vs ETL Ready file,,High,kon7198,aggregate validation,Sum of Amount and unit values are matching,DEART-23755
TestCase3,Column level data validation,Column level data validation source vs target,,High,kon7198,full validation check,Values in source and target dataframe are matching,DEART-23755
TestCase4,Check for dupliacte records,Duplicates should not be present,,High,kon7198,duplicate check,No duplicates are present,DEART-23755




In [0]:
testresults_df = dfResult.join(df_testcases, on = "Test_Data", how = "left")
testresults_df.display()


Test_Data,TestCaseSummary,TestCaseDescription,Status,Comments,TestCases,Summary,Description,TestStatus,Priority,Assignee,Expected_Result,JIRA
count check,Count comparison vs target,Count comparison rolled_up raw file vs ETL Ready file,Pass,Record count is matching,TestCase1,Count comparison vs target,Counts in both the dataframe must be equal,,High,kon7198,Count are equal in source and target,DEART-23755
aggregate validation,Aggregate validation,Aggregate validation rolled up raw file vs ETL Ready file,Pass,Sum of Amount and unit values are matching,TestCase2,Aggregate validation,Aggregate validation rolled up raw file vs ETL Ready file,,High,kon7198,Sum of Amount and unit values are matching,DEART-23755
full validation check,Column level data validation,Minus query rolled-up raw file vs ETL Ready file,Pass,Data matches raw file vs ETL Ready file,TestCase3,Column level data validation,Column level data validation source vs target,,High,kon7198,Values in source and target dataframe are matching,DEART-23755
duplicate check,Duplicate check,Check for dupliacte records,Pass,No duplicate records,TestCase4,Check for dupliacte records,Duplicates should not be present,,High,kon7198,No duplicates are present,DEART-23755


In [0]:
import pandas as pd
import os

import json
from pyspark.dbutils import DBUtils

testresults_df1 = testresults_df.select("TestCases", "Summary", "Description", "Status", "Priority", "Assignee","Test_Data", "Expected_Result", "JIRA")
#testresults_df1.display()

#Load Test results in Json format:
json_data = testresults_df1.toJSON().collect()
print(json_data,"\n")

testresults_json = {}

for json_string in json_data:
    json_object = json.loads(json_string)
    test_case =json_object['TestCases']
    del json_object['TestCases']
    testresults_json[test_case] = json_object

print(testresults_json,"\n")

#Show Test Results in Json format:
print(json.dumps(testresults_json, indent=4))

#Load Test Results in Json file: to databricks default directory: /databricks/driver
#testresults_json_file = testresults_json.to_json('TestResults_QMetry_Integration.json',orient = 'index',indent = 4)

with open('/dbfs/Ravi/pyvalidata//pyvalidata_DIEM_TestResults_json_file', 'w') as json_file:
    json.dump(testresults_json, json_file, indent=4)

print("Current working directory:", os.getcwd())
#testresults_json_file1.json


['{"TestCases":"TestCase1","Summary":"Count comparison vs target","Description":"Counts in both the dataframe must be equal","Status":"Pass","Priority":"High","Assignee":"kon7198","Test_Data":"count check","Expected_Result":"Count are equal in source and target","JIRA":"DEART-23755"}', '{"TestCases":"TestCase2","Summary":"Aggregate validation","Description":"Aggregate validation rolled up raw file vs ETL Ready file","Status":"Pass","Priority":"High","Assignee":"kon7198","Test_Data":"aggregate validation","Expected_Result":"Sum of Amount and unit values are matching","JIRA":"DEART-23755"}', '{"TestCases":"TestCase3","Summary":"Column level data validation","Description":"Column level data validation source vs target","Status":"Pass","Priority":"High","Assignee":"kon7198","Test_Data":"full validation check","Expected_Result":"Values in source and target dataframe are matching","JIRA":"DEART-23755"}', '{"TestCases":"TestCase4","Summary":"Check for dupliacte records","Description":"Duplica

In [0]:
json_string1 = json.dumps(testresults_json, indent=4)
json_sring2 = json.dumps(json_string1, indent=4)

with open('/dbfs/Ravi/pyvalidata//pyvalidata_DIEM_TestResults_json_file.json', 'w') as json_file:
    json_file.write(json_sring2)

#Saving Test Results to json file in databricks directory: to user defined directory:
TestResults_Saved_Path = "/dbfs/Ravi/pyvalidata//pyvalidata_DIEM_TestResults_json_file.json"
print("Fle saved at:", TestResults_Saved_Path)

#Reading Test Results form Json file in databricks path:
with open(TestResults_Saved_Path, 'r') as json_file:
    json_data = json_file.read()
#print(json_data) -- Ths giving unformatted data

test_cases_dictionary_json = json.loads(json_data)

print(test_cases_dictionary_json)

Fle saved at: /dbfs/Ravi/pyvalidata//pyvalidata_DIEM_TestResults_json_file.json
{
    "TestCase1": {
        "Summary": "Count comparison vs target",
        "Description": "Counts in both the dataframe must be equal",
        "Status": "Pass",
        "Priority": "High",
        "Assignee": "kon7198",
        "Test_Data": "count check",
        "Expected_Result": "Count are equal in source and target",
        "JIRA": "DEART-23755"
    },
    "TestCase2": {
        "Summary": "Aggregate validation",
        "Description": "Aggregate validation rolled up raw file vs ETL Ready file",
        "Status": "Pass",
        "Priority": "High",
        "Assignee": "kon7198",
        "Test_Data": "aggregate validation",
        "Expected_Result": "Sum of Amount and unit values are matching",
        "JIRA": "DEART-23755"
    },
    "TestCase3": {
        "Summary": "Column level data validation",
        "Description": "Column level data validation source vs target",
        "Status": "Pass",
  