In [30]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/amex-cleaned-datasets/test_data.parquet
/kaggle/input/amex-cleaned-datasets/train_data_cleaned.parquet
/kaggle/input/amex-cleaned-datasets/add_events_features2.parquet
/kaggle/input/amex-cleaned-datasets/test_data_enhanced.parquet
/kaggle/input/amex-cleaned-datasets/add_events_features.parquet
/kaggle/input/amex-cleaned-datasets/train_data_capped_cleaned.parquet
/kaggle/input/amex-cleaned-datasets/trainmodeltraining.ipynb
/kaggle/input/amex-cleaned-datasets/add_trans.parquet
/kaggle/input/amex-cleaned-datasets/agg_trans_features.parquet
/kaggle/input/amex-cleaned-datasets/offer_clean.parquet
/kaggle/input/amex-cleaned-datasets/train_data_enhanced.parquet
/kaggle/input/amex-cleaned-datasets/train_data_merged.parquet
/kaggle/input/amex-cleaned-datasets/testdownloadkarnekeliye.ipynb
/kaggle/input/amex-cleaned-datasets/add_event_cleaned.parquet
/kaggle/input/amex-cleaned-datasets/offermetadata_features.parquet
/kaggle/input/amex-cleaned-datasets/cleaned_test_dataa.parquet
/ka

In [31]:
import kagglehub

# Correct handle
path = kagglehub.dataset_download("aryansachannn/amex-cleaned-datasets")

print("✅ Dataset mounted at:", path)




✅ Dataset mounted at: /kaggle/input/amex-cleaned-datasets


In [32]:
import pandas as pd
import numpy as np
from datetime import datetime
import warnings
import gc
warnings.filterwarnings('ignore')

def load_data():
    """Load all datasets with proper data type handling"""
    print("Loading datasets...")
    
    
    # ✅ Corrected paths
    train_data = pd.read_parquet('/kaggle/input/amex-cleaned-datasets/train_data_cleaned.parquet')
    test_data = pd.read_parquet('/kaggle/input/amex-cleaned-datasets/test_data_cleaned.parquet')  # ✅ fixed typo
    
    # Load additional datasets
    add_events = pd.read_parquet('/kaggle/input/amex-cleaned-datasets/add_event_cleaned.parquet')
    add_trans = pd.read_parquet('/kaggle/input/amex-cleaned-datasets/add_trans.parquet')
    offer_metadata = pd.read_parquet('/kaggle/input/amex-cleaned-datasets/offer_clean.parquet')

    
    # Convert id3 to consistent data type (int64) - Handle NaN values first
    train_data['id3'] = pd.to_numeric(train_data['id3'], errors='coerce').fillna(0).astype('int64')
    test_data['id3'] = pd.to_numeric(test_data['id3'], errors='coerce').fillna(0).astype('int64')
    offer_metadata['id3'] = pd.to_numeric(offer_metadata['id3'], errors='coerce').fillna(0).astype('int64')
    add_events['id3'] = pd.to_numeric(add_events['id3'], errors='coerce').fillna(0).astype('int64')
    
    # Convert id2 to consistent data type (int64) - Handle NaN values first
    train_data['id2'] = pd.to_numeric(train_data['id2'], errors='coerce').fillna(0).astype('int64')
    test_data['id2'] = pd.to_numeric(test_data['id2'], errors='coerce').fillna(0).astype('int64')
    add_events['id2'] = pd.to_numeric(add_events['id2'], errors='coerce').fillna(0).astype('int64')
    add_trans['id2'] = pd.to_numeric(add_trans['id2'], errors='coerce').fillna(0).astype('int64')
    
    print(f"Train data shape: {train_data.shape}")
    print(f"Test data shape: {test_data.shape}")
    print(f"Events data shape: {add_events.shape}")
    print(f"Transaction data shape: {add_trans.shape}")
    print(f"Offer metadata shape: {offer_metadata.shape}")
    
    # Force garbage collection after loading
    gc.collect()
    
    return train_data, test_data, add_events, add_trans, offer_metadata

def check_id_overlap(train_data, test_data, add_events, add_trans, offer_metadata):
    """Check for ID overlaps between datasets"""
    print("\nChecking ID overlaps...")
    
    # Use sample if datasets are too large
    train_sample = train_data.sample(n=min(100000, len(train_data)), random_state=42)
    
    train_id2 = set(train_sample['id2'].unique())
    train_id3 = set(train_sample['id3'].unique())
    
    events_id2 = set(add_events['id2'].dropna().unique())
    events_id3 = set(add_events['id3'].dropna().unique())
    
    trans_id2 = set(add_trans['id2'].dropna().unique())
    offers_id3 = set(offer_metadata['id3'].dropna().unique())
    
    overlaps = {
        'train_events_id2': len(train_id2.intersection(events_id2)),
        'train_events_id3': len(train_id3.intersection(events_id3)),
        'train_trans_id2': len(train_id2.intersection(trans_id2)),
        'train_offers_id3': len(train_id3.intersection(offers_id3))
    }
    
    for key, value in overlaps.items():
        print(f"{key}: {value}")
    
    return overlaps

def clean_numeric_column(df, column):
    """Clean numeric column by converting non-numeric values to NaN"""
    if column in df.columns:
        # Convert to numeric, coercing errors to NaN
        df[column] = pd.to_numeric(df[column], errors='coerce')
    return df

def create_temporal_features(df, timestamp_col='id4'):
    """Create temporal features from timestamp column with memory optimization"""
    print(f"Creating temporal features for {len(df)} rows...")
    
    # Process in chunks to avoid memory issues
    chunk_size = 100000
    processed_chunks = []
    
    for i in range(0, len(df), chunk_size):
        chunk = df.iloc[i:i+chunk_size].copy()
        
        # Convert timestamp to datetime if it's not already
        if not pd.api.types.is_datetime64_any_dtype(chunk[timestamp_col]):
            chunk['datetime'] = pd.to_datetime(chunk[timestamp_col], errors='coerce')
        else:
            chunk['datetime'] = chunk[timestamp_col]
        
        # Extract temporal features
        chunk['hour'] = chunk['datetime'].dt.hour
        chunk['day_of_week'] = chunk['datetime'].dt.dayofweek
        chunk['day_of_month'] = chunk['datetime'].dt.day
        chunk['month'] = chunk['datetime'].dt.month
        chunk['is_weekend'] = (chunk['day_of_week'] >= 5).astype('int8')
        chunk['is_business_hours'] = ((chunk['hour'] >= 9) & (chunk['hour'] <= 17)).astype('int8')
        chunk['is_evening'] = ((chunk['hour'] >= 18) & (chunk['hour'] <= 23)).astype('int8')
        chunk['is_night'] = ((chunk['hour'] >= 0) & (chunk['hour'] <= 6)).astype('int8')
        
        processed_chunks.append(chunk)
        
        # Force garbage collection after each chunk
        gc.collect()
    
    result = pd.concat(processed_chunks, ignore_index=True)
    del processed_chunks
    gc.collect()
    
    return result

def create_offer_features(offer_metadata, train_data, test_data):
    """Create offer-based features with memory optimization"""
    print("\nCreating offer features...")
    
    # Create copies
    train_enhanced = train_data.copy()
    test_enhanced = test_data.copy()
    
    # Clean offer metadata
    offer_clean = offer_metadata.copy()
    
    # Clean numeric columns in offer metadata
    numeric_cols = ['f375', 'f376']
    for col in numeric_cols:
        offer_clean = clean_numeric_column(offer_clean, col)
    
    # Handle datetime columns with error handling
    if 'id12' in offer_clean.columns:
        offer_clean['id12'] = pd.to_datetime(offer_clean['id12'], errors='coerce')
    if 'id13' in offer_clean.columns:
        offer_clean['id13'] = pd.to_datetime(offer_clean['id13'], errors='coerce')
    
    # Create offer aggregations with error handling
    try:
        offer_agg = offer_clean.groupby('id3').agg({
            'f375': ['mean', 'max', 'min', 'count'],
            'f376': ['mean', 'max', 'min', 'std'],
            'id8': ['count', 'nunique'] if 'id8' in offer_clean.columns else ['count'],
            'f374': ['count', 'nunique'] if 'f374' in offer_clean.columns else ['count']
        }).reset_index()
        
        # Flatten column names
        offer_agg.columns = ['id3'] + [f'offer_{col[0]}_{col[1]}' for col in offer_agg.columns[1:]]
        
        # Convert id3 to int64 for merging
        offer_agg['id3'] = offer_agg['id3'].astype('int64')
        
        # Fill NaN values in offer_agg
        offer_agg = offer_agg.fillna(0)
        
        # Convert to appropriate data types to save memory
        float_cols = offer_agg.select_dtypes(include=['float64']).columns
        offer_agg[float_cols] = offer_agg[float_cols].astype('float32')
        
    except Exception as e:
        print(f"Error creating offer aggregations: {e}")
        # Create empty aggregation with minimal columns
        offer_agg = pd.DataFrame({'id3': offer_clean['id3'].unique()})
        offer_agg['offer_count'] = 1
        offer_agg['id3'] = offer_agg['id3'].astype('int64')
    
    # Global offer statistics
    global_offer_stats = {
        'global_avg_f375': offer_clean['f375'].mean() if 'f375' in offer_clean.columns else 0,
        'global_avg_f376': offer_clean['f376'].mean() if 'f376' in offer_clean.columns else 0,
        'global_unique_offers': offer_clean['id3'].nunique(),
        'global_total_offers': len(offer_clean)
    }
    
    # Merge with train and test data
    print("Merging offer features...")
    train_enhanced = train_enhanced.merge(offer_agg, on='id3', how='left')
    test_enhanced = test_enhanced.merge(offer_agg, on='id3', how='left')
    
    # Check merge success
    merged_count = len(train_enhanced) - train_enhanced[offer_agg.columns[1]].isna().sum()
    print(f"Successfully merged offer features for {merged_count} out of {len(train_enhanced)} train records")
    
    # Force garbage collection
    del offer_clean, offer_agg
    gc.collect()
    
    return train_enhanced, test_enhanced, global_offer_stats

def create_event_features(add_events, train_data, test_data):
    """Create event-based features with memory optimization"""
    print("\nCreating event features...")
    
    # Sample events if too large
    if len(add_events) > 1000000:
        print(f"Sampling events data from {len(add_events)} to 1M rows")
        add_events = add_events.sample(n=1000000, random_state=42)
    
    # Add temporal features to events
    events_temporal = create_temporal_features(add_events, 'id4')
    
    # Create click indicator
    events_temporal['clicked'] = (events_temporal['id7'].notna()).astype('int8')
    
    # Customer-level aggregations with error handling
    try:
        customer_event_agg = events_temporal.groupby('id2').agg({
            'clicked': ['sum', 'mean', 'count'],
            'hour': ['mean', 'std'],
            'is_weekend': 'mean',
            'is_business_hours': 'mean',
            'is_evening': 'mean',
            'id6': ['count', 'nunique'] if 'id6' in events_temporal.columns else ['count']
        }).reset_index()
        
        # Flatten column names
        customer_event_agg.columns = ['id2'] + [f'customer_{col[0]}_{col[1]}' for col in customer_event_agg.columns[1:]]
        
        # Convert to appropriate data types
        float_cols = customer_event_agg.select_dtypes(include=['float64']).columns
        customer_event_agg[float_cols] = customer_event_agg[float_cols].astype('float32')
        
    except Exception as e:
        print(f"Error creating customer event aggregations: {e}")
        # Create minimal aggregation
        customer_event_agg = events_temporal.groupby('id2').agg({
            'clicked': ['sum', 'mean', 'count']
        }).reset_index()
        customer_event_agg.columns = ['id2', 'customer_total_clicks', 'customer_click_rate', 'customer_total_events']
    
    # Offer-level aggregations with error handling
    try:
        offer_event_agg = events_temporal.groupby('id3').agg({
            'clicked': ['sum', 'mean', 'count'],
            'hour': ['mean', 'std'],
            'is_weekend': 'mean',
            'is_business_hours': 'mean',
            'id2': 'nunique'
        }).reset_index()
        
        # Flatten column names
        offer_event_agg.columns = ['id3'] + [f'offer_{col[0]}_{col[1]}' for col in offer_event_agg.columns[1:]]
        
        # Convert to appropriate data types
        float_cols = offer_event_agg.select_dtypes(include=['float64']).columns
        offer_event_agg[float_cols] = offer_event_agg[float_cols].astype('float32')
        
    except Exception as e:
        print(f"Error creating offer event aggregations: {e}")
        # Create minimal aggregation
        offer_event_agg = events_temporal.groupby('id3').agg({
            'clicked': ['sum', 'mean', 'count']
        }).reset_index()
        offer_event_agg.columns = ['id3', 'offer_total_clicks', 'offer_click_rate', 'offer_total_events']
    
    # Global event statistics
    global_event_stats = {
        'global_click_rate': events_temporal['clicked'].mean(),
        'global_avg_hour': events_temporal['hour'].mean(),
        'global_weekend_rate': events_temporal['is_weekend'].mean(),
        'global_business_hours_rate': events_temporal['is_business_hours'].mean(),
        'global_total_events': len(events_temporal),
        'global_total_clicks': int(events_temporal['clicked'].sum())
    }
    
    # Simplified placement stats
    placement_dict = {'global_placement_count': events_temporal['id6'].nunique() if 'id6' in events_temporal.columns else 0}
    
    # Fill NaN values and convert IDs
    customer_event_agg = customer_event_agg.fillna(0)
    offer_event_agg = offer_event_agg.fillna(0)
    customer_event_agg['id2'] = customer_event_agg['id2'].astype('int64')
    offer_event_agg['id3'] = offer_event_agg['id3'].astype('int64')
    
    # Force garbage collection
    del events_temporal
    gc.collect()
    
    return customer_event_agg, offer_event_agg, global_event_stats, placement_dict

def create_transaction_features(add_trans, train_data, test_data):
    """Create transaction-based features with memory optimization"""
    print("\nCreating transaction features...")
    
    # Sample transactions if too large
    if len(add_trans) > 2000000:
        print(f"Sampling transaction data from {len(add_trans)} to 2M rows")
        add_trans = add_trans.sample(n=2000000, random_state=42)
    
    # Clean transaction data first
    trans_clean = add_trans.copy()
    
    # Clean numeric columns
    numeric_cols = ['f367', 'f369']
    for col in numeric_cols:
        if col in trans_clean.columns:
            trans_clean = clean_numeric_column(trans_clean, col)
    
    # Remove rows where critical numeric columns are NaN
    print(f"Original transaction data shape: {trans_clean.shape}")
    if 'f367' in trans_clean.columns:
        trans_clean = trans_clean.dropna(subset=['f367'])
        print(f"After removing NaN amounts: {trans_clean.shape}")
    
    # Add temporal features with error handling
    try:
        trans_temporal = create_temporal_features(trans_clean, 'f371')
    except Exception as e:
        print(f"Error creating temporal features for transactions: {e}")
        # Use original data without temporal features
        trans_temporal = trans_clean.copy()
        trans_temporal['hour'] = 12  # Default hour
        trans_temporal['is_weekend'] = 0
        trans_temporal['is_business_hours'] = 1
    
    # Customer-level transaction aggregations with error handling
    try:
        agg_dict = {
            'f367': ['sum', 'mean', 'std', 'count', 'max', 'min'],
            'hour': ['mean', 'std'],
            'is_weekend': 'mean',
            'is_business_hours': 'mean'
        }
        
        # Add f369 if it exists
        if 'f369' in trans_temporal.columns:
            agg_dict['f369'] = ['sum', 'mean', 'count']
        
        # Add f374 if it exists
        if 'f374' in trans_temporal.columns:
            agg_dict['f374'] = ['count', 'nunique']
        
        customer_trans_agg = trans_temporal.groupby('id2').agg(agg_dict).reset_index()
        
        # Flatten column names
        customer_trans_agg.columns = ['id2'] + [f'customer_{col[0]}_{col[1]}' for col in customer_trans_agg.columns[1:]]
        
        # Convert to appropriate data types
        float_cols = customer_trans_agg.select_dtypes(include=['float64']).columns
        customer_trans_agg[float_cols] = customer_trans_agg[float_cols].astype('float32')
        
    except Exception as e:
        print(f"Error creating customer transaction aggregations: {e}")
        # Create minimal aggregation
        customer_trans_agg = trans_temporal.groupby('id2').agg({
            'f367': ['sum', 'mean', 'count']
        }).reset_index()
        customer_trans_agg.columns = ['id2', 'customer_total_amount', 'customer_avg_amount', 'customer_total_transactions']
    
    # Global transaction statistics
    global_trans_stats = {
        'global_avg_transaction_amount': trans_temporal['f367'].mean() if 'f367' in trans_temporal.columns else 0,
        'global_total_transactions': len(trans_temporal),
        'global_avg_trans_hour': trans_temporal['hour'].mean() if 'hour' in trans_temporal.columns else 12,
        'global_trans_weekend_rate': trans_temporal['is_weekend'].mean() if 'is_weekend' in trans_temporal.columns else 0.28,
        'global_trans_business_hours_rate': trans_temporal['is_business_hours'].mean() if 'is_business_hours' in trans_temporal.columns else 0.33,
        'global_unique_industries': trans_temporal['f374'].nunique() if 'f374' in trans_temporal.columns else 0
    }
    
    # Simplified industry stats
    industry_dict = {'global_industry_count': trans_temporal['f374'].nunique() if 'f374' in trans_temporal.columns else 0}
    
    # Fill NaN values and convert IDs
    customer_trans_agg = customer_trans_agg.fillna(0)
    customer_trans_agg['id2'] = customer_trans_agg['id2'].astype('int64')
    
    # Force garbage collection
    del trans_clean, trans_temporal
    gc.collect()
    
    return customer_trans_agg, global_trans_stats, industry_dict

def merge_all_features(train_enhanced, test_enhanced, customer_event_agg, offer_event_agg,
                      customer_trans_agg, global_event_stats, global_trans_stats, 
                      global_offer_stats, placement_stats, industry_stats):
    """Merge all features into final datasets with memory optimization"""
    print("\nMerging all features...")
    
    # Merge customer event features
    print("Merging customer event features...")
    train_enhanced = train_enhanced.merge(customer_event_agg, on='id2', how='left')
    test_enhanced = test_enhanced.merge(customer_event_agg, on='id2', how='left')
    gc.collect()
    
    # Merge offer event features
    print("Merging offer event features...")
    train_enhanced = train_enhanced.merge(offer_event_agg, on='id3', how='left')
    test_enhanced = test_enhanced.merge(offer_event_agg, on='id3', how='left')
    gc.collect()
    
    # Merge customer transaction features
    print("Merging customer transaction features...")
    train_enhanced = train_enhanced.merge(customer_trans_agg, on='id2', how='left')
    test_enhanced = test_enhanced.merge(customer_trans_agg, on='id2', how='left')
    gc.collect()
    
    # Add temporal features to main datasets
    print("Adding temporal features...")
    train_enhanced = create_temporal_features(train_enhanced, 'id4')
    test_enhanced = create_temporal_features(test_enhanced, 'id4')
    gc.collect()
    
    # Add global statistics as features
    print("Adding global statistics...")
    all_global_stats = {**global_event_stats, **global_trans_stats, **global_offer_stats, 
                       **placement_stats, **industry_stats}
    
    for key, value in all_global_stats.items():
        train_enhanced[key] = value
        test_enhanced[key] = value
    
    # Fill NaN values with 0 for numeric columns
    print("Filling NaN values...")
    numeric_cols = train_enhanced.select_dtypes(include=[np.number]).columns
    train_enhanced[numeric_cols] = train_enhanced[numeric_cols].fillna(0)
    test_enhanced[numeric_cols] = test_enhanced[numeric_cols].fillna(0)
    
    # Drop intermediate datetime column
    train_enhanced = train_enhanced.drop('datetime', axis=1, errors='ignore')
    test_enhanced = test_enhanced.drop('datetime', axis=1, errors='ignore')
    
    # Convert float64 to float32 to save memory
    float64_cols = train_enhanced.select_dtypes(include=['float64']).columns
    train_enhanced[float64_cols] = train_enhanced[float64_cols].astype('float32')
    test_enhanced[float64_cols] = test_enhanced[float64_cols].astype('float32')
    
    gc.collect()
    
    return train_enhanced, test_enhanced

def main():
    """Main function to execute the entire pipeline with memory optimization"""
    print("Starting Amex Offerings Feature Engineering Pipeline...")
    
    try:
        # Load data
        train_data, test_data, add_events, add_trans, offer_metadata = load_data()
        
        # Check ID overlaps
        overlaps = check_id_overlap(train_data, test_data, add_events, add_trans, offer_metadata)
        
        # Create offer features
        train_enhanced, test_enhanced, global_offer_stats = create_offer_features(
            offer_metadata, train_data, test_data)
        
        # Force garbage collection
        del offer_metadata
        gc.collect()
        
        # Create event features
        customer_event_agg, offer_event_agg, global_event_stats, placement_stats = create_event_features(
            add_events, train_data, test_data)
        
        # Force garbage collection
        del add_events
        gc.collect()
        
        # Create transaction features
        customer_trans_agg, global_trans_stats, industry_stats = create_transaction_features(
            add_trans, train_data, test_data)
        
        # Force garbage collection
        del add_trans, train_data, test_data
        gc.collect()
        
        # Merge all features
        train_final, test_final = merge_all_features(
            train_enhanced, test_enhanced, customer_event_agg, offer_event_agg,
            customer_trans_agg, global_event_stats, global_trans_stats, 
            global_offer_stats, placement_stats, industry_stats)
        
        # Force garbage collection
        del train_enhanced, test_enhanced, customer_event_agg, offer_event_agg, customer_trans_agg
        gc.collect()
        
        # Print summary
        print(f"\nFinal train data shape: {train_final.shape}")
        print(f"Final test data shape: {test_final.shape}")
        
        # Save enhanced datasets
        print("\nSaving enhanced datasets...")
        train_final.to_parquet('train_data_enhanced_final.parquet', index=False)
        test_final.to_parquet('test_data_enhanced_final.parquet', index=False)
        
        print("\nEnhanced datasets saved successfully!")
        print("- train_data_enhanced_final.parquet")
        print("- test_data_enhanced_final.parquet")
        
        # Check for any remaining NaN values
        print(f"\nRemaining NaN values in train: {train_final.isna().sum().sum()}")
        print(f"Remaining NaN values in test: {test_final.isna().sum().sum()}")
        
        print("\nPipeline completed successfully!")
        
        return train_final, test_final
        
    except Exception as e:
        print(f"Error in main pipeline: {e}")
        import traceback
        traceback.print_exc()
        return None, None

if __name__ == "__main__":
    train_enhanced, test_enhanced = main()

Starting Amex Offerings Feature Engineering Pipeline...
Loading datasets...
Train data shape: (770164, 372)
Test data shape: (369301, 371)
Events data shape: (21457473, 5)
Transaction data shape: (6339465, 9)
Offer metadata shape: (4164, 9)

Checking ID overlaps...
train_events_id2: 0
train_events_id3: 713
train_trans_id2: 0
train_offers_id3: 709

Creating offer features...
Merging offer features...
Successfully merged offer features for 770064 out of 770164 train records

Creating event features...
Sampling events data from 21457473 to 1M rows
Creating temporal features for 1000000 rows...

Creating transaction features...
Sampling transaction data from 6339465 to 2M rows
Original transaction data shape: (2000000, 9)
After removing NaN amounts: (2000000, 9)
Creating temporal features for 2000000 rows...

Merging all features...
Merging customer event features...
Merging offer event features...
Merging customer transaction features...
Adding temporal features...
Creating temporal featu

In [33]:
import pandas as pd
import numpy as np
from datetime import datetime
import warnings
import gc
warnings.filterwarnings('ignore')

def load_data():
    """Load all datasets with proper data type handling"""
    print("Loading datasets...")
    
    # Load main datasets
    train_data = pd.read_parquet('/kaggle/input/amex-cleaned-datasets/train_data_capped_cleaned.parquet')
    test_data = pd.read_parquet('/kaggle/input/amex-cleaned-datasets/cleaned_test_dataa.parquet')
    
    # Load additional datasets
    add_events = pd.read_parquet('/kaggle/input/amex-cleaned-datasets/add_event_cleaned.parquet')
    add_trans = pd.read_parquet('/kaggle/input/amex-cleaned-datasets/add_trans.parquet')
    offer_metadata = pd.read_parquet('offer_clean.parquet')
    
    
    # Convert id3 to consistent data type (int64) - Handle NaN values first
    train_data['id3'] = pd.to_numeric(train_data['id3'], errors='coerce').fillna(0).astype('int64')
    test_data['id3'] = pd.to_numeric(test_data['id3'], errors='coerce').fillna(0).astype('int64')
    offer_metadata['id3'] = pd.to_numeric(offer_metadata['id3'], errors='coerce').fillna(0).astype('int64')
    add_events['id3'] = pd.to_numeric(add_events['id3'], errors='coerce').fillna(0).astype('int64')
    
    # Convert id2 to consistent data type (int64) - Handle NaN values first
    train_data['id2'] = pd.to_numeric(train_data['id2'], errors='coerce').fillna(0).astype('int64')
    test_data['id2'] = pd.to_numeric(test_data['id2'], errors='coerce').fillna(0).astype('int64')
    add_events['id2'] = pd.to_numeric(add_events['id2'], errors='coerce').fillna(0).astype('int64')
    add_trans['id2'] = pd.to_numeric(add_trans['id2'], errors='coerce').fillna(0).astype('int64')
    
    print(f"Train data shape: {train_data.shape}")
    print(f"Test data shape: {test_data.shape}")
    print(f"Events data shape: {add_events.shape}")
    print(f"Transaction data shape: {add_trans.shape}")
    print(f"Offer metadata shape: {offer_metadata.shape}")
    
    # Force garbage collection after loading
    gc.collect()
    
    return train_data, test_data, add_events, add_trans, offer_metadata

def check_id_overlap(train_data, test_data, add_events, add_trans, offer_metadata):
    """Check for ID overlaps between datasets"""
    print("\nChecking ID overlaps...")
    
    # Use sample if datasets are too large
    train_sample = train_data.sample(n=min(100000, len(train_data)), random_state=42)
    
    train_id2 = set(train_sample['id2'].unique())
    train_id3 = set(train_sample['id3'].unique())
    
    events_id2 = set(add_events['id2'].dropna().unique())
    events_id3 = set(add_events['id3'].dropna().unique())
    
    trans_id2 = set(add_trans['id2'].dropna().unique())
    offers_id3 = set(offer_metadata['id3'].dropna().unique())
    
    overlaps = {
        'train_events_id2': len(train_id2.intersection(events_id2)),
        'train_events_id3': len(train_id3.intersection(events_id3)),
        'train_trans_id2': len(train_id2.intersection(trans_id2)),
        'train_offers_id3': len(train_id3.intersection(offers_id3))
    }
    
    for key, value in overlaps.items():
        print(f"{key}: {value}")
    
    return overlaps

def clean_numeric_column(df, column):
    """Clean numeric column by converting non-numeric values to NaN"""
    if column in df.columns:
        # Convert to numeric, coercing errors to NaN
        df[column] = pd.to_numeric(df[column], errors='coerce')
    return df

def create_temporal_features(df, timestamp_col='id4'):
    """Create temporal features from timestamp column with memory optimization"""
    print(f"Creating temporal features for {len(df)} rows...")
    
    # Process in chunks to avoid memory issues
    chunk_size = 100000
    processed_chunks = []
    
    for i in range(0, len(df), chunk_size):
        chunk = df.iloc[i:i+chunk_size].copy()
        
        # Convert timestamp to datetime if it's not already
        if not pd.api.types.is_datetime64_any_dtype(chunk[timestamp_col]):
            chunk['datetime'] = pd.to_datetime(chunk[timestamp_col], errors='coerce')
        else:
            chunk['datetime'] = chunk[timestamp_col]
        
        # Extract temporal features
        chunk['hour'] = chunk['datetime'].dt.hour
        chunk['day_of_week'] = chunk['datetime'].dt.dayofweek
        chunk['day_of_month'] = chunk['datetime'].dt.day
        chunk['month'] = chunk['datetime'].dt.month
        chunk['is_weekend'] = (chunk['day_of_week'] >= 5).astype('int8')
        chunk['is_business_hours'] = ((chunk['hour'] >= 9) & (chunk['hour'] <= 17)).astype('int8')
        chunk['is_evening'] = ((chunk['hour'] >= 18) & (chunk['hour'] <= 23)).astype('int8')
        chunk['is_night'] = ((chunk['hour'] >= 0) & (chunk['hour'] <= 6)).astype('int8')
        
        processed_chunks.append(chunk)
        
        # Force garbage collection after each chunk
        gc.collect()
    
    result = pd.concat(processed_chunks, ignore_index=True)
    del processed_chunks
    gc.collect()
    
    return result

def create_offer_features(offer_metadata, train_data, test_data):
    """Create offer-based features with memory optimization"""
    print("\nCreating offer features...")
    
    # Create copies
    train_enhanced = train_data.copy()
    test_enhanced = test_data.copy()
    
    # Clean offer metadata
    offer_clean = offer_metadata.copy()
    
    # Clean numeric columns in offer metadata
    numeric_cols = ['f375', 'f376']
    for col in numeric_cols:
        offer_clean = clean_numeric_column(offer_clean, col)
    
    # Handle datetime columns with error handling
    if 'id12' in offer_clean.columns:
        offer_clean['id12'] = pd.to_datetime(offer_clean['id12'], errors='coerce')
    if 'id13' in offer_clean.columns:
        offer_clean['id13'] = pd.to_datetime(offer_clean['id13'], errors='coerce')
    
    # Create offer aggregations with error handling
    try:
        offer_agg = offer_clean.groupby('id3').agg({
            'f375': ['mean', 'max', 'min', 'count'],
            'f376': ['mean', 'max', 'min', 'std'],
            'id8': ['count', 'nunique'] if 'id8' in offer_clean.columns else ['count'],
            'f374': ['count', 'nunique'] if 'f374' in offer_clean.columns else ['count']
        }).reset_index()
        
        # Flatten column names
        offer_agg.columns = ['id3'] + [f'offer_{col[0]}_{col[1]}' for col in offer_agg.columns[1:]]
        
        # Convert id3 to int64 for merging
        offer_agg['id3'] = offer_agg['id3'].astype('int64')
        
        # Fill NaN values in offer_agg
        offer_agg = offer_agg.fillna(0)
        
        # Convert to appropriate data types to save memory
        float_cols = offer_agg.select_dtypes(include=['float64']).columns
        offer_agg[float_cols] = offer_agg[float_cols].astype('float32')
        
    except Exception as e:
        print(f"Error creating offer aggregations: {e}")
        # Create empty aggregation with minimal columns
        offer_agg = pd.DataFrame({'id3': offer_clean['id3'].unique()})
        offer_agg['offer_count'] = 1
        offer_agg['id3'] = offer_agg['id3'].astype('int64')
    
    # Merge with train and test data
    print("Merging offer features...")
    train_enhanced = train_enhanced.merge(offer_agg, on='id3', how='left')
    test_enhanced = test_enhanced.merge(offer_agg, on='id3', how='left')
    
    # Check merge success
    merged_count = len(train_enhanced) - train_enhanced[offer_agg.columns[1]].isna().sum()
    print(f"Successfully merged offer features for {merged_count} out of {len(train_enhanced)} train records")
    
    # Force garbage collection
    del offer_clean, offer_agg
    gc.collect()
    
    return train_enhanced, test_enhanced

def create_event_features(add_events, train_data, test_data):
    """Create event-based features with memory optimization"""
    print("\nCreating event features...")
    
    # Sample events if too large
    if len(add_events) > 1000000:
        print(f"Sampling events data from {len(add_events)} to 1M rows")
        add_events = add_events.sample(n=1000000, random_state=42)
    
    # Add temporal features to events
    events_temporal = create_temporal_features(add_events, 'id4')
    
    # Create click indicator
    events_temporal['clicked'] = (events_temporal['id7'].notna()).astype('int8')
    
    # Customer-level aggregations with error handling
    try:
        customer_event_agg = events_temporal.groupby('id2').agg({
            'clicked': ['sum', 'mean', 'count'],
            'hour': ['mean', 'std'],
            'is_weekend': 'mean',
            'is_business_hours': 'mean',
            'is_evening': 'mean',
            'id6': ['count', 'nunique'] if 'id6' in events_temporal.columns else ['count']
        }).reset_index()
        
        # Flatten column names
        customer_event_agg.columns = ['id2'] + [f'customer_{col[0]}_{col[1]}' for col in customer_event_agg.columns[1:]]
        
        # Convert to appropriate data types
        float_cols = customer_event_agg.select_dtypes(include=['float64']).columns
        customer_event_agg[float_cols] = customer_event_agg[float_cols].astype('float32')
        
    except Exception as e:
        print(f"Error creating customer event aggregations: {e}")
        # Create minimal aggregation
        customer_event_agg = events_temporal.groupby('id2').agg({
            'clicked': ['sum', 'mean', 'count']
        }).reset_index()
        customer_event_agg.columns = ['id2', 'customer_total_clicks', 'customer_click_rate', 'customer_total_events']
    
    # Offer-level aggregations with error handling
    try:
        offer_event_agg = events_temporal.groupby('id3').agg({
            'clicked': ['sum', 'mean', 'count'],
            'hour': ['mean', 'std'],
            'is_weekend': 'mean',
            'is_business_hours': 'mean',
            'id2': 'nunique'
        }).reset_index()
        
        # Flatten column names
        offer_event_agg.columns = ['id3'] + [f'offer_{col[0]}_{col[1]}' for col in offer_event_agg.columns[1:]]
        
        # Convert to appropriate data types
        float_cols = offer_event_agg.select_dtypes(include=['float64']).columns
        offer_event_agg[float_cols] = offer_event_agg[float_cols].astype('float32')
        
    except Exception as e:
        print(f"Error creating offer event aggregations: {e}")
        # Create minimal aggregation
        offer_event_agg = events_temporal.groupby('id3').agg({
            'clicked': ['sum', 'mean', 'count']
        }).reset_index()
        offer_event_agg.columns = ['id3', 'offer_total_clicks', 'offer_click_rate', 'offer_total_events']
    
    # Force garbage collection
    del events_temporal
    gc.collect()
    
    return customer_event_agg, offer_event_agg

def create_transaction_features(add_trans, train_data, test_data):
    """Create transaction-based features with memory optimization"""
    print("\nCreating transaction features...")
    
    # Sample transactions if too large
    if len(add_trans) > 2000000:
        print(f"Sampling transaction data from {len(add_trans)} to 2M rows")
        add_trans = add_trans.sample(n=2000000, random_state=42)
    
    # Clean transaction data first
    trans_clean = add_trans.copy()
    
    # Clean numeric columns
    numeric_cols = ['f367', 'f369']
    for col in numeric_cols:
        if col in trans_clean.columns:
            trans_clean = clean_numeric_column(trans_clean, col)
    
    # Remove rows where critical numeric columns are NaN
    print(f"Original transaction data shape: {trans_clean.shape}")
    if 'f367' in trans_clean.columns:
        trans_clean = trans_clean.dropna(subset=['f367'])
        print(f"After removing NaN amounts: {trans_clean.shape}")
    
    # Add temporal features with error handling
    try:
        trans_temporal = create_temporal_features(trans_clean, 'f371')
    except Exception as e:
        print(f"Error creating temporal features for transactions: {e}")
        # Use original data without temporal features
        trans_temporal = trans_clean.copy()
        trans_temporal['hour'] = 12  # Default hour
        trans_temporal['is_weekend'] = 0
        trans_temporal['is_business_hours'] = 1
    
    # Customer-level transaction aggregations with error handling
    try:
        agg_dict = {
            'f367': ['sum', 'mean', 'std', 'count', 'max', 'min'],
            'hour': ['mean', 'std'],
            'is_weekend': 'mean',
            'is_business_hours': 'mean'
        }
        
        # Add f369 if it exists
        if 'f369' in trans_temporal.columns:
            agg_dict['f369'] = ['sum', 'mean', 'count']
        
        # Add f374 if it exists
        if 'f374' in trans_temporal.columns:
            agg_dict['f374'] = ['count', 'nunique']
        
        customer_trans_agg = trans_temporal.groupby('id2').agg(agg_dict).reset_index()
        
        # Flatten column names
        customer_trans_agg.columns = ['id2'] + [f'customer_{col[0]}_{col[1]}' for col in customer_trans_agg.columns[1:]]
        
        # Convert to appropriate data types
        float_cols = customer_trans_agg.select_dtypes(include=['float64']).columns
        customer_trans_agg[float_cols] = customer_trans_agg[float_cols].astype('float32')
        
    except Exception as e:
        print(f"Error creating customer transaction aggregations: {e}")
        # Create minimal aggregation
        customer_trans_agg = trans_temporal.groupby('id2').agg({
            'f367': ['sum', 'mean', 'count']
        }).reset_index()
        customer_trans_agg.columns = ['id2', 'customer_total_amount', 'customer_avg_amount', 'customer_total_transactions']
    
    # Fill NaN values and convert IDs
    customer_trans_agg = customer_trans_agg.fillna(0)
    customer_trans_agg['id2'] = customer_trans_agg['id2'].astype('int64')
    
    # Force garbage collection
    del trans_clean, trans_temporal
    gc.collect()
    
    return customer_trans_agg

def create_relative_features(train_enhanced, test_enhanced):
    """Create relative features that compare individual values to group averages"""
    print("\nCreating relative features...")
    
    # Combine train and test to get overall statistics for relative features
    combined = pd.concat([train_enhanced, test_enhanced], ignore_index=True)
    
    # Customer-level relative features
    if 'customer_clicked_mean' in combined.columns:
        overall_click_rate = combined['customer_clicked_mean'].mean()
        train_enhanced['customer_click_rate_vs_avg'] = train_enhanced['customer_clicked_mean'] / (overall_click_rate + 1e-8)
        test_enhanced['customer_click_rate_vs_avg'] = test_enhanced['customer_clicked_mean'] / (overall_click_rate + 1e-8)
    
    # Transaction amount relative features
    if 'customer_f367_mean' in combined.columns:
        overall_trans_avg = combined['customer_f367_mean'].mean()
        train_enhanced['customer_trans_amount_vs_avg'] = train_enhanced['customer_f367_mean'] / (overall_trans_avg + 1e-8)
        test_enhanced['customer_trans_amount_vs_avg'] = test_enhanced['customer_f367_mean'] / (overall_trans_avg + 1e-8)
    
    # Offer popularity relative features
    if 'offer_clicked_sum' in combined.columns:
        overall_offer_popularity = combined['offer_clicked_sum'].mean()
        train_enhanced['offer_popularity_vs_avg'] = train_enhanced['offer_clicked_sum'] / (overall_offer_popularity + 1e-8)
        test_enhanced['offer_popularity_vs_avg'] = test_enhanced['offer_clicked_sum'] / (overall_offer_popularity + 1e-8)
    
    # Hour-based relative features
    if 'hour' in combined.columns:
        hour_click_rates = combined.groupby('hour')['customer_clicked_mean'].mean()
        train_enhanced['hour_click_rate_vs_hour_avg'] = train_enhanced.apply(
            lambda x: x['customer_clicked_mean'] / (hour_click_rates.get(x['hour'], 0.1) + 1e-8), axis=1
        )
        test_enhanced['hour_click_rate_vs_hour_avg'] = test_enhanced.apply(
            lambda x: x['customer_clicked_mean'] / (hour_click_rates.get(x['hour'], 0.1) + 1e-8), axis=1
        )
    
    del combined
    gc.collect()
    
    return train_enhanced, test_enhanced

def merge_all_features(train_enhanced, test_enhanced, customer_event_agg, offer_event_agg, customer_trans_agg):
    """Merge all features into final datasets with memory optimization"""
    print("\nMerging all features...")
    
    # Merge customer event features
    print("Merging customer event features...")
    train_enhanced = train_enhanced.merge(customer_event_agg, on='id2', how='left')
    test_enhanced = test_enhanced.merge(customer_event_agg, on='id2', how='left')
    gc.collect()
    
    # Merge offer event features
    print("Merging offer event features...")
    train_enhanced = train_enhanced.merge(offer_event_agg, on='id3', how='left')
    test_enhanced = test_enhanced.merge(offer_event_agg, on='id3', how='left')
    gc.collect()
    
    # Merge customer transaction features
    print("Merging customer transaction features...")
    train_enhanced = train_enhanced.merge(customer_trans_agg, on='id2', how='left')
    test_enhanced = test_enhanced.merge(customer_trans_agg, on='id2', how='left')
    gc.collect()
    
    # Add temporal features to main datasets
    print("Adding temporal features...")
    train_enhanced = create_temporal_features(train_enhanced, 'id4')
    test_enhanced = create_temporal_features(test_enhanced, 'id4')
    gc.collect()
    
    # Create relative features instead of global constants
    train_enhanced, test_enhanced = create_relative_features(train_enhanced, test_enhanced)
    gc.collect()
    
    # Fill NaN values with 0 for numeric columns
    print("Filling NaN values...")
    numeric_cols = train_enhanced.select_dtypes(include=[np.number]).columns
    train_enhanced[numeric_cols] = train_enhanced[numeric_cols].fillna(0)
    test_enhanced[numeric_cols] = test_enhanced[numeric_cols].fillna(0)
    
    # Drop intermediate datetime column
    train_enhanced = train_enhanced.drop('datetime', axis=1, errors='ignore')
    test_enhanced = test_enhanced.drop('datetime', axis=1, errors='ignore')
    
    # Convert float64 to float32 to save memory
    float64_cols = train_enhanced.select_dtypes(include=['float64']).columns
    train_enhanced[float64_cols] = train_enhanced[float64_cols].astype('float32')
    test_enhanced[float64_cols] = test_enhanced[float64_cols].astype('float32')
    
    gc.collect()
    
    return train_enhanced, test_enhanced

def remove_constant_features(train_final, test_final):
    """Remove features that have the same value across all samples"""
    print("\nRemoving constant features...")
    
    # Check for constant features in train
    constant_features = []
    for col in train_final.columns:
        if col not in ['id1', 'id2', 'id3', 'id4', 'target']:  # Skip ID columns and target
            if train_final[col].nunique() <= 1:
                constant_features.append(col)
    
    # Also check for features that have the same single value in both train and test
    same_value_features = []
    for col in train_final.columns:
        if col not in ['id1', 'id2', 'id3', 'id4', 'target']:  # Skip ID columns and target
            if (train_final[col].nunique() == 1 and 
                test_final[col].nunique() == 1 and 
                train_final[col].iloc[0] == test_final[col].iloc[0]):
                same_value_features.append(col)
    
    all_constant_features = list(set(constant_features + same_value_features))
    
    if all_constant_features:
        print(f"Removing {len(all_constant_features)} constant features:")
        print(all_constant_features)
        
        train_final = train_final.drop(columns=all_constant_features)
        test_final = test_final.drop(columns=all_constant_features)
    else:
        print("No constant features found.")
    
    return train_final, test_final

def main():
    """Main function to execute the entire pipeline with memory optimization"""
    print("Starting Amex Offerings Feature Engineering Pipeline...")
    
    try:
        # Load data
        train_data, test_data, add_events, add_trans, offer_metadata = load_data()
        
        # Check ID overlaps
        overlaps = check_id_overlap(train_data, test_data, add_events, add_trans, offer_metadata)
        
        # Create offer features
        train_enhanced, test_enhanced = create_offer_features(offer_metadata, train_data, test_data)
        
        # Force garbage collection
        del offer_metadata
        gc.collect()
        
        # Create event features
        customer_event_agg, offer_event_agg = create_event_features(add_events, train_data, test_data)
        
        # Force garbage collection
        del add_events
        gc.collect()
        
        # Create transaction features
        customer_trans_agg = create_transaction_features(add_trans, train_data, test_data)
        
        # Force garbage collection
        del add_trans, train_data, test_data
        gc.collect()
        
        # Merge all features
        train_final, test_final = merge_all_features(
            train_enhanced, test_enhanced, customer_event_agg, offer_event_agg, customer_trans_agg)
        
        # Force garbage collection
        del train_enhanced, test_enhanced, customer_event_agg, offer_event_agg, customer_trans_agg
        gc.collect()
        
        # Remove constant features
        train_final, test_final = remove_constant_features(train_final, test_final)
        
        # Print summary
        print(f"\nFinal train data shape: {train_final.shape}")
        print(f"Final test data shape: {test_final.shape}")
        
        # Save enhanced datasets
        print("\nSaving enhanced datasets...")
        train_final.to_parquet('train_data_enhanced_final.parquet', index=False)
        test_final.to_parquet('test_data_enhanced_final.parquet', index=False)
        
        print("\nEnhanced datasets saved successfully!")
        print("- train_data_enhanced_final.parquet")
        print("- test_data_enhanced_final.parquet")
        
        # Check for any remaining NaN values
        print(f"\nRemaining NaN values in train: {train_final.isna().sum().sum()}")
        print(f"Remaining NaN values in test: {test_final.isna().sum().sum()}")
        
        # Check for remaining constant features
        print(f"\nFinal feature count: {len(train_final.columns)}")
        
        print("\nPipeline completed successfully!")
        
        return train_final, test_final
        
    except Exception as e:
        print(f"Error in main pipeline: {e}")
        import traceback
        traceback.print_exc()
        return None, None

if _name_ == "_main_":
    train_enhanced, test_enhanced = main()

SyntaxError: invalid non-printable character U+00A0 (3652256343.py, line 533)

In [None]:
train_enhanced

In [None]:
test_enhanced 