In [8]:
import os
os.chdir("/Users/naveenkumar/Desktop/formula-1-bot")
%pwd

'/Users/naveenkumar/Desktop/formula-1-bot'

In [9]:
# from src.formula_one.config.configuration import ConfigurationManager
# from src.formula_one.entity.config_entity import DataValidationConfig, DatabaseConfig
# from src.formula_one.components.data_ingestion import DatabaseIngestion
# from src.formula_one.logging import logger
import pandas as pd
import numpy as np
from typing import Dict, List, Any
from src.formula_one.logging import logger

In [10]:
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, List, Dict, Any

@dataclass
class DataValidationConfig:
    """Configuration for data validation"""
    root_dir: Path
    validation_report_dir: Path
    data_quality_threshold: float = 0.90  # Minimum acceptable data quality
    missing_value_threshold: float = 0.25  # Maximum acceptable missing values
    outlier_threshold: float = 3.0         # Standard deviations for outlier detection
    
    # Tables to validate
    tables_to_validate: List[str] = None
    
    def __post_init__(self):
        if self.tables_to_validate is None:
            self.tables_to_validate = [
                "meetings", "sessions", "drivers", "laps", 
                "pit_stops", "stints", "positions", "intervals", 
                "weather", "race_control"
            ]

In [11]:
from src.formula_one.constants import *
from src.formula_one.utils.common import read_yaml, create_directories

In [12]:

class ConfigurationManager:
    """Manages configuration loading from YAML files"""
    
    def __init__(self, config_file_path: str = "config/config.yaml"):
        self.config_file_path = Path(config_file_path)
        self.config = read_yaml(self.config_file_path)
    
    def get_data_validation_config(self) -> DataValidationConfig:
        """Get data validation configuration"""
        config_data = self.config.get('data_validation', {})
        
        return DataValidationConfig(
            root_dir=Path(config_data.get('root_dir', 'artifacts/data_validation')),
            validation_report_dir=Path(config_data.get('validation_report_dir', 'artifacts/data_validation/reports')),
            data_quality_threshold=config_data.get('data_quality_threshold', 0.95),
            missing_value_threshold=config_data.get('missing_value_threshold', 0.1),
            outlier_threshold=config_data.get('outlier_threshold', 3.0),
            tables_to_validate=config_data.get('tables_to_validate', [
                "meetings", "sessions", "drivers", "laps", 
                "pit_stops", "stints", "positions", "intervals", 
                "weather", "race_control"
            ])
        )

In [13]:
from src.formula_one.entity.config_entity import DataValidationConfig, DatabaseConfig
from src.formula_one.components.data_ingestion import DatabaseIngestion

class DataValidation:
    """Handles data validation for F1 data"""
    
    def __init__(self, validation_config: DataValidationConfig, db_config: DatabaseConfig):
        self.validation_config = validation_config
        self.db_config = db_config
        self.logger = logger
        self.db_ingestion = DatabaseIngestion(None, db_config, None)  # Just for DB connection
    
    def validate_all_data(self) -> Dict[str, Any]:
        """Validate all tables in the database"""
        self.logger.info("Starting comprehensive data validation")
        
        validation_results = {}
        
        for table in self.validation_config.tables_to_validate:
            self.logger.info(f"Validating table: {table}")
            validation_results[table] = self.validate_table(table)
        
        # Overall validation summary
        overall_status = self._generate_validation_summary(validation_results)
        validation_results['overall'] = overall_status
        
        return validation_results
    
    def validate_table(self, table_name: str) -> Dict[str, Any]:
        """Validate a specific table"""
        conn = self.db_ingestion.connect_to_db()
        cursor = conn.cursor()
        
        try:
            # Get table data
            cursor.execute(f"SELECT * FROM {table_name}")
            columns = [desc[0] for desc in cursor.description]
            data = cursor.fetchall()
            
            if not data:
                return {
                    'status': 'EMPTY',
                    'row_count': 0,
                    'missing_values': {},
                    'data_types': {},
                    'outliers': {},
                    'issues': ['Table is empty']
                }
            
            # Convert to DataFrame for easier analysis
            df = pd.DataFrame(data, columns=columns)
            
            validation_result = {
                'row_count': len(df),
                'missing_values': self._check_missing_values(df),
                'data_types': self._check_data_types(df),
                'outliers': self._check_outliers(df, table_name),
                'duplicates': self._check_duplicates(df),
                'foreign_keys': self._check_foreign_keys(df, table_name),
                'issues': []
            }
            
            # Determine overall status
            validation_result['status'] = self._determine_table_status(validation_result)
            
            return validation_result
            
        except Exception as e:
            self.logger.error(f"Error validating table {table_name}: {e}")
            return {
                'status': 'ERROR',
                'issues': [f"Validation error: {str(e)}"]
            }
        finally:
            cursor.close()
            conn.close()
    
    def _check_missing_values(self, df: pd.DataFrame) -> Dict[str, float]:
        """Check missing values in each column"""
        missing_percentages = {}
        for column in df.columns:
            missing_count = df[column].isnull().sum()
            missing_percentage = missing_count / len(df)
            missing_percentages[column] = missing_percentage
            
            if missing_percentage > self.validation_config.missing_value_threshold:
                self.logger.warning(f"High missing values in {column}: {missing_percentage:.2%}")
        
        return missing_percentages
    
    def _check_data_types(self, df: pd.DataFrame) -> Dict[str, str]:
        """Check data types of each column"""
        return {column: str(dtype) for column, dtype in df.dtypes.items()}
    
    def _check_outliers(self, df: pd.DataFrame, table_name: str) -> Dict[str, List]:
        """Check for outliers in numerical columns"""
        outliers = {}
        
        for column in df.select_dtypes(include=[np.number]).columns:
            if column in ['id', 'created_at']:  # Skip metadata columns
                continue
                
            Q1 = df[column].quantile(0.25)
            Q3 = df[column].quantile(0.75)
            IQR = Q3 - Q1
            
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            outlier_indices = df[(df[column] < lower_bound) | (df[column] > upper_bound)].index.tolist()
            
            if outlier_indices:
                outliers[column] = outlier_indices
                self.logger.warning(f"Found {len(outlier_indices)} outliers in {table_name}.{column}")
        
        return outliers
    
    def _check_duplicates(self, df: pd.DataFrame) -> Dict[str, int]:
        """Check for duplicate rows"""
        duplicates = df.duplicated().sum()
        return {'duplicate_rows': duplicates}
    
    def _check_foreign_keys(self, df: pd.DataFrame, table_name: str) -> Dict[str, Any]:
        """Check foreign key relationships"""
        # This is a simplified check - you might want more sophisticated FK validation
        fk_checks = {}
        
        if 'meeting_key' in df.columns:
            fk_checks['meeting_key'] = {
                'unique_values': df['meeting_key'].nunique(),
                'null_count': df['meeting_key'].isnull().sum()
            }
        
        if 'session_key' in df.columns:
            fk_checks['session_key'] = {
                'unique_values': df['session_key'].nunique(),
                'null_count': df['session_key'].isnull().sum()
            }
        
        return fk_checks
    
    def _determine_table_status(self, validation_result: Dict[str, Any]) -> str:
        """Determine overall status of table validation"""
        issues = []
        
        # Check missing values
        for column, missing_pct in validation_result['missing_values'].items():
            if missing_pct > self.validation_config.missing_value_threshold:
                issues.append(f"High missing values in {column}")
        
        # Check duplicates
        if validation_result['duplicates']['duplicate_rows'] > 0:
            issues.append("Duplicate rows found")
        
        # Check outliers
        if validation_result['outliers']:
            issues.append("Outliers detected")
        
        if not issues:
            return 'PASS'
        elif len(issues) <= 2:
            return 'WARNING'
        else:
            return 'FAIL'
    
    def _generate_validation_summary(self, validation_results: Dict[str, Any]) -> Dict[str, Any]:
        """Generate overall validation summary"""
        total_tables = len([k for k in validation_results.keys() if k != 'overall'])
        passed_tables = len([v for v in validation_results.values() if isinstance(v, dict) and v.get('status') == 'PASS'])
        failed_tables = len([v for v in validation_results.values() if isinstance(v, dict) and v.get('status') == 'FAIL'])
        
        overall_status = 'PASS' if failed_tables == 0 else 'FAIL'
        
        return {
            'status': overall_status,
            'total_tables': total_tables,
            'passed_tables': passed_tables,
            'failed_tables': failed_tables,
            'pass_rate': passed_tables / total_tables if total_tables > 0 else 0
        }

In [14]:
 # Load configuration
config_manager = ConfigurationManager()
validation_config = config_manager.get_data_validation_config()
db_config = DatabaseConfig()

# Create validation instance
data_validation = DataValidation(validation_config, db_config)

# Run validation
validation_results = data_validation.validate_all_data()

# Print results
print("=== DATA VALIDATION RESULTS ===")
print(f"Overall Status: {validation_results['overall']['status']}")
print(f"Pass Rate: {validation_results['overall']['pass_rate']:.2%}")
print(f"Passed Tables: {validation_results['overall']['passed_tables']}/{validation_results['overall']['total_tables']}")

print("\n=== DETAILED RESULTS ===")
for table, result in validation_results.items():
    if table != 'overall':
        print(f"\n{table.upper()}:")
        print(f"  Status: {result.get('status', 'UNKNOWN')}")
        print(f"  Rows: {result.get('row_count', 0)}")
        print(f"  Issues: {len(result.get('issues', []))}")
        
        # Show missing values
        missing_values = result.get('missing_values', {})
        if any(pct > 0 for pct in missing_values.values()):
            print("  Missing Values:")
            for col, pct in missing_values.items():
                if pct > 0:
                    print(f"    {col}: {pct:.2%}")

[2025-07-03 13:04:10,898: INFO: common: yaml file: config/config.yaml loaded successfully]
[2025-07-03 13:04:10,900: INFO: 2071959291: Starting comprehensive data validation]
[2025-07-03 13:04:10,901: INFO: 2071959291: Validating table: meetings]


[2025-07-03 13:04:10,991: INFO: data_ingestion: Successfully connected to PostgreSQL database]
[2025-07-03 13:04:11,004: INFO: 2071959291: Validating table: sessions]
[2025-07-03 13:04:11,007: INFO: data_ingestion: Successfully connected to PostgreSQL database]
[2025-07-03 13:04:11,016: INFO: 2071959291: Validating table: drivers]
[2025-07-03 13:04:11,019: INFO: data_ingestion: Successfully connected to PostgreSQL database]
[2025-07-03 13:04:11,029: INFO: 2071959291: Validating table: laps]
[2025-07-03 13:04:11,032: INFO: data_ingestion: Successfully connected to PostgreSQL database]
[2025-07-03 13:04:11,118: INFO: 2071959291: Validating table: pit_stops]
[2025-07-03 13:04:11,122: INFO: data_ingestion: Successfully connected to PostgreSQL database]
[2025-07-03 13:04:11,147: INFO: 2071959291: Validating table: stints]
[2025-07-03 13:04:11,151: INFO: data_ingestion: Successfully connected to PostgreSQL database]
[2025-07-03 13:04:11,166: INFO: 2071959291: Validating table: positions]
[20