Validations 


In [None]:
! pip install numpy 

In [None]:
! pip install numpy 

In [None]:
! pip install openpyxl

In [None]:
import os
import pandas as pd

class FileValidator:

    def __init__(self, file_path, known_headers=["MeterNumber", "Meter ID", "Meter_ID", "Meter_Number"]):
        self.file_path = file_path
        self.known_headers = known_headers if known_headers else ["nothing"]
        self.errors = []
        self.status = "PASS" 

    # 1. Validate File Extension
    def validate_file_type(self):
        valid_extensions = ['.xlsx', '.xls', '.csv']
        _, file_ext = os.path.splitext(self.file_path)

        if file_ext.lower() not in valid_extensions:
            self.errors.append(f" Unsupported file type: {file_ext}")
            self.status = "REJECT"
            return False
        
        print(f" File type valid: {file_ext}")
        return True

    # 2. Validate Header Presence
    def validate_header_presence(self):
        try:
            if self.file_path.endswith('.csv'):
                df = pd.read_csv(self.file_path)
            else:
                df = pd.read_excel(self.file_path)

            headers = df.columns.str.lower()
            if any(header.lower() in headers for header in self.known_headers):
                print(" Header contains at least one known meter field.")
                return True
            else:
                self.errors.append("No known meter headers found.")
                self.status = "ROUTE_TO_HEADER_MAPPING_AI"
                return False

        except Exception as e:
            self.errors.append(f" Error reading file: {str(e)}")
            self.status = "REJECT"
            return False

    # 3. Run All Validations
    def run_all_validations(self):
        print("\n--- Running File Validations ---")
        if not self.validate_file_type():
            return self.status, self.errors
        
        self.validate_header_presence()
        return self.status, self.errors


#  Example Usage:
file_path = r"D:\AI Projects\powerthon\VALIDATIONS\Powerthon_IP_DATA_Adani_Project_V2.xlsx"  # Change to your test file path
validator = FileValidator(file_path)

status, errors = validator.run_all_validations()

print("\n--- Validation Result ---")
print("Final Status:", status)
print("Errors / Notes:", errors)


header_Normalization

In [None]:
import pandas as pd

class HeaderNormalizer:
    def __init__(self, file_path, header_mapping):
        """
        :param file_path: Path of the input file
        :param header_mapping: Dictionary of vendor header ‚Üí canonical header
        """
        self.file_path = file_path
        self.header_mapping = {key.lower(): value for key, value in header_mapping.items()}
        self.original_headers = []
        self.mapped_headers = []
        self.status = "NOT_STARTED"

    def load_file(self):
        try:
            if self.file_path.endswith('.csv'):
                df = pd.read_csv(self.file_path)
            else:
                df = pd.read_excel(self.file_path)

            self.original_headers = list(df.columns)
            return True, df
        except Exception as e:
            self.status = "FAILED"
            return False, str(e)

    def normalize_headers(self, df):
        canonical_headers = []
        for col in df.columns:
            normalized = self.header_mapping.get(col.lower(), col)  # Map or keep original
            canonical_headers.append(normalized)

        self.mapped_headers = canonical_headers
        df.columns = canonical_headers  # Replace in dataframe
        self.status = "HEADERS_NORMALIZED"
        return df

    def run(self):
        print("\n--- Header Normalization Process ---")
        
        success, result = self.load_file()
        if not success:
            print(" Error loading file:", result)
            return self.status
        
        print(" File loaded successfully.")
        df = result
        
        df = self.normalize_headers(df)
        print(" Header mapping complete.")
        print("\nOriginal Headers:", self.original_headers)
        print("Mapped Headers:   ", self.mapped_headers)

        return df, self.original_headers, self.mapped_headers


In [69]:
import os
import pandas as pd
import numpy as np


In [70]:
class FileValidator:
    def __init__(self, file_path):
        self.file_path = file_path
        self.errors = []
        self.status = "PASS"

    def validate_file_type(self):
        valid_extensions = ['.xlsx', '.xls', '.csv']
        _, file_ext = os.path.splitext(self.file_path)

        if file_ext.lower() not in valid_extensions:
            self.errors.append(f" Unsupported file type: {file_ext}")
            self.status = "REJECT"
            return False
        return True

    def validate_header_presence(self, known_headers=["MeterNumber", "Meter ID", "Meter_ID", "Meter_Number"]):
        known_headers = known_headers if known_headers else ["meter_id", "meter_no", "meter_number"]

        try:
            df = pd.read_csv(self.file_path) if self.file_path.endswith('.csv') else pd.read_excel(self.file_path)
            headers = df.columns.str.lower()

            if any(h.lower() in headers for h in known_headers):
                return True
            else:
                self.errors.append(" No known meter headers found.")
                self.status = "ROUTE_TO_HEADER_MAPPING_AI"
                return False
        except Exception as e:
            self.errors.append(f" Error reading file: {e}")
            self.status = "REJECT"
            return False


    def run(self):
        print("üîç Running File-Level Validation...")
        if not self.validate_file_type():
            return None, self.status, self.errors

        file_readable = self.validate_header_presence()
        df = None
        if file_readable:
            df = pd.read_csv(self.file_path) if self.file_path.endswith('.csv') else pd.read_excel(self.file_path)

        return df, self.status, self.errors


In [71]:
class HeaderNormalizer:
    def __init__(self, df, header_mapping):
        self.df = df
        self.header_mapping = {k.lower(): v for k, v in header_mapping.items()}
        self.original_headers = list(df.columns)
        self.mapped_headers = []
        self.status = "PASS"

    def normalize_headers(self):
        new_cols = []
        for col in self.df.columns:
            new_cols.append(self.header_mapping.get(col.lower(), col))
        self.df.columns = new_cols
        self.mapped_headers = new_cols
        print(" Header Normalization Completed")
        return self.df


In [72]:
class SchemaValidator:
    def __init__(self, df, mandatory_columns=None, duplicate_mapping=None):
        self.df = df
        self.logs = []
        self.status = "PASS"
        self.mandatory_columns = mandatory_columns or ["MeterNumber", "Datetime", "Voltage" ]
        self.duplicate_mapping = duplicate_mapping or {
            "meter_no": "meter_id",
            "meter_number": "meter_id",
            "kwh": "energy"
        }

    def check_mandatory(self):
        missing = [col for col in self.mandatory_columns if col not in self.df.columns]
        if missing:
            self.logs.append(f" Missing mandatory columns: {missing}")
            self.status = "PARTIAL_INGEST"
        return missing

    def remove_duplicate_columns(self):
        for dup, main in self.duplicate_mapping.items():
            if dup in self.df.columns and main in self.df.columns:
                self.logs.append(f" Duplicate: {dup} & {main}, keeping {main}")
                self.df.drop(columns=[dup], inplace=True)

    def run(self):
        print("üîç Running Schema-Level Validation...")
        self.check_mandatory()
        self.remove_duplicate_columns()
        return self.df, self.status, self.logs


In [73]:
import pandas as pd
from datetime import datetime, timedelta

class RowValidator:
    def __init__(self, df, meter_col="MeterNumber", consumer_col="ConsumerID", timestamp_col="Datetime"):
        self.df = df
        self.logs = []
        self.status = "PASS"
        self.meter_col = meter_col
        self.consumer_col = consumer_col
        self.timestamp_col = timestamp_col

    # 1. Timestamp Parse Validation
    def parse_timestamp(self):
        possible_formats = [
            "%Y-%m-%d %H:%M:%S",
            "%d-%m-%Y %H:%M",
            "%Y/%m/%d %H:%M",
            "%d/%m/%Y %H:%M:%S",
            "%m-%d-%Y %H:%M:%S"
        ]
        parsed_timestamps = []
        failed_rows = 0

        for ts in self.df[self.timestamp_col]:
            parsed = None
            for fmt in possible_formats:
                try:
                    parsed = datetime.strptime(str(ts), fmt)
                    break
                except:
                    continue
            if parsed:
                parsed_timestamps.append(parsed)
            else:
                parsed_timestamps.append(None)
                failed_rows += 1

        self.df[self.timestamp_col] = parsed_timestamps

        if failed_rows > 0:
            self.status = "TIMESTAMP_FIXER"
            self.logs.append(f" {failed_rows} timestamps could not be parsed. Send to 'timestamp fixer'.")

    # 2. Timestamp Continuity Check (30 min ¬± 5 min)
    def check_timestamp_continuity(self):
        self.df = self.df.sort_values(by=[self.meter_col, self.timestamp_col])
        missing_intervals = 0

        for meter_id, group in self.df.groupby(self.meter_col):
            times = list(group[self.timestamp_col])
            for i in range(1, len(times)):
                if times[i] and times[i-1]:
                    diff = (times[i] - times[i-1]).total_seconds() / 60
                    if not (25 <= diff <= 35):  # 30 min ¬± 5 min
                        missing_intervals += 1

        if missing_intervals > 0:
            self.status = "MISSING_INTERVAL"
            self.logs.append(f" {missing_intervals} timestamp gaps not within 30¬±5 minutes.")

    # 3. Meter ‚Üí Consumer Mapping Consistency
    def check_meter_consumer_binding(self):
        conflicts = 0
        meter_map = {}

        for _, row in self.df.iterrows():
            meter = row[self.meter_col]
            consumer = row[self.consumer_col]
            if meter in meter_map:
                if meter_map[meter] != consumer:
                    conflicts += 1
            else:
                meter_map[meter] = consumer

        if conflicts > 0:
            self.status = "ID_CONFLICT"
            self.logs.append(f" {conflicts} meter-consumer conflicts found (same meter, different consumer)!")

    # Run all validations
    def run(self):
        print(" Running Row-Level Validation...")
        self.parse_timestamp()
        self.check_timestamp_continuity()
        self.check_meter_consumer_binding()

        return self.df, self.status, self.logs


In [74]:
class DataPipeline:
    def __init__(self, file_path, header_mapping):
        self.file_path = file_path
        self.header_mapping = header_mapping

    def run(self):
        print("\n Starting Full Pipeline...\n")

        # Step 1: File Validation
        file_validator = FileValidator(self.file_path)
        df, status, errors = file_validator.run()
        if df is None:
            return status, errors

        # Step 2: Header Normalization
        normalizer = HeaderNormalizer(df, self.header_mapping)
        df = normalizer.normalize_headers()

        # Step 3: Schema Validation
        schema = SchemaValidator(df)
        df, schema_status, schema_logs = schema.run()

        # Step 4: Row-Level Validation
        row_validator = RowValidator(df)
        df, row_status, row_logs = row_validator.run()

        final_status = "PASS" if status == "PASS" and schema_status == "PASS" and row_status == "PASS" else "PARTIAL/FAIL"
        all_logs = errors + schema_logs + row_logs
        return final_status, all_logs


In [76]:
import pandas as pd

class ValueValidator:
    def __init__(self, df, energy_col="Energy", voltage_col="Voltage",
                 current_col="Current", pf_col="PowerFactor", freq_col="Frequency"):
        self.df = df
        self.logs = []
        self.status = "PASS"
        self.energy_col = energy_col
        self.voltage_col = voltage_col
        self.current_col = current_col
        self.pf_col = pf_col
        self.freq_col = freq_col

    # 1. Non-negative & Non-decreasing Energy
    def check_energy_values(self):
        if self.energy_col not in self.df.columns:
            self.logs.append(f"‚ùå Missing column: {self.energy_col}")
            self.status = "FAIL"
            return

        negative_values = (self.df[self.energy_col] < 0).sum()
        if negative_values > 0:
            self.logs.append(f"‚ö† {negative_values} negative energy values found! (BAD_READING)")
            self.status = "BAD_READING"

        decreasing = (self.df[self.energy_col].diff() < 0).sum()
        if decreasing > 0:
            self.logs.append(f"‚ö† {decreasing} decreasing energy values detected! (ROLL_OVER)")
            self.status = "ROLL_OVER"

    # 2. Interval Energy Limits (default: ‚â§10kWh per 30 min)
    def check_interval_bounds(self, limit=10):
        diff_values = self.df[self.energy_col].diff()
        outliers = (diff_values > limit).sum()

        if outliers > 0:
            self.logs.append(f"‚ö† {outliers} rows exceed {limit} kWh interval limit! (OUTLIER_INTERVAL)")
            self.status = "OUTLIER_INTERVAL"

    # 3. Voltage Range (180 - 260V)
    def check_voltage(self):
        anomalies = ((self.df[self.voltage_col] < 180) | (self.df[self.voltage_col] > 260)).sum()
        if anomalies > 0:
            self.logs.append(f"‚ö† {anomalies} voltage anomalies found! (VOLTAGE_ANOMALY)")
            self.status = "VOLTAGE_ANOMALY"

    # 4. Current Range (0 - 60A)
    def check_current(self, max_current=60):
        anomalies = ((self.df[self.current_col] < 0) | 
                     (self.df[self.current_col] > max_current)).sum()
        if anomalies > 0:
            self.logs.append(f"‚ö† {anomalies} current anomalies! (CURRENT_ANOMALY)")
            self.status = "CURRENT_ANOMALY"

    # 5. Power Factor Range (-1 to 1)
    def check_power_factor(self):
        anomalies = ((self.df[self.pf_col] < -1) | (self.df[self.pf_col] > 1)).sum()
        if anomalies > 0:
            self.logs.append(f"‚ö† {anomalies} PF values out of range! (PF_OUT_OF_RANGE)")
            self.status = "PF_OUT_OF_RANGE"

    # 6. Frequency Range (49 - 51Hz)
    def check_frequency(self):
        anomalies = ((self.df[self.freq_col] < 49) | (self.df[self.freq_col] > 51)).sum()
        if anomalies > 0:
            self.logs.append(f"‚ö† {anomalies} frequency anomalies! (FREQ_ANOMALY)")
            self.status = "FREQ_ANOMALY"

    # Run all value validations
    def run(self):
        print("üîç Running Value-Level Validation...")
        self.check_energy_values()
        self.check_interval_bounds()
        self.check_voltage()
        self.check_current()
        self.check_power_factor()
        self.check_frequency()

        return self.df, self.status, self.logs


In [77]:
header_map = {
    "meter no": "meter_id",
    "meter_no": "meter_id",
    "reading_time": "timestamp",
    "kwh": "energy"
}

pipeline = DataPipeline(r"D:\AI Projects\powerthon\VALIDATIONS\Powerthon_IP_DATA_Adani_Project_V2.xlsx", header_map)
status, logs = pipeline.run()

print("\n Final Pipeline Status:", status)
print(" Logs:", logs)



 Starting Full Pipeline...

üîç Running File-Level Validation...
 Header Normalization Completed
üîç Running Schema-Level Validation...
 Running Row-Level Validation...

 Final Pipeline Status: PARTIAL/FAIL
 Logs: [" 640292 timestamps could not be parsed. Send to 'timestamp fixer'."]
