## **Feature:** outliers

**Names:** Hailing Chen

### **What it does**
Handles outliers.

In [2]:
# Load dotenv
import os
from dotenv import load_dotenv
load_dotenv()

# Get API Key
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    print("OpenAI API Key not found")

# Import libraries
from pathlib import Path
import pandas as pd
import numpy as np
from scipy import stats
import io
import contextlib

# Langchain imports
from langchain_openai import ChatOpenAI  
from langchain.schema import HumanMessage, SystemMessage

### **Helper Functions**

- `analyze_outliers(df, iqr_multiplier=1.5, z_threshold=3, contamination=0.1)`: Analyze outliers and suggest handling strategies based on heuristics.
- `get_outlier_details(df, col, method='iqr', iqr_multiplier=1.5, z_threshold=3)`: Get detailed information about outliers in a specific column
- `handle_outliers(df, col, method='cap', percentiles=(5, 95), fill_value=None)`: Handle outliers in a specific column using methods including 'cap', 'remove', 'transform', 'fill'

In [3]:
def analyze_outliers(df, iqr_multiplier=1.5, z_threshold=3, contamination=0.1):
    """
    Analyze outliers and suggest handling strategies based on heuristics.
    
    Parameters:
    - df: pandas DataFrame
    - iqr_multiplier: multiplier for IQR method (default 1.5)
    - z_threshold: threshold for Z-score method (default 3)
    - contamination: expected proportion of outliers for isolation forest (default 0.1)
    
    Returns:
    - DataFrame with outlier analysis and suggestions for each numeric column
    """
    
    df.columns = df.columns.str.replace(' ', '')
    suggestions = {}
    
    # Only analyze numeric columns
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    
    for col in numeric_cols:
        if df[col].isna().all():
            continue
            
        clean_data = df[col].dropna()
        if len(clean_data) < 10:  # Skip if too few data points
            suggestions[col] = {
                "dtype": str(df[col].dtype),
                "sample_size": len(clean_data),
                "outliers_iqr": 0,
                "outliers_zscore": 0,
                "outlier_pct": 0,
                "suggestion": "Insufficient data for outlier detection"
            }
            continue
        
        # IQR Method
        Q1 = clean_data.quantile(0.25)
        Q3 = clean_data.quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - iqr_multiplier * IQR
        upper_bound = Q3 + iqr_multiplier * IQR
        outliers_iqr = ((clean_data < lower_bound) | (clean_data > upper_bound)).sum()
        
        # Z-Score Method
        z_scores = np.abs(stats.zscore(clean_data))
        outliers_zscore = (z_scores > z_threshold).sum()
        
        # Take the more conservative estimate
        outlier_count = min(outliers_iqr, outliers_zscore)
        outlier_pct = outlier_count / len(clean_data)
        
        # Additional statistics for decision making
        n_unique = clean_data.nunique()
        skewness = clean_data.skew()
        kurtosis = clean_data.kurtosis()
        data_range = clean_data.max() - clean_data.min()
        
        # Generate suggestions based on heuristics
        suggestion = None
        
        if outlier_pct == 0:
            suggestion = "No significant outliers detected"
        
        elif outlier_pct > 0.2:  # More than 20% outliers
            if n_unique < 20:  # Likely categorical numeric data
                suggestion = "High outlier rate - check if data is truly numeric or categorical codes"
            else:
                suggestion = "Very high outlier rate (>20%) - investigate data quality or collection process"
        
        elif outlier_pct > 0.1:  # 10-20% outliers
            if abs(skewness) > 2:
                suggestion = "Highly skewed with many outliers - consider log transformation or robust scaling"
            else:
                suggestion = "High outlier rate - use robust methods (median, IQR-based scaling) or cap outliers"
        
        elif outlier_pct > 0.05:  # 5-10% outliers
            if kurtosis > 3:  # Heavy-tailed distribution
                suggestion = "Moderate outliers in heavy-tailed distribution - winsorize or use robust statistics"
            elif abs(skewness) > 1:
                suggestion = "Moderate outliers in skewed data - consider transformation or capping"
            else:
                suggestion = "Moderate outliers - investigate individual cases, consider capping at percentiles"
        
        else:  # Less than 5% outliers
            if n_unique < 50 and data_range > 1000:  # Might be measurement errors
                suggestion = "Few outliers but large range - investigate for measurement errors"
            elif outlier_pct > 0.01:  # 1-5% outliers
                suggestion = "Low outlier rate - investigate individual cases, likely keep unless clear errors"
            else:
                suggestion = "Very few outliers - likely natural variation, keep unless domain knowledge suggests otherwise"
        
        # Special cases
        if col.lower() in ['id', 'index', 'key']:
            suggestion = "ID/Index column - outliers not meaningful"
        elif col.lower() in ['year', 'month', 'day']:
            suggestion = "Date component - investigate outliers for data quality issues"
        elif col.lower() in ['age'] and (clean_data < 0).any():
            suggestion = "Negative age values detected - data quality issue"
        elif col.lower() in ['price', 'cost', 'amount', 'salary'] and (clean_data < 0).any():
            suggestion = "Negative financial values - investigate for data errors"
        
        suggestions[col] = {
            "dtype": str(df[col].dtype),
            "sample_size": len(clean_data),
            "outliers_iqr": outliers_iqr,
            "outliers_zscore": outliers_zscore,
            "outlier_pct": round(outlier_pct, 3),
            "skewness": round(skewness, 3),
            "kurtosis": round(kurtosis, 3),
            "suggestion": suggestion
        }
    
    return pd.DataFrame.from_dict(suggestions, orient="index")

In [4]:
def get_outlier_details(df, col, method='iqr', iqr_multiplier=1.5, z_threshold=3):
    """
    Get detailed information about outliers in a specific column.
    
    Parameters:
    - df: pandas DataFrame
    - column: column name to analyze
    - method: 'iqr', 'zscore', or 'both'
    - iqr_multiplier: multiplier for IQR method
    - z_threshold: threshold for Z-score method
    
    Returns:
    - Dictionary with outlier indices, values, and bounds
    """
    
    if col not in df.columns:
        return {"error": f"Column '{col}' not found"}
    
    if not pd.api.types.is_numeric_dtype(df[col]):
        return {"error": f"Column '{col}' is not numeric"}
    
    clean_data = df[col].dropna()
    results = {}
    
    if method in ['iqr', 'both']:
        Q1 = clean_data.quantile(0.25)
        Q3 = clean_data.quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - iqr_multiplier * IQR
        upper_bound = Q3 + iqr_multiplier * IQR
        
        iqr_outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)].index
        results['iqr'] = {
            'lower_bound': lower_bound,
            'upper_bound': upper_bound,
            'outlier_indices': iqr_outliers.tolist(),
            'outlier_values': df.loc[iqr_outliers, col].tolist(),
            'count': len(iqr_outliers)
        }
    
    if method in ['zscore', 'both']:
        z_scores = np.abs(stats.zscore(clean_data))
        zscore_outliers = clean_data[z_scores > z_threshold].index
        
        results['zscore'] = {
            'threshold': z_threshold,
            'outlier_indices': zscore_outliers.tolist(),
            'outlier_values': df.loc[zscore_outliers, col].tolist(),
            'z_scores': z_scores[z_scores > z_threshold].tolist(),
            'count': len(zscore_outliers)
        }
    
    return results


In [5]:
def handle_outliers(df, col, method='cap', percentiles=(5, 95), fill_value=None):
    """
    Handle outliers in a specific column using various methods.
    
    Parameters:
    - df: pandas DataFrame
    - col: column name to process
    - method: 'cap', 'remove', 'transform', 'fill'
    - percentiles: tuple for capping (lower, upper percentiles)
    - fill_value: value to fill outliers with (for 'fill' method)
    
    Returns:
    - Modified DataFrame
    """
    if col not in df.columns:
        print(f"Column '{col}' not found")
        return df
    
    if not pd.api.types.is_numeric_dtype(df[col]):
        print(f"Column '{col}' is not numeric")
        return df
    
    df_copy = df.copy()
    
    if method == 'cap':
        lower_cap = df_copy[col].quantile(percentiles[0] / 100)
        upper_cap = df_copy[col].quantile(percentiles[1] / 100)
        df_copy[col] = df_copy[col].clip(lower=lower_cap, upper=upper_cap)
        print(f"Capped outliers in '{col}' to [{lower_cap:.2f}, {upper_cap:.2f}]")
    
    elif method == 'remove':
        Q1 = df_copy[col].quantile(0.25)
        Q3 = df_copy[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        before_count = len(df_copy)
        df_copy = df_copy[(df_copy[col] >= lower_bound) & (df_copy[col] <= upper_bound)]
        after_count = len(df_copy)
        print(f"Removed {before_count - after_count} outlier rows from '{col}'")
    
    elif method == 'transform':
        # Log transformation for positive skewed data
        if (df_copy[col] > 0).all():
            df_copy[col] = np.log1p(df_copy[col])
            print(f"Applied log transformation to '{col}'")
        else:
            print(f"Cannot apply log transformation to '{col}' (contains non-positive values)")
            return df
    
    elif method == 'fill':
        if fill_value is None:
            fill_value = df_copy[co].median()
        
        Q1 = df_copy[col].quantile(0.25)
        Q3 = df_copy[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        outlier_mask = (df_copy[col] < lower_bound) | (df_copy[col] > upper_bound)
        outlier_count = outlier_mask.sum()
        df_copy.loc[outlier_mask, col] = fill_value
        print(f"Filled {outlier_count} outliers in '{col}' with {fill_value}")
    
    else:
        print(f"Unknown strategy '{method}', outliers not handled.")
    
    return df_copy

In [6]:
helper_docs = """ Helper functions available:
- get_outliers_detail(df, col, method): Get detailed information about outliers in a specific column.
    - supported methods: 'iqr', 'zscore','both'
- handle_outliers(df, col, method): Handle outliers in a specific column using methods defined.
    - supported methods: 'cap', 'remove', 'transform', 'fill'
"""

In [8]:
helpers = {
    "analyze_outliers": analyze_outliers,
    "get_outlier_details": get_outlier_details,
    "handle_outliers": handle_outliers,
    # add more here
}

# **MAIN FEATURE FUNCTION**

In [None]:
def outliers(df, message):
    """
    Main function that gets called by the main router.
    MUST take (df, message) and return df
    """
    suggestions = analyze_outliers(df)
    # Create message chain
    messages = []
    messages.append(SystemMessage(content=helper_docs))
    messages.append(SystemMessage(content=f"""
    You are a data cleaning agent.
    
    Dataset info: Shape: {df.shape}, Sample: {df.head(3).to_string()}

    Libraries available:
    - pd (pandas), np (numpy)
    - math, re, datetime
    - states from spicy
    
    Rules:
    - Return only executable Python code, no explanations, no markdown blocks
    - Use helper functions if needed
    - ASSUME "df" IS ALREADY DEFINED
    - In order to generate a response/message to the user use print statements
    print("message")
    - Write a detailed print message to summarise actions taken and reasons
    """))
    messages.append(HumanMessage(content=f"User request: {message}"))
    
    # Call LLM with message chain
    llm = ChatOpenAI(temperature=0, model_name="gpt-4o-mini")
    response = llm.invoke(messages)
    generated_code = response.content.strip()
    
    buffer = io.StringIO()
    try:
        exec_globals = {"df": df, "pd": pd, **helpers}
        with contextlib.redirect_stdout(buffer):
            exec(generated_code, exec_globals)
        output = buffer.getvalue().strip()
        
        if not output:
            output = "Code executed, but nothing was printed."
        return output
    
    except Exception as e:
        return f"Error running code:\n{e}\n\nGenerated code:\n{generated_code}"

# **Testing**

In [None]:
# Enter CSV filename from "datasets" folder
dataset_name = "sample-data.csv"

# Build CSV path (to avoid import errors)
load_dotenv()
PROJECT_ROOT = Path(os.environ["PROJECT_ROOT"])
path = PROJECT_ROOT / "datasets" / dataset_name

df = pd.read_csv(path)
test_df = df.copy()