## **Feature:** Data Type Optimization

**Names:** Gia Bao Ngo

### **What it does**
Analyzes and optimizes data types to reduce memory usage and improve performance. Automatically detects and converts appropriate data types including downcasting numeric types, parsing dates, converting percentages, categorizing low-cardinality columns, and detecting boolean patterns.

In [17]:
# Load dotenv
import os
from dotenv import load_dotenv
load_dotenv()

# Get API Key
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    print("OpenAI API Key not found")

# Import libraries
from pathlib import Path
import pandas as pd
import numpy as np
# Additional imports for data type optimization
import math
import re
import datetime
from sklearn import preprocessing
import warnings
warnings.filterwarnings('ignore')

# Langchain imports
from langchain_openai import ChatOpenAI  
from langchain.schema import HumanMessage, SystemMessage

### **Helper Functions**
- `analyze_data_types(df)` - Analyze current types and suggest optimizations
- `optimize_numeric_types(df, downcast='infer')` - Downcast int64→int32, float64→float32
- `parse_dates_auto(df, columns=None)` - Auto-detect and parse date columns
- `convert_percentages(df, columns=None)` - Convert "50%" strings to 0.5 floats
- `categorize_low_cardinality(df, threshold=0.05)` - Convert low-cardinality objects to category
- `detect_boolean_columns(df)` - Find columns that should be boolean
- `memory_usage_comparison(df_before, df_after)` - Show memory savings

In [18]:
def analyze_data_types(df):
    """
    Analyze current data types and suggest optimizations.
    
    Parameters:
    - df: pandas DataFrame
    
    Returns:
    - DataFrame with current usage and suggested optimizations
    """
    analysis = []
    
    for col in df.columns:
        col_info = {
            'Column': col,
            'Current_Type': str(df[col].dtype),
            'Memory_Usage_MB': df[col].memory_usage(deep=True) / 1024**2,
            'Null_Count': df[col].isnull().sum(),
            'Unique_Values': df[col].nunique(),
            'Cardinality_Ratio': df[col].nunique() / len(df),
        }
        
        # Suggest optimizations
        suggestions = []
        
        # Numeric optimization suggestions
        if pd.api.types.is_numeric_dtype(df[col]):
            if df[col].dtype == 'int64':
                min_val, max_val = df[col].min(), df[col].max()
                if min_val >= -128 and max_val <= 127:
                    suggestions.append('int8')
                elif min_val >= -32768 and max_val <= 32767:
                    suggestions.append('int16')
                elif min_val >= -2147483648 and max_val <= 2147483647:
                    suggestions.append('int32')
            elif df[col].dtype == 'float64':
                suggestions.append('float32')
        
        # Category optimization
        elif df[col].dtype == 'object':
            if col_info['Cardinality_Ratio'] < 0.05:  # Low cardinality
                suggestions.append('category')
            
            # Check for percentage strings
            if df[col].dropna().astype(str).str.contains(r'%$').any():
                suggestions.append('convert_percentage')
            
            # Check for boolean patterns
            unique_vals = set(df[col].dropna().astype(str).str.lower())
            bool_patterns = {'yes', 'no', 'true', 'false', '1', '0', 'y', 'n'}
            if unique_vals.issubset(bool_patterns) and len(unique_vals) == 2:
                suggestions.append('boolean')
            
            # Check for date patterns
            sample_vals = df[col].dropna().head(10).astype(str)
            date_patterns = [
                r'\d{4}-\d{2}-\d{2}',  # YYYY-MM-DD
                r'\d{2}/\d{2}/\d{4}',  # MM/DD/YYYY
                r'\d{2}-\d{2}-\d{4}',  # MM-DD-YYYY
            ]
            if any(sample_vals.str.contains(pattern).any() for pattern in date_patterns):
                suggestions.append('datetime')
        
        col_info['Suggested_Optimizations'] = ', '.join(suggestions) if suggestions else 'None'
        col_info['Potential_Memory_Savings'] = 'High' if suggestions else 'Low'
        
        analysis.append(col_info)
    
    analysis_df = pd.DataFrame(analysis)
    
    print("=== DATA TYPE ANALYSIS ===")
    print(f"Total memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
    print(f"Columns analyzed: {len(df.columns)}")
    print(f"Optimization opportunities: {len([x for x in analysis if x['Suggested_Optimizations'] != 'None'])}")
    
    return analysis_df

In [19]:
def optimize_numeric_types(df, downcast='infer'):
    """
    Downcast numeric types to save memory (int64→int32, float64→float32).
    
    Parameters:
    - df: pandas DataFrame
    - downcast: 'infer' for automatic, 'integer', 'signed', 'unsigned', 'float'
    
    Returns:
    - DataFrame with optimized numeric types
    """
    result_df = df.copy()
    optimized_cols = []
    memory_savings = 0
    
    for col in result_df.columns:
        if pd.api.types.is_numeric_dtype(result_df[col]):
            original_memory = result_df[col].memory_usage(deep=True)
            original_type = result_df[col].dtype
            
            try:
                # Try to downcast the column
                if downcast == 'infer':
                    if pd.api.types.is_integer_dtype(result_df[col]):
                        result_df[col] = pd.to_numeric(result_df[col], downcast='integer')
                    elif pd.api.types.is_float_dtype(result_df[col]):
                        result_df[col] = pd.to_numeric(result_df[col], downcast='float')
                else:
                    result_df[col] = pd.to_numeric(result_df[col], downcast=downcast)
                
                new_memory = result_df[col].memory_usage(deep=True)
                new_type = result_df[col].dtype
                
                if original_memory > new_memory:
                    memory_savings += (original_memory - new_memory)
                    optimized_cols.append({
                        'column': col,
                        'original_type': original_type,
                        'new_type': new_type,
                        'memory_saved_mb': (original_memory - new_memory) / 1024**2
                    })
                else:
                    # Revert if no savings
                    result_df[col] = df[col]
                    
            except (ValueError, OverflowError):
                # Revert if conversion fails
                result_df[col] = df[col]
                continue
    
    print(f"=== NUMERIC TYPE OPTIMIZATION ===")
    print(f"Columns optimized: {len(optimized_cols)}")
    print(f"Total memory saved: {memory_savings / 1024**2:.2f} MB")
    
    if optimized_cols:
        print("\\nOptimization details:")
        for opt in optimized_cols:
            print(f"  {opt['column']}: {opt['original_type']} → {opt['new_type']} (saved {opt['memory_saved_mb']:.2f} MB)")
    
    return result_df

In [20]:
def parse_dates_auto(df, columns=None):
    """
    Auto-detect and parse date columns with multiple format detection.
    
    Parameters:
    - df: pandas DataFrame
    - columns: list of column names to check (None = all object columns)
    
    Returns:
    - DataFrame with converted datetime columns
    """
    result_df = df.copy()
    converted_cols = []
    
    if columns is None:
        columns = result_df.select_dtypes(include=['object']).columns.tolist()
    
    # Common date formats to try
    date_formats = [
        '%Y-%m-%d',           # 2023-12-31
        '%m/%d/%Y',           # 12/31/2023
        '%m-%d-%Y',           # 12-31-2023
        '%d/%m/%Y',           # 31/12/2023
        '%d-%m-%Y',           # 31-12-2023
        '%Y/%m/%d',           # 2023/12/31
        '%Y-%m-%d %H:%M:%S',  # 2023-12-31 23:59:59
        '%m/%d/%Y %H:%M:%S',  # 12/31/2023 23:59:59
        '%Y-%m-%d %H:%M',     # 2023-12-31 23:59
        '%m/%d/%Y %H:%M',     # 12/31/2023 23:59
    ]
    
    for col in columns:
        if col not in result_df.columns:
            continue
            
        # Skip if already datetime
        if pd.api.types.is_datetime64_any_dtype(result_df[col]):
            continue
            
        # Get non-null sample
        sample_data = result_df[col].dropna()
        if len(sample_data) == 0:
            continue
            
        # Try pandas automatic parsing first
        try:
            converted = pd.to_datetime(sample_data, infer_datetime_format=True, errors='coerce')
            if converted.notna().sum() > len(sample_data) * 0.8:  # 80% success rate
                result_df[col] = pd.to_datetime(result_df[col], infer_datetime_format=True, errors='coerce')
                converted_cols.append({
                    'column': col,
                    'method': 'auto_infer',
                    'success_rate': converted.notna().sum() / len(sample_data)
                })
                continue
        except:
            pass
        
        # Try specific formats
        best_format = None
        best_success_rate = 0
        
        for fmt in date_formats:
            try:
                converted = pd.to_datetime(sample_data, format=fmt, errors='coerce')
                success_rate = converted.notna().sum() / len(sample_data)
                
                if success_rate > best_success_rate and success_rate > 0.8:
                    best_format = fmt
                    best_success_rate = success_rate
            except:
                continue
        
        # Apply best format if found
        if best_format:
            try:
                result_df[col] = pd.to_datetime(result_df[col], format=best_format, errors='coerce')
                converted_cols.append({
                    'column': col,
                    'method': f'format: {best_format}',
                    'success_rate': best_success_rate
                })
            except:
                pass
    
    print(f"=== DATE PARSING RESULTS ===")
    print(f"Columns processed: {len(columns)}")
    print(f"Columns converted: {len(converted_cols)}")
    
    if converted_cols:
        print("\\nConversion details:")
        for conv in converted_cols:
            print(f"  {conv['column']}: {conv['method']} (success: {conv['success_rate']:.1%})")
    
    return result_df

In [21]:
def convert_percentages(df, columns=None):
    """
    Convert percentage strings ("50%") to float values (0.5).
    
    Parameters:
    - df: pandas DataFrame
    - columns: list of column names to check (None = all object columns)
    
    Returns:
    - DataFrame with converted percentage columns
    """
    result_df = df.copy()
    converted_cols = []
    
    if columns is None:
        columns = result_df.select_dtypes(include=['object']).columns.tolist()
    
    for col in columns:
        if col not in result_df.columns:
            continue
            
        # Check if column contains percentage strings
        sample_data = result_df[col].dropna().astype(str)
        if len(sample_data) == 0:
            continue
            
        # Count how many values end with %
        pct_count = sample_data.str.contains(r'%$').sum()
        pct_ratio = pct_count / len(sample_data)
        
        if pct_ratio > 0.5:  # More than 50% are percentages
            try:
                # Remove % and convert to float
                converted_values = result_df[col].astype(str).str.replace('%', '', regex=False)
                converted_values = pd.to_numeric(converted_values, errors='coerce') / 100
                
                # Check conversion success rate
                success_rate = converted_values.notna().sum() / len(result_df[col].dropna())
                
                if success_rate > 0.8:  # 80% success rate
                    result_df[col] = converted_values
                    converted_cols.append({
                        'column': col,
                        'percentage_ratio': pct_ratio,
                        'success_rate': success_rate
                    })
            except:
                continue
    
    print(f"=== PERCENTAGE CONVERSION RESULTS ===")
    print(f"Columns processed: {len(columns)}")
    print(f"Columns converted: {len(converted_cols)}")
    
    if converted_cols:
        print("\\nConversion details:")
        for conv in converted_cols:
            print(f"  {conv['column']}: {conv['percentage_ratio']:.1%} had % symbols (success: {conv['success_rate']:.1%})")
    
    return result_df

In [22]:
def categorize_low_cardinality(df, threshold=0.05):
    """
    Convert low-cardinality object columns to category type for memory savings.
    
    Parameters:
    - df: pandas DataFrame
    - threshold: cardinality ratio threshold (unique values / total values)
    
    Returns:
    - DataFrame with categorical columns converted
    """
    result_df = df.copy()
    converted_cols = []
    
    object_cols = result_df.select_dtypes(include=['object']).columns.tolist()
    
    for col in object_cols:
        # Calculate cardinality ratio
        unique_vals = result_df[col].nunique()
        total_vals = len(result_df[col])
        cardinality_ratio = unique_vals / total_vals
        
        if cardinality_ratio <= threshold:
            original_memory = result_df[col].memory_usage(deep=True)
            
            # Convert to category
            result_df[col] = result_df[col].astype('category')
            new_memory = result_df[col].memory_usage(deep=True)
            
            memory_saved = original_memory - new_memory
            if memory_saved > 0:
                converted_cols.append({
                    'column': col,
                    'unique_values': unique_vals,
                    'cardinality_ratio': cardinality_ratio,
                    'memory_saved_mb': memory_saved / 1024**2
                })
            else:
                # Revert if no memory savings
                result_df[col] = df[col]
    
    print(f"=== CATEGORICAL CONVERSION RESULTS ===")
    print(f"Object columns processed: {len(object_cols)}")
    print(f"Columns converted to category: {len(converted_cols)}")
    
    total_memory_saved = sum([conv['memory_saved_mb'] for conv in converted_cols])
    print(f"Total memory saved: {total_memory_saved:.2f} MB")
    
    if converted_cols:
        print("\\nConversion details:")
        for conv in converted_cols:
            print(f"  {conv['column']}: {conv['unique_values']} unique values "
                  f"({conv['cardinality_ratio']:.1%} cardinality, saved {conv['memory_saved_mb']:.2f} MB)")
    
    return result_df

In [23]:
def detect_boolean_columns(df):
    """
    Find columns that should be boolean and convert them.
    
    Parameters:
    - df: pandas DataFrame
    
    Returns:
    - DataFrame with boolean columns converted
    """
    result_df = df.copy()
    converted_cols = []
    
    # Boolean patterns to detect
    boolean_patterns = {
        'yes_no': {'yes', 'no'},
        'true_false': {'true', 'false'},
        'y_n': {'y', 'n'},
        'one_zero_str': {'1', '0'},
        'on_off': {'on', 'off'},
        'active_inactive': {'active', 'inactive'},
        'enabled_disabled': {'enabled', 'disabled'}
    }
    
    for col in result_df.columns:
        # Skip if already boolean
        if result_df[col].dtype == 'bool':
            continue
            
        # Get unique non-null values as lowercase strings
        unique_vals = set(result_df[col].dropna().astype(str).str.lower().str.strip())
        
        if len(unique_vals) == 0:
            continue
            
        # Check if values match any boolean pattern
        pattern_matched = None
        for pattern_name, pattern_vals in boolean_patterns.items():
            if unique_vals == pattern_vals or (len(unique_vals) <= 2 and unique_vals.issubset(pattern_vals)):
                pattern_matched = pattern_name
                break
        
        if pattern_matched:
            try:
                # Create mapping based on pattern
                if pattern_matched in ['yes_no', 'y_n']:
                    mapping = {'yes': True, 'y': True, 'no': False, 'n': False}
                elif pattern_matched == 'true_false':
                    mapping = {'true': True, 'false': False}
                elif pattern_matched == 'one_zero_str':
                    mapping = {'1': True, '0': False}
                elif pattern_matched == 'on_off':
                    mapping = {'on': True, 'off': False}
                elif pattern_matched == 'active_inactive':
                    mapping = {'active': True, 'inactive': False}
                elif pattern_matched == 'enabled_disabled':
                    mapping = {'enabled': True, 'disabled': False}
                
                # Apply mapping
                original_memory = result_df[col].memory_usage(deep=True)
                result_df[col] = result_df[col].astype(str).str.lower().str.strip().map(mapping)
                new_memory = result_df[col].memory_usage(deep=True)
                
                converted_cols.append({
                    'column': col,
                    'pattern': pattern_matched,
                    'unique_values': list(unique_vals),
                    'memory_saved_mb': (original_memory - new_memory) / 1024**2
                })
                
            except Exception:
                # Revert if conversion fails
                result_df[col] = df[col]
                continue
    
    print(f"=== BOOLEAN DETECTION RESULTS ===")
    print(f"Columns processed: {len(result_df.columns)}")
    print(f"Boolean columns detected and converted: {len(converted_cols)}")
    
    total_memory_saved = sum([conv['memory_saved_mb'] for conv in converted_cols])
    print(f"Total memory saved: {total_memory_saved:.2f} MB")
    
    if converted_cols:
        print("\\nConversion details:")
        for conv in converted_cols:
            print(f"  {conv['column']}: {conv['pattern']} pattern ({conv['unique_values']}) "
                  f"(saved {conv['memory_saved_mb']:.2f} MB)")
    
    return result_df

In [24]:
def memory_usage_comparison(df_before, df_after):
    """
    Show memory savings comparison between two DataFrames.
    
    Parameters:
    - df_before: original DataFrame
    - df_after: optimized DataFrame
    
    Returns:
    - DataFrame with memory comparison details
    """
    comparison = []
    
    # Overall memory comparison
    memory_before = df_before.memory_usage(deep=True).sum() / 1024**2
    memory_after = df_after.memory_usage(deep=True).sum() / 1024**2
    memory_saved = memory_before - memory_after
    savings_pct = (memory_saved / memory_before) * 100 if memory_before > 0 else 0
    
    print(f"=== MEMORY USAGE COMPARISON ===")
    print(f"Memory before optimization: {memory_before:.2f} MB")
    print(f"Memory after optimization: {memory_after:.2f} MB")
    print(f"Total memory saved: {memory_saved:.2f} MB ({savings_pct:.1f}%)")
    print(f"Data shape: {df_after.shape}")
    
    # Column-by-column comparison
    for col in df_before.columns:
        if col in df_after.columns:
            mem_before = df_before[col].memory_usage(deep=True) / 1024**2
            mem_after = df_after[col].memory_usage(deep=True) / 1024**2
            mem_saved = mem_before - mem_after
            
            comparison.append({
                'Column': col,
                'Type_Before': str(df_before[col].dtype),
                'Type_After': str(df_after[col].dtype),
                'Memory_Before_MB': mem_before,
                'Memory_After_MB': mem_after,
                'Memory_Saved_MB': mem_saved,
                'Savings_Percent': (mem_saved / mem_before) * 100 if mem_before > 0 else 0
            })
    
    comparison_df = pd.DataFrame(comparison)
    
    # Show top memory savers
    if len(comparison_df) > 0:
        top_savers = comparison_df[comparison_df['Memory_Saved_MB'] > 0].nlargest(5, 'Memory_Saved_MB')
        
        if len(top_savers) > 0:
            print(f"\\nTop memory saving columns:")
            for _, row in top_savers.iterrows():
                print(f"  {row['Column']}: {row['Type_Before']} → {row['Type_After']} "
                      f"(saved {row['Memory_Saved_MB']:.2f} MB, {row['Savings_Percent']:.1f}%)")
        
        # Summary by data type changes
        type_changes = comparison_df[comparison_df['Type_Before'] != comparison_df['Type_After']]
        if len(type_changes) > 0:
            print(f"\\nType conversions: {len(type_changes)} columns")
            change_summary = type_changes.groupby(['Type_Before', 'Type_After']).size().reset_index(name='Count')
            for _, row in change_summary.iterrows():
                print(f"  {row['Type_Before']} → {row['Type_After']}: {row['Count']} columns")
    
    return comparison_df

In [25]:
helper_docs = """ Helper functions available:
- analyze_data_types(df): Analyze current types and suggest optimizations. Returns DataFrame with analysis.
- optimize_numeric_types(df, downcast='infer'): Downcast int64→int32, float64→float32 to save memory. Returns optimized DataFrame.
- parse_dates_auto(df, columns=None): Auto-detect and parse date columns with multiple format detection. Returns DataFrame with datetime columns.
- convert_percentages(df, columns=None): Convert "50%" strings to 0.5 floats. Returns DataFrame with converted columns.
- categorize_low_cardinality(df, threshold=0.05): Convert low-cardinality objects to category type. Returns DataFrame with categorical columns.
- detect_boolean_columns(df): Find and convert columns that should be boolean (Yes/No, True/False, 1/0 patterns). Returns DataFrame with boolean columns.
- memory_usage_comparison(df_before, df_after): Show memory savings between DataFrames. Returns comparison DataFrame.

Examples:
- "Analyze data types" -> analysis_df = analyze_data_types(df)
- "Optimize numeric types" -> df = optimize_numeric_types(df)
- "Convert dates automatically" -> df = parse_dates_auto(df)
- "Fix percentage columns" -> df = convert_percentages(df)
- "Convert to categories" -> df = categorize_low_cardinality(df)
- "Detect boolean columns" -> df = detect_boolean_columns(df)
- "Compare memory usage" -> comparison = memory_usage_comparison(original_df, df)
"""

# **MAIN FEATURE FUNCTION**

In [26]:
def data_types(df, user_query):
    """
    Main function that gets called by the main router.
    MUST take (df, user_query) and return df
    """
    
    # Create message chain
    messages = []
    messages.append(SystemMessage(content=helper_docs))
    messages.append(SystemMessage(content=f"""
    You are a data cleaning agent focused on data type optimization and memory management.
    
    Dataset info: Shape: {df.shape}, Sample: {df.head(3).to_string()}

    Libraries available:
    - pd (pandas), np (numpy)
    - math, re, datetime
    - sklearn.preprocessing
    - All helper functions listed above
    
    Rules:
    - Return only executable Python code, no explanations, no markdown blocks
    - Use helper functions when appropriate for data type optimization tasks
    - ASSUME "df" IS ALREADY DEFINED
    - For analysis queries, use helper functions that print results (analyze_data_types, memory_usage_comparison)
    - For optimization, use helper functions that modify DataFrame (optimize_numeric_types, parse_dates_auto, etc.)
    - ALWAYS assign the result back to df when modifying: df = optimize_numeric_types(df)
    - In order to generate a response/message to the user use print statements
    print("message")
    - Write a detailed print message to summarise actions taken and reasons
    
    Common query patterns:
    - "Analyze data types" or "Check data types" -> analysis_df = analyze_data_types(df)
    - "Optimize data types" or "Save memory" -> df = optimize_numeric_types(df); df = categorize_low_cardinality(df)
    - "Convert dates" or "Parse dates" -> df = parse_dates_auto(df)
    - "Fix percentage columns" -> df = convert_percentages(df)
    - "Convert to categories" -> df = categorize_low_cardinality(df)
    - "Detect boolean columns" -> df = detect_boolean_columns(df)
    - "Compare memory usage" -> original_df = df.copy(); [optimizations]; comparison = memory_usage_comparison(original_df, df)
    """))
    messages.append(HumanMessage(content=f"User request: {user_query}"))
    
    # Call LLM with message chain
    llm = ChatOpenAI(temperature=0, model_name="gpt-4o-mini")
    response = llm.invoke(messages)
    generated_code = response.content.strip()
    
    # Execute code
    try:
        original_df = df.copy()
        # Create local namespace with our variables
        local_vars = {
            'df': df.copy(),
            'original_df': original_df,
            'pd': pd,
            'np': np,
            'optimize_numeric_types': optimize_numeric_types,
            'categorize_low_cardinality': categorize_low_cardinality,
            'parse_dates_auto': parse_dates_auto,
            'convert_percentages': convert_percentages,
            'detect_boolean_columns': detect_boolean_columns,
            'analyze_data_types': analyze_data_types,
            'memory_usage_comparison': memory_usage_comparison,
            'print': print
        }
        
        exec(generated_code, globals(), local_vars)
        return local_vars['df']
    except Exception as e:
        print(f"Error: {e}")
        print(f"Generated Code:{generated_code}")
        return original_df

# **Testing**

In [None]:
# # Create sample data with various data type optimization opportunities
# test_data = {
#     'id': range(1, 101),  # Can be downcasted from int64 to int8
#     'score': [float(x) for x in range(1, 101)],  # Can be downcasted from float64 to float32
#     'category': ['A', 'B', 'C', 'A', 'B'] * 20,  # Low cardinality - can be categorical
#     'active': ['Yes', 'No', 'Yes', 'No', 'Yes'] * 20,  # Boolean pattern
#     'percentage': ['85%', '92%', '78%', '95%', '88%'] * 20,  # Percentage strings
#     'date_string': ['2023-01-15', '2023-02-20', '2023-03-10'] * 33 + ['2023-04-05'],  # Date strings
#     'large_text': ['This is some text content'] * 100,  # High cardinality text
# }

# test_df = pd.DataFrame(test_data)
# print("Test DataFrame created:")
# print(f"Shape: {test_df.shape}")
# print("\\nData types:")
# print(test_df.dtypes)
# print("\\nMemory usage:")
# print(f"Total: {test_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

In [None]:
# query = "Optimize data types to save memory"
# result = data_types(test_df, query)

In [None]:
# result.info()

In [None]:
# test_df.info()