#### Import of the required libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from abc import ABC, abstractmethod
from pyspark.sql import DataFrame

# Define spark session if not using Databricks
spark = SparkSession.builder.appName("DataStandardization").getOrCreate()

#### Config Reader Contract (Abstract class)

In [None]:
class ConfigReaderContract(ABC):
    
    @abstractmethod
    def read_source_columns_schema(self)->DataFrame:
        pass

    @abstractmethod
    def read_new_columns_schema(self)->DataFrame:
        pass

    @abstractmethod
    def read_column_descriptions_metadata(self)->dict:
        pass

    @abstractmethod
    def read_column_sequence_order(self)->list:
        pass

#### Implementation of Config Reader (JSON)

In [None]:
class ConfigReader(ConfigReaderContract):
    """
    A class that reads and extracts information from a JSON config file.

    Args:
        config_path (str): The path to the configuration file.

    Attributes:
        config_df (DataFrame): The DataFrame containing the configuration data.

    """

    def __init__(self, config_path):
        """
        Initializes a new instance of the ConfigReader class.

        Args:
            config_path (str): The path to the configuration file.

        """
        self.config_df = spark.read.option("multiLine", True).json(config_path)

    def read_source_columns_schema(self):
        """
        Reads the schema information for the source columns from the configuration file.

        Returns:
            DataFrame: The DataFrame containing the source columns schema.

        """
        exploded_df = self.config_df.select(explode(self.config_df["schema"].source_columns).alias("source_columns"))
        source_columns_schema_df = exploded_df.selectExpr(
            "source_columns.raw_name as raw_name",
            "source_columns.standardized_name as standardized_name",
            "source_columns.data_type as data_type",
            "source_columns.sql_transformation as sql_transformation"
        )
        return source_columns_schema_df

    def read_new_columns_schema(self):
        """
        Reads the schema information for the new columns from the configuration file.

        Returns:
            DataFrame: The DataFrame containing the new columns schema.

        """
        exploded_df = self.config_df.select(explode(self.config_df["schema"].new_columns).alias("new_columns"))
        new_columns_schema_df = exploded_df.selectExpr(
            "new_columns.name as name",
            "new_columns.data_type as data_type",
            "new_columns.sql_transformation as sql_transformation"
        )
        return new_columns_schema_df

    def read_column_descriptions_metadata(self):
        """
        Reads the column descriptions metadata from the configuration file.

        Returns:
            dict: A dictionary containing the column descriptions.

        """
        metadata_df = self.config_df.select("metadata.column_descriptions").alias("column_descriptions")
        descriptions_row_obj = metadata_df.first()["column_descriptions"]
        return descriptions_row_obj.asDict()

    def read_column_sequence_order(self):
        """
        Reads the column sequence order from the configuration file.

        Returns:
            list: A list containing the column sequence order.

        """
        return list(self.config_df.first()["column_sequence_order"])

#### Data Standardizer Engine Implementation

In [None]:
class DataStandardizer:
    """
    A class that performs data standardization based on configuration settings.

    Args:
        raw_dp_path (str): The path to the raw data.
        temp_std_dp_path (str): The path to the temporary standardized data.
        std_dp_path (str): The path to the final standardized data.

    Methods:
        create_temp_std_dp_with_source_columns(source_columns_schema_df):
            Creates a temporary standardized data table with source columns based on the provided schema.
        
        add_new_columns_in_temp_std_dp(new_columns_schema_df):
            Adds new columns to the temporary standardized data table based on the provided schema.
        
        update_column_descriptions_metadata(column_descriptions_dict):
            Updates the column descriptions metadata in the temporary standardized data table.
        
        move_data_to_std_dp(column_sequence_order):
            Moves the data from the temporary standardized data table to the final standardized data table.
        
        run(config_reader):
            Runs the data standardization process based on the provided configuration reader.
    """

    def __init__(self, raw_dp_path, temp_std_dp_path, std_dp_path):
        self.raw_dp_path = raw_dp_path
        self.temp_std_dp_path = temp_std_dp_path
        self.std_dp_path = std_dp_path

    def create_temp_std_dp_with_source_columns(self, source_columns_schema_df):
        source_columns_schema_df.createOrReplaceTempView("source_columns_config_table")
        select_query_sql = f"""
            SELECT 
                concat(
                    "SELECT ", 
                    array_join(collect_list(select_expression), ", "), 
                    " FROM delta.`{self.raw_dp_path}`"
                ) as select_query 
            FROM (
                SELECT 
                    CASE
                        WHEN sql_transformation = "" THEN concat("CAST(", concat("`", raw_name, "`"), " AS ", data_type, ") AS ", standardized_name)
                        ELSE concat("CAST(", sql_transformation, " AS ", data_type, ") AS ", standardized_name)
                    END as select_expression 
                FROM source_columns_config_table
            )
        """
        df = spark.sql(select_query_sql)
        select_query = df.first()["select_query"]
        create_sql_query = f"CREATE OR REPLACE TABLE delta.`{self.temp_std_dp_path}` as ( " + select_query + ")"
        spark.sql(create_sql_query)


    def add_new_columns_in_temp_std_dp(self, new_columns_schema_df):
        new_columns_schema_df_rows = new_columns_schema_df.collect()        
        for row in new_columns_schema_df_rows:
            add_new_columns_sql = f"ALTER TABLE delta.`{self.temp_std_dp_path}` ADD COLUMN {row['name']} {row['data_type']}"   
            sql_transformation = row["sql_transformation"].replace("{temp_std_dp_path}", self.temp_std_dp_path)  
            spark.sql(add_new_columns_sql)  
            spark.sql(sql_transformation)      
    
    def update_column_descriptions_metadata(self, column_descriptions_dict):
        for column_name,description in column_descriptions_dict.items():
            column_description_update_sql = f"ALTER TABLE delta.`{self.temp_std_dp_path}` CHANGE COLUMN {column_name} COMMENT '{description}';"
            spark.sql(column_description_update_sql)
        
    def move_data_to_std_dp(self, column_sequence_order):
        temp_std_df = spark.read.format("delta").load(self.temp_std_dp_path)
        temp_std_df = temp_std_df.select(column_sequence_order)
        temp_std_df.write.option("mergeSchema", "true").format("delta").mode("overwrite").save(self.std_dp_path)

    def run(self, config_reader):
        print("Raw df : ")
        raw_df = spark.read.format("delta").load(self.raw_dp_path)
        display(raw_df)

        source_columns_schema_df = config_reader.read_source_columns_schema()
        self.create_temp_std_dp_with_source_columns(source_columns_schema_df)

        new_columns_schema_df = config_reader.read_new_columns_schema()
        self.add_new_columns_in_temp_std_dp(new_columns_schema_df)

        column_descriptions_dict = config_reader.read_column_descriptions_metadata()
        self.update_column_descriptions_metadata(column_descriptions_dict)

        column_sequence_order = config_reader.read_column_sequence_order()
        self.move_data_to_std_dp(column_sequence_order)

        print("Standardized df : ")
        std_df = spark.read.format("delta").load(self.std_dp_path)
        display(std_df)

        print("Schema information for Standardized df : ")
        std_df.printSchema()    
        display(spark.sql(f"DESCRIBE TABLE delta.`{self.std_dp_path}`"))

#### Example usage

In [None]:
# Replace the below paths with your own desired paths
raw_dp_path = "dbfs:/FileStore/project/supplier"
std_dp_path = "dbfs:/FileStore/project/Product_Supplier"
temp_std_dp_path = "dbfs:/FileStore/project/Product_Supplier_temp"
config_path = "dbfs:/FileStore/project/supplier_config.json"

In [None]:
config_reader = ConfigReader(config_path)
data_standardizer = DataStandardizer(
    raw_dp_path=raw_dp_path,
    temp_std_dp_path=temp_std_dp_path,
    std_dp_path=std_dp_path
)

In [None]:
data_standardizer.run(config_reader)

Raw df : 


sup_id,name,price,prod_name,quantity,email
9999,john,10,ball,100,john@email.com
9876,mary,20,kite,200,mary@email.com
8765,ram,330,bat,300,ram@email.com
7654,rahim,400,football,40,rahim@email.com
6543,sita,560,badminton,500,sita@email.com


Standardized df : 


Supplier_ID,Supplier_Name,Product_ID,Product_Name,Purchase_Price,Purchase_Quantity,Total_Cost
SUP-9999,john,PROD-05,ball,10,100,1000
SUP-9876,mary,PROD-06,kite,20,200,4000
SUP-8765,ram,PROD-04,bat,330,300,99000
SUP-7654,rahim,PROD-01,football,400,40,16000
SUP-6543,sita,PROD-03,badminton,560,500,280000


Schema information for Standardized df : 
root
 |-- Supplier_ID: string (nullable = true)
 |-- Supplier_Name: string (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Product_Name: string (nullable = true)
 |-- Purchase_Price: integer (nullable = true)
 |-- Purchase_Quantity: integer (nullable = true)
 |-- Total_Cost: integer (nullable = true)



col_name,data_type,comment
Supplier_ID,string,Unique identifier for the supplier of a product
Supplier_Name,string,Name of the supplier
Product_ID,string,Unique identifier for the product
Product_Name,string,Name of the product
Purchase_Price,int,Price at which the supplier sells the product
Purchase_Quantity,int,Quantity of the product available with the supplier
Total_Cost,int,Total amount spent on purchasing a specific quantity of items at the given purchase price.
