In [5]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
builder = (
    SparkSession.builder
        .appName('lakeHouseApp')
        .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
        .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')
    )
spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [6]:
from abc import ABC, abstractmethod

class Expectation(ABC):
    def __init__(self, column, dimension, add_info = {}):
        self.column = column
        self.dimension = dimension
        self.add_info = add_info

    @abstractmethod
    def test(self, ge_df):
        pass

In [7]:
class NotNullExpectation(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_values_to_not_be_null(column=self.column,
                                                  meta = {"dimension": self.dimension},
                                                  result_format="COMPLETE")

In [8]:
class UniqueExpectation(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_values_to_be_unique(column=self.column,
                                                meta = {"dimension": self.dimension}, 
                                                result_format="COMPLETE")

In [9]:
class ValuesInListExpectation(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_values_to_be_in_set(column=self.column, 
                                                value_set=self.add_info["value_set"],
                                                meta = {"dimension": self.dimension}, 
                                                result_format="COMPLETE")

In [10]:
class ValueLengthsToEqual(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_value_lengths_to_equal(column=self.column, 
                                                   value=self.add_info["value"], 
                                                   meta = {"dimension": self.dimension}, 
                                                   result_format="COMPLETE")

In [11]:
class ValueLengthsToBeBetween(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_value_lengths_to_be_between(column=self.column, 
                                                        min_value=self.add_info["min_value"], 
                                                        max_value=self.add_info["max_value"],  
                                                        meta = {"dimension": self.dimension}, 
                                                        result_format="COMPLETE")

In [12]:
class ValueToBeBetween(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_values_to_be_between(column=self.column, 
                                                 min_value=self.add_info["min_value"], 
                                                 max_value=self.add_info["max_value"],  
                                                 meta = {"dimension": self.dimension}, 
                                                 result_format="COMPLETE")

In [13]:
class ValueToBeGreaterThan(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_values_to_be_between(column=self.column, 
                                                 min_value=self.add_info["min_value"], 
                                                 strict_min=True,  
                                                 meta = {"dimension": self.dimension}, 
                                                 result_format="COMPLETE")

In [14]:
class ValueToBeGreaterThanOrEqualTo(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_values_to_be_between(column=self.column, 
                                                 min_value=self.add_info["min_value"],  
                                                 meta = {"dimension": self.dimension}, 
                                                 result_format="COMPLETE")

In [15]:
class ValueToBeLessThan(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_values_to_be_between(column=self.column, 
                                                 max_value=self.add_info["max_value"], 
                                                 strict_max=True, 
                                                 meta = {"dimension": self.dimension}, 
                                                 result_format="COMPLETE")

In [16]:
class ValueToBeLessThanOrEqualTo(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_values_to_be_between(column=self.column, 
                                                 max_value=self.add_info["max_value"],  
                                                 meta = {"dimension": self.dimension}, 
                                                 result_format="COMPLETE")

In [17]:
class ValueToBeDateutilParseable(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_values_to_be_dateutil_parseable(column=self.column,  
                                                            meta = {"dimension": self.dimension}, 
                                                            result_format="COMPLETE")

In [18]:
class ValueToBeJsonParseable(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_values_to_be_json_parseable(column=self.column,  
                                                        meta = {"dimension": self.dimension}, 
                                                        result_format="COMPLETE")

In [19]:
class ValueToBeOfType(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_values_to_be_of_type(column=self.column, 
                                                 type_=self.add_info["type"], 
                                                 meta = {"dimension": self.dimension}, 
                                                 result_format="COMPLETE")

In [20]:
class ValuesToMatchRegex(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_values_to_match_regex(column=self.column, 
                                                  regex =self.add_info["regex"], 
                                                  meta = {"dimension": self.dimension}, 
                                                  result_format="COMPLETE")

In [21]:
class ValuesToNotMatchRegex(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_values_to_not_match_regex(column=self.column, 
                                                      regex =self.add_info["regex"], 
                                                      meta = {"dimension": self.dimension}, 
                                                      result_format="COMPLETE")

In [22]:
class ValuesToMatchStrftimeFormat(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_values_to_match_strftime_format(column=self.column,
                                                            strftime_format=self.add_info["format"],
                                                            meta = {"dimension": self.dimension},
                                                            result_format="COMPLETE")

In [27]:
class UniqueValuesCountToBeBetween(Expectation):
    def __init__(self, column, dimension, add_info = {}):
        super().__init__(column, dimension, add_info)

    def test(self, ge_df):
        ge_df.expect_column_unique_value_count_to_be_between(column=self.column,
                                                            min_value=self.add_info["min_value"],
                                                            max_value=self.add_info["max_value"], 
                                                            meta = {"dimension": self.dimension},
                                                            result_format="COMPLETE")

In [42]:
import json

class JSONFileReader:
    def __init__(self, filename):
        self.filename = filename

    def read(self):
        with open(self.filename) as f:
            return json.load(f)

In [50]:
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset

class DataQuality:

    def __init__(self, pyspark_df, config_path):
        self.pyspark_df = pyspark_df
        self.config_path = config_path

    def rule_mapping(self, dq_rule):
        return {
            "check_values_to_not_be_null": "NotNullExpectation",
            "check_values_to_be_unique": "UniqueExpectation",
            "check_values_to_be_in_set": "ValuesInListExpectation",
            "check_value_lengths_to_equal": "ValueLengthsToEqual",
            "check_value_lengths_to_be_between": "ValueLengthsToBeBetween",
            "check_values_to_be_between": "ValueToBeBetween",
            "check_values_to_be_greater_than": "ValueToBeGreaterThan",
            "check_values_to_be_greater_than_or_equal_to": "ValueToBeGreaterThanOrEqualTo",
            "check_values_to_be_less_than": "ValueToBeLessThan",
            "check_values_to_be_less_than_or_equal_to": "ValueToBeLessThanOrEqualTo",
            "check_values_to_be_dateutil_parseable": "ValueToBeDateutilParseable",
            "check_values_to_be_json_parseable": "ValueToBeJsonParseable",
            "check_values_to_be_of_type": "ValueToBeOfType",
            "check_values_to_match_regex": "ValuesToMatchRegex",
            "check_values_to_not_match_regex": "ValuesToNotMatchRegex",
            "check_values_to_match_strftime_format": "ValuesToMatchStrftimeFormat",
            "check_unique_value_count_to_be_between": "UniqueValuesCountToBeBetween"
        }[dq_rule]

    def _get_expectation(self):
        class_obj = globals()[self.rule_mapping()]
        return class_obj(self.extractor_args)
    
    def convert_to_ge_df(self):
        return SparkDFDataset(self.pyspark_df)
    
    def read_config(self):
        json_reader = JSONFileReader(self.config_path)
        return json_reader.read()
      
    def run_test(self):
        ge_df = self.convert_to_ge_df()
        config = self.read_config()
        # config = json.load(conf)
        print(config)
        for column in config["columns"]:
            if column["dq_rule(s)"] is None:
                continue
            for dq_rule in column["dq_rule(s)"]:
                expectation_obj = globals()[self.rule_mapping(dq_rule["rule_name"])]
                expectation_instance = expectation_obj(column["column_name"], dq_rule["rule_dimension"], dq_rule["add_info"])
                expectation_instance.test(ge_df)

        dq_results = ge_df.validate()
        return dq_results

In [59]:
def create_df_from_dq_results(spark, dq_results):
    dq_data = []
    # print(dq_results)
    for result in dq_results["results"]:
        if result["success"] == True:
            status = 'PASSED'
        else:
            status = 'FAILED'
        unexpected_list = result["result"]["unexpected_list"]
        # print(len(unexpected_list))
        dq_data.append((
        result["expectation_config"]["kwargs"]["column"],
        result["expectation_config"]["meta"]["dimension"],
        status,
        result["expectation_config"]["expectation_type"],
        result["result"]["unexpected_count"],
        result["result"]["element_count"],
        result["result"]["unexpected_percent"],
        float(100-result["result"]["unexpected_percent"]),
        result["result"]["unexpected_list"])
        )
    dq_columns = ["column", "dimension", "status", "expectation_type", "unexpected_count", "element_count", "unexpected_percent", "percent","unexpected_values"]
    dq_df = spark.createDataFrame(data=dq_data,schema=dq_columns)
    return dq_df

In [24]:
def dqcheck(dqdf , config_path):
    dq = DataQuality(dqdf, config_path)
    dq_results = dq.run_test()
    dq_df = create_df_from_dq_results(spark, dq_results)
    return(dq_df)

In [2]:
df=spark.read.parquet("landing/mongodb/WideWorldImporters/Colors")

                                                                                

In [92]:
dq_df=dqcheck(df,"dqconfig/Colors.json")

24/04/04 22:47:29 WARN CacheManager: Asked to cache already cached data.


{'data_product_name': 'Colors', 'columns': [{'column_name': 'ColorName', 'dq_rule(s)': [{'rule_name': 'check_values_to_not_be_null', 'rule_dimension': 'Completeness', 'add_info': {}}, {'rule_name': 'check_values_to_be_unique', 'rule_dimension': 'Uniqueness', 'add_info': {}}, {'rule_name': 'check_values_to_be_in_set', 'rule_dimension': 'Validity', 'add_info': {'value_set': ['Red', 'Green', 'Yellow', 'Hindi', 'Sanskrit']}}]}, {'column_name': 'ColorID', 'dq_rule(s)': [{'rule_name': 'check_values_to_not_be_null', 'rule_dimension': 'Completeness', 'add_info': {}}, {'rule_name': 'check_values_to_be_unique', 'rule_dimension': 'Uniqueness', 'add_info': {}}, {'rule_name': 'check_values_to_be_in_set', 'rule_dimension': 'Validity', 'add_info': {'value_set': [1, 2, 3, 4, 5, 6]}}]}]}


24/04/04 22:47:29 WARN CacheManager: Asked to cache already cached data.
24/04/04 22:47:29 WARN CacheManager: Asked to cache already cached data.


In [93]:
dq_df.show(truncate=False)

+---------+------------+------+-----------------------------------+----------------+-------------+------------------+------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|column   |dimension   |status|expectation_type                   |unexpected_count|element_count|unexpected_percent|percent           |unexpected_values                                                                                                                                                                                                                                                                                       |
+---------+------------+------+-----------------------------------+----------------+-------------+------------------+---------------

In [94]:
dq_df.createOrReplaceTempView("dqtbl")

In [95]:
import pyspark.sql.types as T
from pyspark.sql.functions import col
import pandas as pd
import json

def sparkSchemaBuild(spark,dataDict_config_path,data_dict_mapping_config_path,tableNm):

    dataDictDF = spark.read.option("multiline","true").json(path=dataDict_config_path)
    pdf = pd.DataFrame()
    f = open(data_dict_mapping_config_path)
    data = json.load(f)
    f.close()
    sourceColType = []
    targetColType = []
    for (k, v) in data.items():
        sourceColType.append(k)
        targetColType.append(str(v))

    data = {'sourceColType': sourceColType,
    'targetColType': targetColType}
# Convert the dictionary into DataFrame
    pdf = pd.DataFrame(data)
    datatypeDF=spark.createDataFrame(pdf) 
    dataDictDF.createOrReplaceTempView("dataDict")
    datatypeDF.createOrReplaceTempView("dataType")
    schemaDF = spark.sql("select dataDict.ColumnName || ' ' || dataType.targetColType  ddl_schema_string ,  CAST(ColumnOrdinal AS INT) ORDINAL_POSITION from dataDict JOIN dataType ON dataDict.ColumnDataType = dataType.sourceColType where  dataDict.TableName = '" +tableNm+"'")
    schemaDF =schemaDF.sort(col("ORDINAL_POSITION"))
    ddl_schema = schemaDF.collect()
    ddl_schema_string = ''
    for x in ddl_schema:
        ddl_schema_string = ddl_schema_string + ',' + x[0]
    ddl_schema_string = ddl_schema_string.lstrip(',')
    return ddl_schema_string
    # ddl_schema = T._parse_datatype_string(ddl_schema_string.lstrip(','))
    # return ddl_schema

In [96]:
dataDictConfigPath = "/home/animesh/pysparketlframework/lakeHouseBuildFrameWork/etl-config/datadict.json"
dataDictMappingConfigPath = "/home/animesh/pysparketlframework/lakeHouseBuildFrameWork/etl-config/mongoToSparkMapping.json"
tableNm="Colors"
ddl_schema = sparkSchemaBuild(spark,dataDictConfigPath,dataDictMappingConfigPath,tableNm)
print(ddl_schema)
colList = ddl_schema.split(",")

ColorID int,ColorName string,LastEditedBy int,ValidFrom timestamp,ValidTo timestamp


In [100]:
from pyspark.sql import functions as F

df.createOrReplaceTempView("srctbl")
failedDF = spark.sql("select * from dqtbl where status = 'FAILED'")
rows_looped = failedDF.select("column", "unexpected_values","dimension").collect()

for rows in rows_looped:
       
        print(rows[0])
        for col in colList:
            if col.split(" ")[0] == rows[0]:
                dtype = col.split(" ")[1]
                break
        print(dtype)    
        listofvals = ""
        for val in rows[1]:
            if dtype == "string":
                listofvals = listofvals +"'"+ val + "',"
            elif dtype == "int":  
                listofvals = listofvals + val + ","
        print(listofvals.rstrip(","))
        errdf = spark.sql("select * from srctbl where "+rows[0]+" in ("+listofvals.rstrip(",")+")")
        errPath = "/home/animesh/pysparketlframework/lakeHouseBuildFrameWork/err/"+tableNm+"/"
        errdf=errdf.withColumn("error_desc",F.lit(rows[2]))
        errdf.write.mode("append").format("parquet").partitionBy("part_date").save(errPath)

ColorName
string
'Light Green','Orange','Olive','Ivory','Chartreuse','BlaPitchBlack','Dark Green','Navy Blue','Cyan','Royal Blue','Teal','Steel Gray','Charcoal','Plum','Hot Pink','Maroon','Indigo','Dark Brown','Blue','Beige','Wheat','Azure','Fuchsia','Tan','Silver','Khaki','Purple','Puce','Light Brown','White','Mauve','Salmon','Lavender','Gold'


                                                                                

ColorID
int
28,19,24,23,15,9,22,7,29,33,12,25,13,20,14,8,34,36,10,32,31,16,27,26,18,35,21,30,17,11


64

In [103]:
baddf = spark.read.parquet(errPath)
print(baddf.count())
badf =baddf.drop("error_desc")
# Perform the exceptAll() operation
goodDF = df.exceptAll(baddf)
# Display the resulting DataFrame
goodDF.show()

64
+-------+-------------+------------+---------+-------+--------------------+----------+----------+
|ColorID|    ColorName|LastEditedBy|ValidFrom|ValidTo|     IngestTimeStamp|error_desc| part_date|
+-------+-------------+------------+---------+-------+--------------------+----------+----------+
|     19|  Light Green|           1|      NaN|    NaN|2024-03-13 23:21:...|  Validity|2024-03-13|
|     24|       Orange|           1|      NaN|    NaN|2024-03-13 23:21:...|  Validity|2024-03-13|
|     23|        Olive|           1|      NaN|    NaN|2024-03-13 23:21:...|  Validity|2024-03-13|
|     15|        Ivory|           1|      NaN|    NaN|2024-03-13 23:21:...|  Validity|2024-03-13|
|      6|   Chartreuse|           1|      NaN|    NaN|2024-03-13 23:21:...|  Validity|2024-03-13|
|      3|BlaPitchBlack|           2|      NaN|    NaN|2024-03-13 23:21:...|  Validity|2024-03-13|
|      9|   Dark Green|           1|      NaN|    NaN|2024-03-13 23:21:...|  Validity|2024-03-13|
|     22|    Navy

In [3]:
dq_df.show(truncate=False)

                                                                                

+-------+-------------+------------+---------+-------+-----------------------+----------+
|ColorID|ColorName    |LastEditedBy|ValidFrom|ValidTo|IngestTimeStamp        |part_date |
+-------+-------------+------------+---------+-------+-----------------------+----------+
|28     |Red          |1           |NaN      |NaN    |2024-03-13 23:21:37.292|2024-03-13|
|19     |Light Green  |1           |NaN      |NaN    |2024-03-13 23:21:37.292|2024-03-13|
|24     |Orange       |1           |NaN      |NaN    |2024-03-13 23:21:37.292|2024-03-13|
|23     |Olive        |1           |NaN      |NaN    |2024-03-13 23:21:37.292|2024-03-13|
|15     |Ivory        |1           |NaN      |NaN    |2024-03-13 23:21:37.292|2024-03-13|
|6      |Chartreuse   |1           |NaN      |NaN    |2024-03-13 23:21:37.292|2024-03-13|
|3      |BlaPitchBlack|2           |NaN      |NaN    |2024-03-13 23:21:37.292|2024-03-13|
|9      |Dark Green   |1           |NaN      |NaN    |2024-03-13 23:21:37.292|2024-03-13|
|22     |N