In [0]:
import re
import pandas as pd
import json
import matplotlib.pyplot as plt



In [0]:
def null_check(df, column):
    # print(df[column])
    return df[column].isnull().sum()



In [0]:
def range_check(df, column, minimum, maximum):
    s = pd.Series(df[column].astype(str))
    # print(s)
    return len(s) - s.between(minimum, maximum).sum()



In [0]:
def total_error_percentage(df, errors):
    return (errors / df.size) * 100


In [0]:

def format_check(df, column, regex):
    non_errors = 0

    s = pd.Series(df[column]).astype(str)
    for string in s:
        if bool(re.fullmatch(regex, string)):
            non_errors += 1

    errors = len(s) - non_errors
    return errors

In [0]:
def value_check(df, column, value):
    errors = 0

    if isinstance(value, list):
        for row in df[column].astype(str):
            if row not in value:
                errors += 1

    else:
        for row in df[column].astype(str):
            if row != value:
                errors += 1

    return errors

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("DataQuality").getOrCreate()

from pyspark.sql.types import StructType, StructField, FloatType, StringType
def visualize(data_dict, checks):
    # Define the schema
    schema = StructType([
        StructField("Columns", StringType(), True),
        StructField("Error_perc_for_"+checks, FloatType(), True)
    ])

    rows = []
    for key, value in data_dict.items():
        value = float(value) if isinstance(value, (float, int)) else value
        rows.append(Row(Columns=key, Null_percentage=value))

    df = spark.createDataFrame(rows, schema=schema)

    # Show the DataFrame
    display(df)
    return 

In [0]:

def run_quality_checks(dataset_index):
    result_map = {}

    with open('config.json', 'r') as file:
        errors = 0
        total_nulls = 0
        total_format_errors = 0
        total_value_errors = 0

        config = json.loads(file.read())

        dataset_profile = config[dataset_index]

        dataframe = pd.read_csv(dataset_profile['dataset'])

        # checks object
        checks = dataset_profile['checks'][0]

        # print(checks)

        # checks to run on the dataset
        keys = list(checks.keys())

        # print(keys)
        null_output = {}
        range_output = {}
        format_output = {}
        value_output = {}

        for i, check in enumerate(checks):

            # print(check)
            if check == 'null_check':

                for column in checks[check]:
                    # print(dataframe[column])
                    nulls = null_check(dataframe, column)

                    perc = (nulls / dataframe.size) * 100

                    null_output[column] = perc

                    total_nulls += nulls
                    errors += nulls
                    # print(nulls)


            elif check == "range_check":
                # columns
                if isinstance(checks[keys[i]], list):
                    for item in checks[keys[i]]:
                        for key, value in item.items():
                            column = key
                            range = value
                            constraints = range.split('-')
                            range_errors = range_check(dataframe, column, constraints[0], constraints[1])
                            range_output[column] = (range_errors / dataframe.size) * 100


                else:
                    columns = list(checks[check])
                    ranges = list(checks[check].values())
                    for i, column in enumerate(columns):
                        constraints = ranges[i].split('-')
                        # print(constraints[0], constraints[1])
                        range_errors = range_check(dataframe, column, constraints[0], constraints[1])
                        range_output[column] = (range_errors / dataframe.size) * 100



            elif check == "total_error_percentage":
                print("Total errors: ", total_error_percentage(dataframe, errors))

            elif check == "format_check":
                columns = list(checks[check].keys())
                regex = list(checks[check].values())
                for i, column in enumerate(columns):
                    print(column, regex[i])
                    format_errors = format_check(dataframe, column, regex[i])
                    errors += format_errors
                    total_format_errors += format_errors
                    format_output[column] = (format_errors / dataframe.size) * 100


            elif check == "value_check":
                columns = list(checks[check].keys())
                values = list(checks[check].values())
                for i, column in enumerate(columns):
                    value_errors = value_check(dataframe, column, values[i])
                    errors += value_errors
                    total_value_errors += value_errors
                    value_output[column] = (value_errors / dataframe.size) * 100


            elif check == "total_null_errors":
                print("Total nulls: ", total_nulls)

            elif check == "total_format_errors":
                print("Total format errors: ", total_format_errors)

            elif check == "total_value_errors":
                print("Total value errors: ", total_value_errors)

            elif check == "total_number_errors":
                print("Total errors: ", errors)

        visualize(null_output,"null_check")
        visualize(range_output,"range_check")
        visualize(format_output, "format_check")
        visualize(value_output, "value_check")
        
    return 



In [0]:

if __name__ == '__main__':
    for i in range(5):
        if i == 2:
            continue
        run_quality_checks(i)


    print("-"*100)


Total errors:  2.0238095238095237
Total errors:  17


Columns,Error_perc_for_null_check
Total energy consumption of primary fuels and equivalents,1.0714285
Fraction from renewable sources and waste,0.95238096


Databricks visualization. Run in Databricks to view.

Columns,Error_perc_for_range_check
Year,1.1904762


Databricks visualization. Run in Databricks to view.

Columns,Error_perc_for_format_check


Databricks visualization. Run in Databricks to view.

Columns,Error_perc_for_value_check


Databricks visualization. Run in Databricks to view.

  dataframe = pd.read_csv(dataset_profile['dataset'])


MAC Address ^([0-9A-F]{4}:){3}[0-9A-F]{4}$
Total nulls:  258640
Total errors:  3.478409171217991
Total value errors:  1788
Total format errors:  37348
Total errors:  297776


Columns,Error_perc_for_null_check
MAC Address,0.0
Driver Postal Code,0.09814624
User ID,0.0896773
County,0.9889968
Model Number,0.92221487
System S/N,0.92221487


Databricks visualization. Run in Databricks to view.

Columns,Error_perc_for_range_check


Columns,Error_perc_for_format_check
MAC Address,0.43627298


Databricks visualization. Run in Databricks to view.

Columns,Error_perc_for_value_check
Start Time Zone,0.0
End Time Zone,0.0
Postal Code,0.0
Currency,0.020886155
Org Name,0.0


Databricks visualization. Run in Databricks to view.

Total errors:  7.133069725560818
Total errors:  5466


Columns,Error_perc_for_null_check
Year,0.0
Renewable energy share in the total final energy consumption (%),0.25316787
Electricity from renewables (TWh),0.027404768
Renewables (% equivalent primary energy),2.7887614
Financial flows to developing countries (US $),2.726122
Renewable-electricity-generating-capacity-per-capita,1.2149447
Access to electricity (% of population),0.013049889


Databricks visualization. Run in Databricks to view.

Columns,Error_perc_for_range_check
Year,0.013049889
Renewable energy share in the total final energy consumption (%),0.25577784
Electricity from renewables (TWh),1.5046523
Renewables (% equivalent primary energy),2.830521
Financial flows to developing countries (US $),3.2533376
Renewable-electricity-generating-capacity-per-capita,2.5603883
Access to electricity (% of population),4.7345


Databricks visualization. Run in Databricks to view.

Columns,Error_perc_for_format_check


Columns,Error_perc_for_value_check
Entity,0.10961907


Databricks visualization. Run in Databricks to view.

Total errors:  1.403061224489796
Total errors:  22


Columns,Error_perc_for_null_check
YEAR,0.31887755
OBS_VALUE,0.12755102
CROP_LABEL,0.0
FREQ,0.0
INDICATOR,0.0
TIME_FORMAT,0.0
UNIT_MULT,0.12755102


Databricks visualization. Run in Databricks to view.

Columns,Error_perc_for_range_check


Columns,Error_perc_for_format_check


Columns,Error_perc_for_value_check
INDICATOR,0.19132653
FREQ,0.44642857
TIME_FORMAT,0.19132653


Databricks visualization. Run in Databricks to view.

----------------------------------------------------------------------------------------------------
