# Data Cleaning Trials in Modularized Production Format

In [1]:
import os

In [2]:
os.chdir("../")
%pwd

'/Users/macbookpro/Documents/sclable_ml_pipelines/scalable_ml_pipelines'

In [8]:
from us_used_cars_ml_pipeline.constants import *
from us_used_cars_ml_pipeline.utils.common import read_yaml
from us_used_cars_ml_pipeline import logger
from us_used_cars_ml_pipeline.entity.config_entity import (DataIngestionConfig, 
                                                           CleanDataConfig)

class ConfigurationManager:
    """
    The ConfigurationManager class is responsible for reading and providing 
    configuration settings needed for various stages of the data pipeline.

    Attributes:
    - config (dict): Dictionary holding configuration settings from the config file.
    - params (dict): Dictionary holding parameter values from the params file.
    - schema (dict): Dictionary holding schema information from the schema file.
    """
    
    def __init__(self, 
                 config_filepath=CONFIG_FILE_PATH, 
                 params_filepath=PARAMS_FILE_PATH, 
                 schema_filepath=SCHEMA_FILE_PATH):
        """
        Initializes the ConfigurationManager with configurations, parameters, and schema.

        Parameters:
        - config_filepath (str): Filepath to the configuration file.
        - params_filepath (str): Filepath to the parameters file.
        - schema_filepath (str): Filepath to the schema file.
        """
        self.config = self._read_config_file(config_filepath, "config")
        self.params = self._read_config_file(params_filepath, "params")
        self.schema = self._read_config_file(schema_filepath, "schema")

    def _read_config_file(self, filepath: str, config_name: str) -> dict:
        """
        Reads and returns the content of a configuration file.

        Parameters:
        - filepath (str): The file path to the configuration file.
        - config_name (str): Name of the configuration (used for logging purposes).

        Returns:
        - dict: Dictionary containing the configuration settings.

        Raises:
        - Exception: An error occurred reading the configuration file.
        """
        try:
            return read_yaml(filepath)
        except Exception as e:
            logger.error(f"Error reading {config_name} file: {filepath}. Error: {e}")
            raise

    def get_data_ingestion_config(self) -> DataIngestionConfig:
        """
        Extracts and returns data ingestion configuration settings as a DataIngestionConfig object.

        Returns:
        - DataIngestionConfig: Object containing data ingestion configuration settings.

        Raises:
        - AttributeError: The 'data_ingestion' attribute does not exist in the config file.
        """
        try:
            config = self.config.data_ingestion
            return DataIngestionConfig(
                root_dir=config.root_dir,
                source_URL=config.source_URL,
                hdfs_data_file=config.hdfs_data_file
            )
        except AttributeError as e:
            logger.error("The 'data_ingestion' attribute does not exist in the config file.")
            raise e

    def get_clean_data_config(self) -> CleanDataConfig:
        """
        Extracts and returns data cleaning configuration settings as a CleanDataConfig object.

        Returns:
        - CleanDataConfig: Object containing data cleaning configuration settings.

        Raises:
        - AttributeError: The 'clean_data' attribute does not exist in the config file.
        """
        try:
            config = self.config.clean_data
            return CleanDataConfig(
                root_dir=config.root_dir,
                hdfs_data_file=config.hdfs_data_file,
                clean_data_URL=config.clean_data_URL
            )
        except AttributeError as e:
            logger.error("The 'clean_data' attribute does not exist in the config file.")
            raise e


In [5]:
from pyspark.sql import DataFrame
from us_used_cars_ml_pipeline.entity.config_entity import CleanDataConfig
from us_used_cars_ml_pipeline import logger
from pyspark.sql import SparkSession

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType, BooleanType, DateType, DoubleType, StringType


class CleanData:
    """
    Class for cleaning the used cars dataset.

    Attributes:
    - config (CleanDataConfig): Configuration for the data cleaning process.
    """

    def __init__(self, config: CleanDataConfig):
        """
        Initializes the CleanData object with the given configuration.

        Parameters:
        - config (CleanDataConfig): Configuration for the data cleaning process.
        """
        self.config = config

    def read_data_from_hdfs(self, spark: SparkSession) -> DataFrame:
        """
        Reads the data from HDFS using the provided SparkSession.

        Parameters:
        - spark (SparkSession): SparkSession object.

        Returns:
        - DataFrame: Spark DataFrame containing the read data.
        """
        try:
            df = spark.read.csv(self.config.hdfs_data_file, header=True, inferSchema=True)
            return df
        except Exception as e:
            logger.error(f"Failed to read data from HDFS. Error: {e}")
            raise e

    def to_float(self, col):
        """
        Utility function to extract float from strings with units.

        Parameters:
        - col (Column): Spark DataFrame column.

        Returns:
        - Column: Transformed column with float values.
        """
        return F.regexp_extract(col, r"(\d+\.?\d*)", 1).cast(FloatType())

    def to_int(self, col):
        """
        Utility function to extract integer from strings.

        Parameters:
        - col (Column): Spark DataFrame column.

        Returns:
        - Column: Transformed column with integer values.
        """
        return F.regexp_extract(col, r"(\d+)", 1).cast(IntegerType())

    def to_bool(self, col):
        """
        Utility function to convert to boolean.

        Parameters:
        - col (Column): Spark DataFrame column.

        Returns:
        - Column: Transformed column with boolean values.
        """
        return col.cast(BooleanType())

    def split_power_torque(self, df, col_name):
        """
        Utility function to split power and torque into value and rpm.

        Parameters:
        - df (DataFrame): Spark DataFrame.
        - col_name (str): Name of the column to split.

        Returns:
        - DataFrame: DataFrame with new columns for value and rpm.
        """
        value = F.regexp_extract(df[col_name], r"(\d+)", 1).cast(IntegerType())
        rpm = F.regexp_replace(F.regexp_extract(df[col_name], r"@ ([\d,]+)", 1), ",", "").cast(IntegerType())
        return df.withColumn(f"{col_name}_value", value).withColumn(f"{col_name}_rpm", rpm)

    def perform_cleaning(self, df: DataFrame) -> DataFrame:
        """
        Performs cleaning operations on the input DataFrame.

        Parameters:
        - df (DataFrame): Input Spark DataFrame.

        Returns:
        - DataFrame: Cleaned DataFrame.
        """
        conversion_dict = {
            'back_legroom': self.to_float,
            'bed_height': self.to_float,
            'bed_length': self.to_float,
            'front_legroom': self.to_float,
            'height': self.to_float,
            'length': self.to_float,
            'wheelbase': self.to_float,
            'width': self.to_float,
            'city_fuel_economy': self.to_float,
            'combine_fuel_economy': self.to_float,
            'daysonmarket': self.to_int,
            'engine_displacement': self.to_float,
            'fuel_tank_volume': self.to_float,
            'highway_fuel_economy': self.to_float,
            'horsepower': self.to_int,
            'latitude': self.to_float,
            'longitude': self.to_float,
            'mileage': self.to_float,
            'owner_count': self.to_int,
            'price': self.to_float,
            'savings_amount': self.to_float,
            'seller_rating': self.to_float,
            'year': self.to_int,
            'fleet': self.to_bool,
            'frame_damaged': self.to_bool,
            'franchise_dealer': self.to_bool,
            'has_accidents': self.to_bool,
            'isCab': self.to_bool,
            'is_certified': self.to_bool,
            'is_cpo': self.to_bool,
            'is_new': self.to_bool,
            'is_oemcpo': self.to_bool,
            'salvage': self.to_bool,
            'theft_title': self.to_bool
        }

        # Apply conversion functions to corresponding columns
        for col, func in conversion_dict.items():
            df = df.withColumn(col, func(df[col]))

        # Convert listed_date to DateType
        df = df.withColumn('listed_date', F.to_date(df['listed_date'], 'yyyy-MM-dd'))

        # Split power and torque into value and rpm, and add new columns
        df = self.split_power_torque(df, 'power')
        df = self.split_power_torque(df, 'torque')

        # Cleaning maximum_seating and converting to Integer
        df = df.withColumn('maximum_seating', F.regexp_replace(F.col('maximum_seating'), '[^\d]+', '').cast(IntegerType()))

        return df  # Return cleaned DataFrame


In [9]:
# from us_used_cars_ml_pipeline.config.configuration import ConfigurationManager from config
from pyspark.sql import SparkSession
from us_used_cars_ml_pipeline.components.data_cleaning import CleanData
from us_used_cars_ml_pipeline import logger
from us_used_cars_ml_pipeline.utils.common import get_spark_session

class DataCleaningPipeline:
    """
    Pipeline for cleaning the used cars dataset.

    Class Attributes:
    - STAGE_NAME (str): Stage name for logging purposes.

    Attributes:
    - config_manager (ConfigurationManager): Manager for configuration settings.
    """
    
    STAGE_NAME = "Data Cleaning Stage"

    def __init__(self):
        """
        Initializes the DataCleaningPipeline object with the ConfigurationManager.
        """
        self.config_manager = ConfigurationManager()

    def initialize_spark_session(self) -> SparkSession:
        """
        Initializes and returns a Spark session.
        
        Returns:
        - SparkSession: Initialized Spark session.
        """
        return get_spark_session()

    def run_data_cleaning(self):
        """
        Runs the data cleaning process.
        
        This method fetches the data cleaning configuration, initializes the data cleaning process,
        reads the data, and performs data cleaning using the CleanData component.
        """
        try:
            logger.info("Fetching data cleaning configuration...")
            data_cleaning_configuration = self.config_manager.get_clean_data_config()
            
            logger.info("Initializing data cleaning process...")
            data_cleaning = CleanData(config=data_cleaning_configuration)

            logger.info("Read data for cleaning process...")
            spark = self.initialize_spark_session()
            df = data_cleaning.read_data_from_hdfs(spark)

            logger.info("Perform data cleaning")
            cleaned_df = data_cleaning.perform_cleaning(df)
            
            hdfs_path = data_cleaning_configuration.clean_data_URL
            if not hdfs_path:  # or any other validation check you find appropriate
                logger.error("Invalid HDFS path in configuration. Aborting data cleaning process.")
                raise ValueError("Invalid HDFS path in configuration.")
            logger.info(f"Writing cleaned data back to HDFS at {hdfs_path}")
            cleaned_df.write.mode('overwrite').parquet(hdfs_path)
            
        except Exception as e:
            logger.exception("An error occurred during the data cleaning process.")
            raise e
        
    def run_pipeline(self):
        """
        Runs the Data Cleaning Pipeline, logging the start and completion of the stage.
        """
        try:
            logger.info(f">>>>>> Stage: {DataCleaningPipeline.STAGE_NAME} started <<<<<<")
            self.run_data_cleaning()
            logger.info(f">>>>>> Stage {DataCleaningPipeline.STAGE_NAME} completed <<<<<< \n\nx==========x")
        except Exception as e:
            # No need to log the exception here since it's already logged in the run_data_cleaning method.
            raise e

if __name__ == '__main__':
    pipeline = DataCleaningPipeline()
    pipeline.run_pipeline()


[2023-10-08 16:12:35,807: 44: us_used_cars_ml_pipeline_logger: INFO: common:  yaml file: config/config.yaml loaded successfully]
[2023-10-08 16:12:35,813: 44: us_used_cars_ml_pipeline_logger: INFO: common:  yaml file: params.yaml loaded successfully]
[2023-10-08 16:12:35,815: 44: us_used_cars_ml_pipeline_logger: INFO: common:  yaml file: schema.yaml loaded successfully]
[2023-10-08 16:12:35,815: 72: us_used_cars_ml_pipeline_logger: INFO: 1060998365:  >>>>>> Stage: Data Cleaning Stage started <<<<<<]
[2023-10-08 16:12:35,817: 43: us_used_cars_ml_pipeline_logger: INFO: 1060998365:  Fetching data cleaning configuration...]
[2023-10-08 16:12:35,818: 46: us_used_cars_ml_pipeline_logger: INFO: 1060998365:  Initializing data cleaning process...]
[2023-10-08 16:12:35,818: 49: us_used_cars_ml_pipeline_logger: INFO: 1060998365:  Read data for cleaning process...]


23/10/08 16:12:37 WARN Utils: Your hostname, Macbooks-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.100 instead (on interface en0)
23/10/08 16:12:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/08 16:12:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/08 16:12:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

[2023-10-08 16:13:22,147: 53: us_used_cars_ml_pipeline_logger: INFO: 1060998365:  Perform data cleaning]


                                                                                

[2023-10-08 16:13:23,729: 60: us_used_cars_ml_pipeline_logger: INFO: 1060998365:  Writing cleaned data back to HDFS at hdfs://localhost:9000/geekradius/used_cars_project/clean_data]


23/10/08 16:13:23 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/10/08 16:13:26 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/10/08 16:13:26 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/10/08 16:13:48 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/10/08 16:13:49 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/10/08 16:13:49 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/10/08 16:13:50 WARN MemoryManager: Total al

[2023-10-08 16:18:09,151: 74: us_used_cars_ml_pipeline_logger: INFO: 1060998365:  >>>>>> Stage Data Cleaning Stage completed <<<<<< 

