In [None]:
# ==============================================================================
# DATA CLEANING AND FORMATTING SCRIPT FOR CALL CENTER DATASET
# ==============================================================================
"""
This script processes a call center dataset by:
1. Cleaning and standardizing data
2. Applying specific formatting rules
3. Exporting to Excel with custom formatting
"""

# ----------------------
# LIBRARY IMPORTS
# ----------------------
from google.colab import drive
import pandas as pd
import numpy as np
import re
import openpyxl
from openpyxl.styles import Border, Side, Font, Alignment
from datetime import datetime
import os
import logging
import time

# ----------------------
# LOGGING CONFIGURATION
# ----------------------
# Set up logging to track script execution and capture any issues
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

def main():
    """
    Main function to orchestrate the data cleaning and formatting process
    """
    start_time = time.time()
    logger.info("Starting data cleaning process")

    try:
        # Mount Google Drive
        mount_drive()

        # Define file paths
        input_path, output_path = setup_file_paths()

        # Load and process data
        df = load_data(input_path)
        df_clean = clean_and_transform_data(df)

        # Export to Excel with formatting
        export_and_format_excel(df_clean, output_path)

        execution_time = round(time.time() - start_time, 2)
        logger.info(f"✅ Process completed successfully in {execution_time} seconds")
        print(f"✅ Cleaned dataset successfully saved to:\n{output_path}")
        print(f"✅ Processing time: {execution_time} seconds")

    except Exception as e:
        logger.error(f"Error in data processing: {str(e)}")
        print(f"❌ Error occurred: {str(e)}")

def mount_drive():
    """Mount Google Drive for file access"""
    try:
        drive.mount('/content/drive')
        logger.info("Google Drive mounted successfully")
    except Exception as e:
        logger.error(f"Failed to mount Google Drive: {str(e)}")
        raise Exception("Drive mounting failed. Please check your connection.")

def setup_file_paths():
    """Define and validate input/output file paths"""
    base_path = '/content/drive/My Drive/Data_Analyst/Portfolio_Projects/Projects/Python'
    input_path = f"{base_path}/Calls_Dataset_for_Data_Cleaning.xlsx"
    output_path = f"{base_path}/Calls_Dataset_After_Cleaning.xlsx"

    # Create backup of output file if it already exists
    if os.path.exists(output_path):
        backup_path = f"{output_path.split('.')[0]}_backup_{int(time.time())}.xlsx"
        try:
            os.rename(output_path, backup_path)
            logger.info(f"Created backup of existing output file: {backup_path}")
        except Exception as e:
            logger.warning(f"Could not create backup: {str(e)}")

    # Verify input file exists
    if not os.path.exists(input_path):
        logger.error(f"Input file not found: {input_path}")
        raise FileNotFoundError(f"Input file not found: {input_path}")

    return input_path, output_path

def load_data(file_path):
    """Load Excel data into DataFrame with error handling"""
    try:
        logger.info(f"Loading data from {file_path}")
        df = pd.read_excel(file_path)
        logger.info(f"Successfully loaded {len(df)} rows and {len(df.columns)} columns")
        return df
    except Exception as e:
        logger.error(f"Failed to load data: {str(e)}")
        raise Exception(f"Data loading failed: {str(e)}")

def clean_and_transform_data(df):
    """
    Clean and transform the dataset through multiple steps
    Returns a copy of the processed dataframe
    """
    # Work with a copy to preserve original data
    df_clean = df.copy()

    # Step 1: Clean column names
    df_clean = clean_column_names(df_clean)

    # Step 2: Remove duplicates and irrelevant columns
    df_clean = remove_duplicates_and_irrelevant_columns(df_clean)

    # Step 3: Handle missing values
    df_clean = handle_missing_values(df_clean)

    # Step 4: Clean string columns
    df_clean = clean_string_columns(df_clean)

    # Step 5: Clean phone numbers
    df_clean = clean_phone_numbers(df_clean)

    # Step 6: Standardize Yes/No fields
    df_clean = standardize_yes_no_fields(df_clean)

    # Step 7: Filter unwanted records
    df_clean = filter_unwanted_records(df_clean)

    # Step 8: Process address fields
    df_clean = process_address_fields(df_clean)

    # Step 9: Clean ZIP codes
    df_clean = clean_zip_codes(df_clean)

    # Step 10: Convert Column H values
    df_clean = convert_column_h_values(df_clean)

    # Step 11: Format date in Column B
    df_clean = format_date_column(df_clean)

    # Step 12: Remove first column (as requested)
    if len(df_clean.columns) > 0:
        df_clean = df_clean.drop(columns=[df_clean.columns[0]])
        logger.info("Removed first column as requested")

    # Step 13: Reset index
    df_clean.reset_index(drop=True, inplace=True)

    return df_clean

def clean_column_names(df):
    """Clean and standardize column names"""
    logger.info("Cleaning column names")
    original_columns = df.columns.tolist()

    df.columns = (
        df.columns
        .str.strip()
        .str.replace(" ", "_")
        .str.replace(r"[^\w]", "", regex=True)
    )

    # Log column name changes
    for i, (old, new) in enumerate(zip(original_columns, df.columns)):
        if old != new:
            logger.info(f"Column renamed: '{old}' -> '{new}'")

    return df

def remove_duplicates_and_irrelevant_columns(df):
    """Remove duplicate rows and unnecessary columns"""
    # Count duplicates before removal
    duplicate_count = df.duplicated().sum()
    if duplicate_count > 0:
        logger.info(f"Removing {duplicate_count} duplicate rows")
        df.drop_duplicates(inplace=True)
    else:
        logger.info("No duplicate rows found")

    # Remove irrelevant columns if they exist
    irrelevant_cols = ['Not_Useful_Column']
    columns_to_drop = [col for col in irrelevant_cols if col in df.columns]
    if columns_to_drop:
        logger.info(f"Dropping irrelevant columns: {columns_to_drop}")
        df.drop(columns=columns_to_drop, inplace=True)

    return df

def handle_missing_values(df):
    """Handle missing values in the dataset"""
    # Count missing values before filling
    missing_counts = df.isna().sum()
    total_missing = missing_counts.sum()

    if total_missing > 0:
        logger.info(f"Replacing {total_missing} missing values with empty strings")
        # Log columns with missing values
        for col, count in missing_counts.items():
            if count > 0:
                logger.info(f"  - Column '{col}': {count} missing values")
        df.fillna('', inplace=True)
    else:
        logger.info("No missing values found")

    return df

def clean_string_columns(df):
    """Clean and standardize string columns"""
    text_columns = df.select_dtypes(include='object').columns.tolist()
    logger.info(f"Cleaning {len(text_columns)} text columns")

    for col in text_columns:
        df[col] = df[col].astype(str).str.strip()

    # Special cleaning for Last_Name if it exists
    if 'Last_Name' in df.columns:
        logger.info("Applying special cleaning to Last_Name column")
        df['Last_Name'] = df['Last_Name'].str.strip("123._/").str.strip()

    return df

def clean_phone_numbers(df):
    """Clean and standardize phone numbers"""
    if 'Phone_Number' in df.columns:
        logger.info("Cleaning Phone_Number column")

        # Store original values to count changes
        original_phones = df['Phone_Number'].copy()

        df['Phone_Number'] = (
            df['Phone_Number']
            .str.replace(r'nan--|Na--', '', regex=True)
            .str.replace(r'[^0-9]', '', regex=True)
            .apply(lambda x: x if len(x) >= 10 else '')
        )

        # Count how many numbers were cleaned
        cleaned_count = (original_phones != df['Phone_Number']).sum()
        logger.info(f"Cleaned {cleaned_count} phone numbers")

        # Count how many were invalidated (set to empty)
        invalid_count = (df['Phone_Number'] == '').sum()
        if invalid_count > 0:
            logger.info(f"Found {invalid_count} invalid phone numbers")

    return df

def standardize_yes_no_fields(df):
    """Standardize Yes/No fields to Y/N format"""
    if 'Do_Not_Contact' in df.columns:
        logger.info("Standardizing Do_Not_Contact field to Y/N format")

        original_values = df['Do_Not_Contact'].copy()

        df['Do_Not_Contact'] = (
            df['Do_Not_Contact']
            .str.strip()
            .str.replace('Yes', 'Y', regex=False)
            .str.replace('No', 'N', regex=False)
            .str.upper()
        )

        # Count changes
        changes_count = (original_values != df['Do_Not_Contact']).sum()
        logger.info(f"Standardized {changes_count} values in Do_Not_Contact field")

    return df

def filter_unwanted_records(df):
    """Filter out unwanted records based on criteria"""
    original_row_count = len(df)

    # Filter out Do Not Contact records
    if 'Do_Not_Contact' in df.columns:
        do_not_contact_count = (df['Do_Not_Contact'] == 'Y').sum()
        if do_not_contact_count > 0:
            logger.info(f"Removing {do_not_contact_count} 'Do Not Contact' records")
            df = df[df['Do_Not_Contact'] != 'Y']

    # Filter out records with invalid phone numbers
    if 'Phone_Number' in df.columns:
        invalid_phone_count = (df['Phone_Number'] == '').sum()
        if invalid_phone_count > 0:
            logger.info(f"Removing {invalid_phone_count} records with invalid phone numbers")
            df = df[df['Phone_Number'] != '']

    # Report total filtered records
    filtered_count = original_row_count - len(df)
    if filtered_count > 0:
        logger.info(f"Total records filtered: {filtered_count} ({filtered_count/original_row_count:.1%} of data)")

    return df

def process_address_fields(df):
    """Process and split address fields into components"""
    if 'Address' in df.columns:
        logger.info("Processing Address field")

        # Count how many addresses contain commas (splittable)
        comma_count = df['Address'].str.contains(',').sum()
        logger.info(f"Found {comma_count} addresses with comma separators")

        # Split address into components
        split_address = df['Address'].str.split(',', n=2, expand=True)

        df['Street_Address'] = split_address[0].str.strip()
        logger.info("Created Street_Address column")

        if split_address.shape[1] > 1:
            df['State'] = split_address[1].str.strip()
            logger.info("Created State column")
        else:
            df['State'] = ''
            logger.info("Created empty State column (no data available)")

        if split_address.shape[1] > 2:
            df['Zip_Code'] = split_address[2].str.strip()
            logger.info("Created Zip_Code column")
        else:
            df['Zip_Code'] = ''
            logger.info("Created empty Zip_Code column (no data available)")

    return df

def clean_zip_codes(df):
    """Clean and standardize ZIP codes"""
    if 'Zip_Code' in df.columns:
        logger.info("Cleaning ZIP codes")

        original_zips = df['Zip_Code'].copy()
        df['Zip_Code'] = df['Zip_Code'].str.extract(r'(\d{5})')

        # Count valid and invalid ZIP codes
        valid_zip_count = df['Zip_Code'].notna().sum()
        invalid_zip_count = df['Zip_Code'].isna().sum()

        logger.info(f"Extracted {valid_zip_count} valid 5-digit ZIP codes")
        if invalid_zip_count > 0:
            logger.info(f"Found {invalid_zip_count} invalid ZIP codes")

    return df

def convert_column_h_values(df):
    """Convert values in column H from 'Yes' to 'Y' and 'No' to 'N'"""
    # Check if there are enough columns to have an H column
    if len(df.columns) >= 7:  # Column H would be at index 7 (0-based)
        col_h = df.columns[7]
        logger.info(f"Converting Yes/No values in column '{col_h}' (Column H)")

        original_values = df[col_h].copy()

        df[col_h] = (
            df[col_h]
            .astype(str)
            .str.strip()
            .str.replace('Yes', 'Y', regex=False)
            .str.replace('No', 'N', regex=False)
        )

        # Count changes
        changes_count = (original_values != df[col_h]).sum()
        logger.info(f"Converted {changes_count} values in column '{col_h}'")
    else:
        logger.warning("Column H not found - dataset doesn't have enough columns")

    return df

def format_date_column(df):
    """Format date column (Column B) to remove time component"""
    if len(df.columns) > 0:
        col_b = df.columns[0]  # First column
        logger.info(f"Processing date formatting in column '{col_b}' (Column B)")

        # Check if column contains datetime values
        if pd.api.types.is_datetime64_any_dtype(df[col_b]):
            logger.info(f"Converting datetime values in '{col_b}' to date-only format")
            df[col_b] = df[col_b].dt.date
        elif df[col_b].dtype == 'object':
            # Try to convert string dates to proper dates
            try:
                logger.info(f"Attempting to convert string values in '{col_b}' to dates")
                df[col_b] = pd.to_datetime(df[col_b]).dt.date
                logger.info("Conversion successful")
            except Exception as e:
                logger.warning(f"Could not convert column '{col_b}' to date: {str(e)}")

    return df

def export_and_format_excel(df, output_path):
    """Export data to Excel and apply formatting"""
    try:
        logger.info(f"Exporting data to {output_path}")

        # First, export with pandas to create the Excel file
        df.to_excel(output_path, index=True)
        logger.info(f"Basic Excel export completed: {len(df)} rows, {len(df.columns)} columns")

        # Now open with openpyxl to apply formatting
        logger.info("Applying Excel formatting")
        wb = openpyxl.load_workbook(output_path)
        ws = wb.active

        # Define styles
        thin_border = Border(
            left=Side(style='thin'),
            right=Side(style='thin'),
            top=Side(style='thin'),
            bottom=Side(style='thin')
        )
        bold_font = Font(bold=True)
        regular_font = Font(bold=False)
        centered_align = Alignment(horizontal='center')

        # Apply formatting to all cells
        logger.info("Applying borders and font formatting")
        max_row = len(df) + 1  # +1 for header row
        max_col = len(df.columns) + 1  # +1 for index column

        for row_idx, row in enumerate(ws.iter_rows(min_row=1, max_row=max_row, min_col=1, max_col=max_col)):
            for cell in row:
                # Apply border to all cells
                cell.border = thin_border

                # Apply bold font only to header row
                if row_idx == 0:  # First row (headers)
                    cell.font = bold_font
                    cell.alignment = centered_align
                else:  # Data rows
                    cell.font = regular_font

        # Rename index header to "Index"
        logger.info("Setting 'Index' label in cell A1")
        ws['A1'] = 'Index'

        # Apply date formatting to column B
        if max_col > 1:  # Make sure we have at least 2 columns
            logger.info("Setting date format in column B")
            date_column = ws['B']
            for cell in date_column[1:]:  # Skip header
                if isinstance(cell.value, (datetime, str)):
                    try:
                        cell.number_format = 'YYYY-MM-DD'
                    except Exception as e:
                        logger.warning(f"Could not set date format: {str(e)}")

        # Auto-adjust column widths for better visibility
        logger.info("Auto-adjusting column widths")
        for col in ws.columns:
            max_length = 0
            column = col[0].column_letter  # Get column letter

            for cell in col:
                try:
                    if len(str(cell.value)) > max_length:
                        max_length = len(str(cell.value))
                except:
                    pass

            adjusted_width = max_length + 2
            ws.column_dimensions[column].width = min(adjusted_width, 50)  # Cap width at 50

        # Save the workbook
        wb.save(output_path)
        logger.info(f"Excel formatting completed and saved to {output_path}")

    except Exception as e:
        logger.error(f"Error during Excel export/formatting: {str(e)}")
        raise Exception(f"Failed to format Excel: {str(e)}")

# Execute main function when script is run directly
if __name__ == "__main__":
    main()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Cleaned dataset successfully saved to:
/content/drive/My Drive/Data_Analyst/Portfolio_Projects/Projects/Python/Calls_Dataset_After_Cleaning.xlsx
✅ Processing time: 2.6 seconds


  df.fillna('', inplace=True)
