In [2]:
"""
DATA PREPARATION MODULE
AINextBill Technology - Purchase Invoice Analysis

This module handles all data cleaning, standardization, and feature engineering
for the purchase invoice dataset.

Author: Data Science Intern
Date: February 2026
"""

import pandas as pd
import numpy as np
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')


class InvoiceDataPreparation:
    """
    Class to handle end-to-end data preparation for purchase invoices.
    
    This follows production-grade practices:
    - Modular functions for each cleaning step
    - Audit trail for all modifications
    - Validation checks at each stage
    """
    
    def __init__(self, filepath):
        """
        Initialize the data preparation pipeline.
        
        Args:
            filepath (str): Path to the Excel file containing invoice data
        """
        self.filepath = filepath
        self.df = None
        self.df_original = None  # Keep original for comparison
        self.cleaning_report = {}  # Track all changes made
        
    
    def load_data(self):
        """
        Load the Excel file and perform initial validation.
        
        Returns:
            pd.DataFrame: Loaded dataframe
        """
        print("=" * 80)
        print("STEP 1: LOADING DATA")
        print("=" * 80)
        
        try:
            self.df = pd.read_excel(self.filepath)
            self.df_original = self.df.copy()  # Preserve original
            
            print(f"‚úÖ Successfully loaded {len(self.df)} records")
            print(f"‚úÖ Columns: {list(self.df.columns)}")
            print(f"\nData types:")
            print(self.df.dtypes)
            
            # Initial data quality check
            print(f"\n{'='*80}")
            print("INITIAL DATA QUALITY CHECK")
            print("=" * 80)
            missing_counts = self.df.isnull().sum()
            if missing_counts.sum() > 0:
                print("‚ö†Ô∏è  Missing values detected:")
                print(missing_counts[missing_counts > 0])
            else:
                print("‚úÖ No missing values in initial load")
                
            return self.df
            
        except FileNotFoundError:
            print(f"‚ùå ERROR: File not found at {self.filepath}")
            raise
        except Exception as e:
            print(f"‚ùå ERROR: Failed to load data - {str(e)}")
            raise
    
    
    def clean_expense_categories(self):
        """
        Standardize expense category labels to fix spelling inconsistencies.
        
        Problem: Same category appears with multiple spellings:
        - "Raw Material", "raw material", "Raw Materials", "RawMaterial"
        - "Software", "software", "SW", "Software License"
        
        Solution: Create mapping dictionary and standardize all variants.
        """
        print("\n" + "=" * 80)
        print("STEP 2: CLEANING EXPENSE CATEGORIES")
        print("=" * 80)
        
        # First, let's see what we're working with
        print("\nOriginal category distribution:")
        print(self.df['expense_category'].value_counts())
        
        # Create a temporary cleaned column (lowercase, stripped)
        self.df['category_temp'] = self.df['expense_category'].str.strip().str.lower()
        
        # Define the standardization mapping
        # Key: standard name, Value: list of variants to map
        category_mapping = {
            'Raw Material': ['raw material', 'raw materials', 'rawmaterial'],
            'Software': ['software', 'sw', 'software license'],
            'Office Supplies': ['office supplies'],
            'Marketing': ['marketing'],
            'Travel': ['travel'],
            'Utilities': ['utilities'],
            'Maintenance': ['maintenance']
        }
        
        # Apply the mapping
        for standard_name, variants in category_mapping.items():
            for variant in variants:
                self.df.loc[
                    self.df['category_temp'] == variant, 
                    'expense_category'
                ] = standard_name
        
        # Clean up the temporary column
        self.df.drop('category_temp', axis=1, inplace=True)
        
        # Report on changes
        print("\n‚úÖ Standardized category distribution:")
        print(self.df['expense_category'].value_counts())
        
        # Track changes for audit
        original_categories = self.df_original['expense_category'].nunique()
        new_categories = self.df['expense_category'].nunique()
        self.cleaning_report['categories_before'] = original_categories
        self.cleaning_report['categories_after'] = new_categories
        
        print(f"\nüìä Categories reduced from {original_categories} to {new_categories}")
        
        return self.df
    
    
    def fix_missing_gst(self):
        """
        Impute missing GST amounts using the formula: GST = invoice_amount √ó GST_rate / 100
        
        WHY THIS MATTERS:
        - Missing GST leads to understated tax liability
        - Incorrect Input Tax Credit (ITC) claims
        - GSTR-2A reconciliation failures
        
        APPROACH:
        - Calculate GST for records where gst_amount is NULL
        - Flag these records for audit trail
        - Validate calculations
        """
        print("\n" + "=" * 80)
        print("STEP 3: FIXING MISSING GST AMOUNTS")
        print("=" * 80)
        
        # Identify missing GST records
        missing_gst = self.df['gst_amount'].isnull()
        missing_count = missing_gst.sum()
        
        if missing_count == 0:
            print("‚úÖ No missing GST amounts found")
            return self.df
        
        print(f"‚ö†Ô∏è  Found {missing_count} records with missing GST amounts")
        print("\nAffected invoices:")
        print(self.df[missing_gst][['invoice_id', 'vendor_name', 'invoice_amount', 
                                      'gst_rate', 'gst_amount']])
        
        # Calculate missing GST amounts
        self.df.loc[missing_gst, 'gst_amount'] = (
            self.df.loc[missing_gst, 'invoice_amount'] * 
            self.df.loc[missing_gst, 'gst_rate'] / 100
        ).round(2)
        
        # Create audit flag
        self.df['gst_amount_imputed'] = missing_gst
        
        # Calculate total imputed GST value
        total_imputed = self.df.loc[missing_gst, 'gst_amount'].sum()
        
        print(f"\n‚úÖ Imputed GST amounts:")
        print(self.df[missing_gst][['invoice_id', 'vendor_name', 'invoice_amount', 
                                      'gst_rate', 'gst_amount']])
        print(f"\nüí∞ Total imputed GST value: ‚Çπ{total_imputed:,.2f}")
        
        # Track for report
        self.cleaning_report['gst_records_imputed'] = missing_count
        self.cleaning_report['gst_value_imputed'] = total_imputed
        
        return self.df
    
    
    def parse_and_validate_dates(self):
        """
        Convert date strings to proper datetime objects and extract time components.
        
        WHY THIS MATTERS:
        - Enables proper time-series analysis
        - Prevents incorrect monthly/quarterly aggregations
        - Allows for day-of-week, seasonality analysis
        
        FEATURES CREATED:
        - invoice_month: Period for monthly aggregation
        - invoice_year: For year-over-year comparisons
        - invoice_quarter: For quarterly reporting
        - day_of_week: For payment cycle analysis
        """
        print("\n" + "=" * 80)
        print("STEP 4: PARSING AND VALIDATING DATES")
        print("=" * 80)
        
        print(f"Original date type: {self.df['invoice_date'].dtype}")
        print(f"Sample dates: {self.df['invoice_date'].head(3).tolist()}")
        
        # Parse dates with error handling
        self.df['invoice_date'] = pd.to_datetime(
            self.df['invoice_date'], 
            errors='coerce'  # Convert unparseable dates to NaT
        )
        
        # Check for parsing failures
        parsing_failures = self.df['invoice_date'].isnull().sum()
        if parsing_failures > 0:
            print(f"‚ö†Ô∏è  WARNING: {parsing_failures} dates failed to parse")
        else:
            print("‚úÖ All dates parsed successfully")
        
        # Extract time components
        self.df['invoice_month'] = self.df['invoice_date'].dt.to_period('M')
        self.df['invoice_year'] = self.df['invoice_date'].dt.year
        self.df['invoice_quarter'] = self.df['invoice_date'].dt.to_period('Q')
        self.df['day_of_week'] = self.df['invoice_date'].dt.day_name()
        self.df['day_of_month'] = self.df['invoice_date'].dt.day
        
        # Date range validation
        min_date = self.df['invoice_date'].min()
        max_date = self.df['invoice_date'].max()
        date_range_days = (max_date - min_date).days
        
        print(f"\nüìÖ Date range:")
        print(f"   Earliest: {min_date.strftime('%Y-%m-%d')}")
        print(f"   Latest: {max_date.strftime('%Y-%m-%d')}")
        print(f"   Span: {date_range_days} days ({date_range_days/30:.1f} months)")
        
        # Check for future dates (data quality issue)
        today = pd.Timestamp.now()
        future_dates = self.df['invoice_date'] > today
        if future_dates.sum() > 0:
            print(f"‚ö†Ô∏è  WARNING: {future_dates.sum()} invoices have future dates!")
            print(self.df[future_dates][['invoice_id', 'invoice_date', 'vendor_name']])
        
        # Track for report
        self.cleaning_report['date_range_days'] = date_range_days
        self.cleaning_report['date_parsing_failures'] = parsing_failures
        
        return self.df
    
    
    def validate_gst_calculations(self):
        """
        Verify that recorded GST amounts match calculated values.
        
        Formula: expected_gst = invoice_amount √ó gst_rate / 100
        
        WHY THIS MATTERS:
        - Detects data entry errors
        - Identifies rounding inconsistencies
        - Ensures compliance accuracy
        """
        print("\n" + "=" * 80)
        print("STEP 5: VALIDATING GST CALCULATIONS")
        print("=" * 80)
        
        # Calculate expected GST
        self.df['gst_calculated'] = (
            self.df['invoice_amount'] * self.df['gst_rate'] / 100
        ).round(2)
        
        # Calculate variance
        self.df['gst_variance'] = (
            self.df['gst_amount'] - self.df['gst_calculated']
        ).abs()
        
        # Flag significant mismatches (> ‚Çπ0.50)
        threshold = 0.50
        self.df['gst_error_flag'] = self.df['gst_variance'] > threshold
        
        error_count = self.df['gst_error_flag'].sum()
        
        if error_count > 0:
            print(f"‚ö†Ô∏è  Found {error_count} records with GST calculation errors (>‚Çπ{threshold})")
            print("\nAffected invoices:")
            error_records = self.df[self.df['gst_error_flag']][
                ['invoice_id', 'vendor_name', 'invoice_amount', 'gst_rate', 
                 'gst_amount', 'gst_calculated', 'gst_variance']
            ]
            print(error_records)
            
            total_variance = self.df[self.df['gst_error_flag']]['gst_variance'].sum()
            print(f"\nüí∞ Total GST variance: ‚Çπ{total_variance:,.2f}")
        else:
            print(f"‚úÖ All GST calculations are accurate (within ‚Çπ{threshold})")
        
        # Track for report
        self.cleaning_report['gst_errors_found'] = error_count
        
        return self.df
    
    
    def create_derived_features(self):
        """
        Create high-value derived features for accounting analysis.
        
        FEATURES CREATED:
        1. expense_without_gst: True expense (GST is recoverable, not an expense)
        2. gst_effective_rate: Actual GST % paid (handles edge cases)
        3. high_value_flag: Invoices in top 10% (requires scrutiny)
        4. vendor_invoice_count: How many times vendor has billed
        5. monthly_vendor_rank: Vendor rank by spend each month
        """
        print("\n" + "=" * 80)
        print("STEP 6: CREATING DERIVED FEATURES")
        print("=" * 80)
        
        # 1. Expense without GST (THE MOST IMPORTANT METRIC)
        self.df['expense_without_gst'] = self.df['invoice_amount']
        print("‚úÖ Created: expense_without_gst (base invoice amount)")
        
        # 2. Effective GST rate (handles zero amounts)
        self.df['gst_effective_rate'] = np.where(
            self.df['invoice_amount'] > 0,
            (self.df['gst_amount'] / self.df['invoice_amount'] * 100).round(2),
            0
        )
        print("‚úÖ Created: gst_effective_rate (actual GST %)")
        
        # 3. High-value transaction flag
        threshold_90 = self.df['invoice_amount'].quantile(0.90)
        self.df['high_value_flag'] = self.df['invoice_amount'] > threshold_90
        high_value_count = self.df['high_value_flag'].sum()
        print(f"‚úÖ Created: high_value_flag (>‚Çπ{threshold_90:,.2f}, {high_value_count} invoices)")
        
        # 4. Vendor invoice count (frequency analysis)
        vendor_counts = self.df.groupby('vendor_name').size()
        self.df['vendor_invoice_count'] = self.df['vendor_name'].map(vendor_counts)
        print("‚úÖ Created: vendor_invoice_count (vendor frequency)")
        
        # 5. Vendor monthly appearance count
        vendor_month_counts = self.df.groupby('vendor_name')['invoice_month'].nunique()
        self.df['vendor_months_active'] = self.df['vendor_name'].map(vendor_month_counts)
        print("‚úÖ Created: vendor_months_active (consistency metric)")
        
        # 6. Running total by category (cumulative spend)
        self.df = self.df.sort_values('invoice_date')
        self.df['category_cumulative_spend'] = self.df.groupby('expense_category')['invoice_amount'].cumsum()
        print("‚úÖ Created: category_cumulative_spend (trend analysis)")
        
        # 7. Days since last invoice from vendor
        self.df['days_since_last_invoice'] = self.df.groupby('vendor_name')['invoice_date'].diff().dt.days
        print("‚úÖ Created: days_since_last_invoice (payment cycle analysis)")
        
        # Summary statistics
        print(f"\n{'='*80}")
        print("DERIVED FEATURES SUMMARY")
        print("=" * 80)
        print(f"High-value invoices (top 10%): {high_value_count}")
        print(f"Average vendor invoice count: {self.df['vendor_invoice_count'].mean():.1f}")
        print(f"Average vendor months active: {self.df['vendor_months_active'].mean():.1f}")
        
        return self.df
    
    
    def generate_cleaning_report(self):
        """
        Generate a comprehensive summary of all data cleaning operations.
        """
        print("\n" + "=" * 80)
        print("DATA CLEANING SUMMARY REPORT")
        print("=" * 80)
        
        print(f"\nüìä DATASET OVERVIEW:")
        print(f"   Total records: {len(self.df)}")
        print(f"   Date range: {self.cleaning_report.get('date_range_days', 0)} days")
        print(f"   Unique vendors: {self.df['vendor_name'].nunique()}")
        print(f"   Unique categories: {self.cleaning_report.get('categories_after', 0)}")
        
        print(f"\nüîß CLEANING OPERATIONS:")
        print(f"   Categories standardized: {self.cleaning_report.get('categories_before', 0)} ‚Üí {self.cleaning_report.get('categories_after', 0)}")
        print(f"   GST amounts imputed: {self.cleaning_report.get('gst_records_imputed', 0)} (‚Çπ{self.cleaning_report.get('gst_value_imputed', 0):,.2f})")
        print(f"   Date parsing failures: {self.cleaning_report.get('date_parsing_failures', 0)}")
        print(f"   GST calculation errors: {self.cleaning_report.get('gst_errors_found', 0)}")
        
        print(f"\n‚úÖ DATA QUALITY STATUS:")
        total_issues = (
            self.cleaning_report.get('gst_records_imputed', 0) + 
            self.cleaning_report.get('date_parsing_failures', 0) + 
            self.cleaning_report.get('gst_errors_found', 0)
        )
        
        if total_issues == 0:
            print("   üéâ Dataset is clean and ready for analysis!")
        else:
            print(f"   ‚ö†Ô∏è  Total issues addressed: {total_issues}")
            print("   ‚úÖ All critical issues have been resolved")
        
        return self.cleaning_report
    
    
    def save_cleaned_data(self, output_path):
        """
        Save the cleaned dataset to CSV.
        
        Args:
            output_path (str): Path where cleaned CSV will be saved
        """
        print(f"\n{'='*80}")
        print("SAVING CLEANED DATA")
        print("=" * 80)
        
        try:
            self.df.to_csv(output_path, index=False)
            print(f"‚úÖ Cleaned data saved to: {output_path}")
            print(f"   Total records: {len(self.df)}")
            print(f"   Total columns: {len(self.df.columns)}")
            print(f"   File size: {pd.read_csv(output_path).memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")
        except Exception as e:
            print(f"‚ùå ERROR: Failed to save data - {str(e)}")
            raise
    
    
    def run_full_pipeline(self, output_path=None):
        """
        Execute the complete data preparation pipeline.
        
        Args:
            output_path (str, optional): Path to save cleaned data
            
        Returns:
            pd.DataFrame: Fully cleaned and prepared dataset
        """
        print("\n" + "üöÄ" * 40)
        print("STARTING DATA PREPARATION PIPELINE")
        print("üöÄ" * 40)
        
        # Step 1: Load data
        self.load_data()
        
        # Step 2: Clean categories
        self.clean_expense_categories()
        
        # Step 3: Fix missing GST
        self.fix_missing_gst()
        
        # Step 4: Parse dates
        self.parse_and_validate_dates()
        
        # Step 5: Validate GST
        self.validate_gst_calculations()
        
        # Step 6: Create features
        self.create_derived_features()
        
        # Step 7: Generate report
        self.generate_cleaning_report()
        
        # Step 8: Save if path provided
        if output_path:
            self.save_cleaned_data(output_path)
        
        print("\n" + "‚úÖ" * 40)
        print("DATA PREPARATION COMPLETE!")
        print("‚úÖ" * 40)
        
        return self.df


# =============================================================================
# USAGE EXAMPLE
# =============================================================================

if __name__ == "__main__":
    """
    Example usage of the InvoiceDataPreparation class.
    
    This demonstrates how to run the complete pipeline.
    """
    
    # Initialize the data preparation pipeline
    prep = InvoiceDataPreparation(
        filepath='purchase_invoices_dataset.xlsx'
    )
    
    # Run the full pipeline
    cleaned_df = prep.run_full_pipeline(
        output_path='cleaned_invoices.csv'
    )
    
    # Quick preview of cleaned data
    print("\n" + "=" * 80)
    print("CLEANED DATA PREVIEW")
    print("=" * 80)
    print(cleaned_df[['invoice_id', 'invoice_date', 'vendor_name', 
                       'expense_category', 'invoice_amount', 'gst_amount', 
                       'expense_without_gst']].head(10))
    
    print("\n" + "=" * 80)
    print("KEY COLUMNS SUMMARY")
    print("=" * 80)
    print(cleaned_df[['invoice_amount', 'gst_amount', 'total_amount', 
                       'expense_without_gst', 'gst_effective_rate']].describe())


üöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄ
STARTING DATA PREPARATION PIPELINE
üöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄüöÄ
STEP 1: LOADING DATA
‚úÖ Successfully loaded 1500 records
‚úÖ Columns: ['invoice_id', 'invoice_date', 'vendor_name', 'expense_category', 'invoice_amount', 'gst_rate', 'gst_amount', 'total_amount', 'payment_mode', 'invoice_description']

Data types:
invoice_id              object
invoice_date            object
vendor_name             object
expense_category        object
invoice_amount         float64
gst_rate                 int64
gst_amount             float64
total_amount           float64
payment_mode            object
invoice_description     object
dtype: object

INITIAL DATA QUALITY CHECK
‚ö†Ô∏è  Missing values detected:
gst_amou