# 🤖 AI-Powered Data Cleaning Agent - Colab Demo

**GenAI Competition - UoM DSCubed x UWA DSC**  
**Author:** Rudra Tiwari  

This notebook demonstrates how to use the DataCleaningAgent in Google Colab.

## 🚀 Quick Start Guide:
1. **Upload your data** using the file uploader below
2. **Run the cleaning agent** with one line of code
3. **Download your cleaned data** automatically

Let's get started! 🎉


## Step 1: Install Required Packages


In [1]:
# Install required packages
%pip install pandas numpy matplotlib seaborn openpyxl -q

print("✅ All packages installed successfully!")


✅ All packages installed successfully!


## Step 2: Create the DataCleaningAgent Class


In [2]:
# Create the DataCleaningAgent class directly in the notebook
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional, Tuple
import warnings
warnings.filterwarnings('ignore')

class DataCleaningAgent:
    """
    AI-Powered Data Cleaning Agent
    Provides intelligent data cleaning with comprehensive analysis
    """

    def __init__(self):
        self.cleaning_history = []
        self.data_quality_report = {}
        self.cleaning_suggestions = []

    def analyze_data_quality(self, df: pd.DataFrame) -> Dict[str, Any]:
        """Comprehensive data quality analysis"""
        print("🔍 Analyzing Data Quality...")

        analysis = {
            'shape': df.shape,
            'columns': list(df.columns),
            'data_types': df.dtypes.to_dict(),
            'missing_values': df.isnull().sum().to_dict(),
            'missing_percentage': (df.isnull().sum() / len(df) * 100).to_dict(),
            'duplicate_rows': df.duplicated().sum(),
            'duplicate_percentage': (df.duplicated().sum() / len(df) * 100),
            'memory_usage': df.memory_usage(deep=True).sum(),
            'numeric_columns': df.select_dtypes(include=[np.number]).columns.tolist(),
            'categorical_columns': df.select_dtypes(include=['object']).columns.tolist(),
            'datetime_columns': df.select_dtypes(include=['datetime64']).columns.tolist()
        }

        # Detect potential issues
        issues = []
        if analysis['missing_percentage']:
            high_missing = {col: pct for col, pct in analysis['missing_percentage'].items() if pct > 50}
            if high_missing:
                issues.append(f"High missing values (>50%): {high_missing}")

        if analysis['duplicate_percentage'] > 10:
            issues.append(f"High duplicate rate: {analysis['duplicate_percentage']:.1f}%")

        analysis['issues'] = issues
        self.data_quality_report = analysis

        return analysis

    def clean_missing_values(self, df: pd.DataFrame, strategy: str = 'auto') -> pd.DataFrame:
        """Clean missing values with intelligent strategies"""
        print(f"🧹 Cleaning Missing Values using {strategy} strategy...")

        df_cleaned = df.copy()
        changes_made = []

        for column in df_cleaned.columns:
            missing_count = df_cleaned[column].isnull().sum()
            if missing_count > 0:
                if strategy == 'auto':
                    # Intelligent strategy selection
                    if df_cleaned[column].dtype in ['int64', 'float64']:
                        try:
                            if df_cleaned[column].skew() > 1:
                                fill_value = df_cleaned[column].median()
                                method = 'median'
                            else:
                                fill_value = df_cleaned[column].mean()
                                method = 'mean'
                        except:
                            fill_value = 0
                            method = 'zero_fill'
                    else:
                        fill_value = df_cleaned[column].mode().iloc[0] if not df_cleaned[column].mode().empty else 'Unknown'
                        method = 'mode'
                elif strategy == 'drop':
                    df_cleaned = df_cleaned.dropna(subset=[column])
                    method = 'dropped'
                    fill_value = None
                else:
                    continue

                if method != 'dropped':
                    df_cleaned[column] = df_cleaned[column].fillna(fill_value)

                changes_made.append({
                    'column': column,
                    'missing_count': missing_count,
                    'method': method,
                    'fill_value': fill_value
                })

        self.cleaning_history.append({
            'action': 'clean_missing_values',
            'strategy': strategy,
            'changes': changes_made,
            'rows_before': len(df),
            'rows_after': len(df_cleaned)
        })

        print(f"✅ Cleaned {len(changes_made)} columns with missing values")
        return df_cleaned

    def remove_duplicates(self, df: pd.DataFrame, subset: Optional[List[str]] = None, keep: str = 'first') -> pd.DataFrame:
        """Remove duplicate rows"""
        print("🔄 Removing Duplicate Rows...")

        rows_before = len(df)
        df_cleaned = df.drop_duplicates(subset=subset, keep=keep)
        rows_after = len(df_cleaned)
        duplicates_removed = rows_before - rows_after

        self.cleaning_history.append({
            'action': 'remove_duplicates',
            'subset': subset,
            'keep': keep,
            'duplicates_removed': duplicates_removed,
            'rows_before': rows_before,
            'rows_after': rows_after
        })

        print(f"✅ Removed {duplicates_removed} duplicate rows")
        return df_cleaned

    def standardize_data_types(self, df: pd.DataFrame) -> pd.DataFrame:
        """Standardize and optimize data types"""
        print("🔧 Standardizing Data Types...")

        df_cleaned = df.copy()
        changes_made = []

        for column in df_cleaned.columns:
            original_dtype = str(df_cleaned[column].dtype)

            # Optimize numeric columns
            if df_cleaned[column].dtype == 'int64':
                if df_cleaned[column].min() >= 0 and df_cleaned[column].max() <= 255:
                    df_cleaned[column] = df_cleaned[column].astype('uint8')
                elif df_cleaned[column].min() >= -128 and df_cleaned[column].max() <= 127:
                    df_cleaned[column] = df_cleaned[column].astype('int8')
                elif df_cleaned[column].min() >= 0 and df_cleaned[column].max() <= 65535:
                    df_cleaned[column] = df_cleaned[column].astype('uint16')
                elif df_cleaned[column].min() >= -32768 and df_cleaned[column].max() <= 32767:
                    df_cleaned[column] = df_cleaned[column].astype('int16')

            # Optimize float columns
            elif df_cleaned[column].dtype == 'float64':
                df_cleaned[column] = pd.to_numeric(df_cleaned[column], downcast='float')

            # Convert object columns to category if low cardinality
            elif df_cleaned[column].dtype == 'object':
                unique_ratio = df_cleaned[column].nunique() / len(df_cleaned)
                if unique_ratio < 0.5:
                    df_cleaned[column] = df_cleaned[column].astype('category')

            new_dtype = str(df_cleaned[column].dtype)
            if original_dtype != new_dtype:
                changes_made.append({
                    'column': column,
                    'original_dtype': original_dtype,
                    'new_dtype': new_dtype
                })

        self.cleaning_history.append({
            'action': 'standardize_data_types',
            'changes': changes_made
        })

        print(f"✅ Optimized {len(changes_made)} column data types")
        return df_cleaned

    def standardize_text(self, df: pd.DataFrame, columns: Optional[List[str]] = None) -> pd.DataFrame:
        """Standardize text data"""
        print("📝 Standardizing Text Data...")

        df_cleaned = df.copy()
        if columns is None:
            columns = df_cleaned.select_dtypes(include=['object']).columns

        changes_made = []
        for column in columns:
            if df_cleaned[column].dtype == 'object':
                original_sample = df_cleaned[column].dropna().iloc[0] if not df_cleaned[column].dropna().empty else None

                # Standardize text
                df_cleaned[column] = df_cleaned[column].astype(str).str.strip().str.title()

                new_sample = df_cleaned[column].dropna().iloc[0] if not df_cleaned[column].dropna().empty else None
                if original_sample != new_sample:
                    changes_made.append({
                        'column': column,
                        'original_sample': original_sample,
                        'new_sample': new_sample
                    })

        self.cleaning_history.append({
            'action': 'standardize_text',
            'columns': columns,
            'changes': changes_made
        })

        print(f"✅ Standardized text in {len(changes_made)} columns")
        return df_cleaned

    def auto_clean(self, df: pd.DataFrame) -> pd.DataFrame:
        """Perform automatic data cleaning"""
        print("🤖 Starting Automatic Data Cleaning...")
        print("=" * 50)

        # Step 1: Analyze data quality
        self.analyze_data_quality(df)

        # Step 2: Clean missing values
        df_cleaned = self.clean_missing_values(df, strategy='auto')

        # Step 3: Remove duplicates
        df_cleaned = self.remove_duplicates(df_cleaned)

        # Step 4: Standardize data types
        df_cleaned = self.standardize_data_types(df_cleaned)

        # Step 5: Standardize text
        df_cleaned = self.standardize_text(df_cleaned)

        print("=" * 50)
        print("🎉 Automatic Data Cleaning Complete!")

        return df_cleaned

    def generate_report(self) -> str:
        """Generate cleaning report"""
        report = f"""
# Data Cleaning Report
Generated: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}

## Summary
- **Total Actions:** {len(self.cleaning_history)}
- **Actions Performed:** {', '.join([action['action'] for action in self.cleaning_history])}

## Cleaning Actions
"""

        for i, action in enumerate(self.cleaning_history, 1):
            report += f"{i}. **{action['action'].replace('_', ' ').title()}**\n"
            if 'changes' in action:
                report += f"   - Changes made: {len(action['changes'])} columns\n"
            if 'duplicates_removed' in action:
                report += f"   - Duplicates removed: {action['duplicates_removed']}\n"

        return report

print("✅ DataCleaningAgent class created successfully!")


✅ DataCleaningAgent class created successfully!


## Step 3: Upload Your Data


In [3]:
# Upload your data file
from google.colab import files
import io

print("📁 Please upload your data file (CSV or Excel):")
uploaded = files.upload()

# Get the uploaded file
file_name = list(uploaded.keys())[0]
print(f"✅ Uploaded: {file_name}")

# Load the data
if file_name.endswith('.csv'):
    df = pd.read_csv(io.BytesIO(uploaded[file_name]))
elif file_name.endswith(('.xlsx', '.xls')):
    df = pd.read_excel(io.BytesIO(uploaded[file_name]))
else:
    print("❌ Unsupported file format. Please upload CSV or Excel files.")

print(f"📊 Data loaded: {df.shape[0]} rows × {df.shape[1]} columns")
print(f"📋 Columns: {list(df.columns)}")
df.head()


📁 Please upload your data file (CSV or Excel):


Saving WHO Data.xlsx to WHO Data.xlsx
✅ Uploaded: WHO Data.xlsx
📊 Data loaded: 219 rows × 25 columns
📋 Columns: ['Global Health Estimates 2021: Estimated deaths by cause and region, 2000-2021', 'Unnamed: 1', 'Unnamed: 2', 'Unnamed: 3', 'Unnamed: 4', 'Unnamed: 5', 'Unnamed: 6', 'Unnamed: 7', 'Unnamed: 8', 'Unnamed: 9', 'Unnamed: 10', 'Unnamed: 11', 'Unnamed: 12', 'Unnamed: 13', 'Unnamed: 14', 'Unnamed: 15', 'Unnamed: 16', 'Unnamed: 17', 'Unnamed: 18', 'Unnamed: 19', 'Unnamed: 20', 'Unnamed: 21', 'Unnamed: 22', 'Unnamed: 23', 'Unnamed: 24']


Unnamed: 0,"Global Health Estimates 2021: Estimated deaths by cause and region, 2000-2021",Unnamed: 1,Unnamed: 2,Unnamed: 3,Unnamed: 4,Unnamed: 5,Unnamed: 6,Unnamed: 7,Unnamed: 8,Unnamed: 9,...,Unnamed: 15,Unnamed: 16,Unnamed: 17,Unnamed: 18,Unnamed: 19,Unnamed: 20,Unnamed: 21,Unnamed: 22,Unnamed: 23,Unnamed: 24
0,,,,,,Sex,Both sexes,Male,Female,Male,...,,,Female,,,,,,,
1,,,,,,Age group,Total (all ages),,,0-28 days,...,60-69,70+,0-28 days,1-59 months,5-14,15-29,30-49,50-59,60-69,70+
2,Population (thousands),,,,,,7939383.133,3995066.265,3944316.868,6554.456644,...,284738.64,206470.376,6087.864699,321064.753301,657903.377,891073.749,1054417.616,427212.97,311085.839,275470.699
3,Code,Cause of death,,,,,,,,,...,,,,,,,,,,
4,0,,All Causes,,,,68313049.037203,36711370.520266,31601678.516937,1315167.322514,...,7046839.659509,16396306.91162,1057472.07525,1230366.02489,373453.468244,774986.740137,2367606.101871,2945591.598238,5021333.397179,17830869.111129


## Step 4: Use the DataCleaningAgent (One Line!)


In [4]:
# Initialize the agent and clean your data with one line!
agent = DataCleaningAgent()
cleaned_df = agent.auto_clean(df)

print(f"\n📊 Results:")
print(f"Original: {df.shape[0]} rows × {df.shape[1]} columns")
print(f"Cleaned: {cleaned_df.shape[0]} rows × {cleaned_df.shape[1]} columns")
print(f"Missing values: {df.isnull().sum().sum()} → {cleaned_df.isnull().sum().sum()}")
print(f"Duplicates: {df.duplicated().sum()} → {cleaned_df.duplicated().sum()}")

# Show sample of cleaned data
print("\n📋 Sample of cleaned data:")
cleaned_df.head()


🤖 Starting Automatic Data Cleaning...
🔍 Analyzing Data Quality...
🧹 Cleaning Missing Values using auto strategy...
✅ Cleaned 25 columns with missing values
🔄 Removing Duplicate Rows...
✅ Removed 0 duplicate rows
🔧 Standardizing Data Types...
✅ Optimized 6 column data types
📝 Standardizing Text Data...
✅ Standardized text in 17 columns
🎉 Automatic Data Cleaning Complete!

📊 Results:
Original: 219 rows × 25 columns
Cleaned: 219 rows × 25 columns
Missing values: 700 → 0
Duplicates: 0 → 0

📋 Sample of cleaned data:


Unnamed: 0,"Global Health Estimates 2021: Estimated deaths by cause and region, 2000-2021",Unnamed: 1,Unnamed: 2,Unnamed: 3,Unnamed: 4,Unnamed: 5,Unnamed: 6,Unnamed: 7,Unnamed: 8,Unnamed: 9,...,Unnamed: 15,Unnamed: 16,Unnamed: 17,Unnamed: 18,Unnamed: 19,Unnamed: 20,Unnamed: 21,Unnamed: 22,Unnamed: 23,Unnamed: 24
0,Population (Thousands),Cause of death,A.,1.0,A.,Sex,Both Sexes,Male,Female,Male,...,0,0,Female,0,0,0,0,0,0,0
1,Population (Thousands),Cause of death,A.,1.0,A.,Age group,Total (All Ages),0,0,0-28 days,...,60-69,70+,0-28 days,1-59 Months,5-14,15-29,30-49,50-59,60-69,70+
2,Population (Thousands),Cause of death,A.,1.0,A.,Acute glomerulonephritis,7939383.133,3995066.265,3944316.868,6554.456644,...,284738.64,206470.376,6087.864699,321064.753300525,657903.377,891073.749,1054417.616,427212.97,311085.839,275470.699
3,Code,Cause of death,A.,1.0,A.,Acute glomerulonephritis,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,0,Cause of death,All Causes,1.0,A.,Acute glomerulonephritis,68313049.0372033,36711370.5202663,31601678.516937,1315167.322514,...,7046839.65950851,16396306.9116199,1057472.07525,1230366.0248899,373453.468244434,774986.740136733,2367606.10187068,2945591.59823817,5021333.39717872,17830869.1111286


## Step 5: Download Your Cleaned Data


In [5]:
# Save cleaned data
cleaned_df.to_csv('cleaned_data.csv', index=False)
cleaned_df.to_excel('cleaned_data.xlsx', index=False)

# Generate and save report
report = agent.generate_report()
with open('cleaning_report.md', 'w') as f:
    f.write(report)

print("💾 Cleaned data saved!")
print("📥 Download your files:")

# Download the files
files.download('cleaned_data.csv')
files.download('cleaned_data.xlsx')
files.download('cleaning_report.md')

print("✅ All files downloaded successfully!")
print("\n🎉 Data cleaning complete! Your data is now clean and ready for analysis!")


💾 Cleaned data saved!
📥 Download your files:


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

✅ All files downloaded successfully!

🎉 Data cleaning complete! Your data is now clean and ready for analysis!
