### EEIC External r2b Parent

##### This is the Parent notebook for EEIC-Checker-External-r2b scheme notebook

- it contains all the functions that is needed for the external r2b notebook to run


In [0]:
%run ../../BaseToCurated/General/BaseToCurated-Parent

In [0]:
import pandas as pd
from pyspark.sql import SparkSession 
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, LongType
import pyspark.sql.functions as f
from pyspark.sql.functions import coalesce
import os
from pyspark.sql.functions import to_date
from functools import reduce
import ast

In [0]:
LAKE_ROOT_SPARK = "/mnt/datalake/"
external_scheme_path = "/dbfs/mnt/datalake/Raw/PMDS/OfficialSensitive/External/ECO4/"
raw_path = "Raw/PMDS/OfficialSensitive/"
base_path = "Base/PMDS/OfficialSensitive/External/"

def is_valid_webportal_submission_path(path: str):
    """
    Validates if the given path follows the expected directory structure and naming conventions for web portal submissions.

    The expected structure is:
    - The path should be nested at least 3 levels deep.
    - The last three directories should represent a date in the format: YYYY/YYYYMM/YYYYMMDD.
    - The year should be a 4-digit number within the range 1999 to 2049.
    - The month and day should be valid numeric values.

    Args:
    path (str): The directory path to validate.

    Returns:
    bool: True if the path is valid, False otherwise.
    """
    dirs = path.split("/")

    # The directories should be nested at least 3 times
    if len(dirs) < 3:
        return False
    
    year, year_month, year_month_day = dirs[-3], dirs[-2], dirs[-1]

    if len(year) != 4 or len(year_month) != 6 or len(year_month_day) != 8:
        return False
        
    if not year.isnumeric() or not year_month.isnumeric() or not year_month_day.isnumeric():
        return False

    year_range = list(range(1999, 2050))
    if int(year) not in year_range:
        return False
    
    year_set = set((year, year_month[:4], year_month_day[:4]))

    month_set = set((year_month[4:6], year_month_day[4:6]))

    if len(year_set) != 1 or len(month_set) != 1:
        return False
    
    day = year_month_day[6:8]
    
    if int(day) < 0 or int(day) > 31:
        return False

    try:
        time_object = datetime.strptime(dirs[-1], '%Y%m%d').date()
    except(ValueError):
        return False
    return True

def get_path_of_latest_directory_from_webportal(path):
    """
    Retrieves the path of the latest directory from the web portal submissions.

    This function walks through the given directory path, validates each directory
    based on the expected web portal submission structure, and identifies the latest
    directory based on the date embedded in the directory name.

    Args:
    path (str): The root directory path to search for web portal submissions.

    Returns:
    str: The path of the latest directory that matches the expected structure and naming conventions.
         Returns None if no valid directories are found.
    """
    dir_list = []
    for root, dirs, files in os.walk(path):
        if is_valid_webportal_submission_path(root):
            dir_list.append(root)


    latest_date = datetime(1900, 1, 1).date()
    latest_path = None

    for dir in dir_list:
        temp_date = datetime.strptime(dir.split('/')[-1], '%Y%m%d').date()
        if temp_date > latest_date:
            latest_date = temp_date
            latest_path = dir
    print(f'latest submission: {latest_date}')
    print(f'file location path: {latest_path}')
    return latest_path

def get_path_of_latest_file_from_webportal_submissions(path):
    """
    Retrieves the path of the latest CSV file from the web portal submissions.

    This function identifies the latest directory based on the date embedded in the directory name,
    then searches for the most recently modified CSV file within that directory.

    Args:
    path (str): The root directory path to search for web portal submissions.

    Returns:
    str: The path of the latest CSV file that matches the expected structure and naming conventions.
         Raises FileNotFoundError if no files are found in the given path.
    """
    latest_path = get_path_of_latest_directory_from_webportal(path)
    if not os.listdir(latest_path) or latest_path is None:
        raise FileNotFoundError("No files found in the given path.")

    last_modified = 0
    latest_file_path = None

    for file in os.listdir(latest_path):
        file_path = latest_path + "/" + file
        if not os.path.isfile(file_path):
            continue
        if not file_path.endswith(".csv"):
            continue
        temp_last_modified = os.path.getmtime(file_path)
        if temp_last_modified > last_modified:
            last_modified = temp_last_modified
            latest_file_path = file_path
    print(f'the file being processed: {os.path.basename(latest_file_path)}')
    return latest_file_path


In [0]:
from pyspark.sql.functions import col, regexp_replace, trim, when, expr

def clean_dataframe(df):
    """
    Cleans the dataframe by performing the following operations on each column:
    1. Replaces non-printable characters with None.
    2. Trims whitespace and replaces specific values ('None', 'nan', '', 'NULL', 'N/A') with None.
    3. Translates spaces to empty strings and replaces empty strings with None.

    Args:
    df (DataFrame): The input Spark DataFrame to be cleaned.

    Returns:
    DataFrame: The cleaned Spark DataFrame.
    """
    for column in df.columns:
        df = df.withColumn(column, when(regexp_replace(col(column), '[^\\p{Print}]', '') == '', None).otherwise(col(column)))
        df = df.withColumn(column, when(trim(col(column)).isin(['None', 'nan', '', 'NULL', 'N/A']), None).otherwise(trim(col(column))))

    return df

In [0]:
# return a dictionary of schemes, and their non-mandatory columns

def get_scheme_schema_dictionary(onboarded_scheme_columns_dict: dict):
    """
    Creates a dictionary where:
        Keys -> represent the scheme name, must be unique (eco, gbis etc)
        values -> represent the defined schema of the scheme, including the mandatory (uprn, date_of_install etc in their own wording) and non-mandatory columns (columns that are formatted into json in the measure details column).

    Args:
    df_onboarded_scheme_columns (DataFrame): A DataFrame containing the onboarded scheme columns.

    Returns:
    dict: A dictionary with scheme names as keys and their corresponding schema as values.
    """

    if onboarded_scheme_columns_dict == {}:
        raise ValueError("The schema dictionary cannot be empty")

    onboarded_schemes_structtype_dict = {}

    for scheme in onboarded_scheme_columns_dict:
        columns = onboarded_scheme_columns_dict[scheme]
        onboarded_schemes_structtype_dict[scheme] = StructType([StructField(element.strip(), StringType(), True) for element in columns])


    return onboarded_schemes_structtype_dict



def validate_eeic_reference_files(mandatory_column_mapping_path, onboarded_scheme_columns_path):
    """
    Validates the consistency between the mandatory column mapping and onboarded scheme columns files.

    This method asserts that the number of enrolled schemes within the 'mandatory_column_mapping.csv' 
    matches the number of schemes in the 'onboarded_scheme_columns.csv' file. It also checks if the 
    schemes listed in both files are identical and that every element (except the first) of each row 
    in df_mandatory_column_mapping exists somewhere in the row of df_onboarded_scheme_columns.

    Args:
    mandatory_column_mapping_path (str): The file path to the mandatory column mapping CSV.
    onboarded_scheme_columns_path (str): The file path to the onboarded scheme columns CSV.

    Returns:
    tuple: A tuple containing two Dictionaries:
        - df_mandatory_column_mapping (DataFrame): Dataframe of the mandatory column mapping.
        - df_onboarded_scheme_columns (Dictionary): Dictionary of the onboarded scheme columns.

    Raises:
    ValueError: If the schemes in the mandatory column mapping and onboarded scheme columns CSV files do not match,
                or if any mandatory column is not captured in the onboarded scheme columns.
    """
    df_mandatory_column_mapping = pd.read_csv(mandatory_column_mapping_path)
    df_onboarded_scheme_columns = pd.read_csv(onboarded_scheme_columns_path)

    if 'scheme' not in df_mandatory_column_mapping:
        raise KeyError("The mandatory column mapping csv file is missing the 'scheme' column")

    if 'scheme' not in df_onboarded_scheme_columns:
        raise KeyError("The onboarded scheme columns csv file is missing the 'scheme' column")

    if not df_mandatory_column_mapping['scheme'].equals(df_onboarded_scheme_columns['scheme']):
        raise ValueError("The schemes in the mandatory column mapping and onboarded scheme columns csv files do not match")

    onboarded_scheme_columns_dict = {}

    for index, row in df_mandatory_column_mapping.iterrows():
        scheme = df_onboarded_scheme_columns.iloc[index, 0]
        mandatory_columns = row[1:].dropna().tolist()
        onboarded_columns = df_onboarded_scheme_columns.iloc[index, 1:].dropna().tolist()[0]
        onboarded_columns = ast.literal_eval(onboarded_columns)
        onboarded_columns = [col.strip() for col in onboarded_columns]
        for col in mandatory_columns:
            if col not in onboarded_columns:
                raise ValueError(f"Mandatory column '{col}' from scheme '{row['scheme']}' is not captured in the onboarded scheme columns")

        onboarded_scheme_columns_dict[scheme] = onboarded_columns
    return df_mandatory_column_mapping, onboarded_scheme_columns_dict

def map_mandatory_columns(df, df_mandatory_column_mapping, scheme):
    """
    Normalizes the names of the column schema to the recognized standard.

    This function renames the columns of the given DataFrame based on a mapping provided
    in the mandatory column mapping DataFrame. The mapping is specific to each scheme.

    Args:
    df (DataFrame): The input Spark DataFrame whose columns need to be renamed.
    df_mandatory_column_mapping (DataFrame): A DataFrame containing the mapping of old column names to new column names for each scheme.
    scheme (str): The name of the scheme for which the column renaming should be applied.

    Returns:
    DataFrame: The DataFrame with renamed columns according to the specified scheme.
    """

    # new column names
    new_columns = df_mandatory_column_mapping.columns.values.tolist()[1:]

    mappable_dict = df_mandatory_column_mapping.set_index('scheme').T.to_dict('list')

    for current_scheme in mappable_dict:
        if scheme == current_scheme:
            old_columns = mappable_dict[scheme]
            for old_column, new_column in zip(old_columns, new_columns):
                if isinstance(old_column,str):
                    df = df.withColumnRenamed(old_column, new_column)
        
    return df

# given an onboarded scheme and constructed schema, reads the submitted data
def read_eeic_scheme(scheme_name, schema):
    """
    Reads a CSV file for a given scheme name and schema from the latest submission within the web portal.

    This function constructs the path to the latest CSV file for the specified scheme,
    reads the file into a Spark DataFrame using the provided schema, and returns the DataFrame.

    Args:
    scheme_name (str): The name of the scheme for which the CSV file should be read.
    schema (StructType): The schema to be applied when reading the CSV file.

    Returns:
    DataFrame: A Spark DataFrame containing the data from the latest CSV file for the specified scheme.
    """
    SCHEME_PATH = get_path_of_latest_file_from_webportal_submissions(LAKE_ROOT+"Raw/PMDS/OfficialSensitive/External/"+scheme_name).split("/datalake/")[-1]
    df = spark.read.option("header", "true").schema(schema).option('encoding', 'utf8').option("quote", "\"").option("escape", "\"").option("quote", "\"").csv(LAKE_ROOT_SPARK+SCHEME_PATH)
    return df



def create_db_column(df, scheme_name):
    """
    Adds a new column 'db_type' to the DataFrame with a constant value.

    This function checks if the 'db_type' column already exists in the DataFrame.
    If it does, it raises a ValueError. Otherwise, it adds a new column 'db_type'
    with the value set to the provided scheme name.3

    Args:
    df (DataFrame): The input Spark DataFrame to which the 'db_type' column will be added.
    scheme_name (str): The value to be assigned to the 'db_type' column.

    Returns:
    DataFrame: The DataFrame with the added 'db_type' column.

    Raises:
    ValueError: If the 'db_type' column already exists in the DataFrame.
    """
    if "db_type" in df.columns:
        raise ValueError("db_type column already exists, please rename column before continuing")

    df = df.withColumn("db_type", f.lit(scheme_name))
    return df
                       
def process_for_curated(df, columns):
    """
    Processes the DataFrame for the curated layer by building a 'measure_details' column.

    This function performs the following steps:
    1. Defines a list of columns to ignore.
    2. Initializes the 'measure_details' column with None.
    3. Builds the 'measure_details' column by concatenating non-ignored columns into a JSON string.

    Args:
    df (DataFrame): The input Spark DataFrame to be processed.
    scheme_name (str): The name of the scheme being processed.

    Returns:
    DataFrame: The processed DataFrame with selected columns and the 'measure_details' column.

    Raises:
    ValueError: If the 'measure_details' column already exists in the DataFrame.
    """
    if "measure_details" in df.columns:
        raise ValueError("measure_details column already exists, please rename column before continuing")

    minimal_ignore = ["uprn", "date_of_install", "db_type", "measure_type", "postcode"]

    ignore = [] + minimal_ignore

    # Build notes column
    # columns = df.columns
    df = df.withColumn("measure_details", f.lit(None))
    def build_measure_details(*args):
        # Choose ignore set (only minimal if no UPRN)
        this_ignore = [i.lower() for i in ignore]
        d = {}
        for i in range(len(columns)):
            if columns[i].lower() not in this_ignore:
                if isinstance(args[i], datetime):
                    v = args[i].strftime("%m/%d/%Y, %H:%M:%S")
                else:
                    v = args[i]
                d[columns[i]] = v
        return json.dumps(d)

    df = df.withColumn("measure_details", f.udf(build_measure_details, t.StringType())(*columns))
    display(df)

    return df.select("uprn", 'measure_type', "date_of_install", "db_type", 'measure_details')


def get_measure_details_columns(onboarded_schemes_dict, df_mandatory_column_mapping, scheme_name):
        filtered_row_by_scheme = df_mandatory_column_mapping[df_mandatory_column_mapping['scheme'] == scheme_name].iloc[0]

        mandatory_to_remove = [x for x in filtered_row_by_scheme[1:].tolist() if isinstance(x, str)]
        original_measure_details_columns = [column.name for column in onboarded_schemes_dict[scheme_name]]
        measure_details_columns = list(set(original_measure_details_columns)- set(mandatory_to_remove))

        return measure_details_columns

def generic_pipeline(defined_schema_path=None):
    """
    Executes a generic data processing pipeline for EEIC schemes.

    This function performs the following steps:
    1. Validates the reference files for mandatory column mapping and onboarded scheme columns.
    2. Constructs a dictionary of onboarded schemes and their corresponding schemas.
    3. Iterates over each scheme to read, clean, map mandatory columns, join to EEL standard, add db_type, 
       split valid and invalid UPRNs, save invalid UPRNs, process for curated layer, and append to a list.
    4. Unions all processed DataFrames and saves the result to the curated layer.

    Args:
    defined_schema_path (str, optional): The file path to the directory containing the mandatory column mapping 
                                         and onboarded scheme columns CSV files. Defaults to None.

    Returns:
    None
    """
    defined_schema = None
    if defined_schema_path != None:

        df_mandatory_column_mapping, onboarded_scheme_columns_dict = validate_eeic_reference_files(defined_schema_path + "mandatory_column_mapping.csv", defined_schema_path + "onboarded_scheme_column_list.csv")

        onboarded_schemes_dict = get_scheme_schema_dictionary(onboarded_scheme_columns_dict)

    df_list = []

    for scheme_name in onboarded_schemes_dict.keys():
        print("given a scheme name, read the scheme", scheme_name)
        df = read_eeic_scheme(scheme_name, onboarded_schemes_dict[scheme_name])

        print("cleaning dataframe")
        df = clean_dataframe(df)
        print("mapping mandatory columns to internal naming")

        # todo: If richard wants to remove subname, name, number etc, then
        df = map_mandatory_columns(df, df_mandatory_column_mapping, scheme_name)


        print("adding db_type")
        df = create_db_column(df, scheme_name)


        print("processing for curated")

        measure_details_columns = get_measure_details_columns(onboarded_schemes_dict, df_mandatory_column_mapping, scheme_name)

        df = process_for_curated(df, measure_details_columns)

        df_list.append(df)
        break

    union_df = reduce(lambda df1, df2: df1.union(df2), df_list)
    # union_df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save(LAKE_ROOT_SPARK + base_path + "valid_measures")