In [1]:
import polars as pl
import polars.selectors as cs # Import selectors module
import numpy as np
import pandas as pd # Still needed for plotting, and sklearn bridge if PowerTransform used
import matplotlib.pyplot as plt
import seaborn as sns
import os
import warnings
import gc # Garbage collector
import sys
import io # For capturing print output
import time
import re
from sklearn.preprocessing import StandardScaler, RobustScaler, PowerTransformer
from sklearn.exceptions import NotFittedError
from joblib import dump, load # For saving scaler objects

In [2]:
from matplotlib import font_manager as fm

existing_font_files = {f.fname for f in fm.fontManager.ttflist}
font_path = r"C:\Users\junting\AppData\Local\Microsoft\Windows\Fonts\NotoSansTC-Regular.ttf"

if font_path not in existing_font_files:
    fm.fontManager.addfont(font_path)

from matplotlib import font_manager
fonts = [f.name for f in font_manager.fontManager.ttflist if "Noto" in f.name]
print(fonts)

['Noto Sans TC']


In [3]:
# --- Configuration ---
VERSION = "v12" # Simplified Scaling & Font Fix
DATA_DIR = "data/"
TRAIN_FILE = os.path.join(DATA_DIR, "training.csv")
TEST_FILE = os.path.join(DATA_DIR, "public_x.csv")
OUTPUT_DIR = f"eda_output_polars_{VERSION}/"

# --- Output File Paths ---
PARQUET_OUTPUT_TRAIN_ORIGINAL = os.path.join(DATA_DIR, f"training_selected_optimized_polars_{VERSION}.parquet")
PARQUET_OUTPUT_TEST_ORIGINAL = os.path.join(DATA_DIR, f"public_x_selected_optimized_polars_{VERSION}.parquet")
PARQUET_OUTPUT_TRAIN_SCALED_TMPL = os.path.join(DATA_DIR, f"training_scaled_{{strategy}}_{VERSION}.parquet")
PARQUET_OUTPUT_TEST_SCALED_TMPL = os.path.join(DATA_DIR, f"public_x_scaled_{{strategy}}_{VERSION}.parquet")
SCALER_PT_PATH = os.path.join(OUTPUT_DIR, f"scaler_powertransformer_{VERSION}.joblib")
SCALER_FINAL_PATH_TMPL = os.path.join(OUTPUT_DIR, f"scaler_final_{{strategy}}_{VERSION}.joblib")
SCALER_STATS_PATH_TMPL = os.path.join(OUTPUT_DIR, f"scaler_stats_{{strategy}}_{VERSION}.parquet") # For pure Polars stats

# --- Analysis & Processing Parameters ---
SAMPLE_SIZE = 200000        # Sample size for computationally intensive plots/stats during EDA
HIGH_CORR_THRESHOLD = 0.90  # Threshold for identifying highly correlated features
LOW_CARDINALITY_THRESHOLD = 30 # Max unique values for a 'low-cardinality' categorical feature (non-Broker ID)
HEATMAP_THRESHOLD = 100     # Max features for plotting correlation heatmap
TOP_N_FEATURES_PLOT = 50    # Max features for IC/Corr bar plots
PAIRPLOT_N_FEATURES = 7     # Max features for pairplot (computationally expensive)
# IC_COMPARE_TOP_N = 30     # Removed as IC comparison is skipped
COLUMN_BATCH_SIZE = 500     # Process this many columns at a time during batched scaling (if using sklearn path)

TARGET_COLUMN = '飆股'     # Name of the target variable column
ID_COLUMN = 'ID'           # Name of the unique identifier column

# --- Feature Filtering Thresholds ---
# These will be applied BEFORE scaling to reduce feature count
FILTER_MISSING_THRESHOLD = 0.95 # Drop features with > 95% missing values
FILTER_IC_THRESHOLD = 0.005     # Drop features with abs(Original IC) < 0.005
FILTER_ZERO_VAR = True          # Drop features with near-zero variance
FILTER_HIGH_CORR = True         # Drop one feature from highly correlated pairs

# --- Scaling Strategy Selection ---
# Choose one: 'robust', 'power_robust', 'power_standard', 'none'
# Defaulting to 'robust' (pure Polars implementation)
SCALING_STRATEGY = 'robust'
# SCALING_STRATEGY = 'power_robust' # Use this for PowerTransform + RobustScaler (requires more memory)
# SCALING_STRATEGY = 'power_standard'# Use this for PowerTransform + StandardScaler (requires more memory)
# SCALING_STRATEGY = 'none' # Set to 'none' to skip the entire scaling section (Section 6)

SAVE_SCALER_OBJECTS = True # Set to True to save fitted scaler objects/stats using joblib/parquet

# --- Feature Selection Keywords ---
# Keywords used to initially select relevant columns from the CSV files
KEYWORDS = [
    # Fundamentals
    '營收$', '營業利益', '稅後淨利', '淨值', '盈餘', 'EPS', '本益比', '股價淨值比',
    '殖利率', '毛利率', '成長率',
    # Price/Volume/Technical Indicators
    '^收盤價$', '^最低價$', '^最高價$', '^開盤價$',
    '週轉率', '成交.*量', '成交.*值', '報酬率', '漲跌', 'K\(', 'D\(', 'MACD', 'RSI',
    '乖離率', '振幅', '高低價差', 'MA\d+', '均線',
    # Institutional Flow/Chip Analysis
    '外資', '投信', '自營商', '買賣超', '買張', '賣張', '持股',
    '券商代號', # Keep this to select the Broker ID columns (will be treated as numeric)
    '庫存', '融資', '融券', '券資比', '集保', '大戶', '散戶', '分點',
    # Always include ID and Target
    ID_COLUMN, TARGET_COLUMN
]
keyword_pattern = re.compile('|'.join(KEYWORDS), re.IGNORECASE) # Compile regex for efficiency

# --- Setup ---
# Create output directory if it doesn't exist
os.makedirs(OUTPUT_DIR, exist_ok=True)

# --- Output Logging Setup ---
log_file_path = os.path.join(OUTPUT_DIR, f'eda_log_{VERSION}.txt')
original_stdout = sys.stdout # Store original stdout

class Tee(io.StringIO):
    """Helper class to redirect stdout to both console and a file."""
    def __init__(self, *files):
        super().__init__()
        self.files = files
    def write(self, obj):
        text = str(obj) # Ensure it's a string
        for f in self.files:
            f.write(text)
            f.flush() # Ensure immediate writing
        super().write(text)
        super().flush()
    def flush(self):
        for f in self.files:
            f.flush()
        super().flush()

log_file = open(log_file_path, 'w', encoding='utf-8')

# --- Start Logging (Initial Console Output) ---
# Print initial info before redirecting stdout
print(f"--- Starting EDA & Scaling (Polars {VERSION} - Simplified Scaling, Font Fix) ---")
print(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}")
try: print(f"Polars Version: {pl.__version__}")
except NameError: print("Error: Polars library not found. pip install polars"); exit()
print(f"Pandas Version: {pd.__version__}")
print(f"Seaborn Version: {sns.__version__}")
try: print(f"Scikit-learn version: {__import__('sklearn').__version__}")
except ImportError: print("Error: Scikit-learn library not found. pip install scikit-learn"); exit()
print(f"Output Directory: {OUTPUT_DIR}")
print(f"Log File: {log_file_path}")
print(f"Sample Size for Heavy Plotting/Stats: {SAMPLE_SIZE}")
print(f"Chosen Scaling Strategy: {SCALING_STRATEGY}")
print(f"Save Scaler Objects/Stats: {SAVE_SCALER_OBJECTS}")
if 'power' in SCALING_STRATEGY: print(f"Column Batch Size for Sklearn Scaling: {COLUMN_BATCH_SIZE}")
print(f"Feature Filtering: Missing > {FILTER_MISSING_THRESHOLD*100}%, Abs(IC) < {FILTER_IC_THRESHOLD}, ZeroVar={FILTER_ZERO_VAR}, HighCorr={FILTER_HIGH_CORR}")


# --- Plotting Style & Font Setup ---
# Use seaborn styles, fallback gracefully
try: plt.style.use('seaborn-v0_8-whitegrid'); print("Using plot style: 'seaborn-v0_8-whitegrid'")
except OSError:
    try: plt.style.use('seaborn-whitegrid'); print("Using plot style: 'seaborn-whitegrid'")
    except OSError:
        try: plt.style.use('ggplot'); print("Using plot style: 'ggplot'")
        except OSError: print("Warning: Using default Matplotlib style."); pass
plt.rcParams['figure.figsize'] = (14, 7) # Default figure size

# --- *** FONT CONFIGURATION (ROBUST) *** ---
# To display Chinese characters correctly in plots on Linux/WSL:
# 1. Install Noto CJK fonts (Recommended):
#    `sudo apt update && sudo apt install -y fonts-noto-cjk`
# 2. Clear Matplotlib's font cache *after* installing fonts:
#    `rm -rf ~/.cache/matplotlib/*`
# 3. Verify the exact font name Matplotlib finds using a diagnostic script (see previous response).
# 4. Update the `preferred_fonts` list below with the exact names found.

# List of preferred fonts, with Noto Sans CJK TC first (best for Traditional Chinese)
preferred_fonts = [
    'Noto Sans CJK TC',  # Preferred font for Traditional Chinese
    'Noto Sans TC',      # Alternative name
    'WenQuanYi Zen Hei', # Good fallback CJK font
    'DejaVu Sans',       # Common fallback with broad Unicode support
    'sans-serif'         # Generic system fallback
]
try:
    plt.rcParams['font.sans-serif'] = preferred_fonts
    print(f"Set Matplotlib font.sans-serif to: {preferred_fonts}")

    # Verify Matplotlib found *one* of the preferred CJK fonts
    from matplotlib import font_manager
    # Force a rebuild check, useful after cache clearing
    # Note: Might slightly increase startup time the first time after clearing cache
    # try: fm.fontManager._rebuild() # Older matplotlib
    # except: font_manager.findfont("sans-serif", rebuild_if_missing=True) # Newer matplotlib >= 3.3 ?

    found_cjk_fonts_in_list = [f.name for f in font_manager.fontManager.ttflist if f.name in preferred_fonts[:-2]] # Check Noto/WenQuanYi
    if not found_cjk_fonts_in_list:
        print("\n*** WARNING: Matplotlib did NOT find preferred CJK fonts (Noto/WenQuanYi). ***")
        print("*** Ensure 'fonts-noto-cjk' is installed via apt, cache cleared (`rm -rf ~/.cache/matplotlib/*`), and check font names. Plots with Chinese characters may fail. ***\n")
    else:
         print(f"Matplotlib found the following preferred CJK font(s): {found_cjk_fonts_in_list}")

except Exception as e:
    print(f"Error setting font.sans-serif: {e}")
    print("Plots with Chinese characters might not render correctly.")

plt.rcParams['axes.unicode_minus'] = False # Display minus sign correctly
# --- End Font Configuration ---

# Suppress common warnings
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=UserWarning)
# warnings.filterwarnings('ignore', category=pl.exceptions.PerformanceWarning) # Optionally suppress Polars perf warns

# --- Polars Configuration ---
pl.Config.set_tbl_rows(50) # Max rows to display in printouts
pl.Config.set_tbl_cols(15) # Max columns to display
pl.Config.set_float_precision(4) # Decimal places for floats

# --- NOW Redirect stdout to log file and console ---
sys.stdout = Tee(original_stdout, log_file)
print("\n--- stdout now redirected to console and log file ---")

# ==============================================================================
# --- 1. Feature Selection & Schema Definition ---
# ==============================================================================
print(f"\n--- 1. Selecting Features & Defining Schema ---")
selected_features = []; all_available_cols = []; schema_overrides = {}
initial_selected_count = 0 # Initialize here for guaranteed definition
try:
    print(f"Scanning header row from {TRAIN_FILE}...")
    lf_scan = pl.scan_csv(TRAIN_FILE, n_rows=0, infer_schema_length=0)
    all_available_cols = lf_scan.collect_schema().names()
    print(f"Found {len(all_available_cols)} total columns in header.")
    del lf_scan; gc.collect()

    # Save all column names for reference
    all_cols_file_path = os.path.join(OUTPUT_DIR, 'all_column_names.txt')
    print(f"Saving all column names to {all_cols_file_path}")
    with open(all_cols_file_path, 'w', encoding='utf-8') as f:
        for col_name in all_available_cols: f.write(f"{col_name}\n")

    # Select features based on keywords
    start_select_time = time.time()
    selected_features = [col for col in all_available_cols if keyword_pattern.search(col)]
    # Ensure ID and Target are always included if they exist
    if ID_COLUMN not in selected_features and ID_COLUMN in all_available_cols: selected_features.append(ID_COLUMN)
    if TARGET_COLUMN not in selected_features and TARGET_COLUMN in all_available_cols: selected_features.append(TARGET_COLUMN)
    selected_features = sorted(list(set(selected_features))) # Unique and sorted
    initial_selected_count = len(selected_features) # Assign actual value
    print(f"Keyword selection duration: {time.time() - start_select_time:.2f}s.")
    print(f"\nSelected {initial_selected_count} features based on keywords.")
    # Print sample of selected features
    if initial_selected_count > 0:
        if initial_selected_count < 500: print("Selected Features:", selected_features)
        else: print(f"Selected Features (First 50): {selected_features[:50]}..."); print(f"Selected Features (Last 50): ...{selected_features[-50:]}")
    else:
        raise ValueError("CRITICAL: No features were selected based on keywords. Check KEYWORDS list and CSV header.")

    # Define schema overrides for efficient loading
    print("\nDefining heuristic schema overrides (prioritizing Float32)...")
    for col in selected_features:
        if col == ID_COLUMN: schema_overrides[col] = pl.Utf8 # IDs as strings
        elif col == TARGET_COLUMN: schema_overrides[col] = pl.Int8 # Target as small integer
        elif '券商代號' in col: schema_overrides[col] = pl.Float32 # Treat Broker IDs as numeric (Float32 is robust)
        elif any(kw in col for kw in ['張數', '筆數', '家數', '日數', '名次', '成交量', 'K(', 'D(', '週轉率']): schema_overrides[col] = pl.Int32 # Specific integer types
        else: schema_overrides[col] = pl.Float32 # Default to Float32 for others
    print("Sample schema overrides defined:")
    print({k: v for i, (k, v) in enumerate(schema_overrides.items()) if i < 20})

except FileNotFoundError as e: print(f"\nCRITICAL Error: Input file not found - {e}"); sys.stdout = original_stdout; log_file.close(); exit()
except ValueError as e: print(f"\nCRITICAL Error: {e}"); sys.stdout = original_stdout; log_file.close(); exit()
except Exception as e: print(f"\nCRITICAL Error during Feature Selection/Schema setup: {e}"); sys.stdout = original_stdout; log_file.close(); exit()

# ==============================================================================
# --- 2. Load Data (Polars) ---
# ==============================================================================
print("\n\n--- 2. Loading Training Data (Selected Features using Polars Lazy API) ---")
df_train = None; gc.collect(); start_load_time = time.time(); mem_usage_mb = -1
try:
    # Lazy scan with schema hints
    lf_train = pl.scan_csv(
        TRAIN_FILE, low_memory=True, try_parse_dates=False,
        ignore_errors=True, # Skip rows with parsing errors
        infer_schema_length=10000, # Infer types from first N rows
        schema_overrides=schema_overrides # Provide our overrides as hints <<< CORRECTED ARGUMENT NAME
    )

    # Filter selected features to only those actually present in the file
    available_cols_in_file = lf_train.collect_schema().names()
    final_selected_features = [col for col in selected_features if col in available_cols_in_file]
    if len(final_selected_features) != len(selected_features):
        print(f"Warning: {len(selected_features) - len(final_selected_features)} selected features not found in {TRAIN_FILE}. Using {len(final_selected_features)} available.")
        selected_features = final_selected_features # IMPORTANT: Update main list

    if not selected_features: raise ValueError("CRITICAL: No selected features remain after checking file.")
    print(f"Selecting {len(selected_features)} features lazily...")
    lf_train = lf_train.select(selected_features)

    # Apply explicit casting if inferred type differs from override or if default Float32 needed
    print("Applying explicit casting based on schema_overrides (Lazy)...")
    lf_train_schema = lf_train.collect_schema()
    cast_expressions = []
    for col_name in selected_features: # Use updated selected_features
        if col_name not in lf_train_schema: continue # Skip if column wasn't loaded
        target_dtype = schema_overrides.get(col_name); current_dtype = lf_train_schema[col_name]
        # Cast if override specified and different from inferred
        if target_dtype and current_dtype != target_dtype: cast_expressions.append(pl.col(col_name).cast(target_dtype, strict=False))
        # Cast Float64/Int64 to Float32 if no specific override (saves memory)
        elif not target_dtype and col_name != ID_COLUMN and col_name != TARGET_COLUMN:
             if current_dtype == pl.Float64: cast_expressions.append(pl.col(col_name).cast(pl.Float32, strict=False))
             elif current_dtype == pl.Int64: cast_expressions.append(pl.col(col_name).cast(pl.Float32, strict=False)) # Cast large Ints to Float32

    if cast_expressions: print(f"  Applying {len(cast_expressions)} casting operations."); lf_train = lf_train.with_columns(cast_expressions)
    else: print("  No further casting operations needed beyond initial dtypes hint.")

    # Execute the lazy plan and load into memory
    print(f"Collecting training data into memory (executing lazy plan)...")
    df_train = lf_train.collect()
    print(f"\nSuccessfully loaded training data subset using Polars.")
    print(f"Load duration: {time.time() - start_load_time:.2f} seconds.")
    mem_usage_mb = df_train.estimated_size("mb")
    print(f"Estimated memory usage (Polars DataFrame): {mem_usage_mb:.2f} MB")
    print(f"Training Data Shape: {df_train.shape}")
    print("\nTraining Data Schema (Final):"); print(df_train.schema)
    # Check for essential columns after loading
    if ID_COLUMN not in df_train.columns: print(f"Warning: ID column '{ID_COLUMN}' is missing in loaded training data.")
    if TARGET_COLUMN not in df_train.columns: print(f"Warning: Target column '{TARGET_COLUMN}' is missing in loaded training data.")

except pl.exceptions.ComputeError as e: # Catch Polars compute errors (like OOM)
     if "out of memory" in str(e).lower(): print("\nCRITICAL POLARS OOM ERROR during collect(). Increase RAM or reduce features/rows."); df_train = None
     else: print(f"\nCRITICAL Polars Compute Error during load: {e}"); df_train = None
except Exception as e: # Catch other potential errors during loading
    print(f"\nCRITICAL Error during Polars Training Data Load: {e}"); df_train = None
finally:
    # Ensure LazyFrame is cleaned up
    if 'lf_train' in locals(): del lf_train
    gc.collect()

# --- Load Test Data (Polars) ---
# Similar process as training data loading
print(f"\n--- Loading Public Test Data (Selected Features using Polars Lazy API) ---")
df_test = None; mem_usage_test_mb = -1
try:
    # Get header columns efficiently
    test_scan = pl.scan_csv(TEST_FILE, n_rows=0, infer_schema_length=0)
    test_available_cols = test_scan.collect_schema().names(); del test_scan; gc.collect()
    # Use the final 'selected_features' list from training, exclude target, ensure ID
    test_usecols = [col for col in selected_features if col in test_available_cols and col != TARGET_COLUMN]
    if ID_COLUMN in test_available_cols and ID_COLUMN not in test_usecols: test_usecols.append(ID_COLUMN)
    test_usecols = sorted(list(set(test_usecols)))
    # Filter schema overrides for columns present in test set
    test_schema_overrides = {k: v for k, v in schema_overrides.items() if k in test_usecols}

    if not test_usecols: print("Warning: No selected features found in test set header. Skipping test data load."); df_test = None
    else:
        print(f"Loading {len(test_usecols)} features present in test set: {TEST_FILE}")
        lf_test = pl.scan_csv(TEST_FILE, low_memory=True, try_parse_dates=False,
                              ignore_errors=True, infer_schema_length=10000,
                              schema_overrides=test_schema_overrides # <<< CORRECTED ARGUMENT NAME
                             ).select(test_usecols)
        # Apply casting similar to training data
        lf_test_schema = lf_test.collect_schema(); test_cast_expressions = []
        for col_name in test_usecols:
            if col_name not in lf_test_schema: continue
            target_dtype = test_schema_overrides.get(col_name); current_dtype = lf_test_schema[col_name]
            if target_dtype and current_dtype != target_dtype: test_cast_expressions.append(pl.col(col_name).cast(target_dtype, strict=False))
            elif not target_dtype and col_name != ID_COLUMN:
                if current_dtype == pl.Float64: test_cast_expressions.append(pl.col(col_name).cast(pl.Float32, strict=False))
                elif current_dtype == pl.Int64: test_cast_expressions.append(pl.col(col_name).cast(pl.Float32, strict=False))
        if test_cast_expressions: print(f"  Applying {len(test_cast_expressions)} casting operations to test data."); lf_test = lf_test.with_columns(test_cast_expressions)
        else: print("  No further casting operations needed for test data.")
        # Collect test data
        print("Collecting test data into memory..."); df_test = lf_test.collect()
        print(f"\nSuccessfully loaded test data subset.")
        mem_usage_test_mb = df_test.estimated_size("mb"); print(f"Test Data Shape: {df_test.shape}")
        print(f"Estimated memory usage (Polars Test DataFrame): {mem_usage_test_mb:.2f} MB")
        print("\nTest Data Schema (Final):"); print(df_test.schema)
        if ID_COLUMN not in df_test.columns: print(f"Warning: ID column '{ID_COLUMN}' is missing in loaded test data.")

except FileNotFoundError: print(f"Warning: Test file {TEST_FILE} not found. Skipping test data load."); df_test = None
except Exception as e: print(f"\nError loading test data: {e}"); df_test = None
finally:
    if 'lf_test' in locals(): del lf_test
    gc.collect()

# --- Load Test Data (Polars) ---
# Similar process as training data loading
print(f"\n--- Loading Public Test Data (Selected Features using Polars Lazy API) ---")
df_test = None; mem_usage_test_mb = -1
try:
    # Get header columns efficiently
    test_scan = pl.scan_csv(TEST_FILE, n_rows=0, infer_schema_length=0)
    test_available_cols = test_scan.collect_schema().names(); del test_scan; gc.collect()
    # Use the final 'selected_features' list from training, exclude target, ensure ID
    test_usecols = [col for col in selected_features if col in test_available_cols and col != TARGET_COLUMN]
    if ID_COLUMN in test_available_cols and ID_COLUMN not in test_usecols: test_usecols.append(ID_COLUMN)
    test_usecols = sorted(list(set(test_usecols)))
    # Filter schema overrides for columns present in test set
    test_schema_overrides = {k: v for k, v in schema_overrides.items() if k in test_usecols}

    if not test_usecols: print("Warning: No selected features found in test set header. Skipping test data load."); df_test = None
    else:
        print(f"Loading {len(test_usecols)} features present in test set: {TEST_FILE}")
        lf_test = pl.scan_csv(TEST_FILE, low_memory=True, try_parse_dates=False,
                          ignore_errors=True, infer_schema_length=10000,
                          schema_overrides=test_schema_overrides # <<< RENAMED ARGUMENT
                         ).select(test_usecols)
        # Apply casting similar to training data
        lf_test_schema = lf_test.collect_schema(); test_cast_expressions = []
        for col_name in test_usecols:
            if col_name not in lf_test_schema: continue
            target_dtype = test_schema_overrides.get(col_name); current_dtype = lf_test_schema[col_name]
            if target_dtype and current_dtype != target_dtype: test_cast_expressions.append(pl.col(col_name).cast(target_dtype, strict=False))
            elif not target_dtype and col_name != ID_COLUMN:
                if current_dtype == pl.Float64: test_cast_expressions.append(pl.col(col_name).cast(pl.Float32, strict=False))
                elif current_dtype == pl.Int64: test_cast_expressions.append(pl.col(col_name).cast(pl.Float32, strict=False))
        if test_cast_expressions: print(f"  Applying {len(test_cast_expressions)} casting operations to test data."); lf_test = lf_test.with_columns(test_cast_expressions)
        else: print("  No further casting operations needed for test data.")
        # Collect test data
        print("Collecting test data into memory..."); df_test = lf_test.collect()
        print(f"\nSuccessfully loaded test data subset.")
        mem_usage_test_mb = df_test.estimated_size("mb"); print(f"Test Data Shape: {df_test.shape}")
        print(f"Estimated memory usage (Polars Test DataFrame): {mem_usage_test_mb:.2f} MB")
        print("\nTest Data Schema (Final):"); print(df_test.schema)
        if ID_COLUMN not in df_test.columns: print(f"Warning: ID column '{ID_COLUMN}' is missing in loaded test data.")

except FileNotFoundError: print(f"Warning: Test file {TEST_FILE} not found. Skipping test data load."); df_test = None
except Exception as e: print(f"\nError loading test data: {e}"); df_test = None
finally:
    if 'lf_test' in locals(): del lf_test
    gc.collect()


# ==============================================================================
# --- 3. EDA & Feature Filtering Prep ---
# ==============================================================================
print("\n\n--- 3. Detailed EDA & Feature Filtering Prep ---")

# --- CRITICAL CHECK: Ensure df_train is loaded before proceeding ---
if df_train is None or df_train.height == 0:
    print("CRITICAL Error: Training data failed to load or is empty. Aborting.")
    if 'log_file' in locals() and not log_file.closed: log_file.close() # Ensure log is closed
    sys.stdout = original_stdout # Restore stdout if redirected
    exit() # Stop script execution
else:
    print(f"Proceeding with EDA on loaded training data ({df_train.shape[0]} rows, {df_train.shape[1]} columns).")

# --- Define Feature Lists based on loaded data ---
loaded_cols_train = df_train.columns
# Get all numeric types loaded (includes Broker IDs now treated as Float32) - FIXED Deprecation
numeric_features_all = df_train.select(cs.numeric()).columns
# Get actual categorical/string types loaded (excluding ID) - FIXED Deprecation
categorical_features = df_train.select(cs.string() | cs.categorical()).drop(ID_COLUMN).columns

# --- Define features NOT intended for scaling ---
# Include ID, Target, and the Broker ID columns
cols_to_exclude_from_scaling = [ID_COLUMN]
if TARGET_COLUMN in loaded_cols_train: cols_to_exclude_from_scaling.append(TARGET_COLUMN)
cols_to_exclude_from_scaling.extend([col for col in numeric_features_all if '券商代號' in col])
cols_to_exclude_from_scaling = sorted(list(set(cols_to_exclude_from_scaling))) # Unique and sorted

# --- Initial list of numeric features intended FOR scaling (before filtering) ---
# All numeric columns EXCEPT those in the exclusion list
numeric_features_initially_for_scaling = sorted([
    col for col in numeric_features_all if col not in cols_to_exclude_from_scaling
])

print(f"\n[3.1] Initial Feature Counts:")
print(f"   Total loaded: {len(loaded_cols_train)}")
print(f"   Numeric (All): {len(numeric_features_all)}")
print(f"   Categorical/String: {len(categorical_features)}")
print(f"   Numeric initially targeted for scaling: {len(numeric_features_initially_for_scaling)}")
print(f"   Excluded from scaling (IDs, Target, Brokers): {len(cols_to_exclude_from_scaling)}")
if TARGET_COLUMN in loaded_cols_train: print(f"   Target column: '{TARGET_COLUMN}'")
else: print(f"   WARNING: Target column '{TARGET_COLUMN}' NOT FOUND in final loaded training data.")
if ID_COLUMN in loaded_cols_train: print(f"   ID column: '{ID_COLUMN}'")
else: print(f"   WARNING: ID column '{ID_COLUMN}' NOT FOUND in final loaded training data.")


# --- 3.2 Target Variable Analysis ---
print("\n[3.2] Target Variable Analysis")
target_distribution = None; imbalance_ratio = -1
if TARGET_COLUMN in df_train.columns:
    try:
        # Use pl.len() - FIXED Deprecation
        target_distribution = df_train.group_by(TARGET_COLUMN).agg(pl.len().alias("count")).sort(TARGET_COLUMN)
        total_rows_train = df_train.height; target_distribution = target_distribution.with_columns((pl.col("count") / total_rows_train * 100).round(2).alias("percentage"))
        print("Target Distribution:"); print(target_distribution)
        if target_distribution.height > 1: min_c = target_distribution['count'].min(); max_c = target_distribution['count'].max(); imbalance_ratio = min_c / max_c if max_c > 0 else 0; imbalance_desc = 'Highly' if imbalance_ratio < 0.1 else ('Moderately' if imbalance_ratio < 0.4 else 'Relatively'); print(f"\nImbalance Ratio (Minority/Majority Count): {imbalance_ratio:.4f} -> {imbalance_desc} Imbalanced.")
        elif target_distribution.height == 1: print("\nOnly one target class found.")
        else: print("\nTarget column seems empty or null.")
        # Plotting using Seaborn (should use configured fonts)
        try:
             target_dist_pd = target_distribution.to_pandas()
             plt.figure(figsize=(6, 4)); sns.barplot(x=TARGET_COLUMN, y='count', data=target_dist_pd, palette='viridis')
             plt.title(f'Distribution of Target Variable ({TARGET_COLUMN})'); plt.xlabel(f'{TARGET_COLUMN} Value'); plt.ylabel('Count'); plt.tight_layout()
             plot_path = os.path.join(OUTPUT_DIR, 'target_distribution.png'); plt.savefig(plot_path); plt.close(); print(f"Target distribution plot saved to: {plot_path}")
             del target_dist_pd # Clean up pandas df
        except Exception as e: print(f"Error generating target plot: {e}"); plt.close()
    except Exception as e: print(f"Error during target analysis: {e}")
else: print(f"Target column '{TARGET_COLUMN}' not found. Skipping target analysis.")
del target_distribution; gc.collect()


# --- 3.3 Missing Value Analysis & Filtering ---
print("\n[3.3] Missing Value Analysis & Filtering ---")
missing_summary_pl = None
high_missing_cols = []
try:
    start_missing_time = time.time()
    null_counts = df_train.null_count()
    # Create summary DataFrame
    missing_summary_pl = null_counts.unpivot(index=[], variable_name="Feature", value_name="Missing Count").with_columns(
        (pl.col("Missing Count") / df_train.height).alias("Missing Percentage")
    ).filter(pl.col("Missing Count") > 0).sort("Missing Percentage", descending=True)
    print(f"Missing value calculation duration: {time.time() - start_missing_time:.2f} seconds.")

    if missing_summary_pl.height == 0: print("No missing values found.")
    else:
        print("\nColumns with Missing Values (Top 50):"); print(missing_summary_pl.head(50))
        missing_csv_path = os.path.join(OUTPUT_DIR, 'missing_values_summary.csv')
        missing_summary_pl.write_csv(missing_csv_path); print(f"Full missing values summary saved: {missing_csv_path}")
        # Get list of columns exceeding threshold for filtering later
        high_missing_cols = missing_summary_pl.filter(pl.col("Missing Percentage") > FILTER_MISSING_THRESHOLD)['Feature'].to_list()
        print(f"\nFound {len(high_missing_cols)} features with > {FILTER_MISSING_THRESHOLD*100:.1f}% missing values (to be filtered later).")
        # Plotting missing % bar chart (should use configured fonts)
        try:
             missing_pd = missing_summary_pl.head(100).to_pandas() # Plot top 100 missing
             plt.figure(figsize=(12, max(8, len(missing_pd) * 0.2)))
             sns.barplot(x='Missing Percentage', y='Feature', data=missing_pd, palette='viridis')
             plt.title('Percentage of Missing Values by Feature (Top 100)')
             plt.xlabel('Percentage Missing (%)'); plt.ylabel('Features'); plt.tight_layout()
             missing_plot_path = os.path.join(OUTPUT_DIR, 'missing_values_barplot_subset.png')
             plt.savefig(missing_plot_path); plt.close()
             print(f"Missing values plot saved to: {missing_plot_path}")
             del missing_pd # Clean up pandas df
        except Exception as e: print(f"Error generating missing values plot: {e}"); plt.close()
    del null_counts; gc.collect() # Clean up null counts object
    # Keep missing_summary_pl for filtering in 3.6
except Exception as e: print(f"Error during missing value analysis: {e}")

# --- 3.4 Numerical Feature Analysis (EDA + Filtering Prep) ---
print("\n[3.4] Numerical Feature Analysis (EDA + Filtering Prep)")
# Use sample for EDA plots/stats if needed
df_analysis_pl = None; using_sample = False; ic_df_pl_original = None # Initialize original IC dataframe
if df_train.height > SAMPLE_SIZE * 1.5:
    print(f"Using sample ({SAMPLE_SIZE} rows) for EDA plots/stats.")
    df_analysis_pl = df_train.sample(n=SAMPLE_SIZE, seed=42, shuffle=True)
    using_sample = True
else:
    print(f"Using full loaded subset ({df_train.height} rows) for numerical analysis.")
    df_analysis_pl = df_train # Reference, not a copy
gc.collect(); analysis_target = 'Sample' if using_sample else 'Full Subset'

# [3.4.1] Descriptive Statistics (on all numeric)
print(f"\n   [3.4.1] Descriptive Statistics ({analysis_target}) - on all numerics...")
try:
    start_desc_time = time.time(); desc_stats_pl = df_analysis_pl.select(cs.numeric()).describe(percentiles=[0.01, 0.05, 0.25, 0.50, 0.75, 0.95, 0.99]) # Use selector
    print(f"Descriptive stats duration: {time.time() - start_desc_time:.2f}s.")
    with pl.Config(tbl_cols=-1, tbl_width_chars=200): print(desc_stats_pl) # Print full stats table
    desc_stats_path = os.path.join(OUTPUT_DIR, f'numerical_descriptive_stats_{analysis_target.lower()}.csv')
    desc_stats_pl.write_csv(desc_stats_path); print(f"Descriptive stats saved to: {desc_stats_path}"); del desc_stats_pl; gc.collect()
except Exception as e: print(f"Error calculating descriptive stats: {e}")

# [3.4.2] Zero Variance Check & Filtering
print(f"\n   [3.4.2] Checking for Zero/Near-Zero Variance ({analysis_target})...")
zero_variance_cols = []
try:
    # Calculate variance on the analysis df (sample or full)
    variances_pl = df_analysis_pl.select(cs.numeric()).var() # Use selector
    var_dict = {col: var for col, var in zip(variances_pl.columns, variances_pl.row(0))}
    near_zero_threshold = 1e-9
    zero_variance_cols = [col for col, var in var_dict.items() if var is not None and not np.isnan(var) and abs(var) < near_zero_threshold]
    if zero_variance_cols: print(f"   Found {len(zero_variance_cols)} features with near-zero variance (to be filtered later).")
    else: print(f"   No features with near-zero variance found.")
    del variances_pl, var_dict; gc.collect()
except Exception as e: print(f"Error calculating variance: {e}")

# [3.4.3] Distribution Plots (Exclude Broker IDs)
print(f"\n   [3.4.3] Plotting Distributions ({analysis_target}, sample of features)...")
features_for_dist_plot = [f for f in numeric_features_all if '券商代號' not in f] # Use original list from 3.1
num_features_to_plot_dist = min(len(features_for_dist_plot), 36)
if num_features_to_plot_dist > 0:
    np.random.seed(42); plot_dist_cols = np.random.choice(features_for_dist_plot, num_features_to_plot_dist, replace=False).tolist()
    print(f"    Plotting distributions for {num_features_to_plot_dist} random non-Broker-ID features...")
    try:
        plot_data_pd = df_analysis_pl.select(plot_dist_cols).to_pandas(); gc.collect()
        n_cols_plot = 6; n_rows_plot = (len(plot_dist_cols) + n_cols_plot - 1) // n_cols_plot
        fig, axes = plt.subplots(n_rows_plot, n_cols_plot, figsize=(n_cols_plot * 3.5, n_rows_plot * 3)); axes = axes.flatten()
        for i, col in enumerate(plot_dist_cols):
            # Use try-except for individual plot errors
            try:
                 # Check if axes[i] exists before plotting
                 if i < len(axes):
                      sns.histplot(plot_data_pd[col].dropna(), kde=True, bins=50, ax=axes[i], color='steelblue', line_kws={'lw': 1})
                      axes[i].set_title(f'{col}', fontsize=8); axes[i].tick_params(axis='both', labelsize=7)
                 else: print(f"      Warning: Exceeded number of axes for plot {i}, col {col}") # Should not happen with correct grid calc
            except Exception as plot_err:
                print(f"      Error plotting dist for {col}: {plot_err}")
                if i < len(axes): axes[i].set_title(f'{col}\n(Plot Error)', fontsize=8)

        for j in range(i + 1, len(axes)): # Remove any remaining empty axes
             if j < len(axes): fig.delaxes(axes[j])

        fig.suptitle(f'Sampled Numerical Distributions ({analysis_target}, Non-Broker ID)', fontsize=14); plt.tight_layout(rect=[0, 0.03, 1, 0.97])
        dist_plot_path = os.path.join(OUTPUT_DIR, f'numerical_distributions_{analysis_target.lower()}.png'); plt.savefig(dist_plot_path, dpi=150); plt.close(); del plot_data_pd; gc.collect()
        print(f"    Sample distribution plots saved: {dist_plot_path}")
    except Exception as e: print(f"    Error during distribution plotting: {e}"); plt.close()
else: print("    No non-Broker-ID numerical features available to plot distributions.")

# [3.4.4] Correlation Analysis & Filtering
# [3.4.4] Correlation Analysis & Filtering
print(f"\n   [3.4.4] Correlation Analysis & Filtering (Pearson, {analysis_target})...")
# --- NEW: Define a threshold for minimum relevance to target for pairwise calculation ---
MIN_TARGET_CORR_FOR_PAIRWISE = 0.01 # Features below this abs corr with target won't be in N'xN' matrix
print(f"      Will compute full pairwise correlations only for features with abs(Target Correlation) >= {MIN_TARGET_CORR_FOR_PAIRWISE}")
# --- END NEW ---

high_corr_pairs_to_drop = [] # Initialize list of features to drop due to high correlation (based on reduced analysis)
target_corr_pd = None # Initialize dataframe for target correlations
correlation_matrix_reduced_pd = None # Initialize reduced pairwise correlation matrix (Pandas)

# Check if there are enough features initially targeted for scaling to proceed
if len(numeric_features_initially_for_scaling) < 2:
    print("    Skipping correlation analysis: Less than 2 features initially targeted for scaling.")
else:
    # --- STEP 1: Calculate Feature-Target Correlations (Memory Efficient) ---
    target_in_analysis_df = TARGET_COLUMN in df_analysis_pl.columns
    features_to_check_target_corr = numeric_features_initially_for_scaling[:] # Start with all candidates
    target_corr_results = {} # Store {feature: corr_value}

    if target_in_analysis_df:
        print(f"      Calculating Feature-Target correlations for up to {len(features_to_check_target_corr)} features...")
        target_corr_start_time = time.time()
        # Ensure target column is present and usable in the analysis dataframe
        if TARGET_COLUMN in df_analysis_pl.columns:
            target_col_expr = pl.col(TARGET_COLUMN)
            corr_expressions = []
            valid_features_for_target_corr = []

            # Create correlation expressions, checking if feature exists in the analysis df
            for feature in features_to_check_target_corr:
                 if feature in df_analysis_pl.columns:
                      corr_expressions.append(pl.corr(target_col_expr, pl.col(feature)).alias(feature))
                      valid_features_for_target_corr.append(feature)
                 # else: Feature not in analysis_df, skip silently or log warning if needed

            if corr_expressions:
                print(f"      Attempting batch calculation of {len(corr_expressions)} target correlations...")
                try:
                    # Calculate all target correlations at once using Polars LazyFrame aggregation
                    target_corr_values_df = df_analysis_pl.lazy().select(corr_expressions).collect()
                    if target_corr_values_df.height > 0:
                         target_corr_results = target_corr_values_df.to_dicts()[0]
                    del target_corr_values_df
                    print("      Batch target correlation calculation successful.")
                except Exception as e:
                     print(f"      Error during batch target correlations: {type(e).__name__} - {e}.")
                     # Fallback: Calculate one by one (slower, but less likely to OOM than full matrix)
                     print("      Falling back to calculating target correlations individually...")
                     target_corr_results = {} # Reset results dict for fallback
                     for idx, feature in enumerate(valid_features_for_target_corr):
                         # Optional progress print for individual calculation
                         # if (idx + 1) % 500 == 0: print(f"        Target corr {idx+1}/{len(valid_features_for_target_corr)}")
                         try:
                              # Need to drop nulls for individual corr to work correctly
                              # Perform calculation lazily and collect immediately
                              corr_val = df_analysis_pl.lazy().select([
                                  target_col_expr, pl.col(feature)
                              ]).drop_nulls().select(
                                  pl.corr(target_col_expr, pl.col(feature))
                              ).collect().item() # Collect the single value
                              target_corr_results[feature] = corr_val
                         except Exception as ind_err:
                              print(f"        Error calculating target corr for {feature}: {ind_err}")
                              target_corr_results[feature] = None # Mark as failed

            else:
                print("      No valid features found in analysis dataframe for target correlation.")

            gc.collect()
            print(f"      Feature-Target correlation calculation duration: {time.time() - target_corr_start_time:.2f}s.")

            # Create the target_corr_pd DataFrame for reporting and filtering
            if target_corr_results:
                 # Filter out None values that might result from errors
                 target_corr_list = [{'Feature': k, 'Pearson_Corr': v} for k, v in target_corr_results.items() if v is not None and not np.isnan(v)]
                 if target_corr_list:
                      target_corr_pd = pd.DataFrame(target_corr_list)
                      target_corr_pd['Abs_Pearson_Corr'] = target_corr_pd['Pearson_Corr'].abs()
                      target_corr_pd = target_corr_pd.sort_values('Abs_Pearson_Corr', ascending=False).set_index('Feature')

                      print(f"\n    Top Features by Abs Pearson Corr with '{TARGET_COLUMN}':"); print(target_corr_pd.head(TOP_N_FEATURES_PLOT))
                      target_corr_path = os.path.join(OUTPUT_DIR, f'target_pearson_correlations_{analysis_target.lower()}.csv')
                      try:
                          target_corr_pd.to_csv(target_corr_path); print(f"    Target Pearson correlations saved: {target_corr_path}")
                      except Exception as save_err:
                          print(f"    Error saving target correlations CSV: {save_err}")

                      # Plotting target corr (using Seaborn, should use configured fonts)
                      top_n_corr_plot = min(TOP_N_FEATURES_PLOT, len(target_corr_pd))
                      if top_n_corr_plot > 0:
                           # Sort by actual correlation value for plotting direction
                           top_corr_features_plot = target_corr_pd.head(top_n_corr_plot).sort_values('Pearson_Corr')
                           try:
                                plt.figure(figsize=(10, max(8, len(top_corr_features_plot)*0.2)))
                                sns.barplot(x=top_corr_features_plot['Pearson_Corr'], y=top_corr_features_plot.index, palette='coolwarm_r', orient='h')
                                plt.title(f'Top {len(top_corr_features_plot)} Features vs Target ({TARGET_COLUMN}) by Pearson Correlation ({analysis_target})')
                                plt.xlabel("Pearson Correlation") # Add x label
                                plt.ylabel("Feature") # Add y label
                                plt.tight_layout()
                                target_corr_plot_path = os.path.join(OUTPUT_DIR, f'target_pearson_correlation_top_{analysis_target.lower()}.png');
                                plt.savefig(target_corr_plot_path);
                                plt.close(); # Close plot figure
                                print(f"    Target corr plot saved: {target_corr_plot_path}")
                                del top_corr_features_plot; gc.collect() # Clean up plotting df
                           except Exception as plot_err:
                                print(f"    Error plotting target corrs: {plot_err}"); plt.close() # Ensure plot is closed on error
                 else:
                      print("    No valid target correlation results obtained after filtering Nones/NaNs.")
            else:
                 print("    Target correlation calculation yielded no results dictionary.")
        else:
            print(f"    Target column '{TARGET_COLUMN}' not present in analysis dataframe columns, skipping target correlations.")
    else:
        print("    Target column not present in analysis dataframe, skipping target correlations.")


    # --- STEP 2: Identify Features for Reduced Pairwise Correlation ---
    features_for_pairwise_corr = []
    if target_corr_pd is not None:
        # Select features whose absolute correlation with target meets the threshold
        features_for_pairwise_corr = target_corr_pd[target_corr_pd['Abs_Pearson_Corr'] >= MIN_TARGET_CORR_FOR_PAIRWISE].index.tolist()
        print(f"\n      Identified {len(features_for_pairwise_corr)} features meeting target correlation threshold (>= {MIN_TARGET_CORR_FOR_PAIRWISE}) for pairwise analysis.")
        if not features_for_pairwise_corr:
             print("      Warning: No features met the minimum target correlation threshold. Pairwise analysis will be skipped.")
    else:
        # If target correlation failed or target was missing, skip the pairwise calculation.
        # Avoids attempting the full matrix calculation which would likely crash.
        print(f"\n      Target correlation results unavailable. Skipping pairwise feature correlation matrix calculation.")


    # --- STEP 3: Calculate Reduced Pairwise Correlation Matrix (N' x N') ---
    if features_for_pairwise_corr and len(features_for_pairwise_corr) > 1:
        print(f"      Calculating pairwise Pearson correlation matrix for {len(features_for_pairwise_corr)} features...")
        pairwise_corr_start_time = time.time()
        try:
            # Calculate N' x N' matrix using the reduced feature list
            # Use Polars lazy select + corr
            correlation_matrix_reduced_pl = df_analysis_pl.lazy().select(features_for_pairwise_corr).corr().collect() # Use default Pearson

            print(f"      Reduced pairwise corr matrix duration: {time.time() - pairwise_corr_start_time:.2f}s."); gc.collect()

            # Convert reduced matrix to Pandas
            if correlation_matrix_reduced_pl is not None and correlation_matrix_reduced_pl.width > 0 and correlation_matrix_reduced_pl.height > 0:
                # Polars .corr() result has the feature names as the first column
                index_col_name_reduced = correlation_matrix_reduced_pl.columns[0]
                correlation_matrix_reduced_pd = correlation_matrix_reduced_pl.to_pandas().set_index(index_col_name_reduced)
            else:
                print("      Warning: Reduced Polars correlation matrix was empty or None after calculation.")
                correlation_matrix_reduced_pd = None # Set to None if matrix is empty
            del correlation_matrix_reduced_pl; gc.collect() # Clean up polars matrix

        except Exception as e:
            print(f"\n      Error during reduced pairwise Pearson correlation analysis: {type(e).__name__} - {e}")
            import traceback
            traceback.print_exc()
            correlation_matrix_reduced_pd = None # Ensure it's None on error
    elif features_for_pairwise_corr and len(features_for_pairwise_corr) <= 1:
         print(f"      Skipping pairwise correlation: Only {len(features_for_pairwise_corr)} feature(s) selected after target correlation filtering.")
    # else: Already handled case where features_for_pairwise_corr is empty


    # --- STEP 4: High Feature Correlation Filtering (using REDUCED matrix) ---
    if FILTER_HIGH_CORR and correlation_matrix_reduced_pd is not None:
        # Note: We are now filtering based on the reduced matrix
        print(f"\n    Filtering highly correlated feature pairs (|Pearson corr| > {HIGH_CORR_THRESHOLD}) using the reduced matrix ({correlation_matrix_reduced_pd.shape[0]} features)...")
        try:
             # Ensure numeric for comparison, handle potential errors during conversion
             correlation_matrix_reduced_pd_numeric = correlation_matrix_reduced_pd.apply(pd.to_numeric, errors='coerce')

             # Find pairs with high correlation in the upper triangle
             upper_tri = correlation_matrix_reduced_pd_numeric.where(np.triu(np.ones(correlation_matrix_reduced_pd_numeric.shape, dtype=bool), k=1))
             # Stack to get pairs, drop NaNs from upper triangle + NaNs from coerce
             high_corr_pairs_df = upper_tri.stack().dropna().reset_index()
             # Rename columns appropriately (level_0 and level_1 are the default stack index names)
             high_corr_pairs_df.columns = ['Feature 1', 'Feature 2', 'Correlation']

             # Filter by the threshold
             high_corr_pairs_df = high_corr_pairs_df[high_corr_pairs_df['Correlation'].abs() > HIGH_CORR_THRESHOLD].copy()

             if not high_corr_pairs_df.empty:
                  high_corr_pairs_df['Abs Correlation'] = high_corr_pairs_df['Correlation'].abs()
                  high_corr_pairs_df = high_corr_pairs_df.sort_values(by='Abs Correlation', ascending=False)

                  print(f"    Found {len(high_corr_pairs_df)} pairs with |correlation| > {HIGH_CORR_THRESHOLD} among features with target corr >= {MIN_TARGET_CORR_FOR_PAIRWISE}.")
                  # Save these pairs (optional, path indicates it's from reduced set)
                  high_corr_path_reduced = os.path.join(OUTPUT_DIR, f'high_correlation_pairs_reduced_{analysis_target.lower()}.csv')
                  try:
                      high_corr_pairs_df.to_csv(high_corr_path_reduced, index=False); print(f"    Highly correlated pairs (reduced set) saved: {high_corr_path_reduced}")
                  except Exception as save_err:
                       print(f"    Error saving reduced high correlation pairs CSV: {save_err}")

                  # --- Simple strategy to decide which feature to drop (applied to reduced set) ---
                  # Drop features that appear only in 'Feature 2' column within these high-corr pairs
                  features_to_consider_dropping = set(high_corr_pairs_df['Feature 2'].unique())
                  features_in_feature1 = set(high_corr_pairs_df['Feature 1'].unique())
                  high_corr_pairs_to_drop = list(features_to_consider_dropping - features_in_feature1) # This list is used in Sec 3.6
                  print(f"    Identified {len(high_corr_pairs_to_drop)} features to potentially drop due to high correlation (based on reduced set analysis).")
             else:
                  print(f"    No feature pairs found with |correlation| > {HIGH_CORR_THRESHOLD} in the reduced set.")
             # Cleanup intermediate objects
             del upper_tri, high_corr_pairs_df, correlation_matrix_reduced_pd_numeric; gc.collect()
        except Exception as filter_err:
             print(f"    Error finding/filtering high correlation pairs from reduced matrix: {filter_err}")

        # Heatmap plotting (optional, using REDUCED matrix if small enough)
        num_heatmap_features = len(correlation_matrix_reduced_pd.columns)
        if 0 < num_heatmap_features <= HEATMAP_THRESHOLD:
             print(f"\n    Plotting feature correlation heatmap ({num_heatmap_features} features from reduced set)...")
             try:
                figsize_heat = (max(12, num_heatmap_features * 0.25), max(10, num_heatmap_features * 0.2))
                fontsize_heat = max(5, 10 - num_heatmap_features // 10) # Adjust font size based on number of features
                plt.figure(figsize=figsize_heat)
                sns.heatmap(correlation_matrix_reduced_pd, cmap='coolwarm', center=0, annot=False, fmt=".1f", linewidths=.1)
                plt.title(f'Reduced Feature Correlation Matrix (Target Corr >= {MIN_TARGET_CORR_FOR_PAIRWISE}, N={num_heatmap_features})', fontsize=14)
                plt.xticks(rotation=90, fontsize=fontsize_heat);
                plt.yticks(rotation=0, fontsize=fontsize_heat);
                plt.tight_layout()
                heatmap_path_reduced = os.path.join(OUTPUT_DIR, f'feature_correlation_heatmap_reduced_{analysis_target.lower()}.png');
                plt.savefig(heatmap_path_reduced, dpi=100);
                plt.close(); # Close plot figure
                print(f"    Reduced heatmap saved: {heatmap_path_reduced}")
             except Exception as plot_err:
                 print(f"    Error plotting reduced heatmap: {plot_err}"); plt.close() # Ensure plot is closed on error
        elif num_heatmap_features > 0:
             print(f"\n    Skipping reduced heatmap plot (Features: {num_heatmap_features} > {HEATMAP_THRESHOLD}).")

    elif FILTER_HIGH_CORR:
         print("\n    Skipping high correlation filtering (Reduced pairwise correlation matrix not calculated or empty).")


    # Cleanup reduced matrix from memory if it exists
    if 'correlation_matrix_reduced_pd' in locals() and correlation_matrix_reduced_pd is not None:
        del correlation_matrix_reduced_pd; gc.collect()

# End of the primary 'else' block for correlation analysis

# Note: high_corr_pairs_to_drop now contains features identified from the reduced analysis (or is empty)
# This list will be used later in section 3.6 for the final filtering

# [3.4.5] Boxplots/Violin Plots (Keep as before, uses non-Broker IDs)
print(f"\n   [3.4.5] Plotting Boxplots and Violin Plots ({analysis_target}, sample of features)...")
if 'plot_dist_cols' in locals() and plot_dist_cols: # Check if plot_dist_cols was populated
    print(f"    Plotting boxplots/violin plots for {len(plot_dist_cols)} random non-Broker-ID features...")
    try:
        plot_data_pd_outlier = df_analysis_pl.select(plot_dist_cols).to_pandas(); gc.collect()
        # Boxplot
        plt.figure(figsize=(max(16, len(plot_dist_cols) * 0.5), 7)); sns.boxplot(data=plot_data_pd_outlier, orient='v', palette='viridis', fliersize=1.5, linewidth=0.8); plt.title(f'Boxplots for Sampled Numerical Features ({analysis_target}, Non-Broker ID)', fontsize=12); plt.ylabel('Value'); plt.xticks(rotation=90, fontsize=8); plt.tight_layout(); boxplot_path = os.path.join(OUTPUT_DIR, f'numerical_boxplots_sample_{analysis_target.lower()}.png'); plt.savefig(boxplot_path, dpi=120); print(f"    Sample boxplots saved to: {boxplot_path}"); plt.close()
        # Violin Plot
        plt.figure(figsize=(max(16, len(plot_dist_cols) * 0.6), 8)); sns.violinplot(data=plot_data_pd_outlier, orient='v', palette='viridis', inner='quartile', cut=0, linewidth=0.8, scale='width'); plt.title(f'Violin Plots for Sampled Numerical Features ({analysis_target}, Non-Broker ID)', fontsize=12); plt.ylabel('Value'); plt.xticks(rotation=90, fontsize=8); plt.tight_layout(); violin_path = os.path.join(OUTPUT_DIR, f'numerical_violinplots_sample_{analysis_target.lower()}.png'); plt.savefig(violin_path, dpi=120); print(f"    Sample violin plots saved to: {violin_path}"); plt.close()
        del plot_data_pd_outlier; gc.collect()
    except Exception as e: print(f"    Error during outlier plotting: {e}"); plt.close()
else: print("    Skipping Box/Violin plots (no features selected for distribution plots).")

# [3.4.6] Scatter Plots (Keep as before)
print(f"\n   [3.4.6] Scatter Plots for Top Pearson Correlated Features vs Target ({analysis_target})...")
max_scatter_plots = 16
if target_in_analysis_df and target_corr_pd is not None and not target_corr_pd.empty:
    top_scatter_features = target_corr_pd.head(min(max_scatter_plots, len(target_corr_pd))).index.tolist()
    print(f"    Plotting scatter plots for top {len(top_scatter_features)} features vs target...")
    try:
        # Ensure scatter columns exist in the analysis df
        scatter_cols_pl_present = [c for c in top_scatter_features if c in df_analysis_pl.columns]
        if TARGET_COLUMN in df_analysis_pl.columns: scatter_cols_pl_present.append(TARGET_COLUMN)

        if len(scatter_cols_pl_present) > 1: # Need at least one feature + target
             scatter_data_pd = df_analysis_pl.select(scatter_cols_pl_present).to_pandas(); gc.collect()
             n_cols_scatter = 4; n_rows_scatter = (len(top_scatter_features) + n_cols_scatter - 1) // n_cols_scatter # Use original count for grid size
             fig, axes = plt.subplots(n_rows_scatter, n_cols_scatter, figsize=(n_cols_scatter * 4, n_rows_scatter * 3.5), sharey=True); axes = axes.flatten()
             target_jitter = scatter_data_pd[TARGET_COLUMN] + np.random.normal(0, 0.03, size=len(scatter_data_pd)) # Add jitter for binary target
             plot_idx = 0
             for col in top_scatter_features: # Iterate original list
                 if col in scatter_data_pd.columns: # Check if column was actually loaded
                      try:
                          # Use .get() on pandas Series for safer access if index missing
                          corr_val = target_corr_pd['Pearson_Corr'].get(col, np.nan)
                          ax = axes[plot_idx]
                          sns.scatterplot(x=scatter_data_pd[col], y=target_jitter, hue=scatter_data_pd[TARGET_COLUMN], palette={0: 'blue', 1: 'red'}, alpha=0.2, s=8, ax=ax, legend=False, edgecolor='none')
                          ax.set_title(f'{col}\n(Pearson Corr: {corr_val:.3f})', fontsize=9); ax.set_xlabel(col, fontsize=8); ax.set_ylabel(TARGET_COLUMN + ' (jittered)' if plot_idx % n_cols_scatter == 0 else '', fontsize=8); ax.tick_params(axis='both', which='major', labelsize=7)
                          plot_idx += 1
                      except Exception as plot_err:
                           print(f"      Error plotting scatter for {col}: {plot_err}")
                           # Still advance plot index even on error to avoid overwriting
                           if plot_idx < len(axes): axes[plot_idx].set_title(f'{col}\n(Plot Error)', fontsize=9)
                           plot_idx += 1

             for j in range(plot_idx, len(axes)): fig.delaxes(axes[j]) # Remove empty axes
             fig.suptitle(f'Scatter Plots: Top Pearson Correlated Features vs. Target ({analysis_target})', fontsize=14); plt.tight_layout(rect=[0, 0.03, 1, 0.97])
             scatter_plot_path = os.path.join(OUTPUT_DIR, f'scatter_top_pearson_vs_target_{analysis_target.lower()}.png'); plt.savefig(scatter_plot_path, dpi=150); print(f"    Scatter plots saved: {scatter_plot_path}"); plt.close(); del scatter_data_pd, target_jitter; gc.collect()
        else: print("    Skipping scatter plots: No valid feature columns found for plotting.")
    except Exception as e: print(f"    Error during scatter plotting: {e}"); plt.close()
elif not target_in_analysis_df: print("    Skipping scatter plots vs target (target column missing).")
else: print("    Skipping scatter plots (no Pearson correlation results available).")


# [3.4.7] Original IC Calculation & Filtering
print(f"\n   [3.4.7] Feature Importance Analysis & Filtering (Spearman Rank IC - Original Data, {analysis_target})...")
ic_df_pl_original = None # Initialize DataFrame to store results
low_ic_cols = []         # Initialize list to store features with low IC
target_present = TARGET_COLUMN in df_analysis_pl.columns # Check if target exists in the analysis dataframe

if target_present:
    # Calculate IC only for features initially intended for scaling that exist in analysis df
    features_for_ic_calc = [f for f in numeric_features_initially_for_scaling if f in df_analysis_pl.columns]
    if not features_for_ic_calc:
         print("    No features available in analysis dataframe to calculate IC.")
    else:
        print(f"    Calculating Spearman IC for {len(features_for_ic_calc)} features initially targeted for scaling...")
        ic_results_original = [] # List to store dictionary results for each feature
        start_ic_time = time.time()
        target_col_expr = pl.col(TARGET_COLUMN) # Pre-define target column expression

        # Iterate through features to calculate IC
        for i, feature in enumerate(features_for_ic_calc):
            # Optional progress print:
            # if (i + 1) % 500 == 0: print(f"      IC Orig Feat {i+1}/{len(features_for_ic_calc)} ({time.time()-start_ic_time:.1f}s)")

            spearman_corr = np.nan # Default to NaN for errors or insufficient data
            try:
                # Select only the two columns needed (target and feature)
                # Drop rows where *either* column is null to ensure valid pairs for correlation
                corr_df = df_analysis_pl.select([target_col_expr, pl.col(feature)]).drop_nulls()

                # Check if enough valid pairs remain after dropping nulls (need at least 2)
                if corr_df.height >= 2:
                    # Calculate Spearman correlation using Polars
                    result = corr_df.select(pl.corr(TARGET_COLUMN, feature, method="spearman")).item()
                    # Handle potential null result from Polars correlation function
                    spearman_corr = result if result is not None else np.nan
                # No 'else' needed, spearman_corr remains NaN if corr_df.height < 2

                del corr_df # Clean up the intermediate dataframe immediately to save memory
            except pl.exceptions.InvalidOperationError:
                # This can happen if a column becomes constant after dropping nulls
                spearman_corr = np.nan
            except Exception as e: # Catch any other unexpected errors during calculation
                print(f"      ERROR calculating IC for {feature}: {e}"); spearman_corr = np.nan

            # Append the result (or NaN) to the list
            ic_results_original.append({'Feature': feature, 'IC_Original': spearman_corr})

        gc.collect() # Force garbage collection after loop
        print(f"    Finished Original IC calculations. Duration: {time.time() - start_ic_time:.2f} seconds.")

        # Process the collected IC results
        if ic_results_original:
            # Create a Polars DataFrame from the results
            ic_df_pl_original = pl.from_dicts(ic_results_original)
            # Filter out features where IC calculation resulted in NaN
            ic_df_pl_original = ic_df_pl_original.filter(pl.col('IC_Original').is_not_nan())

            if ic_df_pl_original.height > 0:
                 # Calculate absolute IC and sort by it (descending)
                 ic_df_pl_original = ic_df_pl_original.with_columns(
                     pl.col('IC_Original').abs().alias('Abs_IC_Original')
                 ).sort('Abs_IC_Original', descending=True)

                 print("\n    Top Features by Absolute Original IC (Spearman):"); print(ic_df_pl_original.head(TOP_N_FEATURES_PLOT))

                 # Save the full ranking to a CSV file
                 ic_rank_path = os.path.join(OUTPUT_DIR, f'feature_ic_original_ranking_{analysis_target.lower()}.csv')
                 ic_df_pl_original.write_csv(ic_rank_path); print(f"    Original IC ranking saved: {ic_rank_path}")

                 # Filter features based on the IC threshold for later removal
                 low_ic_cols = ic_df_pl_original.filter(pl.col('Abs_IC_Original') < FILTER_IC_THRESHOLD)['Feature'].to_list()
                 print(f"\nFound {len(low_ic_cols)} features with Abs(IC) < {FILTER_IC_THRESHOLD} (to be filtered later).")

                 # --- Plotting top original IC features using Seaborn ---
                 top_n_ic_plot = min(TOP_N_FEATURES_PLOT, ic_df_pl_original.height)
                 if top_n_ic_plot > 0:
                     # Sort by IC_Original (ascending for plot) using Polars .sort() FIRST
                     ic_plot_data_pd = ic_df_pl_original.head(top_n_ic_plot).sort('IC_Original').to_pandas()
                     # Now ic_plot_data_pd is a Pandas DataFrame

                     try:
                          plt.figure(figsize=(10, max(8, len(ic_plot_data_pd)*0.2)))
                          # Use Seaborn for plotting (should use configured fonts)
                          sns.barplot(x='IC_Original', y='Feature', data=ic_plot_data_pd, palette='viridis', orient='h')
                          plt.title(f'Top {len(ic_plot_data_pd)} Features by Original IC (Spearman) ({analysis_target})')
                          plt.xlabel("Spearman IC (Original)") # Add x-axis label
                          plt.ylabel("Feature") # Add y-axis label
                          plt.tight_layout() # Adjust layout
                          ic_plot_path = os.path.join(OUTPUT_DIR, f'feature_ic_original_top_{analysis_target.lower()}.png');
                          plt.savefig(ic_plot_path); # Save the plot
                          plt.close(); # Close the plot figure to free memory
                          print(f"    Top Original IC plot saved: {ic_plot_path}")
                          del ic_plot_data_pd; gc.collect() # Clean up pandas dataframe
                     except Exception as e:
                          print(f"    Error plotting original IC: {e}"); plt.close() # Ensure plot is closed on error
                 else:
                     print("    No valid Original IC results to plot.") # If top_n_ic_plot is 0
            else:
                print("    No valid Original IC results after filtering NaNs.") # If filtering NaNs removed all rows
        else:
            print("    No Original IC results were calculated.") # If the initial loop didn't produce results
else:
    print("    Skipping Original IC calculation & filtering (target column missing from analysis dataframe).")


# [3.4.8] Pair Plot (Top Original IC Features, excluding Broker IDs)
print(f"\n   [3.4.8] Pair Plot for Top Features by Original IC ({analysis_target})...")

# Check if target was present during IC calc AND original IC dataframe exists and is not empty
if target_present and 'ic_df_pl_original' in locals() and ic_df_pl_original is not None and ic_df_pl_original.height > 0:
     try:
          # Filter out Broker IDs for a potentially cleaner and more relevant pairplot
          # Uses the original IC dataframe calculated in the previous step
          ic_df_pl_filtered = ic_df_pl_original.filter(~pl.col('Feature').str.contains('券商代號'))

          # Determine the number of features to plot (up to PAIRPLOT_N_FEATURES)
          num_pairplot_features = min(PAIRPLOT_N_FEATURES, ic_df_pl_filtered.height)

          # Check if we have at least 2 features left after filtering Broker IDs
          if num_pairplot_features >= 2:
               # Get the names of the top N non-Broker features based on original IC
               top_pairplot_features = ic_df_pl_filtered.head(num_pairplot_features)['Feature'].to_list()

               # Ensure these features actually exist in the analysis dataframe (df_analysis_pl)
               pairplot_features_present = [f for f in top_pairplot_features if f in df_analysis_pl.columns]

               # Also need the target column to be present for coloring the plot
               if TARGET_COLUMN in df_analysis_pl.columns:
                   pairplot_features_present.append(TARGET_COLUMN)

                   # Check if we still have at least 2 features + the target column
                   if len(pairplot_features_present) >= 3:
                        print(f"    Generating pair plot for top {len(pairplot_features_present)-1} non-Broker features by Original IC: {pairplot_features_present[:-1]}")
                        pairplot_cols = pairplot_features_present # List of columns to select

                        # Define sample size for the pairplot (it's computationally expensive)
                        pairplot_sample_size = min(10000, df_analysis_pl.height)
                        print(f"    Using sample of {pairplot_sample_size} rows for pairplot generation.")

                        # Select the required columns and sample the data using Polars
                        pairplot_data_pl = df_analysis_pl.select(pairplot_cols).sample(n=pairplot_sample_size, seed=43, shuffle=True)

                        # Convert the sampled Polars DataFrame to Pandas for Seaborn
                        pairplot_data_pd = pairplot_data_pl.to_pandas()
                        del pairplot_data_pl; gc.collect() # Clean up Polars sample

                        # Configure Seaborn pairplot appearance
                        pairplot_kws = {'alpha': 0.4, 's': 10, 'edgecolor': 'none'} # Scatterplot arguments
                        diag_kws = {'fill': True} # Arguments for diagonal plots (KDE)

                        # Generate the pairplot - Seaborn will use Matplotlib's rcParams for fonts
                        # Add error handling for the plot generation itself
                        try:
                            sns.pairplot(
                                pairplot_data_pd,
                                hue=TARGET_COLUMN,      # Color points by target variable
                                palette='viridis',      # Color scheme
                                corner=True,            # Only plot lower triangle (less redundancy)
                                plot_kws=pairplot_kws,  # Pass scatterplot arguments
                                diag_kind='kde',        # Use Kernel Density Estimate on diagonal
                                diag_kws=diag_kws       # Pass arguments to diagonal plots
                            )
                            # Add a title (positioned slightly higher)
                            plt.suptitle(f'Pair Plot of Top {len(pairplot_features_present)-1} Non-Broker Features by Original IC ({analysis_target}, N={pairplot_sample_size})', y=1.02)

                            # Save the plot
                            pairplot_path = os.path.join(OUTPUT_DIR, f'pairplot_top_original_ic_{analysis_target.lower()}.png');
                            plt.savefig(pairplot_path, dpi=120); # Use reasonable DPI
                            plt.close(); # Close the figure to free memory
                            print(f"    Pair plot saved: {pairplot_path}")
                        except Exception as pairplot_err:
                            print(f"      Error during sns.pairplot generation: {pairplot_err}"); plt.close()

                        del pairplot_data_pd; gc.collect() # Clean up Pandas sample dataframe
                   else:
                        print("    Skipping pair plot: Not enough valid features present in analysis dataframe after including target.")
               else:
                   print("    Skipping pair plot: Target column not found in analysis dataframe.")
          else:
              print("    Skipping pair plot: Not enough non-Broker features (>=2) with Original IC results after filtering.")
     except Exception as e:
          print(f"    Error generating pair plot: {e}"); plt.close() # Ensure plot is closed on error
else:
    print("    Skipping pair plot (Original IC results missing, target missing, or IC DataFrame empty).")

# --- Cleanup EDA analysis dataframe ---
if 'df_analysis_pl' in locals(): del df_analysis_pl; gc.collect(); print("\nCleaned up EDA analysis dataframe.")
# End of numerical feature analysis block


# --- 3.5 Categorical Feature Analysis ---
# This section analyzes only Utf8/Categorical columns (Broker IDs are now numeric)
print("\n[3.5] Categorical Feature Analysis (on Full Loaded Subset)")
if not categorical_features: print("No actual Categorical/String features identified to analyze.")
else:
    # Ensure df_train exists before proceeding
    if 'df_train' not in locals() or df_train is None:
        print("   Skipping categorical analysis: Training data not available.")
    else:
        print(f"Found {len(categorical_features)} Categorical/String features: {categorical_features[:20]}...")
        print("\n   Analyzing unique values and distributions:")
        cat_summary = []; low_cardinality_cats, high_cardinality_cats = [], []
        cat_analysis_pl = df_train # Use full training data
        for i, col in enumerate(categorical_features): # Iterate only over actual cat features
            if col not in cat_analysis_pl.columns: continue # Skip if column somehow missing
            try:
                unique_count = cat_analysis_pl.select(pl.col(col).n_unique()).item()
                # Use pl.len() - FIXED Deprecation
                top5_counts = cat_analysis_pl.group_by(col).agg(pl.len().alias("count")).sort("count", descending=True).limit(5).to_dicts()
                cat_summary.append({'Feature': col, 'Unique Count': unique_count, 'Top 5 Values': top5_counts})
                if unique_count == 1: print(f"    - '{col}': Only 1 unique value (Constant).")
                elif unique_count <= LOW_CARDINALITY_THRESHOLD: print(f"    - '{col}': {unique_count} unique values (Low Cardinality). Top 5:"); print(pl.from_dicts(top5_counts)); low_cardinality_cats.append(col)
                else: print(f"    - '{col}': {unique_count} unique values (High Cardinality). Top 5:"); print(pl.from_dicts(top5_counts)); high_cardinality_cats.append(col)
            except Exception as e: print(f"    Error analyzing cat {col}: {e}"); cat_summary.append({'Feature': col, 'Unique Count': 'Error', 'Top 5 Values': str(e)})
        # Save summary
        if cat_summary:
            try: cat_summary_df = pl.from_dicts(cat_summary); cat_summary_path = os.path.join(OUTPUT_DIR, 'categorical_summary_subset.csv'); cat_summary_df.write_csv(cat_summary_path); print(f"\n   Categorical summary saved: {cat_summary_path}"); del cat_summary_df; gc.collect()
            except Exception as e: print(f"Error saving categorical summary: {e}")

        # Plotting for low cardinality features using Seaborn (should use configured fonts)
        if low_cardinality_cats:
            print(f"\n   Plotting for {len(low_cardinality_cats)} low-cardinality features...")
            target_present_for_plots = TARGET_COLUMN in cat_analysis_pl.columns
            # Distribution Plots
            try:
                plot_data_list_dist = []
                for col in low_cardinality_cats:
                    # Use pl.len() - FIXED Deprecation
                    counts_pl = cat_analysis_pl.group_by(col).agg(pl.len().alias("count")).sort("count", descending=True).limit(LOW_CARDINALITY_THRESHOLD)
                    plot_data_list_dist.append(counts_pl.to_pandas().assign(Feature=col))
                    del counts_pl
                if plot_data_list_dist:
                     plot_data_pd_cat_dist = pd.concat(plot_data_list_dist, ignore_index=True); gc.collect()
                     n_cols_cat = 5; n_rows_cat = (len(low_cardinality_cats) + n_cols_cat - 1) // n_cols_cat
                     fig, axes = plt.subplots(n_rows_cat, n_cols_cat, figsize=(n_cols_cat * 3.5, n_rows_cat * 3.5)); axes = axes.flatten()
                     plot_idx = 0
                     for col in low_cardinality_cats: # Iterate original list
                         if plot_idx < len(axes): # Ensure we don't exceed axes bounds
                             ax = axes[plot_idx]; data_subset = plot_data_pd_cat_dist[plot_data_pd_cat_dist['Feature'] == col]
                             if not data_subset.empty: sns.barplot(data=data_subset, y=col, x='count', palette='viridis', ax=ax, orient='h'); ax.set_title(f'{col} (Top {len(data_subset)})', fontsize=9); ax.tick_params(axis='both', labelsize=7)
                             else: ax.set_title(f'No data for {col}', fontsize=9)
                             plot_idx += 1
                         else: break # Stop if we run out of axes
                     for j in range(plot_idx, len(axes)): fig.delaxes(axes[j]) # Remove remaining empty axes
                     fig.suptitle('Low-Cardinality Categorical Distributions', fontsize=14); plt.tight_layout(rect=[0, 0.03, 1, 0.97]); cat_dist_plot_path = os.path.join(OUTPUT_DIR, 'categorical_distributions_subset.png'); plt.savefig(cat_dist_plot_path, dpi=120); plt.close(); print(f"    Categorical distribution plots saved: {cat_dist_plot_path}"); del plot_data_pd_cat_dist, plot_data_list_dist; gc.collect()
            except Exception as e: print(f"    Error during categorical distribution plotting: {e}"); plt.close()
            # Target Rate Plots
            if target_present_for_plots:
                print(f"\n   Analyzing relationship with Target for {len(low_cardinality_cats)} low-cardinality features...")
                try:
                    plot_data_list_target = []
                    for col in low_cardinality_cats: target_rate_pl = cat_analysis_pl.group_by(col).agg((pl.col(TARGET_COLUMN).mean() * 100).round(2).alias("Target Rate %")).sort("Target Rate %", descending=True).limit(LOW_CARDINALITY_THRESHOLD); plot_data_list_target.append(target_rate_pl.to_pandas().assign(Feature=col)); del target_rate_pl
                    if plot_data_list_target:
                        plot_data_pd_cat_target = pd.concat(plot_data_list_target, ignore_index=True); gc.collect()
                        n_cols_cat = 5; n_rows_cat = (len(low_cardinality_cats) + n_cols_cat - 1) // n_cols_cat # Reuse grid size
                        fig, axes = plt.subplots(n_rows_cat, n_cols_cat, figsize=(n_cols_cat * 4, n_rows_cat * 3.5)); axes = axes.flatten()
                        plot_idx = 0
                        for col in low_cardinality_cats: # Iterate original list
                            if plot_idx < len(axes):
                                ax=axes[plot_idx]; data_subset = plot_data_pd_cat_target[plot_data_pd_cat_target['Feature'] == col]
                                if not data_subset.empty: sns.barplot(data=data_subset, y=col, x='Target Rate %', palette='viridis_r', ax=ax, orient='h'); ax.set_title(f'% {TARGET_COLUMN}=1 by {col} (Top {len(data_subset)})', fontsize=9); ax.tick_params(axis='both', labelsize=7)
                                else: ax.set_title(f'No target rate data for {col}', fontsize=9)
                                plot_idx += 1
                            else: break
                        for j in range(plot_idx, len(axes)): fig.delaxes(axes[j]) # Remove remaining empty axes
                        fig.suptitle(f'Categorical Features vs. Target Rate ({TARGET_COLUMN}=1 %)', fontsize=14); plt.tight_layout(rect=[0, 0.03, 1, 0.97]); cat_target_plot_path = os.path.join(OUTPUT_DIR, 'categorical_vs_target_subset.png'); plt.savefig(cat_target_plot_path, dpi=120); plt.close(); print(f"    Categorical vs Target plots saved: {cat_target_plot_path}"); del plot_data_pd_cat_target, plot_data_list_target; gc.collect()
                except Exception as e: print(f"    Error plotting categorical vs target: {e}"); plt.close()
            else: print("    Skipping categorical vs target analysis (target column not loaded).")
        # Report high cardinality features
        if high_cardinality_cats: print(f"\n   High-cardinality String/Categorical features ({len(high_cardinality_cats)}): {high_cardinality_cats[:50]}...")
# End of categorical feature analysis block


# --- 3.6 Compile Final Feature List for Scaling ---
print("\n[3.6] Compiling Final Feature List for Scaling")
features_to_drop = set()
# Add features based on filtering criteria IF the respective calculation was successful
if FILTER_MISSING_THRESHOLD < 1.0 and 'missing_summary_pl' in locals() and missing_summary_pl is not None:
    # Get features to drop from the missing summary calculated earlier
    high_missing_cols_final = missing_summary_pl.filter(pl.col("Missing Percentage") > FILTER_MISSING_THRESHOLD)['Feature'].to_list()
    features_to_drop.update(high_missing_cols_final)
    print(f"   Adding {len(high_missing_cols_final)} features from missing value filter.")
    del high_missing_cols_final # Clean up temporary list
else:
    print(f"   Skipping missing value filtering (Threshold={FILTER_MISSING_THRESHOLD} or summary missing).")

if FILTER_ZERO_VAR and 'zero_variance_cols' in locals():
    features_to_drop.update(zero_variance_cols)
    print(f"   Adding {len(zero_variance_cols)} features from zero variance filter.")
else:
    print(f"   Skipping zero variance filtering (FILTER_ZERO_VAR={FILTER_ZERO_VAR} or var calc failed).")

if FILTER_HIGH_CORR and 'high_corr_pairs_to_drop' in locals():
    features_to_drop.update(high_corr_pairs_to_drop)
    print(f"   Adding {len(high_corr_pairs_to_drop)} features from high correlation filter.")
else:
     print(f"   Skipping high correlation filtering (FILTER_HIGH_CORR={FILTER_HIGH_CORR} or corr calc failed).")

if FILTER_IC_THRESHOLD > 0 and target_present and 'low_ic_cols' in locals(): # Use low_ic_cols calculated in 3.4.7
    features_to_drop.update(low_ic_cols)
    print(f"   Adding {len(low_ic_cols)} features from low IC filter.")
else:
     print(f"   Skipping low IC filtering (Threshold={FILTER_IC_THRESHOLD}, TargetPresent={target_present}, or IC calc failed).")

# Ensure we only drop features that were initially targeted for scaling
features_to_drop = features_to_drop.intersection(set(numeric_features_initially_for_scaling))

# Final list of features TO SCALE
numeric_features_to_scale_final = sorted([
    f for f in numeric_features_initially_for_scaling if f not in features_to_drop
])

print(f"\n--- Feature Filtering Summary ---")
print(f"Initial features targeted for scaling: {len(numeric_features_initially_for_scaling)}")
# Recalculate counts based on intersection for accurate reporting
print(f"Dropped by Missing % Threshold (> {FILTER_MISSING_THRESHOLD*100:.1f}%): {len(set(missing_summary_pl.filter(pl.col('Missing Percentage') > FILTER_MISSING_THRESHOLD)['Feature'].to_list()).intersection(numeric_features_initially_for_scaling)) if 'missing_summary_pl' in locals() and missing_summary_pl is not None and FILTER_MISSING_THRESHOLD < 1.0 else 'N/A'}")
print(f"Dropped by Near-Zero Variance: {len(set(zero_variance_cols).intersection(numeric_features_initially_for_scaling)) if FILTER_ZERO_VAR and 'zero_variance_cols' in locals() else 'N/A'}")
print(f"Dropped by High Correlation (> {HIGH_CORR_THRESHOLD}): {len(set(high_corr_pairs_to_drop).intersection(numeric_features_initially_for_scaling)) if FILTER_HIGH_CORR and 'high_corr_pairs_to_drop' in locals() else 'N/A'}")
print(f"Dropped by Low Absolute IC (< {FILTER_IC_THRESHOLD}): {len(set(low_ic_cols).intersection(numeric_features_initially_for_scaling)) if FILTER_IC_THRESHOLD > 0 and target_present and 'low_ic_cols' in locals() else 'N/A'}")
print(f"Total unique features dropped: {len(features_to_drop)}")
print(f"--- Final features remaining for scaling: {len(numeric_features_to_scale_final)} ---")
if numeric_features_to_scale_final:
    print(f"Sample final features: {numeric_features_to_scale_final[:50]}{'...' if len(numeric_features_to_scale_final)>50 else ''}")

# Free up memory from filtering intermediates
if 'missing_summary_pl' in locals(): del missing_summary_pl
if 'zero_variance_cols' in locals(): del zero_variance_cols
if 'high_corr_pairs_to_drop' in locals(): del high_corr_pairs_to_drop
if 'low_ic_cols' in locals(): del low_ic_cols
del features_to_drop
gc.collect()


# ==============================================================================
# --- 4. Save *Original Optimized* Subset to Parquet ---
# ==============================================================================
# Save the data *before* scaling, containing ALL selected features
print("\n\n--- 4. Saving Original Optimized Subset to Parquet ---")
save_parquet_original = True
if save_parquet_original:
    print(f"Attempting to save original selected subset dataframes...")
    try:
        if 'df_train' in locals() and df_train is not None and df_train.height > 0:
            start_pq_train = time.time(); df_train.write_parquet(PARQUET_OUTPUT_TRAIN_ORIGINAL, compression='zstd')
            print(f"Original training subset saved: {PARQUET_OUTPUT_TRAIN_ORIGINAL} ({time.time()-start_pq_train:.2f}s)")
        else: print("Skipping original training data Parquet save (DataFrame is None or empty).")
        if 'df_test' in locals() and df_test is not None and df_test.height > 0 :
             start_pq_test = time.time(); df_test.write_parquet(PARQUET_OUTPUT_TEST_ORIGINAL, compression='zstd')
             print(f"Original test subset saved: {PARQUET_OUTPUT_TEST_ORIGINAL} ({time.time()-start_pq_test:.2f}s)")
        else: print("Skipping original test data Parquet save (DataFrame is None or empty).")
    except Exception as e: print(f"ERROR saving original Parquet: {e}")
else: print("Skipping original Parquet save step.")
gc.collect()
# --- Optionally delete original df_train/df_test now if memory is extremely tight ---
# print("Deleting original df_train/df_test from memory to conserve RAM before scaling...")
# if 'df_train' in locals(): del df_train
# if 'df_test' in locals(): del df_test
# gc.collect()
# --- If deleted, scaling MUST read from PARQUET_OUTPUT_TRAIN_ORIGINAL ---


# ==============================================================================
# --- 5. Pre-Scaling Summary ---
# ==============================================================================
print("\n\n--- 5. Pre-Scaling Summary ---")
print(f"- Initial Feature Selection: Selected {initial_selected_count} features.") # Use count from section 1
print(f"- Data Loading: Train {'OK' if ('df_train' in locals() and df_train is not None) or os.path.exists(PARQUET_OUTPUT_TRAIN_ORIGINAL) else 'FAIL'}, Test {'OK' if ('df_test' in locals() and df_test is not None) or os.path.exists(PARQUET_OUTPUT_TEST_ORIGINAL) else 'FAIL/SKIPPED'}")
print(f"- EDA Filtering Results: {len(numeric_features_to_scale_final)} features remain for scaling.")
print(f"- EDA outputs saved in: {OUTPUT_DIR}")
if os.path.exists(PARQUET_OUTPUT_TRAIN_ORIGINAL): print(f"- Original data saved to: {os.path.basename(PARQUET_OUTPUT_TRAIN_ORIGINAL)}, {os.path.basename(PARQUET_OUTPUT_TEST_ORIGINAL)}")
print(f"- Chosen Scaling Strategy: '{SCALING_STRATEGY}'")


# ==============================================================================
# --- 6. Apply Scaling (Optimized) & Save Scaled Data ---
# ==============================================================================
print("\n\n--- 6. Applying Feature Scaling ---") # Removed "Compare IC" from title

# Initialize scaled dataframes and scalers
scaler_pt = None
scaler_final = None
scaling_successful = False # Flag
scaler_stats = None # To store stats for pure Polars scaling

# --- Check if scaling should proceed ---
# Check if train source exists (either in memory or as file)
train_source_exists = ('df_train' in locals() and df_train is not None) or os.path.exists(PARQUET_OUTPUT_TRAIN_ORIGINAL)

if SCALING_STRATEGY == 'none':
    print("SCALING_STRATEGY set to 'none'. Skipping scaling.")
elif not train_source_exists:
     print("Skipping scaling: Training DataFrame missing and original Parquet file not found.")
elif not numeric_features_to_scale_final: # Check the FINAL list
    print("Skipping scaling: No features remain after filtering.")
# Removed target check here as it's not strictly needed just for scaling (only for IC)
# elif TARGET_COLUMN not in loaded_cols_train: # Check target from original load status
#      print(f"CRITICAL WARNING: Target column '{TARGET_COLUMN}' was not loaded. Cannot calculate IC. Skipping scaling.")
else:
    # --- Scaling Process ---
    print(f"Applying scaling strategy: '{SCALING_STRATEGY}'")
    print(f"Features to scale ({len(numeric_features_to_scale_final)}): {numeric_features_to_scale_final[:30]}...")
    start_scale_time = time.time()

    # --- Define file paths based on strategy ---
    scaled_train_path = PARQUET_OUTPUT_TRAIN_SCALED_TMPL.format(strategy=SCALING_STRATEGY)
    scaled_test_path = PARQUET_OUTPUT_TEST_SCALED_TMPL.format(strategy=SCALING_STRATEGY)
    final_scaler_path = SCALER_FINAL_PATH_TMPL.format(strategy=SCALING_STRATEGY)
    scaler_stats_path = SCALER_STATS_PATH_TMPL.format(strategy=SCALING_STRATEGY)

    # Determine the source for training data (memory or disk)
    train_source = df_train if 'df_train' in locals() and df_train is not None else PARQUET_OUTPUT_TRAIN_ORIGINAL
    print(f"   Using {'DataFrame in memory' if isinstance(train_source, pl.DataFrame) else 'Parquet file'} as training data source for scaling.")

    # --- Strategy: Pure Polars RobustScaler with Column Batching for Stats ---# --- Strategy: Pure Polars RobustScaler with Column Batching for Stats ---
    if SCALING_STRATEGY == 'robust':
        try:
            print("\n   [Pure Polars RobustScaler Workflow with Batched Stats Calc]")

            # --- Determine the source LazyFrame for stats calculation ---
            lazy_frame_for_stats = None
            if isinstance(train_source, pl.DataFrame):
                lazy_frame_for_stats = train_source.lazy()
                print("      Calculating stats from in-memory DataFrame.")
            elif isinstance(train_source, str) and train_source.endswith('.parquet'):
                try:
                    lazy_frame_for_stats = pl.scan_parquet(train_source)
                    print(f"      Calculating stats by scanning Parquet file: {train_source}")
                except Exception as e:
                    raise RuntimeError(f"Failed to scan Parquet source {train_source} for stats calculation: {e}") from e
            else:
                raise TypeError(f"train_source must be a Polars DataFrame or a path to a Parquet file for stats calc, got {type(train_source)}")

            # --- Step 1: Calculate Scaling Statistics (Median, IQR) in Batches ---
            print(f"      Calculating median and quantiles for scaling in batches of {COLUMN_BATCH_SIZE}...")
            stats_start_time = time.time()
            scaler_stats = {} # Store median and iqr per column {col_name: {'median': m, 'iqr': i}}
            skipped_scaling_cols = [] # Track columns skipped due to IQR issues or absence in source
            processed_cols_stats = set() # Track columns for which stats calculation was attempted

            num_batches = (len(numeric_features_to_scale_final) + COLUMN_BATCH_SIZE - 1) // COLUMN_BATCH_SIZE
            source_cols_for_stats = lazy_frame_for_stats.columns # Get columns once

            for i in range(num_batches):
                batch_start_idx = i * COLUMN_BATCH_SIZE
                batch_end_idx = min((i + 1) * COLUMN_BATCH_SIZE, len(numeric_features_to_scale_final))
                batch_cols_requested = numeric_features_to_scale_final[batch_start_idx:batch_end_idx]
                if not batch_cols_requested: continue # Skip if batch is empty

                # Filter to columns actually present in the source data
                batch_cols_present = [col for col in batch_cols_requested if col in source_cols_for_stats]
                processed_cols_stats.update(batch_cols_present) # Mark these as processed

                # Identify columns requested but not found in this batch
                cols_missing_in_batch = set(batch_cols_requested) - set(batch_cols_present)
                if cols_missing_in_batch:
                     print(f"      Info: Stats Batch {i+1}/{num_batches}, columns not found in source: {list(cols_missing_in_batch)[:5]}...") # Log missing
                     skipped_scaling_cols.extend(list(cols_missing_in_batch)) # Add to skipped list

                if not batch_cols_present:
                    print(f"      Skipping Stats Batch {i+1}/{num_batches}: No requested columns found in source.")
                    continue

                print(f"      Processing Stats Batch {i+1}/{num_batches} ({len(batch_cols_present)} columns present in source)...")
                batch_agg_exprs = []
                # Create aggregation expressions ONLY for columns present in the source
                for col in batch_cols_present:
                    batch_agg_exprs.append(pl.median(col).alias(f"{col}_median"))
                    batch_agg_exprs.append(pl.quantile(col, 0.25).alias(f"{col}_q25"))
                    batch_agg_exprs.append(pl.quantile(col, 0.75).alias(f"{col}_q75"))

                # Check if there are any expressions to compute for this batch
                if not batch_agg_exprs:
                    print(f"      Skipping Stats Batch {i+1}/{num_batches}: No valid aggregation expressions generated (unexpected).")
                    continue

                # Compute statistics for the current batch using the prepared lazy frame
                try:
                    # --- CORRECTED AGGREGATION CALL ---
                    # Apply aggregation expressions directly to the lazy frame.
                    batch_stats_result_df = lazy_frame_for_stats.agg(batch_agg_exprs).collect()
                    # --- END CORRECTED AGGREGATION CALL ---

                    # Process the results for this batch
                    if batch_stats_result_df is not None and batch_stats_result_df.height > 0:
                        batch_results_dict = batch_stats_result_df.to_dicts()[0]
                        for col in batch_cols_present: # Iterate columns processed in *this* batch
                             median = batch_results_dict.get(f"{col}_median")
                             q25 = batch_results_dict.get(f"{col}_q25")
                             q75 = batch_results_dict.get(f"{col}_q75")

                             if median is not None and q25 is not None and q75 is not None:
                                  iqr = q75 - q25
                                  # Check for None IQR and use a small epsilon for float comparison to zero
                                  if iqr is not None and not np.isclose(iqr, 0, atol=1e-9):
                                       scaler_stats[col] = {'median': median, 'iqr': iqr}
                                  else:
                                       print(f"      Warning: IQR for '{col}' is zero, near-zero ({iqr}), or None. Skipping scaling.")
                                       skipped_scaling_cols.append(col)
                             else:
                                  print(f"      Warning: Could not compute all stats (median/q25/q75) for '{col}'. Skipping scaling.")
                                  skipped_scaling_cols.append(col)
                        del batch_results_dict # Cleanup dict
                    else:
                         print(f"      Warning: Stats Batch {i+1} resulted in empty or None DataFrame. Columns in batch ({batch_cols_present[:5]}...) will be skipped.")
                         skipped_scaling_cols.extend(batch_cols_present) # Skip all cols in this failed batch

                    del batch_stats_result_df; gc.collect() # Cleanup result df

                except Exception as batch_err:
                    # Provide more context in the error message
                    print(f"      ERROR processing stats batch {i+1} ({len(batch_cols_present)} cols like '{batch_cols_present[0]}'): {type(batch_err).__name__} - {batch_err}.")
                    skipped_scaling_cols.extend(batch_cols_present) # Skip all cols in this failed batch
                    # import traceback # Uncomment for deep debug
                    # traceback.print_exc() # Uncomment for deep debug

            print(f"      Statistics calculation duration: {time.time() - stats_start_time:.2f}s")

            # Final check for any requested columns that were never processed (e.g., not in source at all)
            unprocessed_cols = set(numeric_features_to_scale_final) - processed_cols_stats
            if unprocessed_cols:
                print(f"      Info: {len(unprocessed_cols)} initially requested features were not found in the source data at all and will be skipped.")
                skipped_scaling_cols.extend(list(unprocessed_cols))

            # Remove duplicates from skipped list
            skipped_scaling_cols = sorted(list(set(skipped_scaling_cols)))
            print(f"      Total unique columns where scaling will be skipped: {len(skipped_scaling_cols)}")
            # print(f"      Sample skipped columns: {skipped_scaling_cols[:50]}") # Optional: Print sample skipped

            # Optional: Save computed valid scaler stats
            if SAVE_SCALER_OBJECTS and scaler_stats:
                try:
                     stats_to_save = [{'feature': k, 'median': v['median'], 'iqr': v['iqr']} for k,v in scaler_stats.items()]
                     if stats_to_save: # Only write if there's something to save
                         pl.from_dicts(stats_to_save).write_parquet(scaler_stats_path)
                         print(f"      Scaler statistics saved: {scaler_stats_path}")
                     else:
                         print("      No valid scaler statistics were computed to save.")
                except Exception as save_err: print(f"      Error saving scaler stats: {save_err}")

            # --- Step 2: Apply Scaling using Polars Expressions and Sink to Parquet (Train Data) ---
            print("\n      Applying scaling and writing scaled training data...")
            apply_start_time_train = time.time()

            # --- Determine the source LazyFrame for APPLYING scaling (Train) ---
            lazy_frame_for_apply_train = None
            source_schema_cols_apply_train = []
            if isinstance(train_source, pl.DataFrame):
                lazy_frame_for_apply_train = train_source.lazy()
                source_schema_cols_apply_train = train_source.columns
                print("      Applying scaling to in-memory training DataFrame.")
            elif isinstance(train_source, str) and train_source.endswith('.parquet'):
                try:
                    lazy_frame_for_apply_train = pl.scan_parquet(train_source)
                    source_schema_cols_apply_train = lazy_frame_for_apply_train.columns
                    print(f"      Applying scaling by scanning training Parquet file: {train_source}")
                except Exception as e:
                    raise RuntimeError(f"Failed to scan training Parquet source {train_source} for applying scaling: {e}") from e
            else:
                raise TypeError(f"train_source must be a Polars DataFrame or a path to a Parquet file for applying scaling, got {type(train_source)}")

            # --- Define scaling expressions and columns to select (Train) ---
            scaling_exprs_apply_train = []
            final_train_cols_in_output = [] # Track columns actually included in output

            # Start with ID if present in the source
            if ID_COLUMN in source_schema_cols_apply_train:
                final_train_cols_in_output.append(ID_COLUMN)
            else:
                 print(f"      Warning: ID column '{ID_COLUMN}' not found in training source for applying scaling.")

            # Process features intended for scaling
            for col in numeric_features_to_scale_final:
                if col not in source_schema_cols_apply_train:
                    # This column was requested but isn't in the data being processed now, skip.
                    continue

                if col in scaler_stats: # Check if we have valid stats computed earlier
                    stats = scaler_stats[col]
                    # Apply scaling: (col - median) / iqr
                    scaling_exprs_apply_train.append(
                        ((pl.col(col).cast(pl.Float64, strict=False) - stats['median']) / stats['iqr'])
                        .cast(pl.Float32) # Cast final result to Float32
                        .alias(col)       # Ensure the output column has the original name
                    )
                    final_train_cols_in_output.append(col) # Add scaled column to output list
                elif col in skipped_scaling_cols:
                    # Column was processed but skipped (e.g., zero IQR), keep original values
                    scaling_exprs_apply_train.append(pl.col(col)) # Pass-through expression
                    final_train_cols_in_output.append(col) # Add original column to output list
                # else: The column wasn't in numeric_features_to_scale_final or wasn't found in source during stats - ignore

            # Add Target column if present in the source
            if TARGET_COLUMN in source_schema_cols_apply_train:
                final_train_cols_in_output.append(TARGET_COLUMN)
            else:
                 print(f"      Warning: Target column '{TARGET_COLUMN}' not found in training source for saving scaled data.")

            # Ensure no duplicates in final list (preserves order)
            final_train_cols_in_output = list(dict.fromkeys(final_train_cols_in_output))

            # --- Apply expressions, select final columns, and sink (Train) ---
            if not final_train_cols_in_output:
                 print("      ERROR: No columns identified for scaled training output. Skipping sink.")
                 scaling_successful = False # Mark as failed
            else:
                print(f"      Executing training scaling plan and selecting {len(final_train_cols_in_output)} columns for output...")
                lazy_frame_for_apply_train.with_columns(
                    scaling_exprs_apply_train # Apply the scaling/pass-through expressions
                ).select(
                    final_train_cols_in_output # Select the final set of columns in desired order
                ).sink_parquet(
                    scaled_train_path, compression='zstd'
                )
                print(f"      Scaled training data written to: {scaled_train_path}")
                print(f"      Train scaling application duration: {time.time() - apply_start_time_train:.2f}s")
                # Set success flag only after successful sink
                scaling_successful = True # Assume success unless error below


            # --- Step 3: Apply Scaling using Polars Expressions and Sink to Parquet (Test Data) ---
            if scaling_successful: # Proceed only if train scaling seemed successful
                test_source_path_or_df = None
                test_parquet_path = PARQUET_OUTPUT_TEST_ORIGINAL
                if 'df_test' in locals() and df_test is not None:
                    test_source_path_or_df = df_test
                    print("\n   Using in-memory DataFrame as test data source for scaling.")
                elif os.path.exists(test_parquet_path):
                    test_source_path_or_df = test_parquet_path
                    print(f"\n   Using Parquet file as test data source for scaling: {test_parquet_path}")
                else:
                    print("\n   Skipping test data scaling (Test data/file not available).")

                if test_source_path_or_df is not None:
                    print("\n      Applying scaling and writing scaled test data...")
                    apply_start_time_test = time.time()

                    # --- Determine the source LazyFrame for APPLYING scaling (Test) ---
                    lazy_frame_for_apply_test = None
                    source_schema_cols_apply_test = []
                    if isinstance(test_source_path_or_df, pl.DataFrame):
                        lazy_frame_for_apply_test = test_source_path_or_df.lazy()
                        source_schema_cols_apply_test = test_source_path_or_df.columns
                        print("      Applying scaling to in-memory test DataFrame.")
                    elif isinstance(test_source_path_or_df, str) and test_source_path_or_df.endswith('.parquet'):
                        try:
                            lazy_frame_for_apply_test = pl.scan_parquet(test_source_path_or_df)
                            source_schema_cols_apply_test = lazy_frame_for_apply_test.columns
                            print(f"      Applying scaling by scanning test Parquet file: {test_source_path_or_df}")
                        except Exception as e:
                             # Log error but maybe don't stop the whole script, just skip test scaling
                            print(f"      ERROR: Failed to scan test Parquet source {test_source_path_or_df} for applying scaling: {e}")
                            lazy_frame_for_apply_test = None # Ensure it's None
                    else:
                         print(f"      ERROR: Invalid test source type: {type(test_source_path_or_df)}")
                         lazy_frame_for_apply_test = None

                    if lazy_frame_for_apply_test: # Proceed only if test frame was loaded/scanned
                        # --- Define scaling expressions and columns to select (Test) ---
                        scaling_exprs_apply_test = []
                        final_test_cols_in_output = []

                        # Start with ID if present in the test source
                        if ID_COLUMN in source_schema_cols_apply_test:
                            final_test_cols_in_output.append(ID_COLUMN)
                        else:
                            print(f"      Warning: ID column '{ID_COLUMN}' not found in test source.")

                        # Process features based on TRAIN scaling decisions
                        for col in numeric_features_to_scale_final:
                            if col not in source_schema_cols_apply_test:
                                # Feature was requested for scaling but not in test data, skip.
                                continue

                            if col in scaler_stats: # Use stats from training
                                stats = scaler_stats[col]
                                scaling_exprs_apply_test.append(
                                    ((pl.col(col).cast(pl.Float64, strict=False) - stats['median']) / stats['iqr'])
                                    .cast(pl.Float32)
                                    .alias(col)
                                )
                                final_test_cols_in_output.append(col)
                            elif col in skipped_scaling_cols:
                                # Feature was skipped during training, keep original in test too
                                scaling_exprs_apply_test.append(pl.col(col))
                                final_test_cols_in_output.append(col)
                            # else: Ignore columns not in numeric_features_to_scale_final list

                        # Ensure no duplicates
                        final_test_cols_in_output = list(dict.fromkeys(final_test_cols_in_output))

                        # --- Apply expressions, select final columns, and sink (Test) ---
                        if not final_test_cols_in_output or ID_COLUMN not in final_test_cols_in_output : # Need at least ID
                             print("      ERROR: No columns (or ID column missing) identified for scaled test output. Skipping sink.")
                        else:
                            print(f"      Executing test scaling plan and selecting {len(final_test_cols_in_output)} columns for output...")
                            lazy_frame_for_apply_test.with_columns(
                                scaling_exprs_apply_test
                            ).select(
                                final_test_cols_in_output
                            ).sink_parquet(
                                scaled_test_path, compression='zstd'
                            )
                            print(f"      Scaled test data written to: {scaled_test_path}")
                            print(f"      Test scaling application duration: {time.time() - apply_start_time_test:.2f}s")

        except Exception as e:
            print(f"\n   ERROR during Pure Polars RobustScaler workflow: {type(e).__name__} - {e}")
            import traceback
            traceback.print_exc()
            scaling_successful = False # Explicitly mark as failed on any error in the main try block

    # End of `if SCALING_STRATEGY == 'robust':` block
    # <<< REST OF SECTION 6 (e.g., sklearn strategies, Post-Scaling) would follow here >>>

    # --- Strategy: Sklearn PowerTransformer + Scaler (Batched Pandas Bridge) ---
    elif SCALING_STRATEGY in ['power_robust', 'power_standard']:
        print(f"\n   [Sklearn {SCALING_STRATEGY} Workflow - Batched Pandas Bridge on Filtered Features]")
        try:
            # --- Step 1: Fit Scalers in Batches (Train Data) ---
            print(f"      Fitting scalers in batches of {COLUMN_BATCH_SIZE} (Train Data)...")
            scaler_pt = PowerTransformer(method='yeo-johnson', standardize=False)
            if SCALING_STRATEGY == 'power_robust': scaler_final = RobustScaler()
            else: scaler_final = StandardScaler()

            is_first_batch = True
            num_batches = (len(numeric_features_to_scale_final) + COLUMN_BATCH_SIZE - 1) // COLUMN_BATCH_SIZE
            for i in range(num_batches):
                batch_start_idx = i * COLUMN_BATCH_SIZE
                batch_end_idx = min((i + 1) * COLUMN_BATCH_SIZE, len(numeric_features_to_scale_final))
                batch_cols = numeric_features_to_scale_final[batch_start_idx:batch_end_idx]
                if not batch_cols: continue

                print(f"      Fitting Batch {i+1}/{num_batches} ({len(batch_cols)} columns)...")
                cols_to_load_batch = [ID_COLUMN] + batch_cols # Load only necessary columns
                try:
                     # Read batch from source
                     batch_df_pl = pl.read_parquet(train_source, columns=cols_to_load_batch) if isinstance(train_source, str) else train_source.select(cols_to_load_batch)
                     batch_df_pd = batch_df_pl.to_pandas().set_index(ID_COLUMN)
                     del batch_df_pl; gc.collect()

                     # Fit PT and Final Scaler on the first batch only for memory constraint
                     if is_first_batch:
                          print(f"         Fitting PowerTransformer on first batch...")
                          batch_pt_np = scaler_pt.fit_transform(batch_df_pd)
                          print(f"         Fitting Final Scaler ({type(scaler_final).__name__}) on transformed first batch...")
                          scaler_final.fit(batch_pt_np)
                          is_first_batch = False
                          del batch_pt_np # Clean intermediate
                     del batch_df_pd; gc.collect() # Clean pandas batch df

                except MemoryError: raise MemoryError(f"MemoryError during scaler fitting on batch {i+1}. Reduce batch size or use pure Polars.")
                except Exception as fit_err: print(f"      ERROR fitting batch {i+1}: {fit_err}")

            # Save fitted scalers if fit was successful
            if not is_first_batch: # Indicates at least first batch was processed
                if SAVE_SCALER_OBJECTS:
                    try: dump(scaler_pt, SCALER_PT_PATH); print(f"   PowerTransformer saved: {SCALER_PT_PATH}")
                    except Exception as e: print(f"   Error saving PT scaler: {e}")
                    try: dump(scaler_final, final_scaler_path); print(f"   Final Scaler saved: {final_scaler_path}")
                    except Exception as e: print(f"   Error saving final scaler: {e}")
            else:
                raise RuntimeError("Scaler fitting failed on the first batch. Cannot proceed.")

            # --- Step 2: Transform Data in Batches and Write Lazily (Train Data) ---
            print("\n      Applying scaling and writing scaled training data (Batched Transform)...")
            apply_start_time = time.time()

            # Define a generator function to process batches lazily
            def generate_scaled_batches(source, features, batch_size, scaler_pt, scaler_final):
                num_batches_gen = (len(features) + batch_size - 1) // batch_size
                source_cols = pl.scan(source).columns
                # Determine ID and Target columns available in the source
                id_target_cols = [ID_COLUMN] if ID_COLUMN in source_cols else []
                if TARGET_COLUMN in source_cols: id_target_cols.append(TARGET_COLUMN)

                # Pre-collect ID/Target if they exist
                base_lf = None
                if id_target_cols:
                    base_lf = pl.scan(source).select(id_target_cols).collect()

                for i in range(num_batches_gen):
                    batch_start_idx_gen = i * batch_size
                    batch_end_idx_gen = min((i + 1) * batch_size, len(features))
                    batch_cols_gen = features[batch_start_idx_gen:batch_end_idx_gen]
                    # Ensure batch columns actually exist in source
                    batch_cols_present = [c for c in batch_cols_gen if c in source_cols]
                    if not batch_cols_present: continue

                    print(f"         Transforming Train Batch {i+1}/{num_batches_gen}...")
                    cols_to_read_batch = [ID_COLUMN] + batch_cols_present if ID_COLUMN in source_cols else batch_cols_present

                    try:
                        # Read only needed feature columns for the batch
                        batch_feat_lf = pl.scan(source).select(cols_to_read_batch).collect()
                        id_col_data = batch_feat_lf.select(ID_COLUMN) if ID_COLUMN in cols_to_read_batch else None
                        batch_df_pd = batch_feat_lf.drop(ID_COLUMN) if ID_COLUMN in cols_to_read_batch else batch_feat_lf.to_pandas()
                        if not isinstance(batch_df_pd, pd.DataFrame): batch_df_pd = batch_df_pd.to_pandas()

                        # Apply transforms
                        batch_pt_np = scaler_pt.transform(batch_df_pd)
                        batch_scaled_np = scaler_final.transform(batch_pt_np)
                        del batch_df_pd, batch_pt_np; gc.collect()

                        # Create Polars DF for the scaled batch results
                        batch_scaled_pl = pl.DataFrame(batch_scaled_np, schema=batch_cols_present)

                        # Combine with ID/Target
                        if id_col_data is not None:
                             batch_final_pl = pl.concat([id_col_data, batch_scaled_pl], how="horizontal")
                        else: batch_final_pl = batch_scaled_pl

                        # Add target if it was collected
                        if TARGET_COLUMN in id_target_cols and base_lf is not None:
                             # Need to join based on row number implicitly, assuming stable order
                             # A safer way might involve joining on ID if collected separately
                             if base_lf.height == batch_final_pl.height:
                                 batch_final_pl = pl.concat([batch_final_pl, base_lf.select(TARGET_COLUMN)], how="horizontal")
                             else: print(f"WARNING: Row count mismatch when adding target for batch {i+1}")


                        yield batch_final_pl # Yield the processed batch DataFrame

                    except MemoryError: print(f"ERROR: MemoryError processing train batch {i+1}."); yield None # Signal error
                    except Exception as batch_err: print(f"ERROR processing train batch {i+1}: {batch_err}"); yield None

            # Collect results from generator and write - less memory efficient than sink
            print("         Collecting and writing scaled training batches...")
            # Write first batch, then append
            # This is still experimental in Polars for Parquet, using concat might be safer for now
            scaled_batches_list = [batch_df for batch_df in generate_scaled_batches(train_source, numeric_features_to_scale_final, COLUMN_BATCH_SIZE, scaler_pt, scaler_final) if batch_df is not None]

            if scaled_batches_list:
                print("         Concatenating scaled training batches...")
                # Concatenate vertically, requires consistent schema (should be okay)
                # final_scaled_train_pl = pl.concat(scaled_batches_list) # Vertical concat

                # --- Horizontal Join Approach (safer if row order not guaranteed) ---
                final_scaled_train_pl = None
                id_target_cols_final = [ID_COLUMN]
                if TARGET_COLUMN in scaled_batches_list[0].columns: id_target_cols_final.append(TARGET_COLUMN)

                if ID_COLUMN in scaled_batches_list[0].columns:
                     final_scaled_train_pl = scaled_batches_list[0].select(id_target_cols_final) # Start with ID/Target
                     for i, batch_df in enumerate(scaled_batches_list):
                          feature_cols_in_batch = [c for c in batch_df.columns if c not in id_target_cols_final]
                          if feature_cols_in_batch:
                               # Select ID and features from the batch and join
                               final_scaled_train_pl = final_scaled_train_pl.join(
                                    batch_df.select([ID_COLUMN] + feature_cols_in_batch), on=ID_COLUMN, how="left"
                               )
                else: # Fallback if ID somehow missing (shouldn't happen with checks)
                     print("ERROR: ID column missing in scaled batches, cannot join horizontally.")
                     final_scaled_train_pl = None

                if final_scaled_train_pl is not None:
                     print(f"         Writing final joined scaled training data... Shape: {final_scaled_train_pl.shape}")
                     final_scaled_train_pl.write_parquet(scaled_train_path, compression='zstd')
                     scaling_successful = True
                else:
                     print("         ERROR: Failed to construct final training DataFrame from batches.")
                     scaling_successful = False

                del scaled_batches_list; gc.collect()
                if 'final_scaled_train_pl' in locals(): del final_scaled_train_pl; gc.collect()
            else:
                 print("         ERROR: No scaled batches were successfully processed for training data.")
                 scaling_successful = False

            print(f"      Train scaling application duration: {time.time() - apply_start_time:.2f}s")

            # --- Step 3: Transform Test Data in Batches ---
            if scaling_successful: # Only proceed if train scaling worked
                # Determine test source
                test_source = None
                test_parquet_path = PARQUET_OUTPUT_TEST_ORIGINAL
                if 'df_test' in locals() and df_test is not None: test_source = df_test
                elif os.path.exists(test_parquet_path): test_source = test_parquet_path; print("   Using Parquet file as test data source for scaling.")
                else: print("\n   Skipping test data scaling (Test data/file not available).")

                if test_source is not None:
                    print("\n      Applying scaling and writing scaled test data (Batched Transform)...")
                    test_apply_start_time = time.time()

                    # Define generator for test batches
                    def generate_scaled_batches_test(source, features, batch_size, scaler_pt, scaler_final):
                         num_batches_test = (len(features) + batch_size - 1) // batch_size
                         test_schema_cols = pl.scan(source).columns # Get available columns in test
                         id_col_exists_test = ID_COLUMN in test_schema_cols

                         if id_col_exists_test:
                             id_lf = pl.scan(source).select(ID_COLUMN).collect()
                         else: print("ERROR: ID column not found in test source."); yield None; return

                         for i in range(num_batches_test):
                              batch_start_idx_test = i * batch_size
                              batch_end_idx_test = min((i + 1) * batch_size, len(features))
                              batch_cols_train_ref = features[batch_start_idx_test:batch_end_idx_test] # Features scaled in train
                              # Only process columns actually present in the test set for this batch
                              batch_cols_test = [c for c in batch_cols_train_ref if c in test_schema_cols]
                              if not batch_cols_test: continue
                              print(f"         Transforming Test Batch {i+1}/{num_batches_test} ({len(batch_cols_test)} columns)...")

                              cols_to_read_batch_test = [ID_COLUMN] + batch_cols_test

                              try:
                                   batch_feat_lf = pl.scan(source).select(cols_to_read_batch_test).collect()
                                   batch_df_pd = batch_feat_lf.drop(ID_COLUMN).to_pandas() # Features only

                                   batch_pt_np = scaler_pt.transform(batch_df_pd)
                                   batch_scaled_np = scaler_final.transform(batch_pt_np)
                                   del batch_df_pd, batch_pt_np; gc.collect()

                                   batch_scaled_pl = pl.DataFrame(batch_scaled_np, schema=batch_cols_test)
                                   # Combine with ID (already collected)
                                   if id_lf.height == batch_scaled_pl.height:
                                       batch_final_pl = pl.concat([id_lf, batch_scaled_pl], how="horizontal")
                                       yield batch_final_pl
                                   else: print(f"WARNING: Row count mismatch for test batch {i+1}"); yield None

                              except MemoryError: print(f"ERROR: MemoryError processing test batch {i+1}."); yield None
                              except Exception as batch_err: print(f"ERROR processing test batch {i+1}: {batch_err}"); yield None

                    # Collect and write test batches
                    print("         Collecting and writing scaled test batches...")
                    scaled_test_batches_list = [batch_df for batch_df in generate_scaled_batches_test(test_source, numeric_features_to_scale_final, COLUMN_BATCH_SIZE, scaler_pt, scaler_final) if batch_df is not None]

                    if scaled_test_batches_list:
                         # --- Horizontal Join Approach for Test ---
                         final_scaled_test_pl = None
                         if ID_COLUMN in scaled_test_batches_list[0].columns:
                              final_scaled_test_pl = scaled_test_batches_list[0].select(ID_COLUMN) # Start with ID
                              for i, batch_df in enumerate(scaled_test_batches_list):
                                   feature_cols_in_batch = [c for c in batch_df.columns if c != ID_COLUMN]
                                   if feature_cols_in_batch:
                                        final_scaled_test_pl = final_scaled_test_pl.join(
                                             batch_df.select([ID_COLUMN] + feature_cols_in_batch), on=ID_COLUMN, how="left"
                                        )
                         else: print("ERROR: ID column missing in scaled test batches.")

                         if final_scaled_test_pl is not None:
                              print(f"         Writing final joined scaled test data... Shape: {final_scaled_test_pl.shape}")
                              final_scaled_test_pl.write_parquet(scaled_test_path, compression='zstd')
                         else:
                              print("         ERROR: Failed to construct final test DataFrame from batches.")
                              # Consider setting scaling_successful = False if test scaling failure is critical

                         del scaled_test_batches_list; gc.collect()
                         if 'final_scaled_test_pl' in locals(): del final_scaled_test_pl; gc.collect()
                    else:
                         print("         ERROR: No scaled batches were successfully processed for test data.")

                    print(f"      Test scaling application duration: {time.time() - test_apply_start_time:.2f}s")

        except MemoryError: print(f"\n   ERROR: MemoryError during batched sklearn scaling. Try smaller COLUMN_BATCH_SIZE or pure Polars method."); scaling_successful=False
        except Exception as e: print(f"\n   ERROR during Sklearn Batched Scaling Workflow: {type(e).__name__} - {e}"); import traceback; traceback.print_exc(); scaling_successful = False
    else:
         print(f"Unknown SCALING_STRATEGY: '{SCALING_STRATEGY}'")

    # --- Post-Scaling ---
    # Removed IC comparison section as requested

    print(f"\nScaling section duration: {time.time() - start_scale_time:.2f} seconds.")
    # Clean up scaler objects from memory if they exist
    if 'scaler_pt' in locals(): del scaler_pt
    if 'scaler_final' in locals(): del scaler_final
    if 'scaler_stats' in locals(): del scaler_stats
    gc.collect()


# ==============================================================================
# --- 7. Final Summary & Next Steps ---
# ==============================================================================
print(f"\n\n--- 7. Final Summary & Next Steps (Polars {VERSION}) ---")
print("\nEDA & Filtering complete. Key outputs generated in:", OUTPUT_DIR)
# Report status of original parquet saving
if save_parquet_original and os.path.exists(PARQUET_OUTPUT_TRAIN_ORIGINAL):
     print(f"- Original selected data saved: {os.path.basename(PARQUET_OUTPUT_TRAIN_ORIGINAL)}")
elif save_parquet_original:
     print("- Original selected data saving failed or skipped.")
else:
     print("- Original selected data saving was disabled.")

print(f"- Feature Filtering Results: {len(numeric_features_to_scale_final)} features selected for scaling.")
# Report status of scaled parquet saving
if SCALING_STRATEGY != 'none':
    scaled_train_path_final = PARQUET_OUTPUT_TRAIN_SCALED_TMPL.format(strategy=SCALING_STRATEGY)
    if scaling_successful and os.path.exists(scaled_train_path_final):
        print(f"- Scaled data (Strategy: '{SCALING_STRATEGY}') saved: {os.path.basename(scaled_train_path_final)}")
        print("  (Scaled TRAIN contains ID, Scaled/Original Features, Target; Scaled TEST contains ID, Scaled/Original Features)")
        if SAVE_SCALER_OBJECTS: print(f"- Scaler objects/stats potentially saved in: {OUTPUT_DIR} or {DATA_DIR}")
    elif scaling_successful:
         print(f"- Scaled data (Strategy: '{SCALING_STRATEGY}') file NOT FOUND, check saving step: {scaled_train_path_final}")
    else: print(f"- Scaling (Strategy: '{SCALING_STRATEGY}') was attempted but FAILED. Scaled files not saved.")
else: print("- Scaling was skipped (SCALING_STRATEGY = 'none').")


print("\nPotential Next Steps:")
print(f"1.  Load final data for modeling:")
print(f"    - If using **scaled** data: Load `{os.path.basename(PARQUET_OUTPUT_TRAIN_SCALED_TMPL.format(strategy=SCALING_STRATEGY))}`")
print(f"    - If using **original** data (post-keyword selection): Load `{os.path.basename(PARQUET_OUTPUT_TRAIN_ORIGINAL)}`")
print(f"2.  Merge back any necessary non-scaled/non-numeric features (e.g., encoded categoricals, Broker IDs if needed differently) from the original Parquet file using the '{ID_COLUMN}'.")
print(f"3.  Train and evaluate ML models. Choose algorithms appropriate for the data scale (original vs scaled) and dimensionality ({len(numeric_features_to_scale_final)} features).")
print(f"4.  Review EDA outputs ({OUTPUT_DIR}) like IC rankings, correlation lists, and missing value reports to potentially refine feature selection further.")
print(f"5.  Iterate on filtering thresholds, scaling strategy, or feature engineering based on model performance.")
print(f"6.  Consider how to handle '{'券商代號'}' features in modeling (treat as numeric, bin, encode, or drop).")

print(f"\n--- Processing Complete (Polars {VERSION}) ---")

# ==============================================================================
# --- Final Cleanup ---
# ==============================================================================
print("\nCleaning up main variables...")
# Explicitly delete potentially large objects that might still be in memory
if 'df_train' in locals(): del df_train
if 'df_test' in locals(): del df_test
if 'ic_df_pl_original' in locals(): del ic_df_pl_original # Clean up original IC df
# Scaled dfs should have been deleted after saving or within the scaling block
gc.collect(); print("Cleanup complete.")

# ==============================================================================
# --- Stop Logging ---
# ==============================================================================
sys.stdout = original_stdout # Restore standard output
if 'log_file' in locals() and not log_file.closed: log_file.close() # Ensure log is closed
print(f"\nFinished logging. Check the log file: {log_file_path}")
print(f"All EDA outputs (plots, CSVs) are saved in: {OUTPUT_DIR}")
if SCALING_STRATEGY != 'none' and scaling_successful: print(f"Scaled data saved in: {DATA_DIR} (filenames like *scaled_{SCALING_STRATEGY}_polars_{VERSION}.parquet)")
elif SCALING_STRATEGY != 'none': print("Scaled data files were NOT generated due to errors.")
else: print("Scaling was skipped.")

--- Starting EDA & Scaling (Polars v12 - Simplified Scaling, Font Fix) ---
Timestamp: 2025-03-30 21:06:23
Polars Version: 1.26.0
Pandas Version: 2.1.4
Seaborn Version: 0.13.2
Scikit-learn version: 1.4.2
Output Directory: eda_output_polars_v12/
Log File: eda_output_polars_v12/eda_log_v12.txt
Sample Size for Heavy Plotting/Stats: 200000
Chosen Scaling Strategy: robust
Save Scaler Objects/Stats: True
Feature Filtering: Missing > 95.0%, Abs(IC) < 0.005, ZeroVar=True, HighCorr=True
Using plot style: 'seaborn-v0_8-whitegrid'
Set Matplotlib font.sans-serif to: ['Noto Sans CJK TC', 'Noto Sans TC', 'WenQuanYi Zen Hei', 'DejaVu Sans', 'sans-serif']
Matplotlib found the following preferred CJK font(s): ['Noto Sans TC']

--- stdout now redirected to console and log file ---

--- 1. Selecting Features & Defining Schema ---


Scanning header row from data/training.csv...
Found 10214 total columns in header.
Saving all column names to eda_output_polars_v12/all_column_names.txt
Keyword selection duration: 0.01s.

Selected 10004 features based on keywords.
Selected Features (First 50): ['ID', '上市加權指數10天乖離率', '上市加權指數10天報酬率', '上市加權指數10天成交量波動度', '上市加權指數19天乖離率', '上市加權指數1天報酬率', '上市加權指數20天報酬率', '上市加權指數20天成交量波動度', '上市加權指數5天乖離率', '上市加權指數5天報酬率', '上市加權指數5天成交量波動度', '上市加權指數前10天成交量', '上市加權指數前11天成交量', '上市加權指數前12天成交量', '上市加權指數前13天成交量', '上市加權指數前14天成交量', '上市加權指數前15天成交量', '上市加權指數前16天成交量', '上市加權指數前17天成交量', '上市加權指數前18天成交量', '上市加權指數前19天成交量', '上市加權指數前1天成交量', '上市加權指數前20天成交量', '上市加權指數前2天成交量', '上市加權指數前3天成交量', '上市加權指數前4天成交量', '上市加權指數前5天成交量', '上市加權指數前6天成交量', '上市加權指數前7天成交量', '上市加權指數前8天成交量', '上市加權指數前9天成交量', '上市加權指數成交量', '主力券商_分點出貨比(%)', '主力券商_分點吃貨比(%)', '主力券商_分點成交力(%)', '主力券商_分點買賣力', '主力券商_分點進出', '主力券商_前10天分點出貨比(%)', '主力券商_前10天分點吃貨比(%)', '主力券商_前10天分點成交力(%)', '主力券商_前10天分點買賣力', '主力券商_前10天分點進出', '主力券商_前11天分點出貨比(%)', '主力券商_前11天分點吃貨比(%)', '主力券

Traceback (most recent call last):
  File "C:\Users\junting\AppData\Local\Temp\ipykernel_75340\2793573610.py", line 705, in <module>
    correlation_matrix_reduced_pl = df_analysis_pl.lazy().select(features_for_pairwise_corr).corr().collect() # Use default Pearson
AttributeError: 'LazyFrame' object has no attribute 'corr'


    Sample boxplots saved to: eda_output_polars_v12/numerical_boxplots_sample_full subset.png
    Sample violin plots saved to: eda_output_polars_v12/numerical_violinplots_sample_full subset.png

   [3.4.6] Scatter Plots for Top Pearson Correlated Features vs Target (Full Subset)...
    Plotting scatter plots for top 16 features vs target...
    Scatter plots saved: eda_output_polars_v12/scatter_top_pearson_vs_target_full subset.png

   [3.4.7] Feature Importance Analysis & Filtering (Spearman Rank IC - Original Data, Full Subset)...
    Calculating Spearman IC for 9372 features initially targeted for scaling...
    Finished Original IC calculations. Duration: 76.72 seconds.

    Top Features by Absolute Original IC (Spearman):
shape: (50, 3)
┌─────────────────────────────────┬─────────────┬─────────────────┐
│ Feature                         ┆ IC_Original ┆ Abs_IC_Original │
│ ---                             ┆ ---         ┆ ---             │
│ str                             ┆ f64    

  source_cols_for_stats = lazy_frame_for_stats.columns # Get columns once



      ERROR processing stats batch 12 (500 cols like '賣超第15名分點前16天買金額(千)'): AttributeError - 'LazyFrame' object has no attribute 'agg'.
      Processing Stats Batch 13/16 (500 columns present in source)...
      ERROR processing stats batch 13 (500 cols like '賣超第2名分點前16天買均值(千)'): AttributeError - 'LazyFrame' object has no attribute 'agg'.
      Processing Stats Batch 14/16 (500 columns present in source)...
      ERROR processing stats batch 14 (500 cols like '賣超第4名分點前15天賣張'): AttributeError - 'LazyFrame' object has no attribute 'agg'.
      Processing Stats Batch 15/16 (500 columns present in source)...
      ERROR processing stats batch 15 (500 cols like '賣超第6名分點前15天買金額(千)'): AttributeError - 'LazyFrame' object has no attribute 'agg'.
      Processing Stats Batch 16/16 (443 columns present in source)...
      ERROR processing stats batch 16 (443 cols like '賣超第8名分點前15天買均值(千)'): AttributeError - 'LazyFrame' object has no attribute 'agg'.
      Statistics calculation duration: 0.32s
  

Traceback (most recent call last):
  File "C:\Users\junting\AppData\Local\Temp\ipykernel_75340\2793573610.py", line 1517, in <module>
    if lazy_frame_for_apply_test: # Proceed only if test frame was loaded/scanned
  File "c:\Users\junting\anaconda3\envs\AIGO\lib\site-packages\polars\lazyframe\frame.py", line 650, in __bool__
    raise TypeError(msg)
TypeError: the truth value of a LazyFrame is ambiguous

LazyFrames cannot be used in boolean context with and/or/not operators.


Cleanup complete.

Finished logging. Check the log file: eda_output_polars_v12/eda_log_v12.txt
All EDA outputs (plots, CSVs) are saved in: eda_output_polars_v12/
Scaled data files were NOT generated due to errors.
