In [1]:
import os

In [2]:
os.chdir("../")

In [3]:
%pwd

'/Users/macbookpro/Documents/semantic_preprocessor_model/semantic_preprocessor_model'

# Config.yaml


In [None]:
# Configuration related to data validation
data_validation:
  # Directory where data validation results and artifacts are stored
  root_dir: artifacts/data_validation
  
  # Path to the ingested data file that will be used for validation
  data_source_file: artifacts/data_ingestion/data.csv
  
  # Path to the file that captures the validation status (e.g., success, errors encountered)
  status_file: artifacts/data_validation/status.txt

# Entity

In [4]:
from dataclasses import dataclass
from pathlib import Path
from typing import Dict

@dataclass(frozen=True)
class DataValidationConfig:
    """
    Configuration class for data validation.
    
    This class captures the essential configurations required for data validation, 
    including directories for storing validation results, paths to data files, 
    and the expected data schema.
    
    Attributes:
    -----------
    root_dir : Path
        Directory for storing validation results and related artifacts.
        
    data_source_file : Path
        Path to the ingested or feature-engineered data file.
        
    status_file : Path
        File for logging the validation status.
        
    schema : Dict[str, Dict[str, str]]
        Dictionary containing initial schema configurations for data validation.
    """
    
    root_dir: Path  # Directory for storing validation results and related artifacts
    data_source_file: Path  # Path to the ingested or feature-engineered data file
    status_file: Path  # File for logging the validation status
    schema: Dict[str, Dict[str, str]]  # Dictionary containing initial schema configurations

# Schema

In [None]:
schema_type: "schema"
description: "Schema of the data before feature engineering."

columns:
  work_name: 
    type: object
    description: "Name or identifier of the work task."
  generalized_work_class: 
    type: object
    description: "Broad category or class of the work task."
  global_work_class: 
    type: object
    description: "Highest level category or class of the work task."
  upper_works: 
    type: object
    description: "Higher-level tasks or processes associated with the work task."

# Configuration Manager


In [5]:
from src.semantic_preprocessor_model.constants import *
from src.semantic_preprocessor_model.utils.common import read_yaml, create_directories
from src.semantic_preprocessor_model import logger
from src.semantic_preprocessor_model.entity.config_entity import DataValidationConfig
import os

class ConfigurationManager:
    """
    Manager for configurations required for the semantic preprocessor data pipeline.

    This manager facilitates the reading of configuration, parameter, and schema settings 
    from specified files. It provides methods to access these settings and ensures the 
    creation of necessary directories as defined in the configurations.

    Attributes:
    - config (dict): Configuration settings.
    - params (dict): Pipeline parameters.
    - schema (dict): Schema information.
    """
    
    def __init__(self, 
                 config_filepath=CONFIG_FILE_PATH, 
                 params_filepath=PARAMS_FILE_PATH, 
                 schema_filepath=SCHEMA_FILE_PATH) -> None:
        """
        Initialize ConfigurationManager with configurations, parameters, and schema.

        Args:
        - config_filepath (Path): Path to the configuration file.
        - params_filepath (Path): Path to the parameters file.
        - schema_filepath (Path): Path to the schema file.

        Creates:
        - Directories specified in the configuration, if they don't exist.
        """
        self.config = self._read_config_file(config_filepath, "config")
        self.params = self._read_config_file(params_filepath, "params")
        self.schema = self._read_config_file(schema_filepath, "initial_schema")

        # Ensure the directory for storing artifacts exists
        create_directories([self.config.artifacts_root])

    def _read_config_file(self, filepath: str, config_name: str) -> dict:
        """
        Read a configuration file and return its content.

        Args:
        - filepath (str): Path to the configuration file.
        - config_name (str): Name of the configuration (for logging purposes).

        Returns:
        - dict: Configuration settings.

        Raises:
        - Exception: If there's an error reading the file.
        """
        try:
            return read_yaml(filepath)
        except Exception as e:
            logger.error(f"Error reading {config_name} file: {filepath}. Error: {e}")
            raise

    def get_data_validation_config(self) -> DataValidationConfig:
        """
        Extracts data validation configurations and constructs a DataValidationConfig object.

        Returns:
        - DataValidationConfig: Object containing data validation configuration.

        Raises:
        - AttributeError: If the 'data_validation' attribute does not exist in the config.
        """
        try:
            # Extract data validation configurations
            config = self.config.data_validation
            
            # Extract schema for data validation
            schema = self.schema.columns
            
            # Ensure the directory for the status file exists
            create_directories([os.path.dirname(config.status_file)])

            # Construct and return the DataValidationConfig object
            return DataValidationConfig(
                root_dir=Path(config.root_dir),
                data_source_file=Path(config.data_source_file),
                status_file=Path(config.status_file),
                schema=schema
            )

        except AttributeError as e:
            # Log the error and re-raise the exception for handling by the caller
            logger.error("The 'data_validation' attribute does not exist in the config file.")
            raise e


# Component

In [16]:
import pandas as pd
from src.semantic_preprocessor_model import logger
from src.semantic_preprocessor_model.entity.config_entity import DataValidationConfig


class DataValidation:
    """
    The DataValidation class ensures the integrity of the dataset by comparing it 
    against a predefined schema. It verifies the presence and data types of columns 
    as per the expectations set in the schema.

    Attributes:
    - df (pd.DataFrame): The data to be validated.
    """

    # Define optional and required columns for validation
    optional_columns = {'upper_work'}
    required_columns = {'work_name', 'generalized_work_class', 'global_work_class'}

    def __init__(self, config: DataValidationConfig, file_object=None):
        """
        Initializes the DataValidation class.
        
        Depending on the presence of a file_object, it either loads data from the provided 
        file object or from the specified file in the configuration.

        Args:
        - config (DataValidationConfig): Configuration settings for data validation.
        - file_object (File, optional): A file object containing the dataset.
        """
        logger.info("Initializing DataValidation.")
        self.config = config
        try:
            if file_object:
                self.df = pd.read_csv(file_object)
            else:
                self.df = pd.read_csv(self.config.data_source_file)
        except FileNotFoundError:
            logger.error(f"File not found: {self.config.data_source_file}")
            raise

    def validate_all_features(self) -> bool:
        """
        Checks if all expected columns, as defined in the schema, are present in the dataframe.

        Returns:
        - bool: True if all columns are present and match the schema, False otherwise.
        """
        logger.info("Starting feature validation.")
        
        validation_status = True

        all_columns = set(self.df.columns)
        expected_columns = set(self.config.schema.keys())

        missing_required_columns = self.required_columns - all_columns
        extra_columns = all_columns - expected_columns - self.optional_columns

        if missing_required_columns:
            validation_status = False
            logger.warning(f"Missing required columns: {', '.join(missing_required_columns)}")

        if extra_columns:
            validation_status = False
            logger.warning(f"Extra columns found: {', '.join(extra_columns)}")

        if validation_status:
            logger.info("All expected columns are present in the dataframe.")
        return validation_status

    def validate_data_types(self) -> bool:
        """
        Checks the data types of each column in the dataframe against the expected 
        data types defined in the schema.

        Returns:
        - bool: True if all column data types match the schema, False otherwise.
        """
        logger.info("Starting data type validation.")
        validation_status = True
        
        expected_data_types = {col: self.config.schema[col]['type'] for col in self.config.schema if col in self.df.columns}

        for column, dtype in expected_data_types.items():
            if not pd.api.types.is_dtype_equal(self.df[column].dtype, dtype):
                validation_status = False
                logger.warning(f"Data type mismatch for column '{column}': Expected {dtype} but got {self.df[column].dtype}")

        if validation_status:
            logger.info("All data types are as expected.")
        return validation_status



    def _write_status_to_file(self, message: str, overwrite: bool = False):
        """
        Writes the validation status message to a specified file.

        Args:
        - message (str): The message to write.
        - overwrite (bool, optional): If set to True, overwrites the file. If False, appends to the file.
        """
        logger.info("Writing validation status to file.")
        mode = 'w' if overwrite else 'a'
        try:
            with open(self.config.status_file, mode) as f:
                f.write(message + "\n")
        except Exception as e:
            logger.error(f"Error writing to status file: {e}")
            raise

    def run_all_validations(self) -> bool:
        """
        Executes all data validations and logs the overall status. 
        It encompasses both feature existence and data type checks.
        """
        logger.info("Running all data validations.")
        feature_validation_status = self.validate_all_features()
        data_type_validation_status = self.validate_data_types()

        overall_status = "Overall Validation Status: "
        if feature_validation_status and data_type_validation_status:
            overall_status += "All validations passed."
            logger.info(overall_status)
        else:
            overall_status += "Some validations failed. Check the log for details."
            logger.error(overall_status)
        
        self._write_status_to_file(overall_status)
        return feature_validation_status and data_type_validation_status


# Pipeline

In [17]:
from src.semantic_preprocessor_model import logger

class InitialDataValidationPipeline:
    """
    This pipeline handles the initial data validation steps.

    After the data ingestion stage, it's imperative to ensure the data's integrity
    before moving on to feature engineering or model training. This class
    orchestrates that validation by checking for correct features and data types.

    Attributes:
        STAGE_NAME (str): The name of this pipeline stage.
    """

    STAGE_NAME = "Data Validation Pipeline"

    def __init__(self):
        """
        Initializes the pipeline with a configuration manager.
        """
        self.config_manager = ConfigurationManager()

    def run_data_validation(self):
        """
        Run the set of data validations.
        
        This method orchestrates the different validation functions to ensure the
        dataset's integrity.
        """
        try:
            logger.info("Fetching initial data validation configuration...")
            data_validation_config = self.config_manager.get_data_validation_config()

            logger.info("Initializing data validation process...")
            data_validation = DataValidation(config=data_validation_config)

            logger.info("Executing Data Validations...")
            data_validation.run_all_validations()

            logger.info("Initial Data Validation Pipeline completed successfully.")

        except Exception as e:
            logger.error(f"Error encountered during the data validation: {e}")
    
    def run_pipeline(self):
        """
        Run the entire Initial Data Validation Pipeline.
        
        This method encapsulates the process of the initial data validation and
        provides logs for each stage of the pipeline.
        """
        try:
            logger.info(f">>>>>> Stage: {InitialDataValidationPipeline.STAGE_NAME} started <<<<<<")
            self.run_data_validation()
            logger.info(f">>>>>> Stage {InitialDataValidationPipeline.STAGE_NAME} completed <<<<<< \n\nx==========x")
        except Exception as e:
            logger.error(f"Error encountered during the {InitialDataValidationPipeline.STAGE_NAME}: {e}")
            raise e

if __name__ == '__main__':
    pipeline = InitialDataValidationPipeline()
    pipeline.run_pipeline()


[2023-10-23 21:51:08,049: 42: semantic_preprocessor_model_logger: INFO: common:  yaml file: config/config.yaml loaded successfully]
[2023-10-23 21:51:08,052: 42: semantic_preprocessor_model_logger: INFO: common:  yaml file: params.yaml loaded successfully]
[2023-10-23 21:51:08,054: 42: semantic_preprocessor_model_logger: INFO: common:  yaml file: schema.yaml loaded successfully]
[2023-10-23 21:51:08,055: 65: semantic_preprocessor_model_logger: INFO: common:  Created directory at: artifacts]
[2023-10-23 21:51:08,055: 53: semantic_preprocessor_model_logger: INFO: 3251083886:  >>>>>> Stage: Data Validation Pipeline started <<<<<<]
[2023-10-23 21:51:08,056: 31: semantic_preprocessor_model_logger: INFO: 3251083886:  Fetching initial data validation configuration...]
[2023-10-23 21:51:08,056: 65: semantic_preprocessor_model_logger: INFO: common:  Created directory at: artifacts/data_validation]
[2023-10-23 21:51:08,057: 34: semantic_preprocessor_model_logger: INFO: 3251083886:  Initializing 