In [1]:
import pandas as pd
import numpy as np
import re
from pathlib import Path

class DataCleaningPipeline:
    """A flexible data cleaning pipeline for Excel and CSV files with multiple sheets."""
    
    def __init__(self):
        self.cleaning_functions = {
            'remove_empty_rows_cols': self.remove_empty_rows_cols,
            'standardize_column_names': self.standardize_column_names,
            'convert_data_types': self.convert_data_types,
            'handle_missing_values': self.handle_missing_values,
            'remove_duplicates': self.remove_duplicates,
            'clean_text_fields': self.clean_text_fields,
            'handle_outliers': self.handle_outliers,
            'extract_datetime_features': self.extract_datetime_features
        }
    
    def process_file(self, file_path, sheet_name=None, cleaning_steps=None):
        """
        Process a file (Excel or CSV) with the specified cleaning steps.
        
        Parameters:
        file_path (str): Path to the file
        sheet_name (str or list): Specific sheet name(s) to process (for Excel files)
        cleaning_steps (list): List of cleaning steps to apply
        
        Returns:
        dict: Dictionary of cleaned DataFrames
        """
        file_path = Path(file_path)
        if not file_path.exists():
            raise FileNotFoundError(f"File not found: {file_path}")
        
        # Determine file type and read data
        if file_path.suffix.lower() in ['.xlsx', '.xls']:
            return self.process_excel(file_path, sheet_name, cleaning_steps)
        elif file_path.suffix.lower() == '.csv':
            return self.process_csv(file_path, cleaning_steps)
        else:
            raise ValueError(f"Unsupported file format: {file_path.suffix}")
    
    def process_excel(self, file_path, sheet_name=None, cleaning_steps=None):
        """Process an Excel file with multiple sheets."""
        # Read all sheets or specific sheets
        if sheet_name is None:
            sheets = pd.read_excel(file_path, sheet_name=None)
        else:
            if isinstance(sheet_name, str):
                sheet_name = [sheet_name]
            sheets = {}
            for sheet in sheet_name:
                try:
                    sheets[sheet] = pd.read_excel(file_path, sheet_name=sheet)
                except Exception as e:
                    print(f"Warning: Could not read sheet '{sheet}': {e}")
        
        # Apply cleaning steps to each sheet
        cleaned_sheets = {}
        for name, df in sheets.items():
            print(f"Processing sheet: {name}")
            cleaned_df = self.apply_cleaning_steps(df, cleaning_steps)
            cleaned_sheets[name] = cleaned_df
        
        return cleaned_sheets
    
    def process_csv(self, file_path, cleaning_steps=None):
        """Process a CSV file."""
        df = pd.read_csv(file_path)
        cleaned_df = self.apply_cleaning_steps(df, cleaning_steps)
        return {'data': cleaned_df}
    
    def apply_cleaning_steps(self, df, cleaning_steps):
        """Apply the specified cleaning steps to a DataFrame."""
        if cleaning_steps is None:
            # Apply all cleaning steps if none specified
            cleaning_steps = list(self.cleaning_functions.keys())
        
        for step in cleaning_steps:
            if step in self.cleaning_functions:
                print(f"  Applying step: {step}")
                df = self.cleaning_functions[step](df)
            else:
                print(f"  Warning: Unknown cleaning step '{step}'")
        
        return df
    
    def remove_empty_rows_cols(self, df):
        """Remove completely empty rows and columns."""
        # Remove rows where all values are missing
        df = df.dropna(how='all')
        
        # Remove columns where all values are missing
        df = df.dropna(axis=1, how='all')
        
        return df
    
    def standardize_column_names(self, df):
        """Standardize column names to snake_case."""
        new_columns = []
        for col in df.columns:
            # Handle non-string column names
            if not isinstance(col, str):
                col = str(col)
            
            # Remove special characters and extra spaces
            col = re.sub(r'[^a-zA-Z0-9\s]', ' ', col)
            col = re.sub(r'\s+', ' ', col).strip()
            
            # Convert to snake_case
            col = col.replace(' ', '_').lower()
            
            new_columns.append(col)
        
        df.columns = new_columns
        return df
    
    def convert_data_types(self, df):
        """Convert data types appropriately."""
        for col in df.columns:
            # Skip if all values are missing
            if df[col].isna().all():
                continue
            
            # Try to convert to numeric
            try:
                # Check if the column contains numeric values stored as strings
                if df[col].dtype == 'object':
                    # Try to convert to numeric, handling commas and other non-numeric characters
                    converted = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
                    # Only convert if we successfully converted most values
                    if converted.notna().mean() > 0.8:  # If more than 80% converted successfully
                        df[col] = converted
            except:
                pass
            
            # Try to convert to datetime
            try:
                if df[col].dtype == 'object':
                    # Try to convert to datetime
                    converted = pd.to_datetime(df[col], errors='coerce')
                    # Only convert if we successfully converted most values
                    if converted.notna().mean() > 0.8:  # If more than 80% converted successfully
                        df[col] = converted
            except:
                pass
        
        return df
    
    def handle_missing_values(self, df):
        """Handle missing values based on data type."""
        for col in df.columns:
            if df[col].isna().any():
                if df[col].dtype in ['int64', 'float64']:
                    # For numeric columns, fill with median
                    df[col] = df[col].fillna(df[col].median())
                elif df[col].dtype == 'object':
                    # For categorical columns, fill with mode
                    df[col] = df[col].fillna(df[col].mode()[0] if not df[col].mode().empty else 'Unknown')
                elif pd.api.types.is_datetime64_any_dtype(df[col]):
                    # For datetime columns, fill with the most recent date
                    df[col] = df[col].fillna(df[col].max())
        
        return df
    
    def remove_duplicates(self, df):
        """Remove duplicate rows."""
        return df.drop_duplicates()
    
    def clean_text_fields(self, df):
        """Clean text fields by removing extra whitespace and standardizing."""
        for col in df.columns:
            if df[col].dtype == 'object':
                # Remove leading/trailing whitespace
                df[col] = df[col].astype(str).str.strip()
                
                # Replace multiple spaces with single space
                df[col] = df[col].str.replace(r'\s+', ' ', regex=True)
                
                # Standardize case (title case for proper names, lower for others)
                if any(keyword in col.lower() for keyword in ['name', 'title', 'category']):
                    df[col] = df[col].str.title()
                else:
                    df[col] = df[col].str.lower()
        
        return df
    
    def handle_outliers(self, df):
        """Handle outliers in numeric columns using IQR method."""
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        
        for col in numeric_cols:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            
            # Define bounds
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            # Cap outliers
            df[col] = np.where(df[col] < lower_bound, lower_bound, df[col])
            df[col] = np.where(df[col] > upper_bound, upper_bound, df[col])
        
        return df
    
    def extract_datetime_features(self, df):
        """Extract features from datetime columns."""
        datetime_cols = df.select_dtypes(include=['datetime64[ns]']).columns
        
        for col in datetime_cols:
            # Extract year, month, day, etc.
            df[f'{col}_year'] = df[col].dt.year
            df[f'{col}_month'] = df[col].dt.month
            df[f'{col}_day'] = df[col].dt.day
            df[f'{col}_dayofweek'] = df[col].dt.dayofweek
            df[f'{col}_quarter'] = df[col].dt.quarter
        
        return df
    
    def save_cleaned_data(self, data_dict, output_path, format='excel'):
        """Save the cleaned data to file(s)."""
        if format == 'excel':
            with pd.ExcelWriter(output_path) as writer:
                for sheet_name, df in data_dict.items():
                    df.to_excel(writer, sheet_name=sheet_name, index=False)
        elif format == 'csv':
            # Create a directory for CSV files
            output_dir = Path(output_path).parent / 'cleaned_data'
            output_dir.mkdir(exist_ok=True)
            
            for sheet_name, df in data_dict.items():
                safe_name = re.sub(r'[^a-zA-Z0-9]', '_', sheet_name)
                df.to_csv(output_dir / f'{safe_name}.csv', index=False)
        else:
            raise ValueError(f"Unsupported output format: {format}")

# Example usage
if __name__ == "__main__":
    # Initialize the pipeline
    pipeline = DataCleaningPipeline()
    
    # Define the cleaning steps to apply
    cleaning_steps = [
        'remove_empty_rows_cols',
        'standardize_column_names',
        'convert_data_types',
        'handle_missing_values',
        'remove_duplicates',
        'clean_text_fields',
        'handle_outliers'
    ]
    
    # Process a file
    try:
        # For Excel files with multiple sheets
        cleaned_data = pipeline.process_file(
            file_path="your_data_file.xlsx",
            cleaning_steps=cleaning_steps
        )
        
        # Save the cleaned data
        pipeline.save_cleaned_data(cleaned_data, "cleaned_data.xlsx")
        
        print("Data cleaning completed successfully!")
        
    except Exception as e:
        print(f"Error during data cleaning: {e}")

Error during data cleaning: File not found: your_data_file.xlsx


In [2]:
pipline = DataCleaningPipeline()
cleaning_steps = ['standardize_column_name', 'convert_data_types', 'handle_missing_values',
                  'remove-dublicate', 'clean_text_fields']

cleaned_data = pipeline.process_file('../GovHack-Team-ASTRA/Datasets/same_dataset/agency-foi-data-2023-24.xlsx', cleaning_steps=cleaning_steps)

Processing sheet: Index
  Applying step: convert_data_types
  Applying step: handle_missing_values
  Applying step: clean_text_fields
Processing sheet: Request numbers
  Applying step: convert_data_types
  Applying step: handle_missing_values
  Applying step: clean_text_fields
Processing sheet: Action on requests
  Applying step: convert_data_types
  Applying step: handle_missing_values
  Applying step: clean_text_fields
Processing sheet: Response times
  Applying step: convert_data_types
  Applying step: handle_missing_values
  Applying step: clean_text_fields
Processing sheet: Charges
  Applying step: convert_data_types
  Applying step: handle_missing_values
  Applying step: clean_text_fields
Processing sheet: Internal review
  Applying step: convert_data_types
  Applying step: handle_missing_values
  Applying step: clean_text_fields
Processing sheet: Section 48 primary
  Applying step: convert_data_types
  Applying step: handle_missing_values
  Applying step: clean_text_fields
Proce

  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')


  Applying step: handle_missing_values
  Applying step: clean_text_fields
Processing sheet: IPS Summary of salary & admin c
  Applying step: convert_data_types
  Applying step: handle_missing_values
  Applying step: clean_text_fields
Processing sheet: FOI non-labour costs
  Applying step: convert_data_types
  Applying step: handle_missing_values
  Applying step: clean_text_fields
Processing sheet: IPS non-labour costs
  Applying step: convert_data_types
  Applying step: handle_missing_values
  Applying step: clean_text_fields
Processing sheet: Practical refusal
  Applying step: convert_data_types
  Applying step: handle_missing_values
  Applying step: clean_text_fields
Processing sheet: Exemptions
  Applying step: convert_data_types
  Applying step: handle_missing_values
  Applying step: clean_text_fields
Processing sheet: Staff years and costs by level
  Applying step: convert_data_types
  Applying step: handle_missing_values
  Applying step: clean_text_fields
Processing sheet: Agency

  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')
  converted = pd.to_datetime(df[col], errors='coerce')


In [3]:
pipeline.save_cleaned_data(cleaned_data, 'testing.xlsx')