In [None]:
import os

In [None]:
%pwd

In [None]:
os.chdir(r'C:\Users\deepu\OneDrive\Desktop\ML_PIPELINE')
%pwd

In [None]:
import pandas as pd

In [None]:
data = pd.read_csv("artifacts/data_ingestion/winequality-red.csv")
data.head()

In [None]:
data.info()

In [None]:
data.isnull().sum()

In [None]:
data.dtypes

In [None]:
dict(data.dtypes)['fixed acidity']

In [None]:
data.shape

In [None]:
from dataclasses import dataclass
from pathlib import Path

In [None]:
@dataclass
class DataValidationConfig:
    root_dir: Path
    STATUS_FILE: str
    unzip_data_dir: Path
    all_schema: dict

In [None]:
from src.datascience.constants import *
from src.datascience.utils.common import read_yaml, create_directories

In [None]:
class ConfigurationManager:
    def __init__(
            self,
            config_filepath = CONFIG_FILE_PATH,
            params_filepath = PARAMS_FILE_PATH,
            schema_filepath = SCHEMA_FILE_PATH):
        
        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        self.schema = read_yaml(schema_filepath)
    
        create_directories([self.config.artifacts_root])


    def get_data_validation_config(self) -> DataValidationConfig:
        config = self.config.data_validation
        schema = self.schema.COLUMNS

        create_directories([config.root_dir])

        data_validation_config = DataValidationConfig(
            root_dir=config.root_dir,
            STATUS_FILE=config.STATUS_FILE,
            unzip_data_dir=config.unzip_dir,
            all_schema=schema
        )
        return data_validation_config

In [None]:
import os
from src.datascience import logger

## Updated DataValidation Class with Datatype Validation

In [None]:
class DataValidation:
    def __init__(self, config: DataValidationConfig):
        self.config = config

    
    def validate_all_columns(self) -> bool:
        """
        Validates if all required columns exist in the dataset
        """
        try:
            validation_status = None

            data = pd.read_csv(self.config.unzip_data_dir)
            all_cols = list(data.columns)

            all_schema = self.config.all_schema.keys()

            # Check if all columns from data exist in schema
            for col in all_cols:
                if col not in all_schema:
                    validation_status = False
                    with open(self.config.STATUS_FILE, 'w') as f:
                        f.write(f"Column Validation Status: {validation_status}\n")
                        f.write(f"Missing column in schema: {col}\n")
                    logger.error(f"Column '{col}' not found in schema")
                    return validation_status
            
            # All columns exist
            validation_status = True
            with open(self.config.STATUS_FILE, 'w') as f:
                f.write(f"Column Validation Status: {validation_status}\n")
            
            logger.info("All columns validated successfully")
            return validation_status
            
        except Exception as e:
            raise e


    def validate_all_dtypes(self) -> bool:
        """
        Validates if all column datatypes match the schema
        """
        try:
            validation_status = None
            
            # Read the data
            data = pd.read_csv(self.config.unzip_data_dir)
            
            # Get actual datatypes from CSV
            actual_dtypes = data.dtypes
            
            # Get expected datatypes from schema
            expected_schema = self.config.all_schema
            
            # Open file in append mode to add dtype validation results
            with open(self.config.STATUS_FILE, 'a') as f:
                f.write(f"\n{'='*50}\n")
                f.write(f"Datatype Validation:\n")
                f.write(f"{'='*50}\n")
            
            # Compare each column's datatype
            mismatches = []
            for col in data.columns:
                actual_dtype = str(actual_dtypes[col])
                expected_dtype = str(expected_schema[col])
                
                if actual_dtype != expected_dtype:
                    mismatch_msg = f"  ‚ùå {col}: Expected '{expected_dtype}', Got '{actual_dtype}'"
                    mismatches.append(mismatch_msg)
                    logger.warning(mismatch_msg)
            
            # Write results
            if len(mismatches) > 0:
                validation_status = False
                with open(self.config.STATUS_FILE, 'a') as f:
                    f.write(f"Datatype Validation Status: {validation_status}\n")
                    f.write(f"Mismatches Found:\n")
                    for mismatch in mismatches:
                        f.write(f"{mismatch}\n")
                logger.error(f"Found {len(mismatches)} datatype mismatches")
            else:
                validation_status = True
                with open(self.config.STATUS_FILE, 'a') as f:
                    f.write(f"Datatype Validation Status: {validation_status}\n")
                    f.write(f"‚úÖ All datatypes match the schema!\n")
                logger.info("All datatypes validated successfully")
            
            return validation_status
            
        except Exception as e:
            raise e

## Execute Data Validation Pipeline

In [None]:
try:
    config = ConfigurationManager()
    data_validation_config = config.get_data_validation_config()
    data_validation = DataValidation(config=data_validation_config)
    
    # Step 1: Validate column names
    print("\n" + "="*60)
    print("Step 1: Validating Column Names")
    print("="*60)
    column_status = data_validation.validate_all_columns()
    logger.info(f"Column validation status: {column_status}")
    print(f"‚úÖ Column Validation: {'PASSED' if column_status else 'FAILED'}")
    
    # Step 2: Validate datatypes
    print("\n" + "="*60)
    print("Step 2: Validating Column Datatypes")
    print("="*60)
    dtype_status = data_validation.validate_all_dtypes()
    logger.info(f"Datatype validation status: {dtype_status}")
    print(f"‚úÖ Datatype Validation: {'PASSED' if dtype_status else 'FAILED'}")
    
    # Overall validation
    print("\n" + "="*60)
    print("Final Results")
    print("="*60)
    if column_status and dtype_status:
        logger.info("‚úÖ All validations passed!")
        print("üéâ ALL VALIDATIONS PASSED! üéâ")
        print(f"\nDetailed report saved at: {data_validation_config.STATUS_FILE}")
    else:
        logger.warning("‚ö†Ô∏è Some validations failed. Check status.txt for details.")
        print("‚ö†Ô∏è SOME VALIDATIONS FAILED")
        print(f"\nCheck details at: {data_validation_config.STATUS_FILE}")
        
except Exception as e:
    print(f"\n‚ùå ERROR OCCURRED: {str(e)}")
    raise e

## View Validation Status File

In [None]:
# Read and display the status file
with open('artifacts/data_validation/status.txt', 'r') as f:
    print(f.read())