In [1]:
import pandas as pd

In [None]:
import pandas as pd
import numpy as np

def create_automated_dtype_mapping(data_dict_path_or_df):
    """
    Automatically create dtype mapping from data dictionary with 371+ features
    """
    # Load data dictionary if path provided
    if isinstance(data_dict_path_or_df, str):
        data_dict = pd.read_csv(data_dict_path_or_df)
    else:
        data_dict = data_dict_path_or_df.copy()
    
    dtype_mapping = {}
    
    for _, row in data_dict.iterrows():
        column = row['masked_column']
        data_type = row['Type']
        description = str(row['Description']).lower()
        
        # Map based on Type column
        if data_type == 'Key':
            dtype_mapping[column] = 'object'
        elif data_type == 'Categorical':
            dtype_mapping[column] = 'category'
        elif data_type == 'Label':
            # Check if binary classification or regression
            dtype_mapping[column] = 'int8'  # Assume binary, adjust if needed
        elif data_type == 'Numerical':
            # Smart mapping based on description
            if any(keyword in description for keyword in ['whether', 'binary', 'flag', 'indicator']):
                # Binary features (0/1)
                dtype_mapping[column] = 'int8'
            elif any(keyword in description for keyword in ['count', 'number of', 'quantity']):
                # Count features (integers)
                dtype_mapping[column] = 'int32'
            elif 'score' in description or 'rate' in description or 'ratio' in description:
                # Scores and rates (floats)
                dtype_mapping[column] = 'float32'
            elif 'timestamp' in description or 'time' in description:
                # Timestamps
                dtype_mapping[column] = 'int64'  # or 'datetime64[ns]'
            elif 'date' in description:
                # Dates
                dtype_mapping[column] = 'object'  # Convert to datetime later
            else:
                # Default numerical
                dtype_mapping[column] = 'float32'
        else:
            # Unknown type, keep as object
            dtype_mapping[column] = 'object'
    
    return dtype_mapping

def load_and_convert_parquet(parquet_path, data_dict_path, sample_first=True, nan_strategy='nullable', sample_size=1000):
    """
    Load parquet file and convert dtypes based on data dictionary with NaN handling
    
    Parameters:
    - nan_strategy: 'nullable', 'float', 'fill_zero', 'fill_mode', 'analyze'
    - sample_size: number of rows to sample for testing (if sample_first=True)
    """
    # Create dtype mapping
    dtype_mapping = create_automated_dtype_mapping(data_dict_path)
    
    print(f"Created dtype mapping for {len(dtype_mapping)} columns")
    
    # Sample first to check for issues (optional)
    if sample_first:
        print(f"Sampling {sample_size} rows to check for conversion issues...")
        
        # More efficient sampling for large files using PyArrow
        try:
            import pyarrow.parquet as pq
            # Read just the first batch/chunk
            parquet_file = pq.ParquetFile(parquet_path)
            first_batch = parquet_file.read_row_group(0)
            df_sample = first_batch.to_pandas().head(sample_size)
            total_rows = parquet_file.metadata.num_rows
            print(f"Dataset has {total_rows:,} total rows, using {len(df_sample)} for testing")
        except ImportError:
            # Fallback to pandas if PyArrow not available
            print("PyArrow not available, loading full dataset for sampling...")
            df_full = pd.read_parquet(parquet_path)
            df_sample = df_full.head(sample_size)
            total_rows = len(df_full)
            print(f"Dataset has {total_rows:,} total rows, using {len(df_sample)} for testing")
        
        # Analyze NaN impact
        nan_analysis = analyze_nan_impact(df_sample, dtype_mapping)
        
        if nan_strategy == 'analyze':
            nan_strategy = recommend_nan_strategy(nan_analysis)
            print(f"Recommended NaN strategy: {nan_strategy}")
        
        # Test conversions on sample
        conversion_issues = test_dtype_conversions(df_sample, dtype_mapping)
        if conversion_issues:
            print("Found conversion issues:")
            for col, issue in conversion_issues.items():
                print(f"  {col}: {issue}")
            
            # Adjust mapping based on issues
            dtype_mapping = fix_conversion_issues(df_sample, dtype_mapping, conversion_issues)
    
    # Load full dataset
    print("Loading full dataset...")
    if sample_first and 'df_full' in locals():
        df = df_full  # Already loaded for sampling
    else:
        df = pd.read_parquet(parquet_path)
    
    # Convert dtypes with NaN handling
    df = safe_convert_all_dtypes(df, dtype_mapping, nan_strategy)
    
    return df, dtype_mapping

def test_dtype_conversions(df_sample, dtype_mapping):
    """
    Test dtype conversions on a sample to identify issues
    """
    issues = {}
    
    for col, target_dtype in dtype_mapping.items():
        if col not in df_sample.columns:
            continue
            
        try:
            if target_dtype in ['int8', 'int16', 'int32', 'int64']:
                pd.to_numeric(df_sample[col], errors='raise').astype(target_dtype)
            elif target_dtype in ['float16', 'float32', 'float64']:
                pd.to_numeric(df_sample[col], errors='raise').astype(target_dtype)
            elif target_dtype == 'category':
                df_sample[col].astype('category')
        except Exception as e:
            issues[col] = str(e)
    
    return issues

def fix_conversion_issues(df_sample, dtype_mapping, issues):
    """
    Automatically fix common conversion issues
    """
    fixed_mapping = dtype_mapping.copy()
    
    for col, issue in issues.items():
        print(f"Fixing {col}: {issue}")
        
        # Check actual data in problematic column
        sample_values = df_sample[col].dropna().head(10)
        print(f"Sample values: {sample_values.tolist()}")
        
        # Common fixes
        if "cannot convert" in issue.lower() or "invalid literal" in issue.lower():
            # Try float first, then object
            try:
                pd.to_numeric(df_sample[col], errors='coerce')
                fixed_mapping[col] = 'float32'
                print(f"  -> Changed to float32")
            except:
                fixed_mapping[col] = 'object'
                print(f"  -> Keeping as object")
    
    return fixed_mapping

def safe_convert_all_dtypes(df, dtype_mapping, handle_nans_strategy='nullable'):
    """
    Safely convert all columns with comprehensive NaN handling
    
    Parameters:
    - handle_nans_strategy: 'nullable', 'float', 'fill_zero', 'fill_mode'
    """
    print("Converting dtypes safely with NaN handling...")
    conversion_report = {}
    nan_report = {}
    
    for col, target_dtype in dtype_mapping.items():
        if col not in df.columns:
            continue
            
        original_dtype = df[col].dtype
        nan_count = df[col].isna().sum()
        nan_report[col] = nan_count
        
        try:
            if target_dtype == 'category':
                # Categories can handle NaNs naturally
                df[col] = df[col].astype('category')
                
            elif target_dtype in ['int8', 'int16', 'int32', 'int64']:
                # Integer types cannot handle NaNs - need special handling
                df[col] = pd.to_numeric(df[col], errors='coerce')
                
                if df[col].isna().any():
                    if handle_nans_strategy == 'nullable':
                        # Use nullable integer types (pandas >= 0.24)
                        nullable_type = target_dtype.replace('int', 'Int')  # Int8, Int32, etc.
                        df[col] = df[col].astype(nullable_type)
                        print(f"  {col}: Using nullable {nullable_type} due to {nan_count} NaNs")
                    elif handle_nans_strategy == 'float':
                        # Convert to float instead (can handle NaNs)
                        float_type = target_dtype.replace('int', 'float')
                        df[col] = df[col].astype(float_type)
                        print(f"  {col}: Converted to {float_type} due to {nan_count} NaNs")
                    elif handle_nans_strategy == 'fill_zero':
                        # Fill NaNs with 0
                        df[col] = df[col].fillna(0).astype(target_dtype)
                        print(f"  {col}: Filled {nan_count} NaNs with 0")
                    elif handle_nans_strategy == 'fill_mode':
                        # Fill with mode (most common value)
                        mode_val = df[col].mode().iloc[0] if not df[col].mode().empty else 0
                        df[col] = df[col].fillna(mode_val).astype(target_dtype)
                        print(f"  {col}: Filled {nan_count} NaNs with mode value {mode_val}")
                else:
                    df[col] = df[col].astype(target_dtype)
                    
            elif target_dtype in ['float16', 'float32', 'float64']:
                # Float types can handle NaNs naturally
                df[col] = pd.to_numeric(df[col], errors='coerce').astype(target_dtype)
                if nan_count > 0:
                    print(f"  {col}: Keeping {nan_count} NaN values in {target_dtype}")
                    
            else:
                # Object and other types
                df[col] = df[col].astype(target_dtype)
                
            conversion_report[col] = f"{original_dtype} -> {df[col].dtype} ✓"
            
        except Exception as e:
            conversion_report[col] = f"{original_dtype} -> {target_dtype} ✗ ({str(e)})"
            print(f"Failed to convert {col}: {e}")
    
    # Summary of NaN handling
    print(f"\nNaN Summary:")
    total_nans = sum(nan_report.values())
    cols_with_nans = sum(1 for count in nan_report.values() if count > 0)
    print(f"Total NaN values: {total_nans:,}")
    print(f"Columns with NaNs: {cols_with_nans}/{len(nan_report)}")
    
    return df

def analyze_nan_impact(df, dtype_mapping):
    """
    Analyze how NaNs will affect dtype conversions
    """
    print("Analyzing NaN impact on dtype conversions...")
    
    nan_analysis = {}
    problematic_cols = []
    
    for col, target_dtype in dtype_mapping.items():
        if col not in df.columns:
            continue
            
        nan_count = df[col].isna().sum()
        nan_pct = (nan_count / len(df)) * 100
        
        # Check if this will be problematic
        is_problematic = (
            nan_count > 0 and 
            target_dtype in ['int8', 'int16', 'int32', 'int64']
        )
        
        nan_analysis[col] = {
            'nan_count': nan_count,
            'nan_percentage': nan_pct,
            'target_dtype': target_dtype,
            'problematic': is_problematic
        }
        
        if is_problematic:
            problematic_cols.append(col)
    
    print(f"\nFound {len(problematic_cols)} columns with NaNs targeting integer types:")
    for col in problematic_cols[:10]:  # Show first 10
        info = nan_analysis[col]
        print(f"  {col}: {info['nan_count']} NaNs ({info['nan_percentage']:.1f}%) -> {info['target_dtype']}")
    
    if len(problematic_cols) > 10:
        print(f"  ... and {len(problematic_cols) - 10} more")
    
    return nan_analysis

def recommend_nan_strategy(nan_analysis):
    """
    Recommend best NaN handling strategy based on data
    """
    integer_cols_with_nans = [
        col for col, info in nan_analysis.items() 
        if info['problematic']
    ]
    
    if not integer_cols_with_nans:
        return 'no_action_needed'
    
    # Calculate statistics
    high_nan_cols = [
        col for col in integer_cols_with_nans 
        if nan_analysis[col]['nan_percentage'] > 20
    ]
    
    binary_like_cols = [
        col for col in integer_cols_with_nans 
        if nan_analysis[col]['target_dtype'] == 'int8'
    ]
    
    print(f"\nRecommendations:")
    print(f"- {len(high_nan_cols)} columns have >20% NaNs - consider 'nullable' or 'float'")
    print(f"- {len(binary_like_cols)} binary columns with NaNs - consider 'fill_zero' or 'nullable'")
    
    if len(high_nan_cols) > len(integer_cols_with_nans) * 0.5:
        return 'nullable'  # Many high-NaN columns
    elif len(binary_like_cols) > len(integer_cols_with_nans) * 0.7:
        return 'fill_zero'  # Mostly binary features
    else:
        return 'nullable'  # Safe default

def analyze_memory_savings(df_before, df_after):
    """
    Analyze memory savings after dtype conversion
    """
    mem_before = df_before.memory_usage(deep=True).sum() / 1024**2
    mem_after = df_after.memory_usage(deep=True).sum() / 1024**2
    savings = mem_before - mem_after
    
    print(f"\nMemory Analysis:")
    print(f"Before conversion: {mem_before:.2f} MB")
    print(f"After conversion:  {mem_after:.2f} MB")
    print(f"Memory saved:      {savings:.2f} MB ({savings/mem_before*100:.1f}%)")

# Usage examples with different NaN strategies
if __name__ == "__main__":
    data_dict_path = "dataset/data_dictionary.csv"
    parquet_path = "dataset/add_trans.parquet"
    
    # Method 1: Auto-analyze and recommend strategy
    df, dtype_mapping = load_and_convert_parquet(
        parquet_path, data_dict_path, nan_strategy='analyze'
    )
    
    # Method 2: Use nullable integers (recommended for most cases)
    # df, dtype_mapping = load_and_convert_parquet(
    #     parquet_path, data_dict_path, nan_strategy='nullable'
    # )
    
    # Method 3: Convert problematic integers to floats
    # df, dtype_mapping = load_and_convert_parquet(
    #     parquet_path, data_dict_path, nan_strategy='float'
    # )
    
    # Method 4: Fill NaNs with 0 (good for binary features)
    # df, dtype_mapping = load_and_convert_parquet(
    #     parquet_path, data_dict_path, nan_strategy='fill_zero'
    # )
    
    print("\nFinal dtypes:")
    print(df.dtypes.value_counts())
    
    print(f"\nDataset shape: {df.shape}")
    print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
    print(f"Total NaN values: {df.isna().sum().sum():,}")
    
    # Check specific columns with NaNs
    cols_with_nans = df.columns[df.isna().any()].tolist()
    print(f"Columns still with NaNs: {len(cols_with_nans)}")
    
    if cols_with_nans:
        print("Top 10 columns by NaN count:")
        nan_counts = df[cols_with_nans].isna().sum().sort_values(ascending=False)
        print(nan_counts.head(10))
    
    # Save the optimized dataset
    # df.to_parquet("optimized_data.parquet", index=False)

Created dtype mapping for 372 columns
Sampling 1000 rows to check for conversion issues...
Dataset has 6,339,465 total rows, using 1000 for testing
Analyzing NaN impact on dtype conversions...

Found 0 columns with NaNs targeting integer types:
Recommended NaN strategy: no_action_needed
Loading full dataset...
Converting dtypes safely with NaN handling...

NaN Summary:
Total NaN values: 0
Columns with NaNs: 0/1

Final dtypes:
object     8
float64    1
Name: count, dtype: int64

Dataset shape: (6339465, 9)
Memory usage: 2911.49 MB
Total NaN values: 1,304
Columns still with NaNs: 1
Top 10 columns by NaN count:
id8    1304
dtype: int64


In [3]:
df.to_parquet('dataset/processed/add_trans_processed.parquet')