In [None]:
# -*- coding: utf-8 -*-
# ---
# jupyter:
#   jupytext:
#     text_representation:
#       extension: .py
#       format_name: light
#       format_version: '1.5'
#       jupytext_version: 1.14.5
#   kernelspec:
#     display_name: Python 3 (ipykernel)
#     language: python
#     name: python3
# ---

# %% [markdown]
# # Debugging Notebook: make_dataset.py File Saving
#
# This notebook replicates the logic from `src/data/make_dataset.py` to interactively debug why the output file (`all_processed_data.npz`) might not be appearing on the host machine despite logs indicating successful saving.
#
# **Instructions:**
# 1. Run this notebook using a kernel connected to your Docker environment (e.g., by running `jupyter lab` inside the `airflow-scheduler` container) or a local environment with identical dependencies.
# 2. Ensure the `config_path` variable below points to the correct location of your `params.yaml` *within the execution environment*.
# 3. Run cells sequentially (`Shift+Enter`).
# 4. Observe log output and the final file existence check.

# %%
# Core Imports
import argparse
import time
import pickle
import yaml
import numpy as np
import pandas as pd
import yfinance as yf
import ta
from pathlib import Path
import logging
import sys
import os # For final file check

# %%
# --- Configure logging ---
# Configure logging to output to the notebook's console output
logger = logging.getLogger("make_dataset_debug")
logger.setLevel(logging.INFO) # Set to DEBUG for more verbose output if needed

# Check if handlers are already added (important in interactive environments)
if not logger.hasHandlers():
    handler = logging.StreamHandler(stream=sys.stdout) # Explicitly use stdout
    handler.setLevel(logging.INFO)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.propagate = False # Prevent duplicate logs if root logger is configured

logger.info("Logging configured for notebook.")

# %% [markdown]
# ## Function Definitions
# Copy all function definitions from `src/data/make_dataset.py` here.

# %%
def load_data(tickers_list, period, interval, fetch_delay, output_dir):
    """Loads data for tickers and saves each as a pickle file."""
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    logger.info(f"Saving raw data to: {output_dir}")

    for t in tickers_list:
        output_path = output_dir / f"{t}_raw.pkl"
        # Re-downloading for debugging simplicity in notebook, remove 'if' to force download
        # if output_path.exists():
        #     logger.info(f"Skipping download for {t}, file exists: {output_path}")
        #     continue
        try:
            logger.info(f"Attempting download for {t}...")
            data = yf.Ticker(t).history(period=period, interval=interval)
            if not data.empty:
                with open(output_path, 'wb') as f:
                    pickle.dump(data, f)
                logger.info(f"Loaded and saved raw data for {t} to {output_path}")
            else:
                logger.warning(f"No data loaded for {t}")
            time.sleep(fetch_delay)
        except Exception as e:
            logger.error(f"Error loading {t}: {e}", exc_info=True)

# %%
def add_technical_indicators(ticker_data):
    logger.info("Adding technical indicators...")
    for t in ticker_data:
        df = ticker_data[t]
        if df is None or df.empty:
            logger.warning(f"Skipping indicators for {t} due to empty data.")
            continue
        logger.debug(f"Processing indicators for {t}...")
        # Wrap potentially problematic indicators in try-except if needed
        try:
            # Core indicators
            df['EMA_50'] = ta.trend.EMAIndicator(df['Close'], 50).ema_indicator()
            df['EMA_200'] = ta.trend.EMAIndicator(df['Close'], 200).ema_indicator()
            df['RSI'] = ta.momentum.RSIIndicator(df['Close'], 14).rsi()
            df['MACD'] = ta.trend.MACD(df['Close'], window_fast=12, window_sign=9, window_slow=26).macd()
            df['BB_High'] = ta.volatility.BollingerBands(df['Close'], window=15, window_dev=2).bollinger_hband()
            df['BB_Low'] = ta.volatility.BollingerBands(df['Close'], window=15, window_dev=2).bollinger_lband()
            df['ATR'] = ta.volatility.AverageTrueRange(df['High'], df['Low'], df['Close'], window=14).average_true_range()
            df['OBV'] = ta.volume.OnBalanceVolumeIndicator(df['Close'], df['Volume']).on_balance_volume()
            df['MFI'] = ta.volume.MFIIndicator(df['High'], df['Low'], df['Close'], df['Volume'], window=14).money_flow_index()
            df['ADX'] = ta.trend.ADXIndicator(df['High'], df['Low'], df['Close'], window=14).adx()
            # Add other indicators from your script...
            # Additional trend indicators
            df['SMA_50'] = ta.trend.SMAIndicator(df['Close'], 50).sma_indicator()
            df['VWAP'] = ta.volume.VolumeWeightedAveragePrice(df['High'], df['Low'], df['Close'], df['Volume']).volume_weighted_average_price()
            df['PSAR'] = ta.trend.PSARIndicator(df['High'], df['Low'], df['Close']).psar()
            # Momentum indicators
            df['Stochastic_K'] = ta.momentum.StochasticOscillator(df['High'], df['Low'], df['Close']).stoch()
            df['Stochastic_D'] = ta.momentum.StochasticOscillator(df['High'], df['Low'], df['Close']).stoch_signal()
            df['CCI'] = ta.trend.CCIIndicator(df['High'], df['Low'], df['Close']).cci()
            df['Williams_R'] = ta.momentum.WilliamsRIndicator(df['High'], df['Low'], df['Close']).williams_r()
            # Volatility indicators
            df['Donchian_High'] = ta.volatility.DonchianChannel(df['High'], df['Low'], df['Close'], window=20).donchian_channel_hband()
            df['Donchian_Low'] = ta.volatility.DonchianChannel(df['High'], df['Low'], df['Close'], window=20).donchian_channel_lband()
            df['Keltner_High'] = ta.volatility.KeltnerChannel(df['High'], df['Low'], df['Close']).keltner_channel_hband()
            df['Keltner_Low'] = ta.volatility.KeltnerChannel(df['High'], df['Low'], df['Close']).keltner_channel_lband()
            # Price-based features
            df['Log_Return'] = np.log(df['Close'] / df['Close'].shift(1))
            df['Price_Rate_Of_Change'] = ta.momentum.ROCIndicator(df['Close'], 12).roc()
            # Volume-based indicators
            df['Volume_SMA'] = ta.trend.SMAIndicator(df['Volume'], 20).sma_indicator()
            df['Chaikin_Money_Flow'] = ta.volume.ChaikinMoneyFlowIndicator(df['High'], df['Low'], df['Close'], df['Volume'], window=20).chaikin_money_flow()
            df['Force_Index'] = ta.volume.ForceIndexIndicator(df['Close'], df['Volume']).force_index()
            # Trend-strength indicators
            df['DI_Positive'] = ta.trend.ADXIndicator(df['High'], df['Low'], df['Close'], window=14).adx_pos()
            df['DI_Negative'] = ta.trend.ADXIndicator(df['High'], df['Low'], df['Close'], window=14).adx_neg()

            ticker_data[t] = df
        except Exception as e:
            logger.error(f"Error adding indicators for {t}: {e}", exc_info=True)
            # Optionally set df to None or keep partial data
            # ticker_data[t] = df # Keep partially processed data
            pass # Continue with next ticker

    logger.info("Finished adding technical indicators.")
    return ticker_data

# %%
def preprocess_data(df):
    logger.debug("Preprocessing data: Filling NaNs and creating Target.")
    # Fill forward then backward to handle NaNs
    df_filled = df.ffill().bfill() # Use different variable name to avoid modifying original df directly if needed later
    # Create target variable (next day's close price)
    df_filled['Target'] = df_filled['Close'].shift(-1)
    # Drop the last row since it will have NaN target
    df_processed = df_filled.dropna()
    logger.debug(f"Original shape: {df.shape}, Processed shape: {df_processed.shape}")
    return df_processed

# %%
def align_and_process_data(ticker_data):
    logger.info("Aligning and processing data across all tickers.")
    tickers = list(ticker_data.keys())
    logger.debug(f"Processing tickers: {tickers}")

    # Preprocess each dataframe first
    processed_ticker_data = {}
    for t in tickers:
         if ticker_data[t] is not None and not ticker_data[t].empty:
              processed_ticker_data[t] = preprocess_data(ticker_data[t].copy()) # Process a copy
         else:
              logger.warning(f"Skipping preprocessing for {t} due to empty or None data.")

    # Filter out any potentially empty dataframes after preprocessing
    processed_ticker_data = {t: df for t, df in processed_ticker_data.items() if not df.empty}
    if not processed_ticker_data:
        logger.error("No valid data remaining after preprocessing step.")
        return None, None, None, None # Return Nones to indicate failure

    # Get final list of tickers and common indices
    final_tickers = list(processed_ticker_data.keys())
    logger.info(f"Tickers remaining after preprocessing: {final_tickers}")
    try:
        all_indices = pd.concat(processed_ticker_data.values()).index.unique()
    except Exception as e:
        logger.error(f"Error getting common indices: {e}", exc_info=True)
        # Trying alternative if concat fails (e.g., only one ticker left)
        if len(final_tickers) == 1:
             all_indices = processed_ticker_data[final_tickers[0]].index
        else: # If multiple tickers but concat failed, re-raise or handle
             raise ValueError("Could not determine common indices.") from e

    logger.debug(f"Total common timesteps: {len(all_indices)}")

    # Reindex and align
    aligned_data = {}
    for t in final_tickers:
        aligned_data[t] = processed_ticker_data[t].reindex(index=all_indices).sort_index()

    # Get feature columns (excluding Target) from the first *valid* aligned dataframe
    first_ticker = final_tickers[0]
    feature_columns = [col for col in aligned_data[first_ticker].columns if col != 'Target']
    num_features = len(feature_columns)
    num_stocks = len(final_tickers)
    logger.info(f"Number of stocks: {num_stocks}, Number of features: {num_features}")

    # Create 3D arrays: (timesteps, stocks, features)
    processed_data_np = np.zeros((len(all_indices), num_stocks, num_features))
    targets_np = np.zeros((len(all_indices), num_stocks))

    for i, ticker in enumerate(final_tickers):
        # Fill NaNs that might have been introduced by reindexing before getting values
        df_aligned_filled = aligned_data[ticker][feature_columns + ['Target']].ffill().bfill()

        # Check if df_aligned_filled still has NaNs after ffill/bfill (shouldn't happen often)
        if df_aligned_filled.isnull().values.any():
            logger.warning(f"NaNs still present in aligned data for {ticker} after ffill/bfill. Check source data.")
            # Handle as needed: maybe fill with 0, mean, or raise error
            df_aligned_filled = df_aligned_filled.fillna(0) # Example: fill remaining with 0

        try:
             processed_data_np[:, i, :] = df_aligned_filled[feature_columns].values
             targets_np[:, i] = df_aligned_filled['Target'].values
        except ValueError as ve:
             logger.error(f"Shape mismatch error processing {ticker}. Aligned shape: {df_aligned_filled.shape}, Expected features: {num_features}", exc_info=True)
             raise ve # Re-raise error

    # Clean any remaining NaNs across the entire arrays (e.g., if a whole timestep was NaN)
    nan_mask_data = np.isnan(processed_data_np).any(axis=(1, 2))
    nan_mask_targets = np.isnan(targets_np).any(axis=1)
    combined_nan_mask = nan_mask_data | nan_mask_targets

    if np.all(combined_nan_mask):
         logger.error("All timesteps contain NaNs after alignment. Cannot proceed.")
         return None, None, None, None

    processed_data_final = processed_data_np[~combined_nan_mask]
    targets_final = targets_np[~combined_nan_mask]
    logger.info(f"Final processed data shape after NaN cleaning: {processed_data_final.shape}")
    logger.info(f"Final targets shape after NaN cleaning: {targets_final.shape}")

    if processed_data_final.shape[0] == 0:
         logger.error("No data left after NaN cleaning. Check input data quality and alignment.")
         return None, None, None, None

    return processed_data_final, targets_final, feature_columns, final_tickers


# %%
def filter_correlated_features(ticker_data, threshold=0.9):
    """
    Analyze feature correlations and remove highly correlated features
    Returns filtered data and list of features to keep
    """
    logger.info(f"Filtering highly correlated features with threshold {threshold}")

    if not ticker_data:
         logger.warning("Ticker data is empty, skipping correlation filtering.")
         return {}, []

    # Use the first ticker as reference for correlation analysis
    first_ticker = list(ticker_data.keys())[0]
    df = ticker_data[first_ticker].copy()

    # Ensure only numeric columns are used for correlation
    numeric_cols = df.select_dtypes(include=np.number).columns.tolist()
    if not numeric_cols:
        logger.warning(f"No numeric columns found for correlation analysis in ticker {first_ticker}.")
        return ticker_data, df.columns.tolist() # Return original if no numeric cols

    logger.debug(f"Calculating correlation matrix on {len(numeric_cols)} numeric columns.")
    corr_matrix = df[numeric_cols].corr().abs()

    # Create a mask for the upper triangle
    upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))

    # Find features with correlation greater than threshold
    to_drop = [column for column in upper.columns if any(upper[column] > threshold)]

    logger.info(f"Identified {len(to_drop)} potentially highly correlated numeric features to remove: {to_drop}")

    if 'Close' in to_drop:
        logger.info("Excluding 'Close' column from removal list.")
        to_drop.remove('Close')

    logger.info(f"Final list of {len(to_drop)} numeric features to remove: {to_drop}")

    # Determine features to keep (all original columns minus the ones to drop)
    # This preserves non-numeric columns if any existed
    features_to_keep = [col for col in df.columns if col not in to_drop]
    logger.info(f"Total features to keep: {len(features_to_keep)}")

    # Filter features for all tickers
    filtered_ticker_data = {}
    for ticker, data in ticker_data.items():
        if data is not None:
             # Select only the columns that exist in this specific dataframe
             cols_to_select = [col for col in features_to_keep if col in data.columns]
             filtered_ticker_data[ticker] = data[cols_to_select]
        else:
             filtered_ticker_data[ticker] = None


    return filtered_ticker_data, features_to_keep

# %% [markdown]
# ## Configuration Setup
# Define the path to the `params.yaml` file. **Ensure this path is correct for the environment where you are running the notebook.** If running inside the Docker container as recommended, `/opt/airflow/config/params.yaml` should be correct.

# %%
# CONFIGURATION
# Adjust this path if necessary!
config_path = '/opt/airflow/config/params.yaml' # Path inside the container

logger.info(f"Using configuration file: {config_path}")

# Verify config file exists
if not Path(config_path).is_file():
    logger.error(f"Configuration file not found at: {config_path}")
    # Raise error or handle appropriately
    raise FileNotFoundError(f"Configuration file not found at: {config_path}")
else:
    logger.info("Configuration file found.")

# %% [markdown]
# ## Main Processing Logic (Adapted from `run_processing`)
# This cell executes the core data loading and processing steps.

# %%
# MAIN EXECUTION
try:
    with open(config_path, 'r') as f:
        params = yaml.safe_load(f)
    logger.info(f"Loaded parameters: {params}")

    # Define paths from config - Use Path objects for robustness
    raw_data_dir = Path(params['output_paths']['raw_data_template']).parent
    processed_output_path = Path(params['output_paths']['processed_data_path'])
    logger.info(f"Raw data directory: {raw_data_dir}")
    logger.info(f"Processed data output path: {processed_output_path}")

    # Create parent directory for output if it doesn't exist
    processed_output_path.parent.mkdir(parents=True, exist_ok=True)
    logger.info(f"Ensured output directory exists: {processed_output_path.parent}")


    tickers_list = params['data_loading']['tickers']
    period = params['data_loading']['period']
    interval = params['data_loading']['interval']
    fetch_delay = params['data_loading']['fetch_delay']
    corr_threshold = params['feature_engineering']['correlation_threshold']

    # 1. Load Raw Data (or ensure it's downloaded)
    logger.info("--- Step 1: Starting Data Loading ---")
    load_data(tickers_list, period, interval, fetch_delay, raw_data_dir)
    logger.info("--- Step 1: Finished Data Loading ---")

    # Load from saved pickles
    logger.info("--- Loading raw data from pickle files ---")
    ticker_data = {}
    for t in tickers_list:
        raw_path = raw_data_dir / f"{t}_raw.pkl"
        if raw_path.exists():
            try:
                with open(raw_path, 'rb') as f:
                    ticker_data[t] = pickle.load(f)
                logger.info(f"Loaded pickle for {t}")
            except Exception as e:
                logger.error(f"Error loading pickle file for {t} from {raw_path}", exc_info=True)
                ticker_data[t] = None # Mark as None if loading failed
        else:
            logger.warning(f"Raw data pickle file not found for {t} at {raw_path}")
            ticker_data[t] = None # Mark as None if file not found

    # Filter out any tickers where data wasn't loaded or failed to load
    ticker_data = {k: v for k, v in ticker_data.items() if v is not None and not v.empty}
    if not ticker_data:
        raise ValueError("No valid ticker data could be loaded from pickle files. Exiting.")
    loaded_tickers = list(ticker_data.keys())
    logger.info(f"Successfully loaded data for tickers: {loaded_tickers}")


    # 2. Add Technical Indicators
    logger.info("--- Step 2: Starting Feature Engineering (Indicators) ---")
    ticker_data = add_technical_indicators(ticker_data)
    logger.info("--- Step 2: Finished Feature Engineering (Indicators) ---")

    # 3. Filter Correlated Features
    logger.info("--- Step 3: Starting Feature Filtering ---")
    ticker_data, remaining_features = filter_correlated_features(ticker_data, corr_threshold)
    logger.info(f"Features remaining after filtering: {len(remaining_features)}")
    logger.info("--- Step 3: Finished Feature Filtering ---")

    # 4. Align and Process
    logger.info("--- Step 4: Starting Data Alignment & Processing ---")
    processed_data, targets, feature_columns, final_tickers = align_and_process_data(ticker_data)
    if processed_data is None:
         raise ValueError("Data alignment and processing failed. Check logs.")
    logger.info(f"Processed data shape: {processed_data.shape}")
    logger.info(f"Targets shape: {targets.shape}")
    logger.info(f"Final tickers in order: {final_tickers}")
    logger.info("--- Step 4: Finished Data Alignment & Processing ---")

    # 5. Save Processed Data
    logger.info(f"--- Step 5: Saving Processed Data to {processed_output_path} ---")
    absolute_save_path = processed_output_path.resolve()
    logger.info(f"Attempting to save to absolute path: {absolute_save_path}")
    try:
        np.savez(
            processed_output_path,
            processed_data=processed_data,
            targets=targets,
            feature_columns=np.array(feature_columns, dtype=object),
            tickers=np.array(final_tickers, dtype=object)
        )
        logger.info(f"np.savez command executed for path: {processed_output_path}")

        # Add an existence check right after saving
        if absolute_save_path.exists():
            logger.info(f"Verified file exists immediately after saving at: {absolute_save_path}")
            file_size = absolute_save_path.stat().st_size
            logger.info(f"File size: {file_size} bytes")
            if file_size == 0:
                logger.warning("File was created but has ZERO size. Saving might have failed silently or data was empty.")
        else:
            logger.error(f"!!! File DOES NOT exist immediately after saving attempt at: {absolute_save_path}")

    except Exception as e:
        logger.error(f"Failed to save processed data to {processed_output_path}", exc_info=True)
        raise # Re-raise the exception

    logger.info("--- Step 5: Finished Saving Processed Data ---")

except FileNotFoundError:
    # Log specific message handled above, just pass here or log completion status
    logger.critical("Process failed due to missing configuration file.")
except ValueError as ve:
    logger.critical(f"Process failed due to a ValueError: {ve}", exc_info=True)
except Exception as e:
    logger.critical("An unexpected error occurred during the data processing pipeline.", exc_info=True)

logger.info("--- Processing finished (check logs for errors) ---")


# %% [markdown]
# ## Final File Check
# Let's explicitly check if the file exists at the target path *after* the main logic has run.

# %%
# FINAL CHECK
logger.info("--- Performing final check for output file ---")

# Define the expected output path again (ensure it matches the one used above)
config_path_check = '/opt/airflow/config/params.yaml' # Use the same path as defined earlier
with open(config_path_check, 'r') as f:
    params_check = yaml.safe_load(f)
processed_output_path_check = Path(params_check['output_paths']['processed_data_path'])
absolute_path_check = processed_output_path_check.resolve() # Get absolute path *inside* the environment

logger.info(f"Checking for file at absolute path: {absolute_path_check}")

if absolute_path_check.exists():
    logger.info(f"SUCCESS: File found at {absolute_path_check}")
    logger.info(f"File size: {absolute_path_check.stat().st_size} bytes")
    # You can also try loading it
    try:
        data_check = np.load(absolute_path_check, allow_pickle=True)
        logger.info(f"Successfully loaded npz file. Keys found: {list(data_check.keys())}")
        logger.info(f"Processed data shape from file: {data_check['processed_data'].shape}")
    except Exception as e:
        logger.error(f"Found file but FAILED to load/read npz file: {e}", exc_info=True)
else:
    logger.error(f"FAILURE: File NOT found at {absolute_path_check}")
    logger.info("Listing directory contents:")
    parent_dir = absolute_path_check.parent
    if parent_dir.exists():
         logger.info(f"Contents of {parent_dir}: {list(parent_dir.iterdir())}")
    else:
         logger.error(f"Parent directory {parent_dir} does not exist.")

logger.info("--- Final check finished ---")