## Util Functions for Set Up
All cells must be run in main notebook for Data Quality Tests to work

Function to read either csv or xlsx data 

In [None]:
# Function 0: Reading the dataset file
def read_data(dataset_path):
    _, file_extension = os.path.splitext(dataset_path)
    if file_extension == ".csv":
        try:  
            df = pd.read_csv(dataset_path, encoding="utf-8-sig")  
        except UnicodeDecodeError:  
            df = pd.read_csv(dataset_path, encoding="cp1252") 
    elif file_extension == ".xlsx":
        df = pd.read_excel(dataset_path)
    else:
        print("Unsupported file type")
        df = None
    return df

Function to log the scores into an xlsx file (already created, existing)

In [None]:
# Function to log a new row into the DQS_Log_XX.xlsx file
def log_score(test_name, dataset_name, score, selected_columns, threshold=None):
    # Convert score to a percentage
    percentage_score = score

    # Load the Excel file into a DataFrame
    log_file = "DQS_Log_Beta.xlsx"

    # Set threshold to "No threshold" if it is not provided
    if threshold is None:
        threshold_value = "no threshold"
    else:
        threshold_value = threshold

    # If selected_columns is None, assume "All" was tested
    if selected_columns is None:
        columns_tested = "All columns"
    else:
        # Convert selected_columns list to a string if specific columns are provided
        columns_tested = ", ".join(selected_columns)

    # Try loading the existing Excel file
    try:
        df = read_data(log_file)
    except FileNotFoundError:
        # Create an empty DataFrame if file doesn't exist (shouldn't be the case if you already created it)
        df = pd.DataFrame(
            columns=["Dataset", "Test", "Threshold", "Date_Calculated", "Score"]
        )

    # Prepare the new row as a DataFrame
    new_row = pd.DataFrame(
        {
            "Dataset": [dataset_name],
            "Columns_Tested": [columns_tested],  # Add the list of columns tested
            "Test": [test_name],
            "Date_Calculated": [datetime.now().strftime("%Y-%m-%d %H:%M:%S")],
            "Threshold": [threshold_value],
            "Score": [percentage_score],
            "User": GLOBAL_USER
        }
    )

    # Append the new row to the DataFrame
    df = pd.concat([df, new_row], ignore_index=True)

    # Save the updated DataFrame back to the Excel file
    df.to_excel(log_file, index=False)

Function to extract dataset name from a path

In [None]:
def get_dataset_name(dataset_path):
    # Extract the file name from the path (e.g., 'Dataset_A.csv')
    file_name = os.path.basename(dataset_path)
    # Split the file name to remove the extension (e.g., 'Dataset_A')
    dataset_name = os.path.splitext(file_name)[0]
    return dataset_name

Define colour variables for console outputs

In [None]:
# ANSI escape code for red text  
RED = "\033[31m"  
RESET = "\033[0m" 

## Completeness Test

In [None]:
def completeness_test(dataset_path, exclude_columns=[], threshold=0.75):
    dataset = read_data(dataset_path)

    # Exclude the 'Comment' column if it exists in the dataset
    if "Comment" in dataset.columns:
        dataset = dataset.drop(columns=["Comment"])

    # Exclude columns in exclude_columns if they exist in the dataset
    dataset = dataset.drop(
        columns=[col for col in exclude_columns if col in dataset.columns]
    )

    # Calculate the percentage of non-null (non-missing) values in each column
    is_null_percentage = dataset.isna().mean()

    # Identify columns with non-null percentage less than or equal to the threshold
    columns_to_keep = is_null_percentage[is_null_percentage <= threshold].index

    # Keep columns that exceed the threshold of non-null values
    dataset2 = dataset[columns_to_keep]

    # Calculate the actual percentage of non-missing values in the dataset
    total_non_missing = dataset2.notna().sum().sum()
    total_obs = dataset2.shape[0] * dataset2.shape[1]
    completeness_score = total_non_missing / total_obs

    # log the results
    log_score(
        test_name="Completeness (P)",
        dataset_name=get_dataset_name(dataset_path),
        selected_columns=None,
        threshold=threshold,
        score=completeness_score,
    )

    return completeness_score

## Output Reports

Consistency Type 2

In [None]:
def compare_datasets(dataset_path, column_mapping, ref_dataset_path=None):
    # Read the data file
    df = read_data(dataset_path)

    # Initialize ref_df if a ref dataset is provided
    if ref_dataset_path:
        df_ref = read_data(ref_dataset_path)
        ref_data = True  # Flag to indicate we are using a ref dataset
    else:
        ref_data = False  # No ref dataset, compare within the same dataset

    for selected_column, m_selected_column in column_mapping.items():
        if ref_data:
            # Compare to ref dataset
            unique_observations = get_names_used_for_column(df_ref, m_selected_column)
        else:
            # Use own column for comparison
            unique_observations = get_names_used_for_column(df, selected_column)

        # Iterate over each row in the selected column
        column_results = []
        for value in df[selected_column]:
            # Check if the value exists in unique_observations and append the result to column_results
            if pd.isnull(value):
                column_results.append(
                    False
                )  # or True, depending on how you want to handle NaN values
            else:
                column_results.append(value in unique_observations)

        # Add the results as a new column in the DataFrame
        df[selected_column + "_comparison"] = column_results

    return df

Accuracy Type 1

In [None]:
# Function 1: Using isdigit to find non-numerical entries
def find_non_digits(s):
    # Ensure the value is treated as a string
    s = str(s)
    return [char for char in s if not (char.isdigit() or char == ".")]


# Function 2 : Check if each row has only numbers in each selected column and add results as new columns
def add_only_numbers_columns(dataset_path, selected_columns):
    adf = read_data(dataset_path)
    selected_columns = [col for col in adf.columns if col in selected_columns]

    for column_name in selected_columns:
        adf[column_name + "_Only_Numbers"] = adf[column_name].apply(
            lambda x: len(find_non_digits(x)) == 0
        )

    return adf