In the data validation phase, the primary goal is to ensure that the dataset meets the expected quality standards

In [1]:
import os

In [2]:
%pwd

'/Users/macbookpro/Desktop/pixi_hr_project/pixi_hr/research'

In [3]:
os.chdir("../")

In [4]:
%pwd

'/Users/macbookpro/Desktop/pixi_hr_project/pixi_hr'

Entity update

update config.yaml, and update the schema for balidation

In [5]:
import pandas as pd

In [6]:
from src.pixi_hr.constants import *

In [7]:
df = pd.read_csv("artifacts/data_ingestion/jobs_data_simply_hired.csv")
df.head()

Unnamed: 0,date_of_job_post,title,job_location,company_name,job_link,job_summary,job_type,job_qualifications,job_description
0,2023-07-14T17:00:35Z,Data Scientist,"Toronto, ON",Cineplex,https://www.simplyhired.ca/job/tsfSmO_biXPolNH...,Lead the data/IT governance process and manage...,Full-time,"['Power BI', 'SQL', 'Tableau', 'MTA', 'Data vi...",Work location:\nHome Office 1303 Yonge St\n\nC...
1,2023-07-13T01:10:36Z,Data Scientist,"Greater Toronto Area, ON",fgf brands,https://www.simplyhired.ca/job/dnDqxJfbA0BNLi3...,We are seeking a talented and experienced Data...,Full-time,"['Analysis skills', 'TensorFlow', 'Communicati...",Job Description:\nData Scientist – Facial Reco...
2,2023-07-06T22:18:35Z,Data Scientist,"Mississauga, ON",Procom,https://www.simplyhired.ca/job/A-L5E5YUGKQWdur...,Data mining or extracting usable data from val...,Contract,"['SQL', 'Microsoft Excel', 'SAS', 'NoSQL', 'Ca...",On behalf of our client in the Transportation ...
3,2023-07-06T03:12:09Z,Staff Data Scientist,"Toronto, ON",ISG Search Inc,https://www.simplyhired.ca/job/-SAKV7mxY8l7tfm...,Lead a small team of data scientists. Collabor...,,"['TensorFlow', 'C++', 'Azure', 'C', 'Machine l...",Must Have:\n7+ years of experience with genera...
4,2023-07-05T08:44:07Z,Data Scientist,"Toronto, ON",Canada Life Assurance Company,https://www.simplyhired.ca/job/Tiz9efu8Gbf2yqV...,Make data-driven business recommendations and ...,Full-time,"['TensorFlow', 'Power BI', 'Communication skil...",Job Description:\nCanada Life is seeking a hig...


In [8]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 239 entries, 0 to 238
Data columns (total 9 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   date_of_job_post    239 non-null    object
 1   title               239 non-null    object
 2   job_location        239 non-null    object
 3   company_name        239 non-null    object
 4   job_link            239 non-null    object
 5   job_summary         239 non-null    object
 6   job_type            170 non-null    object
 7   job_qualifications  239 non-null    object
 8   job_description     239 non-null    object
dtypes: object(9)
memory usage: 16.9+ KB


Entity

In [9]:
from dataclasses import dataclass
from pathlib import Path

# Using the dataclass decorator to create a class that mainly represents a data structure.
# The frozen=True parameter makes the instances of this dataclass immutable.
@dataclass(frozen=True)
class DataValidationConfig:
    # Path to the root directory where data validation artifacts are stored.
    root_dir: Path
    
    # Location of the extracted data (in this case, a CSV file) that needs to be validated.
    unzip_data_dir: Path
    
    # Path to a status file used to track the progress or status of data validation.
    STATUS_FILE: str

    # Store all schema configuration
    all_schema: dict

Configuration Manager

In [10]:
from src.pixi_hr.constants import *
from src.pixi_hr.utils.common import read_yaml, create_directories

In [11]:
class ConfigurationManager:
    def __init__(
            self,
            config_filepath = CONFIG_FILE_PATH,   # Path to the configuration YAML file
            params_filepath = PARAMS_FILE_PATH,   # Path to the parameters YAML file
            schema_filepath = SCHEMA_FILE_PATH):  # Path to the schema YAML file
        
        # Load the configuration details from the YAML file

        print(type(config_filepath), config_filepath)
        self.config = read_yaml(config_filepath)
        # Load the parameters details from the YAML file
        self.params = read_yaml(params_filepath)
        # Load the schema details from the YAML file
        self.schema = read_yaml(schema_filepath)


        # Create directories as specified in the configuration (e.g., for storing artifacts)
        create_directories([self.config.artifacts_root])

    def get_data_validation_config(self) -> DataValidationConfig:
        # Extract data validation configuration from the main configuration
        config = self.config.data_validation
        # Extract schema columns from schema.yaml
        schema = self.schema.COLUMNS

        # Create directories specified in the data valition configuration
        create_directories([config.root_dir])

        # Create an instance of the DataValidationConfig dataclass using the extracted configuration
        data_validation_config = DataValidationConfig(
            root_dir=config.root_dir,
            unzip_data_dir=config.unzip_data_dir,
            STATUS_FILE=config.STATUS_FILE,
            all_schema=schema
        )

        return data_validation_config


components

In [12]:
import os
from pixi_hr import logger

In [13]:
class DataValidation:
    def __init__(self, config: DataValidationConfig):
        self.config = config
        self.df = pd.read_csv(self.config.unzip_data_dir)


    def validate_columns(self) -> bool:
        """
        Ensure that all expected columns are present in the dataset.
        """
        validation_status = True
        status_message = "Validation status: "
        
        all_columns = self.df.columns
        expected_columns = self.config.all_schema.keys()  # Assuming your config provides expected columns

        print(expected_columns)
        
        missing_columns = set(expected_columns) - set(all_columns)
        extra_columns = set(all_columns) - set(expected_columns)
        
        if missing_columns:
            validation_status = False
            logger.warning(f"Missing columns: {missing_columns}")
            status_message += f"Missing columns: {missing_columns}\n"
            
        if extra_columns:
            validation_status = False
            logger.warning(f"Extra columns found: {extra_columns}")
            status_message += f"Extra columns found: {extra_columns}\n"
            
        if validation_status:
            logger.info("All expected columns are present in the dataframe.")
            status_message += "All expected columns are present."
        
        # Write the validation status to the file once, at the end
        with open(self.config.STATUS_FILE, 'w') as f:
            f.write(status_message)

        return validation_status


Pipeline

In [16]:
from pixi_hr import logger
from pixi_hr.config.configuration import ConfigurationManager
from pixi_hr.components.data_validation import DataValidation

class DataValidationTrainingPipeline:
    """
    Pipeline for validating the data before training or processing. 

    This pipeline performs the following steps:
    1. Initializes configuration management.
    2. Fetches the data validation configuration.
    3. Initializes the DataValidation component using the fetched configuration.
    4. Validates the columns of the data.
    """

    STAGE_NAME = "Data Validation Stage"

    def __init__(self):
        """
        Initializes the DataValidationTrainingPipeline.
        """
        pass

    def main(self):
        """
        Executes the main functionality of the DataValidationTrainingPipeline.
        """
        logger.info("Starting the Data Validation Pipeline...")

        # Step 1: Initialize ConfigurationManager
        logger.info("Initializing ConfigurationManager...")
        config = ConfigurationManager()

        # Step 2: Fetch Data Validation Configuration
        logger.info("Fetching Data Validation Configuration...")
        data_validation_config = config.get_data_validation_config()

        # Step 3: Initialize DataValidation Component
        logger.info("Initializing DataValidation Component...")
        data_validation = DataValidation(config=data_validation_config)

        # Step 4: Validate Columns
        logger.info("Validating Columns...")
        validation_status = data_validation.validate_columns()

        # Log the result of the validation
        if validation_status:
            logger.info("All columns successfully validated.")
        else:
            logger.warning("Column validation failed. Check logs for more details.")
        
        logger.info("Data Validation Pipeline completed successfully.")

if __name__ == '__main__':
    try:
        logger.info(f">>>>>> Stage: {DataValidationTrainingPipeline.STAGE_NAME} started <<<<<<")
        data_validation_training_pipeline = DataValidationTrainingPipeline()
        data_validation_training_pipeline.main()
        logger.info(f">>>>>> Stage {DataValidationTrainingPipeline.STAGE_NAME} completed <<<<<< \n\nx==========x")
    except Exception as e:
        logger.exception(f"Error encountered during the Data Validation Pipeline: {e}")
        raise


[2023-08-15 20:32:47,895: 10: pixi_hr_project_logger: INFO: 369255095:  Starting the Data Validation Pipeline...]
[2023-08-15 20:32:47,897: 13: pixi_hr_project_logger: INFO: 369255095:  Initializing ConfigurationManager...]
<class 'pathlib.PosixPath'> config/config.yaml
[2023-08-15 20:32:47,899: 41: pixi_hr_project_logger: INFO: common:  yaml file: config/config.yaml loaded successfully]
[2023-08-15 20:32:47,900: 41: pixi_hr_project_logger: INFO: common:  yaml file: params.yaml loaded successfully]
[2023-08-15 20:32:47,901: 41: pixi_hr_project_logger: INFO: common:  yaml file: schema.yaml loaded successfully]
[2023-08-15 20:32:47,902: 64: pixi_hr_project_logger: INFO: common:  Created directory at: artifacts]
[2023-08-15 20:32:47,902: 17: pixi_hr_project_logger: INFO: 369255095:  Fetching Data Validation Configuration...]
[2023-08-15 20:32:47,903: 64: pixi_hr_project_logger: INFO: common:  Created directory at: artifacts/data_validation]
[2023-08-15 20:32:47,903: 21: pixi_hr_project_lo

In [14]:
import pandas as pd
import ast
from pixi_hr import logger

class DataValidator:
    def __init__(self, df):
        self.df = df

    def validate(self):
        self.validate_columns()
        self.validate_date_of_job_post()
        self.validate_text_fields(['title', 'job_location', 'company_name', 'job_link', 'job_summary', 'job_description'])
        self.validate_job_link()
        self.validate_job_type()
        self.validate_job_qualifications()
        self.handle_duplicates()

    def validate_columns(self):
        """
        Ensure that all expected columns are present in the dataset.
        """
        expected_columns = [
            'date_of_job_post', 'title', 'job_location', 'company_name', 
            'job_link', 'job_summary', 'job_type', 'job_qualifications', 
            'job_description'
        ]
        missing_columns = set(expected_columns) - set(self.df.columns)
        extra_columns = set(self.df.columns) - set(expected_columns)
        
        if missing_columns:
            logger.warning(f"Missing columns: {missing_columns}")
        if extra_columns:
            logger.warning(f"Extra columns found: {extra_columns}")
        if not missing_columns and not extra_columns:
            logger.info("All expected columns are present in the dataframe.")

    def validate_date_of_job_post(self):
        """
        Validate that 'date_of_job_post' contains valid date-time strings.
        """
        try:
            self.df['date_of_job_post_temp'] = pd.to_datetime(self.df['date_of_job_post'], errors='raise')
            logger.info("All values in 'date_of_job_post' are valid date-time strings.")
        except Exception as e:
            logger.warning(f"Error encountered: {e}")
            self.df['date_of_job_post_temp'] = pd.to_datetime(self.df['date_of_job_post'], errors='coerce')
            invalid_rows = self.df[self.df['date_of_job_post_temp'].isna()]
            logger.warning("\nRows with invalid date-time strings:")
            logger.warning(invalid_rows[['date_of_job_post']])

    def validate_text_fields(self, columns):
        """
        Validate that the specified columns contain only text values.
        """
        for column in columns:
            non_text_rows = self.df[self.df[column].apply(lambda x: not isinstance(x, str))]
            if not non_text_rows.empty:
                logger.warning(f"Column '{column}' has non-text values:")
                logger.warning(non_text_rows[[column]])
            else:
                logger.info(f"Column '{column}' contains only text values.")

    def validate_job_link(self):
        """
        Validate that 'job_link' starts with "http".
        """
        invalid_urls = self.df[~self.df['job_link'].str.startswith("http")]
        if not invalid_urls.empty:
            logger.warning(f"Found {len(invalid_urls)} rows with URLs not starting with 'http':")
            logger.warning(invalid_urls[['job_link']])
        else:
            logger.info("All URLs in 'job_link' start with 'http'.")

    def validate_job_type(self):
        """
        Validate that 'job_type' contains only text values, excluding nulls.
        """
        non_null_job_types = self.df[self.df['job_type'].notna()]
        non_text_job_types = non_null_job_types[~non_null_job_types['job_type'].apply(lambda x: isinstance(x, str))]
        if not non_text_job_types.empty:
            logger.warning(f"Found {len(non_text_job_types)} rows in 'job_type' with non-text values:")
            logger.warning(non_text_job_types[['job_type']])
        else:
            logger.info("All non-null values in 'job_type' are of text type.")

    def validate_job_qualifications(self):
        """
        Validate that 'job_qualifications' contains valid lists of text values.
        """
        def is_valid_list(value):
            try:
                lst = ast.literal_eval(value)
                return isinstance(lst, list) and all(isinstance(i, str) for i in lst)
            except (ValueError, SyntaxError):
                return False

        invalid_qualifications = self.df[~self.df['job_qualifications'].apply(is_valid_list)]
        if not invalid_qualifications.empty:
            logger.warning(f"Found {len(invalid_qualifications)} rows in 'job_qualifications' with invalid format:")
            logger.warning(invalid_qualifications[['job_qualifications']])
        else:
            logger.info("All values in 'job_qualifications' are valid lists of text values.")

    def handle_duplicates(self):
        """
        Handle duplicate rows based on the 'job_link' column.
        """
        num_duplicates = self.df[self.df['job_link'].duplicated()].shape[0]
        if num_duplicates > 0:
            self.df.drop_duplicates(subset='job_link', inplace=True)
            logger.info(f"Dropped {num_duplicates} duplicate rows based on the 'job_link' column.")
        else:
            logger.info("No duplicates found based on the 'job_link' column.")


In [15]:
def main(dataframe):
    """
    Main function to validate the dataframe.
    
    Args:
    - dataframe (pd.DataFrame): The dataframe to validate.
    """
    logger.info("Starting data validation process.")
    
    # Initialize the DataValidator class with the dataframe
    validator = DataValidator(dataframe)
    
    # Run the validations
    validator.validate()
    
    # Ensure the directory exists or create it
    save_dir = "artifacts/data_validation"
    os.makedirs(save_dir, exist_ok=True)
    
    # Save the validated dataframe
    save_path = os.path.join(save_dir, "validated_data.csv")
    dataframe.to_csv(save_path, index=False)
    logger.info(f"Validated dataframe saved to {save_path}")
    
    logger.info("Data validation process completed.")

if __name__ == "__main__":
    # Sample usage: (Replace 'df' with your actual dataframe variable)
    main(df)

[2023-08-15 16:55:51,652: 8: pixi_hr_project_logger: INFO: 211043931:  Starting data validation process.]
[2023-08-15 16:55:51,658: 43: pixi_hr_project_logger: INFO: 3972941761:  All values in 'date_of_job_post' are valid date-time strings.]
[2023-08-15 16:55:51,659: 61: pixi_hr_project_logger: INFO: 3972941761:  Column 'title' contains only text values.]
[2023-08-15 16:55:51,660: 61: pixi_hr_project_logger: INFO: 3972941761:  Column 'job_location' contains only text values.]
[2023-08-15 16:55:51,661: 61: pixi_hr_project_logger: INFO: 3972941761:  Column 'company_name' contains only text values.]
[2023-08-15 16:55:51,661: 61: pixi_hr_project_logger: INFO: 3972941761:  Column 'job_link' contains only text values.]
[2023-08-15 16:55:51,662: 61: pixi_hr_project_logger: INFO: 3972941761:  Column 'job_summary' contains only text values.]
[2023-08-15 16:55:51,663: 61: pixi_hr_project_logger: INFO: 3972941761:  Column 'job_description' contains only text values.]
[2023-08-15 16:55:51,664: 72: