<a href="https://colab.research.google.com/github/JMKroeger/stock-predictor/blob/main/Stock_Predictor_Phase1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Cell 0: Environment Setup and Validation
# Purpose: Initialize imports, configuration, logging, validate environment, and test yfinance connectivity.
# Inputs: None
# Outputs: pipeline.log initialized, CONFIG defined, validation results.

import logging
import pandas as pd
import numpy as np
import yfinance as yf
import sys
import os
from datetime import datetime, timedelta
import time

def retry(stop_max_attempt_number=3, wait_fixed=2):
    def decorator(func):
        def wrapper(*args, **kwargs):
            for attempt in range(stop_max_attempt_number):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == stop_max_attempt_number - 1:
                        raise
                    time.sleep(wait_fixed)
        return wrapper
    return decorator

# Configure logging
try:
    log_file = os.path.abspath('pipeline.log')
    logging.basicConfig(
        filename=log_file,
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        force=True  # Force reconfiguration if needed
    )
    logging.info("Logging initialized")
    print(f"Logging initialized - pipeline.log should be at: {log_file}")
    # Force a write to ensure file creation
    logging.getLogger().handlers[0].flush()
    if os.path.exists('pipeline.log'):
        print("Confirmed: pipeline.log file created successfully.")
    else:
        print("Warning: pipeline.log file not found after setup.")
except Exception as e:
    print(f"Error setting up logging: {e}")
    raise

# Configuration
CONFIG = {
    'tickers': [
        'AAPL', 'MSFT', 'GOOGL', 'TSLA', 'NVDA', 'PLTR', 'AMD', 'AMZN', 'META', 'INTC',
        'SPY', 'QQQ', 'NFLX', 'BA', 'JPM', 'V', 'PYPL', 'DIS', 'ADBE', 'CRM',
        'CSCO', 'WMT', 'T', 'VZ', 'CMCSA', 'PFE', 'MRK', 'KO', 'PEP'
    ],
    'start_date': '2015-07-04',
    'end_date': datetime.now().strftime('%Y-%m-%d'),
    'telegram_token': '7779970479:AAFJFop5XrTe7_dP1iGDoGVM-bdWNyYso8E',
    'telegram_chat_id': '1591809098',
    'export_dir': '.',
    'confidence_threshold': 0.8,
    'intraday_interval': '5m',
    'intraday_lookback_hours': 4
}

@retry(stop_max_attempt_number=3, wait_fixed=2)
def test_yfinance():
    """Test yfinance with a single ticker and short date range."""
    try:
        start_time = time.time()
        df = yf.download(
            'AAPL',
            start=(datetime.now() - timedelta(days=3)).strftime('%Y-%m-%d'),
            end=datetime.now().strftime('%Y-%m-%d'),
            progress=False
        )
        if df.empty:
            logging.error("yfinance test failed: Empty DataFrame for AAPL")
            raise ValueError("Empty DataFrame")
        logging.info("yfinance test successful")
        print("yfinance test successful")
        return True
    except Exception as e:
        logging.error(f"yfinance test failed: {e}")
        print(f"yfinance test failed: {e}")
        raise
    finally:
        elapsed_time = time.time() - start_time
        logging.info(f"yfinance test took {elapsed_time:.2f} seconds")
        print(f"yfinance test took {elapsed_time:.2f} seconds")

def validate_environment():
    """Validate Python and library versions."""
    start_time = time.time()
    logging.info("Starting Environment Setup and Validation")
    print("Setting up and validating environment...")

    try:
        # Check Python version
        if sys.version_info[:2] < (3, 6):  # Relaxed to 3.6+ for Colab compatibility
            logging.error("Python 3.6 or higher required")
            raise ValueError("Python 3.6 or higher required")
        logging.info(f"Python version validated: {sys.version.split()[0]}")
        print(f"Python: {sys.version.split()[0]}")

        # Check library versions
        libraries = {
            'pandas': pd.__version__,
            'numpy': np.__version__,
            'yfinance': yf.__version__
        }
        for lib, version in libraries.items():
            logging.info(f"{lib}: {version}")
            print(f"{lib}: {version}")

        # Test yfinance
        test_yfinance()

        # Log configuration
        logging.info(f"CONFIG: {len(CONFIG['tickers'])} tickers, date range {CONFIG['start_date']} to {CONFIG['end_date']}")
        print("Environment setup and validation complete")

    except Exception as e:
        logging.error(f"Environment validation failed: {e}")
        print(f"Error in environment validation: {e}")
        raise
    finally:
        elapsed_time = time.time() - start_time
        logging.info(f"Environment setup and validation took {elapsed_time:.2f} seconds")
        print(f"Setup and validation took {elapsed_time:.2f} seconds")
        logging.getLogger().handlers[0].flush()  # Flush again to ensure write

if __name__ == "__main__":
    validate_environment()

Logging initialized - pipeline.log should be at: /content/pipeline.log
Confirmed: pipeline.log file created successfully.
Setting up and validating environment...
Python: 3.11.13
pandas: 2.2.2
numpy: 2.0.2
yfinance: 0.2.65


  df = yf.download(


yfinance test successful
yfinance test took 0.39 seconds
Environment setup and validation complete
Setup and validation took 0.40 seconds


In [2]:
# Cell 1: Fetch and Transform Data
# Purpose: Fetch historical stock data using yfinance, cache to SQLite, save to raw_stock_data.csv.
# Inputs: CONFIG from Cell 0, yfinance API.
# Outputs: stock_data.db, raw_stock_data.csv, logs to pipeline.log.

import pandas as pd
import yfinance as yf
import sqlite3
import logging
from datetime import datetime
import time

# Configure logging
logging.basicConfig(
    filename='pipeline.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

@retry(stop_max_attempt_number=3, wait_fixed=2)
def fetch_historical_data(ticker):
    """Fetch historical data with retry logic."""
    try:
        df = yf.download(ticker, start=CONFIG['start_date'], end=CONFIG['end_date'], progress=False)
        if df.empty:
            logging.warning(f"No data fetched for {ticker}")
            return None
        if isinstance(df.columns, pd.MultiIndex):
            df.columns = df.columns.get_level_values(0)
        df['Ticker'] = ticker
        df['Date'] = df.index
        df = df.reset_index(drop=True)
        col_map = {
            'Open': 'Open', 'High': 'High', 'Low': 'Low', 'Close': 'Close', 'Volume': 'Volume',
            'Adj Close': 'Adj_Close', 'Adjusted Close': 'Adj_Close'
        }
        df = df.rename(columns={k: v for k, v in col_map.items() if k in df.columns})
        required_cols = ['Date', 'Ticker', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj_Close']
        for col in required_cols:
            if col not in df.columns:
                df[col] = df['Close'] if col == 'Adj_Close' else None
        return df[required_cols]
    except Exception as e:
        logging.error(f"Error fetching data for {ticker}: {e}")
        return None

def save_to_sqlite(df, conn):
    """Save DataFrame to SQLite with consistent schema."""
    try:
        df.to_sql('historical_data', conn, if_exists='append', index=False)
        logging.debug(f"Data saved to SQLite for {df['Ticker'].iloc[0]}")
    except Exception as e:
        logging.error(f"Error saving to SQLite: {e}")
        raise

def initialize_sqlite():
    """Initialize SQLite database with historical_data table."""
    try:
        conn = sqlite3.connect('stock_data.db')
        cursor = conn.cursor()
        cursor.execute("DROP TABLE IF EXISTS historical_data")
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS historical_data (
                Date TEXT,
                Ticker TEXT,
                Open REAL,
                High REAL,
                Low REAL,
                Close REAL,
                Volume INTEGER,
                Adj_Close REAL
            )
        """)
        conn.commit()
        return conn
    except Exception as e:
        logging.error(f"Error initializing SQLite: {e}")
        raise

def main():
    """Fetch and store historical data."""
    start_time = time.time()
    logging.info("Starting Cell 1: Data fetching")
    print("Fetching historical data...")

    try:
        conn = initialize_sqlite()
        all_data = []
        for ticker in CONFIG['tickers']:
            df = fetch_historical_data(ticker)
            if df is not None:
                all_data.append(df)
                save_to_sqlite(df, conn)
                logging.debug(f"Fetched and saved data for {ticker}")
            else:
                logging.warning(f"Failed to fetch data for {ticker}")
        conn.close()
        if all_data:
            combined_df = pd.concat(all_data, ignore_index=True)
            combined_df.to_csv('raw_stock_data.csv', index=False)
            logging.info("Saved data to raw_stock_data.csv")
            print("Data saved to raw_stock_data.csv")
        else:
            logging.warning("No data fetched for any ticker")
            print("No data fetched")
        logging.info("Cell 1: Data fetching successful")
        print("Data fetching complete")
    except Exception as e:
        logging.error(f"Cell 1: Failed: {e}")
        print(f"Error in Cell 1: {e}")
        raise
    finally:
        elapsed_time = time.time() - start_time
        logging.info(f"Cell 1 took {elapsed_time:.2f} seconds")
        print(f"Data fetching took {elapsed_time:.2f} seconds")

if __name__ == "__main__":
    main()

Fetching historical data...


  df = yf.download(ticker, start=CONFIG['start_date'], end=CONFIG['end_date'], progress=False)
  df = yf.download(ticker, start=CONFIG['start_date'], end=CONFIG['end_date'], progress=False)
  df = yf.download(ticker, start=CONFIG['start_date'], end=CONFIG['end_date'], progress=False)
  df = yf.download(ticker, start=CONFIG['start_date'], end=CONFIG['end_date'], progress=False)
  df = yf.download(ticker, start=CONFIG['start_date'], end=CONFIG['end_date'], progress=False)
  df = yf.download(ticker, start=CONFIG['start_date'], end=CONFIG['end_date'], progress=False)
  df = yf.download(ticker, start=CONFIG['start_date'], end=CONFIG['end_date'], progress=False)
  df = yf.download(ticker, start=CONFIG['start_date'], end=CONFIG['end_date'], progress=False)
  df = yf.download(ticker, start=CONFIG['start_date'], end=CONFIG['end_date'], progress=False)
  df = yf.download(ticker, start=CONFIG['start_date'], end=CONFIG['end_date'], progress=False)
  df = yf.download(ticker, start=CONFIG['start_dat

Data saved to raw_stock_data.csv
Data fetching complete
Data fetching took 19.22 seconds


In [3]:
# Cell 2: Calculate Technical Indicators
# Purpose: Compute technical indicators for historical data to enhance ML predictions.
# Inputs: raw_stock_data.csv, stock_data.db.
# Outputs: processed_stock_data.csv, logs to pipeline.log.

import pandas as pd
import numpy as np
import logging
import sqlite3
import time
import os
import yfinance as yf
from textblob import TextBlob
import json

# Configure logging
logging.basicConfig(
    filename='pipeline.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def rsi(close, length=14):
    delta = close.diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)
    avg_gain = gain.ewm(com=length-1, min_periods=length).mean()
    avg_loss = loss.ewm(com=length-1, min_periods=length).mean()
    rs = avg_gain / avg_loss
    return 100 - 100 / (1 + rs)

def stoch(high, low, close, k=14, smooth_k=3):
    l14 = low.rolling(k).min()
    h14 = high.rolling(k).max()
    per_k = 100 * (close - l14) / (h14 - l14)
    per_k = per_k.rolling(smooth_k).mean()
    return per_k

def mfi(high, low, close, volume, length=14):
    tp = (high + low + close) / 3
    mf = tp * volume
    diff = tp.diff()
    pos_mf = mf.where(diff > 0, 0).ewm(com=length-1, min_periods=length).mean()
    neg_mf = -mf.where(diff < 0, 0).ewm(com=length-1, min_periods=length).mean()
    mfr = pos_mf / neg_mf
    return 100 - 100 / (1 + mfr)

def macd(close, fast=12, slow=26, signal=9):
    ema_fast = close.ewm(span=fast, adjust=False).mean()
    ema_slow = close.ewm(span=slow, adjust=False).mean()
    macd_line = ema_fast - ema_slow
    signal_line = macd_line.ewm(span=signal, adjust=False).mean()
    return macd_line

def adx(high, low, close, length=14):
    tr1 = high - low
    tr2 = abs(high - close.shift())
    tr3 = abs(low - close.shift())
    tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
    atr = tr.ewm(com=length-1, min_periods=length).mean()
    up = (high - high.shift()).clip(lower=0)
    down = (low.shift() - low).clip(lower=0)
    pos_di = 100 * (up.ewm(com=length-1, min_periods=length).mean() / atr)
    neg_di = 100 * (down.ewm(com=length-1, min_periods=length).mean() / atr)
    dx = 100 * abs(pos_di - neg_di) / (pos_di + neg_di)
    return dx.ewm(com=length-1, min_periods=length).mean()

def ichimoku_a(high, low, close, tenkan=9, kijun=26, senkou=52):
    tenkan_max = high.rolling(tenkan).max()
    tenkan_min = low.rolling(tenkan).min()
    tenkan_sen = (tenkan_max + tenkan_min) / 2
    kijun_max = high.rolling(kijun).max()
    kijun_min = low.rolling(kijun).min()
    kijun_sen = (kijun_max + kijun_min) / 2
    senkou_a = (tenkan_sen + kijun_sen) / 2
    return senkou_a.shift(kijun)

def supertrend(high, low, close, length=10, multiplier=3):
    atr = atr_func(high, low, close, length)
    hl2 = (high + low) / 2
    upper = hl2 + multiplier * atr
    lower = hl2 - multiplier * atr
    upper = upper.where((close.shift() > upper.shift()) | upper.shift().isna(), upper.shift())
    lower = lower.where((close.shift() < lower.shift()) | lower.shift().isna(), lower.shift())
    trend = pd.Series(0.0, index=close.index)
    for i in range(1, len(close)):
        if close.iloc[i] > upper.iloc[i-1]:
            trend.iloc[i] = lower.iloc[i]
        elif close.iloc[i] < lower.iloc[i-1]:
            trend.iloc[i] = upper.iloc[i]
        else:
            trend.iloc[i] = trend.iloc[i-1]
            if trend.iloc[i-1] == upper.iloc[i-1] and close.iloc[i] < lower.iloc[i]:
                trend.iloc[i] = upper.iloc[i]
            elif trend.iloc[i-1] == lower.iloc[i-1] and close.iloc[i] > upper.iloc[i]:
                trend.iloc[i] = lower.iloc[i]
    return trend

def bbands(close, length=20, std=2):
    ma = close.rolling(length).mean()
    sd = close.rolling(length).std()
    upper = ma + std * sd
    lower = ma - std * sd
    return upper, lower

def atr_func(high, low, close, length=14):
    tr1 = high - low
    tr2 = abs(high - close.shift())
    tr3 = abs(low - close.shift())
    tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
    return tr.ewm(com=length-1, min_periods=length).mean()

def kc(high, low, close, length=20, scalar=2, mamode="ema"):
    if mamode == "ema":
        ma = close.ewm(span=length, adjust=False).mean()
    else:
        ma = close.rolling(length).mean()
    atr = atr_func(high, low, close, 10)
    upper = ma + scalar * atr
    lower = ma - scalar * atr
    return upper, lower

def vwap(high, low, close, volume):
    tp = (high + low + close) / 3
    return (tp * volume).cumsum() / volume.cumsum()

def chaikin_ad(high, low, close, volume):
    mfm = ((close - low) - (high - close)) / (high - low)
    mfv = mfm * volume
    return mfv.cumsum()

def cmf(high, low, close, volume, length=20):
    ad = chaikin_ad(high, low, close, volume)
    return ad.rolling(length).sum() / volume.rolling(length).sum()

def obv(close, volume):
    diff = close.diff()
    sign = diff.gt(0).astype(int) - diff.lt(0).astype(int)
    return (sign * volume).cumsum()

def force_index(close, volume, length=13):
    fi = close.diff() * volume
    return fi.ewm(span=length, adjust=False).mean()

def calculate_indicators(df):
    """Calculate technical indicators for a single ticker using custom functions."""
    try:
        # Momentum
        df['RSI'] = rsi(df['Close'])
        df['Stoch_K'] = stoch(df['High'], df['Low'], df['Close'])
        df['MFI'] = mfi(df['High'], df['Low'], df['Close'], df['Volume'])

        # Trend
        df['MACD'] = macd(df['Close'])
        df['ADX'] = adx(df['High'], df['Low'], df['Close'])
        df['Ichimoku_A'] = ichimoku_a(df['High'], df['Low'], df['Close'])
        df['Supertrend'] = supertrend(df['High'], df['Low'], df['Close'])

        # Volatility
        df['BB_Upper'], df['BB_Lower'] = bbands(df['Close'])
        df['ATR'] = atr_func(df['High'], df['Low'], df['Close'])
        df['KC_Upper'], df['KC_Lower'] = kc(df['High'], df['Low'], df['Close'])

        # Volume
        df['VWAP'] = vwap(df['High'], df['Low'], df['Close'], df['Volume'])
        df['AD'] = chaikin_ad(df['High'], df['Low'], df['Close'], df['Volume'])
        df['CMF'] = cmf(df['High'], df['Low'], df['Close'], df['Volume'])
        df['OBV'] = obv(df['Close'], df['Volume'])
        df['Volume_Osc'] = force_index(df['Close'], df['Volume'])

        return df.fillna(0)
    except Exception as e:
        logging.error(f"Error calculating indicators: {e}")
        return None

def get_sentiment_score(ticker):
    """Fetch recent news from yfinance and compute average sentiment using TextBlob."""
    try:
        t = yf.Ticker(ticker)
        news = t.news
        if not news:
            logging.debug(f"No news for {ticker}, using neutral sentiment")
            return 0.0
        scores = []
        for article in news:
            text = article.get('title') or article.get('summary', '')
            if text:
                scores.append(TextBlob(text).sentiment.polarity)
        if not scores:
            logging.debug(f"No valid text for sentiment in news for {ticker}")
            return 0.0
        return np.mean(scores)
    except Exception as e:
        logging.warning(f"Sentiment analysis failed for {ticker}: {e}")
        return 0.0

def main():
    """Calculate indicators and save results."""
    start_time = time.time()
    logging.info("Starting Cell 2: Indicator calculation")
    print("Calculating indicators...")

    try:
        if not os.path.exists('raw_stock_data.csv'):
            logging.error("Input file raw_stock_data.csv not found")
            raise FileNotFoundError("raw_stock_data.csv not found")

        df = pd.read_csv('raw_stock_data.csv')
        df['Date'] = pd.to_datetime(df['Date'])

        sentiment_dict = {ticker: get_sentiment_score(ticker) for ticker in df['Ticker'].unique()}
        df['Sentiment'] = df['Ticker'].map(sentiment_dict)
        with open('sentiment_scores.json', 'w') as f:
            json.dump(sentiment_dict, f)

        conn = sqlite3.connect('stock_data.db')

        all_data = []
        for ticker in df['Ticker'].unique():
            ticker_df = df[df['Ticker'] == ticker].sort_values('Date').set_index('Date')
            ticker_df = calculate_indicators(ticker_df)
            if ticker_df is not None:
                ticker_df['Ticker'] = ticker
                all_data.append(ticker_df.reset_index())

        if all_data:
            processed_df = pd.concat(all_data, ignore_index=True)
            processed_df.to_csv('processed_stock_data.csv', index=False)
            processed_df.to_sql('processed_data', conn, if_exists='replace', index=False)
            logging.info("Saved processed data to processed_stock_data.csv and SQLite")
            print("Processed data saved")
        else:
            logging.warning("No processed data generated")
            print("No processed data")

        conn.close()
        logging.info("Cell 2: Indicator calculation successful")
        print("Indicator calculation complete")

    except Exception as e:
        logging.error(f"Cell 2: Failed: {e}")
        print(f"Error in Cell 2: {e}")
        raise
    finally:
        elapsed_time = time.time() - start_time
        logging.info(f"Cell 2 took {elapsed_time:.2f} seconds")
        print(f"Indicator calculation took {elapsed_time:.2f} seconds")

if __name__ == "__main__":
    main()

Calculating indicators...
Processed data saved
Indicator calculation complete
Indicator calculation took 17.16 seconds


In [4]:
# Cell 3: One-time Historical Indicator Weighting
# Purpose: Drop and recreate indicator_weights table, compute feature importance for historical data using Random Forest, store weights in SQLite.
# Inputs: Historical data from yfinance, indicators from Cell 2.
# Outputs: indicator_weights.db updated with feature importance.

import sqlite3
import pandas as pd
import yfinance as yf
from sklearn.ensemble import RandomForestClassifier
import logging
from datetime import datetime
import numpy as np

# Configure logging
logging.basicConfig(filename='pipeline.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Define tickers and date range
TICKERS = CONFIG['tickers']
START_DATE = CONFIG['start_date']
END_DATE = CONFIG['end_date']

@retry(stop_max_attempt_number=3, wait_fixed=2000)
def fetch_historical_data(ticker):
    """Fetch historical data with retry logic."""
    try:
        df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
        if df.empty:
            logging.warning(f"No data fetched for {ticker}")
            return None
        df['Ticker'] = ticker
        return df
    except Exception as e:
        logging.error(f"Error fetching data for {ticker}: {str(e)}")
        raise

def calculate_indicators(df):
    """Calculate technical indicators using custom functions, individually with error handling."""
    indicators = {
        'RSI': rsi,
        'MACD': macd,
        'BB_Upper': lambda x: bbands(x)[0],  # Assuming bbands returns (upper, lower)
        'ATR': lambda x: atr_func(df['High'], df['Low'], df['Close']),
        'VWAP': lambda x: vwap(df['High'], df['Low'], df['Close'], df['Volume'])
    }
    for ind, func in indicators.items():
        try:
            if ind == 'BB_Upper':
                df[ind], _ = func(df['Close'])
            else:
                df[ind] = func(df['Close'])
        except Exception as e:
            logging.error(f"Error calculating {ind}: {str(e)}")
            df[ind] = np.nan
    return df

def compute_feature_importance(df):
    """Compute feature importance using Random Forest."""
    features = ['RSI', 'MACD', 'BB_Upper', 'ATR', 'VWAP']
    try:
        if df is None or df.empty:
            logging.warning("No data for feature importance")
            return pd.DataFrame({'Feature': features, 'Importance': [0.2] * len(features)})

        # Filter to available features
        available_features = [f for f in features if f in df.columns]
        if not available_features:
            logging.warning("No available features for importance")
            return pd.DataFrame({'Feature': features, 'Importance': [0.2] * len(features)})

        df = df.dropna(subset=available_features + ['Close'])
        if len(df) < 10:
            logging.warning("Too few samples for feature importance")
            return pd.DataFrame({'Feature': features, 'Importance': [0.2] * len(features)})

        df['Target'] = (df['Close'].shift(-1) > df['Close'] * 1.01).astype(int)
        df = df.dropna(subset=['Target'])
        if df.empty:
            logging.warning("Empty DF after Target dropna")
            return pd.DataFrame({'Feature': features, 'Importance': [0.2] * len(features)})

        if len(df['Target'].unique()) < 2:
            logging.warning("Only one class in Target, using default importance")
            return pd.DataFrame({'Feature': features, 'Importance': [0.2] * len(features)})

        X = df[available_features]
        # Handle inf/NaN
        X = X.replace([np.inf, -np.inf], np.nan).fillna(0)
        y = df['Target']

        model = RandomForestClassifier(n_estimators=100, random_state=42)
        model.fit(X, y)
        importance = pd.DataFrame({
            'Feature': available_features,
            'Importance': model.feature_importances_
        })
        # Pad missing features with 0 importance
        missing = set(features) - set(available_features)
        if missing:
            missing_df = pd.DataFrame({'Feature': list(missing), 'Importance': [0.0] * len(missing)})
            importance = pd.concat([importance, missing_df])
        return importance
    except Exception as e:
        logging.error(f"Error computing feature importance: {str(e)}")
        return pd.DataFrame({'Feature': features, 'Importance': [0.2] * len(features)})

def main():
    """Main function to process tickers and store weights."""
    try:
        conn = sqlite3.connect('indicator_weights.db')
        cursor = conn.cursor()

        cursor.execute("DROP TABLE IF EXISTS indicator_weights")
        cursor.execute("""
            CREATE TABLE indicator_weights (
                ticker TEXT,
                feature TEXT,
                importance REAL,
                date TEXT
            )
        """)

        for ticker in TICKERS:
            logging.info(f"Processing {ticker} for indicator weighting")

            df = fetch_historical_data(ticker)
            if df is None:
                continue

            df = calculate_indicators(df)
            if df is None:  # Though now less likely with individual tries
                continue

            importance = compute_feature_importance(df)
            # importance always returned now

            current_date = datetime.now().strftime('%Y-%m-%d')
            for _, row in importance.iterrows():
                cursor.execute("""
                    INSERT INTO indicator_weights (ticker, feature, importance, date)
                    VALUES (?, ?, ?, ?)
                """, (ticker, row['Feature'], row['Importance'], current_date))

            conn.commit()
            logging.info(f"Stored weights for {ticker}")

        conn.close()
        logging.info("Cell 3 completed successfully")

        print("Indicator weights calculated and stored in indicator_weights.db")

    except Exception as e:
        logging.error(f"Cell 3 failed: {str(e)}")
        print(f"Error in Cell 3: {str(e)}")
        raise

if __name__ == "__main__":
    main()

  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker

Indicator weights calculated and stored in indicator_weights.db


  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)


In [5]:
# Cell 4: Model Training and Feature Selection
# Purpose: Train ML models (RF, XGBoost) using Optuna for hyperparams, select features based on weights, generate predictions.
# Inputs: processed_stock_data.csv, indicator_weights.db.
# Outputs: model_rf.pkl, model_xgb.pkl, scaler.pkl, selected_features.pkl, predictions.csv.

!pip install -q optuna joblib xgboost

import pandas as pd
import numpy as np
import logging
import sqlite3
import time
import os
import json
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import precision_score
from sklearn.model_selection import TimeSeriesSplit
from xgboost import XGBClassifier
import optuna
from joblib import Parallel, delayed

def train_model(data):
    start_time = time.time()
    try:
        df = data.copy()
        df = df.dropna(subset=['Close', 'Ticker'])

        sentiment_path = f"{CONFIG['export_dir']}/sentiment_scores.json"
        if os.path.exists(sentiment_path):
            with open(sentiment_path, 'r') as f:
                sentiment_scores = json.load(f)
            df['Sentiment'] = df['Ticker'].map(sentiment_scores).fillna(0.5)
        else:
            logging.warning("Cell 4: No sentiment scores found, using neutral values")
            df['Sentiment'] = 0.5

        features = [
            'RSI', 'Stoch_K', 'MFI', 'MACD', 'ADX', 'Ichimoku_A', 'Supertrend',
            'BB_Upper', 'BB_Lower', 'ATR', 'KC_Upper', 'KC_Lower', 'VWAP', 'AD', 'CMF', 'OBV', 'Volume_Osc', 'Sentiment'
        ]

        conn = sqlite3.connect(f"{CONFIG['export_dir']}/indicator_weights.db")
        weights_df = pd.read_sql_query("SELECT * FROM indicator_weights", conn)  # Adjusted query as 'Type' may not exist
        conn.close()
        # Aggregate importance by feature, mean across tickers
        weights_df = weights_df.groupby('feature')['importance'].mean().reset_index()
        weights = dict(zip(weights_df['feature'], weights_df['importance']))
        weighted_features = sorted(
            [f for f in features if f in weights],
            key=lambda x: weights.get(x, 0),
            reverse=True
        )[:10]
        selected_features = [col for col in weighted_features if col in df.columns]  # Ensure in df
        logging.info(f"Cell 4: Using {len(selected_features)} weighted features: {selected_features}")

        for file in ['model_rf.pkl', 'model_xgb.pkl', 'scaler.pkl', 'selected_features.pkl']:
            file_path = f"{CONFIG['export_dir']}/{file}"
            if os.path.exists(file_path):
                os.remove(file_path)
                logging.info(f"Cell 4: Cleared {file_path}")

        if not selected_features:
            selected_features = [col for col in features if col in df.columns]
            logging.warning(f"Cell 4: No weighted features, using all available: {selected_features}")

        missing_features = [f for f in selected_features if f not in df.columns]
        if missing_features:
            logging.warning(f"Cell 4: Missing features: {missing_features}, filling with zeros")
            for f in missing_features:
                df[f] = 0

        df['Volume'] = np.log1p(df['Volume'].clip(lower=1))
        if 'VWAP' in df.columns:
            df['VWAP'] = df['VWAP'].clip(lower=0, upper=df['Close'].max() * 2)
        else:
            df['VWAP'] = df['Close']
        if 'ATR' in df.columns:
            df['ATR'] = df['ATR'].clip(lower=0, upper=0.1)
        else:
            df['ATR'] = 1.0

        logging.info(f"Cell 4: Features available: {selected_features}")
        print(f"Debug: Features available: {selected_features}")
        print(f"Debug: Data shape before training: {df.shape}")

        volatility = df.groupby('Ticker')['Close'].pct_change().rolling(window=5).std().groupby(df['Ticker']).last()
        df['Next_Close'] = df.groupby('Ticker')['Close'].shift(-1)
        df['Target'] = df.apply(
            lambda row: 1 if pd.notna(row['Next_Close']) and row['Next_Close'] > row['Close'] * (1 + volatility.get(row['Ticker'], 0.003) * 0.15) else 0,
            axis=1
        )
        df = df.drop(columns=['Next_Close']).dropna(subset=['Target', 'Close'] + selected_features)

        latest_date = df['Date'].max()
        train_data = df[df['Date'] < latest_date]
        pred_data = df[df['Date'] == latest_date]

        if train_data.empty or pred_data.empty:
            raise ValueError("Cell 4: Insufficient data for training or prediction")

        X_train = train_data[selected_features]
        y_train = train_data['Target']
        X_train = X_train.fillna(0)
        y_train = y_train.fillna(0).astype(int)

        logging.info(f"Cell 4: X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
        logging.info(f"Cell 4: Target distribution: {y_train.value_counts().to_dict()}")
        print(f"Debug: X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
        print(f"Debug: Target distribution: {y_train.value_counts()}")

        def train_split(train_idx, test_idx, X_train, y_train, selected_features):
            X_train_split, X_test_split = X_train.iloc[train_idx], X_train.iloc[test_idx]
            y_train_split, y_test_split = y_train.iloc[train_idx], y_train.iloc[test_idx]
            scaler = StandardScaler()
            X_train_scaled = scaler.fit_transform(X_train_split)
            X_test_scaled = scaler.transform(X_test_split)

            # Subsample for tuning: last 20000 rows for time series
            sample_size = 20000
            if len(X_train_scaled) > sample_size:
                X_tune = X_train_scaled[-sample_size:]
                y_tune = y_train_split.iloc[-sample_size:]
            else:
                X_tune = X_train_scaled
                y_tune = y_train_split

            def rf_objective(trial):
                rf_params = {
                    'n_estimators': trial.suggest_int('n_estimators', 50, 100),
                    'max_depth': trial.suggest_int('max_depth', 3, 7),
                    'min_samples_split': trial.suggest_int('min_samples_split', 5, 10),
                    'min_samples_leaf': trial.suggest_int('min_samples_leaf', 2, 5)
                }
                rf_model = RandomForestClassifier(**rf_params, random_state=42, class_weight='balanced')
                rf_model.fit(X_tune, y_tune)
                rf_pred = rf_model.predict_proba(X_test_scaled)[:, 1]
                return precision_score(y_test_split, (rf_pred > 0.5).astype(int), zero_division=0)

            def xgb_objective(trial):
                xgb_params = {
                    'n_estimators': trial.suggest_int('n_estimators', 50, 100),
                    'max_depth': trial.suggest_int('max_depth', 3, 7),
                    'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.05)
                }
                xgb_model = XGBClassifier(**xgb_params, random_state=42, scale_pos_weight=max(1, (y_tune==0).sum() / (y_tune==1).sum()))
                xgb_model.fit(X_tune, y_tune)
                xgb_pred = xgb_model.predict_proba(X_test_scaled)[:, 1]
                return precision_score(y_test_split, (xgb_pred > 0.5).astype(int), zero_division=0)

            rf_study = optuna.create_study(direction='maximize')
            rf_study.optimize(rf_objective, n_trials=5)
            rf_model = RandomForestClassifier(**rf_study.best_params, random_state=42, class_weight='balanced')
            rf_model.fit(X_train_scaled, y_train_split)
            rf_pred = rf_model.predict_proba(X_test_scaled)[:, 1]
            rf_score = precision_score(y_test_split, (rf_pred > 0.5).astype(int), zero_division=0)

            xgb_study = optuna.create_study(direction='maximize')
            xgb_study.optimize(xgb_objective, n_trials=5)
            xgb_model = XGBClassifier(**xgb_study.best_params, random_state=42, scale_pos_weight=max(1, (y_train_split==0).sum() / (y_train_split==1).sum()))
            xgb_model.fit(X_train_scaled, y_train_split)
            xgb_pred = xgb_model.predict_proba(X_test_scaled)[:, 1]
            xgb_score = precision_score(y_test_split, (xgb_pred > 0.5).astype(int), zero_division=0)

            return rf_score, xgb_score, rf_model, xgb_model, scaler

        tscv = TimeSeriesSplit(n_splits=5)
        results = Parallel(n_jobs=-1)(
            delayed(train_split)(train_idx, test_idx, X_train, y_train, selected_features)
            for train_idx, test_idx in tscv.split(X_train)
        )
        rf_scores = [r[0] for r in results]
        xgb_scores = [r[1] for r in results]
        rf_model = results[-1][2]
        xgb_model = results[-1][3]
        scaler = results[-1][4]

        X_train_scaled = scaler.fit_transform(X_train)
        X_pred_scaled = scaler.transform(pred_data[selected_features].fillna(0))

        rf_model.fit(X_train_scaled, y_train)
        xgb_model.fit(X_train_scaled, y_train)
        rf_pred = rf_model.predict_proba(X_pred_scaled)[:, 1]
        xgb_pred = xgb_model.predict_proba(X_pred_scaled)[:, 1]
        confidence = (rf_pred + xgb_pred) / 2

        predictions_df = pred_data[['Ticker', 'Date', 'Close']].copy()
        predictions_df['Confidence'] = confidence
        predictions_df['Prediction'] = (confidence > 0.5).astype(int)
        predictions_df = predictions_df.sort_values('Confidence', ascending=False)

        rf_importance = pd.DataFrame({
            'Feature': selected_features,
            'Importance': rf_model.feature_importances_
        }).sort_values('Importance', ascending=False)
        logging.info(f"Cell 4: RF Feature Importance (Top 5): {rf_importance.head().to_dict()}")

        with open(f"{CONFIG['export_dir']}/model_rf.pkl", 'wb') as f:
            pickle.dump(rf_model, f)
        logging.info(f"Cell 4: Saved {CONFIG['export_dir']}/model_rf.pkl")
        with open(f"{CONFIG['export_dir']}/model_xgb.pkl", 'wb') as f:
            pickle.dump(xgb_model, f)
        logging.info(f"Cell 4: Saved {CONFIG['export_dir']}/model_xgb.pkl")
        with open(f"{CONFIG['export_dir']}/scaler.pkl", 'wb') as f:
            pickle.dump(scaler, f)
        logging.info(f"Cell 4: Saved {CONFIG['export_dir']}/scaler.pkl")
        with open(f"{CONFIG['export_dir']}/selected_features.pkl", 'wb') as f:
            pickle.dump(selected_features, f)
        logging.info(f"Cell 4: Saved {CONFIG['export_dir']}/selected_features.pkl")
        predictions_df.to_csv(f"{CONFIG['export_dir']}/predictions.csv", index=False)

        avg_cv_accuracy = (np.mean(rf_scores) + np.mean(xgb_scores)) / 2 * 100
        logging.info(f"Cell 4: Model trained, Avg CV Accuracy: {avg_cv_accuracy:.2f}%, Predictions shape: {predictions_df.shape}, Time: {time.time() - start_time:.2f}s")
        print(f"Cell 4: Model trained, Avg CV Accuracy: {avg_cv_accuracy:.2f}%, Predictions shape: {predictions_df.shape}, Time: {time.time() - start_time:.2f}s")
        return scaler, selected_features, predictions_df
    except Exception as e:
        logging.error(f"Cell 4: Failed: {str(e)}")
        raise

# Execute
processed_data = pd.read_csv(f"{CONFIG['export_dir']}/processed_stock_data.csv")
scaler, selected_features, predictions_df = train_model(processed_data)

Debug: Features available: ['RSI', 'MACD', 'BB_Upper', 'ATR', 'VWAP']
Debug: Data shape before training: (71731, 26)
Debug: X_train shape: (71702, 5), y_train shape: (71702,)
Debug: Target distribution: Target
0    38463
1    33239
Name: count, dtype: int64
Cell 4: Model trained, Avg CV Accuracy: 47.36%, Predictions shape: (29, 5), Time: 77.02s


In [6]:
# Cell 5: Generate Trade Candidates
# Purpose: Generate trade candidates based on predictions, excluding recent losses.
# Inputs: predictions.csv, loss_tracker.db.
# Outputs: trade_candidates.csv.

import pandas as pd
import sqlite3
import logging
import time
from datetime import datetime

def generate_trade_candidates(predictions_df, confidence_threshold=CONFIG['confidence_threshold']):
    start_time = time.time()
    try:
        if predictions_df.empty:
            raise ValueError("Cell 5: Empty predictions dataframe")

        conn = sqlite3.connect(f"{CONFIG['export_dir']}/loss_tracker.db")
        cursor = conn.cursor()
        cursor.execute("CREATE TABLE IF NOT EXISTS loss_tracker (Ticker TEXT, Loss_Date TEXT)")
        conn.commit()
        loss_data = pd.read_sql("SELECT * FROM loss_tracker", conn)
        conn.close()

        current_date = datetime.now()
        excluded_tickers = []
        for _, row in loss_data.iterrows():
            loss_date = pd.to_datetime(row['Loss_Date'])
            if (current_date - loss_date).days < 35:
                excluded_tickers.append(row['Ticker'])
        excluded_tickers = list(set(excluded_tickers))

        df = predictions_df.copy()
        df = df[~df['Ticker'].isin(excluded_tickers)]
        volatility = df.groupby('Ticker')['Close'].pct_change().rolling(window=5).std().groupby(df['Ticker']).last()
        df['Volatility'] = df['Ticker'].map(volatility).fillna(1.0)

        candidates = df[df['Confidence'] >= confidence_threshold][['Ticker', 'Date', 'Close', 'Confidence', 'Prediction', 'Volatility']]
        candidates = candidates.sort_values('Confidence', ascending=False)

        candidates.to_csv(f"{CONFIG['export_dir']}/trade_candidates.csv", index=False)
        logging.info(f"Cell 5: Confidence threshold: {confidence_threshold}, Volatility: {candidates['Volatility'].mean():.4f}")
        logging.info(f"Cell 5: Confidence stats: {candidates['Confidence'].describe().to_dict()}")
        logging.info(f"Cell 5: Generated {len(candidates)} trade candidates, Excluded tickers: {excluded_tickers}")
        logging.info(f"Cell 5: Trade candidates generated, Shape: {candidates.shape}, Time: {time.time() - start_time:.2f}s")
        print(f"Cell 5: Trade candidates generated, Shape: {candidates.shape}, Time: {time.time() - start_time:.2f}s")
        return candidates
    except Exception as e:
        logging.error(f"Cell 5: Failed: {str(e)}")
        raise

# Execute
predictions_df = pd.read_csv(f"{CONFIG['export_dir']}/predictions.csv")
trade_candidates_df = generate_trade_candidates(predictions_df)

Cell 5: Trade candidates generated, Shape: (0, 6), Time: 0.01s


In [7]:
# Cell 6: Telegram Alerts
# Purpose: Send Telegram alerts for trade candidates.
# Inputs: trade_candidates.csv.
# Outputs: Telegram messages sent, logs to pipeline.log.

import requests
import logging
import time
import pandas as pd

def send_telegram_alert(candidates_df):
    start_time = time.time()
    try:
        if candidates_df.empty:
            logging.warning("Cell 6: No candidates for Telegram alert")
            print("Cell 6: No candidates for Telegram alert")
            return

        bot_token = CONFIG['telegram_token']
        chat_id = CONFIG['telegram_chat_id']
        candidates_df['Type'] = 'Buy'  # Assume all are buy for now
        for _, row in candidates_df.iterrows():
            if row['Type'] == 'Buy':
                message = (
                    f"🚨 Trade Alert 🚨\n"
                    f"Ticker: {row['Ticker']}\n"
                    f"Date: {row['Date']}\n"
                    f"Action: Buy\n"
                    f"Entry Price: ${row['Close']:.2f}\n"
                    f"Confidence: {row['Confidence']:.2%}\n"
                    f"Volatility: {row['Volatility']:.4f}"
                )
                url = f"https://api.telegram.org/bot{bot_token}/sendMessage?chat_id={chat_id}&text={message}"
                response = requests.get(url)
                response.raise_for_status()
                logging.info(f"Cell 6: Telegram alert sent for {row['Ticker']}.")
                print(f"Cell 6: Telegram alert sent for {row['Ticker']}.")
                time.sleep(0.2)
        logging.info(f"Cell 6: Telegram alerts completed, Time: {time.time() - start_time:.2f}s")
        print(f"Cell 6: Telegram alerts completed, Time: {time.time() - start_time:.2f}s")
    except Exception as e:
        logging.error(f"Cell 6: Failed: {str(e)}")
        raise

# Execute
trade_candidates_df = pd.read_csv(f"{CONFIG['export_dir']}/trade_candidates.csv")
send_telegram_alert(trade_candidates_df)

Cell 6: No candidates for Telegram alert


In [8]:
# Cell 7: Backtest Strategy
# Purpose: Backtest the strategy on trade candidates.
# Inputs: processed_stock_data.csv, trade_candidates.csv.
# Outputs: backtest_results.csv, updates loss_tracker.db.

import pandas as pd
import sqlite3
import logging
import time
from datetime import datetime, timedelta

def backtest_strategy(data, trade_candidates, target_multiplier=0.8, stop_loss_multiplier=2.0):
    start_time = time.time()
    try:
        if trade_candidates.empty or data.empty:
            logging.warning("Cell 7: Empty trade candidates or data, skipping backtest")
            print("Cell 7: No trades to backtest")
            return pd.DataFrame()  # Return empty DF instead of raising error

        df = data.copy()
        df['Date'] = pd.to_datetime(df['Date'])
        candidates = trade_candidates.copy()
        candidates['Date'] = pd.to_datetime(candidates['Date'])

        results = []
        for _, candidate in candidates.iterrows():
            ticker = candidate['Ticker']
            entry_date = candidate['Date']
            entry_price = candidate['Close']

            start_date = entry_date - timedelta(days=30)
            end_date = entry_date
            ticker_data = df[(df['Ticker'] == ticker) & (df['Date'] >= start_date) & (df['Date'] <= end_date)].sort_values('Date')

            if len(ticker_data) < 14:
                logging.warning(f"Cell 7: Insufficient data for {ticker} from {start_date} to {end_date}, skipping")
                continue

            atr = ticker_data['ATR'].iloc[-1] if 'ATR' in ticker_data.columns else 1.0
            atr = atr if pd.notna(atr) else 1.0

            target_price = entry_price + atr * target_multiplier
            stop_loss_price = entry_price - atr * stop_loss_multiplier

            outcome = 'Loss'
            exit_price = stop_loss_price
            exit_date = entry_date

            future_data = df[(df['Ticker'] == ticker) & (df['Date'] > entry_date)].sort_values('Date')
            if future_data.empty:
                future_data = ticker_data[ticker_data['Date'] <= entry_date].iloc[-10:]

            for _, row in future_data.iterrows():
                if row['High'] >= target_price:
                    outcome = 'Win'
                    exit_price = target_price
                    exit_date = row['Date']
                    break
                if row['Low'] <= stop_loss_price:
                    outcome = 'Loss'
                    exit_price = stop_loss_price
                    exit_date = row['Date']
                    break

            profit = exit_price - entry_price
            results.append({
                'Ticker': ticker,
                'Entry_Date': entry_date,
                'Entry_Price': entry_price,
                'Exit_Date': exit_date,
                'Exit_Price': exit_price,
                'Outcome': outcome,
                'Profit': profit,
                'ATR': atr,
                'Target_Price': target_price,
                'Stop_Loss': stop_loss_price
            })

        results_df = pd.DataFrame(results)
        if results_df.empty:
            logging.warning("Cell 7: No trades executed")
            return results_df

        conn = sqlite3.connect(f"{CONFIG['export_dir']}/loss_tracker.db")
        cursor = conn.cursor()
        cursor.execute("CREATE TABLE IF NOT EXISTS loss_tracker (Ticker TEXT, Loss_Date TEXT)")
        for _, row in results_df[results_df['Outcome'] == 'Loss'].iterrows():
            cursor.execute("INSERT OR IGNORE INTO loss_tracker (Ticker, Loss_Date) VALUES (?, ?)", (row['Ticker'], str(row['Exit_Date'])))
        conn.commit()
        conn.close()

        win_rate = (results_df['Outcome'] == 'Win').mean() * 100
        reward_risk = results_df[results_df['Outcome'] == 'Win']['Profit'].sum() / abs(results_df[results_df['Outcome'] == 'Loss']['Profit'].sum() + 1e-6)

        results_df.to_csv(f"{CONFIG['export_dir']}/backtest_results.csv", index=False)
        logging.info(f"Cell 7: Backtest completed, {len(results_df)} trades, Win rate: {win_rate:.2f}%, Reward:Risk: {reward_risk:.2f}, Time: {time.time() - start_time:.2f}s")
        print(f"Cell 7: Backtest completed, {len(results_df)} trades, Win rate: {win_rate:.2f}%, Reward:Risk: {reward_risk:.2f}, Time: {time.time() - start_time:.2f}s")
        return results_df
    except Exception as e:
        logging.error(f"Cell 7: Failed: {str(e)}")
        raise

# Execute
processed_data = pd.read_csv(f"{CONFIG['export_dir']}/processed_stock_data.csv")
trade_candidates_df = pd.read_csv(f"{CONFIG['export_dir']}/trade_candidates.csv")
backtest_results = backtest_strategy(processed_data, trade_candidates_df)

Cell 7: No trades to backtest


In [9]:
# Cell 8: Track Performance
# Purpose: Track performance metrics from backtest results, including taxes.
# Inputs: backtest_results.csv.
# Outputs: performance_tracker.csv.

import pandas as pd
import numpy as np
import logging
import time
import os

def track_performance(backtest_results):
    start_time = time.time()
    try:
        if backtest_results.empty:
            logging.warning("Cell 8: No backtest results to track")
            return pd.DataFrame()

        df = backtest_results.copy()
        df['Returns'] = df['Profit'] / df['Entry_Price']

        net_returns = df['Returns'].copy()
        net_returns[df['Outcome'] == 'Win'] *= 0.6  # 60% retained after taxes
        net_profit = net_returns.sum()

        win_rate = (df['Outcome'] == 'Win').mean() * 100
        reward_risk = df[df['Outcome'] == 'Win']['Profit'].sum() * 0.6 / abs(df[df['Outcome'] == 'Loss']['Profit'].sum() + 1e-6)
        sharpe_ratio = net_returns.mean() / (net_returns.std() + 1e-6) * np.sqrt(252)
        cumulative_returns = (1 + net_returns).cumprod()
        max_drawdown = (cumulative_returns.cummax() - cumulative_returns).max()

        performance_df = df.copy()
        performance_df['Win_Rate'] = win_rate
        performance_df['Reward_Risk'] = reward_risk
        performance_df['Sharpe'] = sharpe_ratio
        performance_df['Max_Drawdown'] = max_drawdown
        performance_df['Net_Profit'] = net_profit

        performance_df.to_csv(f"{CONFIG['export_dir']}/performance_tracker.csv", index=False)
        logging.info(f"Cell 8: Performance tracked, Shape: {performance_df.shape}, Win rate: {win_rate:.2f}%, Reward:Risk: {reward_risk:.2f}, Sharpe: {sharpe_ratio:.2f}, Max Drawdown: {max_drawdown:.2f}%, Net Profit: {net_profit:.2f}, Time: {time.time() - start_time:.2f}s")
        print(f"Cell 8: Performance tracked, Shape: {performance_df.shape}, Win rate: {win_rate:.2f}%, Reward:Risk: {reward_risk:.2f}, Sharpe: {sharpe_ratio:.2f}, Max Drawdown: {max_drawdown:.2f}%, Net Profit: {net_profit:.2f}, Time: {time.time() - start_time:.2f}s")
        return performance_df
    except Exception as e:
        logging.error(f"Cell 8: Failed: {str(e)}")
        raise

# Execute
file_path = f"{CONFIG['export_dir']}/backtest_results.csv"
if os.path.exists(file_path):
    backtest_results = pd.read_csv(file_path)
else:
    logging.warning(f"Cell 8: backtest_results.csv not found, using empty DataFrame")
    backtest_results = pd.DataFrame()
performance_df = track_performance(backtest_results)

In [10]:
# Cell 9: Retrain Model with Feedback
# Purpose: Retrain model using backtest feedback as labels.
# Inputs: processed_stock_data.csv, performance_tracker.csv.
# Outputs: retrained_model.pkl, retrained_scaler.pkl.

import pandas as pd
import pickle
import logging
import time
import os

def retrain_model(processed_data, performance_df):
    start_time = time.time()
    try:
        if performance_df.empty or 'Win_Rate' not in performance_df.columns or performance_df['Win_Rate'].iloc[0] >= 50:
            logging.info("Cell 9: Performance satisfactory or no data, no retrain needed")
            return None, None, None

        logging.info("Cell 9: Win rate below 50%, triggering retrain")

        with open(f"{CONFIG['export_dir']}/model_rf.pkl", 'rb') as f:
            rf_model = pickle.load(f)
        with open(f"{CONFIG['export_dir']}/model_xgb.pkl", 'rb') as f:
            xgb_model = pickle.load(f)
        with open(f"{CONFIG['export_dir']}/scaler.pkl", 'rb') as f:
            scaler = pickle.load(f)
        with open(f"{CONFIG['export_dir']}/selected_features.pkl", 'rb') as f:
            selected_features = pickle.load(f)

        df = processed_data.copy()
        df['Target'] = (df.groupby('Ticker')['Close'].shift(-1) > df['Close']).astype(int)
        df = df.dropna()
        X = df[selected_features]
        y = df['Target']

        X_scaled = scaler.fit_transform(X)
        rf_model.fit(X_scaled, y)
        xgb_model.fit(X_scaled, y)

        with open(f"{CONFIG['export_dir']}/retrained_model_rf.pkl", 'wb') as f:
            pickle.dump(rf_model, f)
        with open(f"{CONFIG['export_dir']}/retrained_model_xgb.pkl", 'wb') as f:
            pickle.dump(xgb_model, f)
        with open(f"{CONFIG['export_dir']}/retrained_scaler.pkl", 'wb') as f:
            pickle.dump(scaler, f)

        logging.info(f"Cell 9: Models retrained, Time: {time.time() - start_time:.2f}s")
        print(f"Cell 9: Models retrained, Time: {time.time() - start_time:.2f}s")
        return rf_model, xgb_model, scaler
    except Exception as e:
        logging.error(f"Cell 9: Failed: {str(e)}")
        raise

# Execute
processed_data = pd.read_csv(f"{CONFIG['export_dir']}/processed_stock_data.csv")
file_path = f"{CONFIG['export_dir']}/performance_tracker.csv"
if os.path.exists(file_path):
    performance_df = pd.read_csv(file_path)
else:
    logging.warning(f"Cell 9: performance_tracker.csv not found, using empty DataFrame")
    performance_df = pd.DataFrame()
retrained_model_rf, retrained_model_xgb, retrained_scaler = retrain_model(processed_data, performance_df)

In [11]:
# Cell 10: Fetch Intraday Data
# Purpose: Fetch and cache intraday data for monitoring.
# Inputs: CONFIG tickers, interval.
# Outputs: intraday_data.csv.

import pandas as pd
import yfinance as yf
import sqlite3
import logging
import time
from datetime import datetime, timedelta

def fetch_intraday_data(tickers, interval=CONFIG['intraday_interval'], lookback_hours=CONFIG['intraday_lookback_hours']):
    start_time = time.time()
    try:
        conn = sqlite3.connect(f"{CONFIG['export_dir']}/stock_data.db")
        cursor = conn.cursor()
        cursor.execute('''CREATE TABLE IF NOT EXISTS intraday_data (
            Date TEXT, Ticker TEXT, Open REAL, High REAL, Low REAL, Close REAL, Volume INTEGER,
            PRIMARY KEY (Date, Ticker)
        )''')

        end_date = datetime.now()
        start_date = end_date - timedelta(days=5)  # Sufficient for intraday lookback
        cached_data = pd.read_sql_query(
            f"SELECT * FROM intraday_data WHERE Date >= ? AND Date <= ? AND Ticker IN ({','.join(['?']*len(tickers))})",
            conn, params=[start_date.strftime('%Y-%m-%d %H:%M:%S'), end_date.strftime('%Y-%m-%d %H:%M:%S')] + tickers
        )
        cached_data['Date'] = pd.to_datetime(cached_data['Date'])
        cached_tickers = set(cached_data['Ticker'])
        new_tickers = [t for t in tickers if t not in cached_tickers]

        if len(cached_data) >= len(tickers) * lookback_hours * 0.7 and not new_tickers:
            logging.info(f"Cell 10: Loaded {len(cached_data)} intraday rows from cache")
            df = cached_data
        else:
            batch_size = 5
            all_data = [cached_data] if not cached_data.empty else []
            for i in range(0, len(new_tickers), batch_size):
                batch_tickers = new_tickers[i:i + batch_size]
                try:
                    @retry(stop_max_attempt_number=3, wait_fixed=2)
                    def fetch_yfinance(tickers, start, end, interval):
                        return yf.download(tickers, start=start, end=end, interval=interval, progress=False, auto_adjust=True)

                    data = fetch_yfinance(batch_tickers, start_date, end_date, interval)
                    if data.empty:
                        raise ValueError(f"No intraday data for {batch_tickers}")
                    if isinstance(data.columns, pd.MultiIndex):
                        data.columns = [f"{col[0]}_{col[1]}" for col in data.columns]
                    data = data.reset_index().rename(columns={'Datetime': 'Date'})
                    data['Date'] = pd.to_datetime(data['Date'])
                    melted = data.melt(id_vars=['Date'], var_name='Metric', value_name='Value')
                    melted['Ticker'] = melted['Metric'].str.split('_').str[-1]
                    melted['Metric'] = melted['Metric'].str.split('_').str[0]
                    df_batch = melted.pivot_table(index=['Date', 'Ticker'], columns='Metric', values='Value').reset_index()
                    df_batch.columns.name = None
                    df_batch.columns = [col.capitalize() for col in df_batch.columns]
                    df_batch = df_batch.drop_duplicates(subset=['Date', 'Ticker'], keep='last')
                    all_data.append(df_batch)
                    logging.info(f"Cell 10: Fetched intraday data for {batch_tickers}, Shape: {df_batch.shape}")
                except Exception as e:
                    logging.warning(f"Cell 10: yfinance failed for {batch_tickers}: {str(e)}")
                    continue

            df = pd.concat(all_data, ignore_index=True) if all_data else pd.DataFrame()
            df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
            df = df.dropna(subset=['Date', 'Close', 'Ticker'])
            df = df.sort_values(['Ticker', 'Date']).drop_duplicates(subset=['Date', 'Ticker'], keep='last').reset_index(drop=True)

            if not df.empty:
                existing_rows = pd.read_sql_query(
                    f"SELECT Date, Ticker FROM intraday_data WHERE Ticker IN ({','.join(['?']*len(tickers))})",
                    conn, params=tickers
                )
                existing_rows['Date'] = pd.to_datetime(existing_rows['Date'])
                existing_set = set(existing_rows[['Date', 'Ticker']].itertuples(index=False, name=None))
                new_rows = df[~df[['Date', 'Ticker']].apply(tuple, axis=1).isin(existing_set)]
                if not new_rows.empty:
                    new_rows.to_sql('intraday_data', conn, if_exists='append', index=False, method='multi')
                    logging.info(f"Cell 10: Cached {len(new_rows)} new intraday rows")

        conn.close()
        df.to_csv(f"{CONFIG['export_dir']}/intraday_data.csv", index=False)
        logging.info(f"Cell 10: Intraday data fetched, Shape: {df.shape}, Time: {time.time() - start_time:.2f}s")
        print(f"Cell 10: Intraday data fetched, Shape: {df.shape}, Time: {time.time() - start_time:.2f}s")
        return df
    except Exception as e:
        logging.error(f"Cell 10: Failed: {str(e)}")
        raise

# Execute
intraday_data = fetch_intraday_data(CONFIG['tickers'])

Cell 10: Intraday data fetched, Shape: (9048, 7), Time: 6.47s


In [12]:
# Cell 11: Diagnostics
# Purpose: Run diagnostics on files, data, models, and sentiment.
# Inputs: Various files in export_dir.
# Outputs: Logs with diagnostic info.

import os
import logging
import time
import pandas as pd
import pickle
import numpy as np

def run_diagnostics():
    start_time = time.time()
    try:
        files = os.listdir(CONFIG['export_dir'])
        logging.info(f"Cell 11: Files: {files}")

        processed_data = pd.read_csv(f"{CONFIG['export_dir']}/processed_stock_data.csv")
        nan_counts = processed_data.isna().sum()
        logging.info(f"Cell 11: Processed data shape: {processed_data.shape}, NaNs: {nan_counts[nan_counts > 0].to_dict()}")

        with open(f"{CONFIG['export_dir']}/model_rf.pkl", 'rb') as f:
            pickle.load(f)
        with open(f"{CONFIG['export_dir']}/model_xgb.pkl", 'rb') as f:
            pickle.load(f)
        logging.info("Cell 11: Models loaded successfully")

        sentiment_scores = {}
        for ticker in CONFIG['tickers']:
            sentiment_scores[ticker] = np.random.uniform(0, 1)  # Placeholder for diagnostics
            logging.info(f"Cell 11: Sentiment score for {ticker}: {sentiment_scores[ticker]:.2f}")

        with open(f"{CONFIG['export_dir']}/sentiment_scores.json", 'w') as f:
            json.dump(sentiment_scores, f)
        logging.info(f"Cell 11: Sentiment scores saved: {sentiment_scores}")
        logging.info(f"Cell 11: Diagnostics completed, Time: {time.time() - start_time:.2f}s")
        print(f"Cell 11: Diagnostics completed, Time: {time.time() - start_time:.2f}s")
    except Exception as e:
        logging.error(f"Cell 11: Failed: {str(e)}")
        raise

# Execute
run_diagnostics()

Cell 11: Diagnostics completed, Time: 1.48s


In [13]:
# Cell 12: Pipeline Orchestration
# Purpose: Orchestrate the full pipeline execution.
# Inputs: All previous outputs.
# Outputs: Full pipeline run, updates files.

import logging
import time
import pandas as pd
import pickle

def monitor_intraday(intraday_data, last_alert_time):
    start_time = time.time()
    try:
        if intraday_data.empty:
            logging.warning("Cell 12: Intraday data is empty, skipping monitoring")
            return last_alert_time

        with open(f"{CONFIG['export_dir']}/retrained_model.pkl", 'rb') as f:
            model = pickle.load(f)
        with open(f"{CONFIG['export_dir']}/retrained_scaler.pkl", 'rb') as f:
            scaler = pickle.load(f)
        with open(f"{CONFIG['export_dir']}/selected_features.pkl", 'rb') as f:
            selected_features = pickle.load(f)

        processed_intraday = calculate_indicators(intraday_data.reset_index())
        processed_intraday = processed_intraday.set_index('Date')

        X = processed_intraday[selected_features].fillna(0)
        X_scaled = scaler.transform(X)

        confidences = model.predict_proba(X_scaled)[:, 1]
        processed_intraday['Confidence'] = confidences

        signals = processed_intraday[processed_intraday['Confidence'] >= CONFIG['confidence_threshold']]
        signals = signals[['Ticker', 'Confidence']]

        signals.to_csv(f"{CONFIG['export_dir']}/intraday_signals.csv", index=True)
        logging.info(f"Cell 12: Generated {len(signals)} intraday signals")

        return pd.to_datetime(datetime.now())
    except Exception as e:
        logging.error(f"Cell 12: Monitor intraday failed: {str(e)}")
        raise

def run_pipeline():
    start_time = time.time()
    retries = 3
    for attempt in range(retries):
        try:
            # Cell 3: Indicator Weighting (one-time, but call if db empty)
            if not os.path.exists(f"{CONFIG['export_dir']}/indicator_weights.db"):
                main()  # Call the main from Cell 3

            # Cell 1: Fetch data
            main()  # Call main from Cell 1

            # Cell 2: Indicators
            main()  # Call main from Cell 2

            processed_data = pd.read_csv(f"{CONFIG['export_dir']}/processed_stock_data.csv")
            scaler, selected_features, predictions_df = train_model(processed_data)
            trade_candidates_df = generate_trade_candidates(predictions_df)
            backtest_results = backtest_strategy(processed_data, trade_candidates_df)
            performance_df = track_performance(backtest_results)
            retrained_model, retrained_scaler = retrain_model(processed_data, performance_df)
            intraday_data = fetch_intraday_data(CONFIG['tickers'])
            last_alert_time = None
            if not intraday_data.empty:
                last_alert_time = monitor_intraday(intraday_data, None)
            logging.info(f"Cell 12: Pipeline completed, Time: {time.time() - start_time:.2f}s")
            print(f"Cell 12: Pipeline completed, Time: {time.time() - start_time:.2f}s")
            return
        except Exception as e:
            logging.error(f"Cell 12: Attempt {attempt+1} failed: {str(e)}")
            if attempt == retries - 1:
                logging.error(f"Cell 12: Pipeline failed after {retries} attempts: {str(e)}")
                print(f"Cell 12: Pipeline failed: {str(e)}")
                raise
            time.sleep(5)

# Execute
run_pipeline()

  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker

Indicator weights calculated and stored in indicator_weights.db


  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker

Indicator weights calculated and stored in indicator_weights.db
Debug: Features available: ['RSI', 'MACD', 'BB_Upper', 'ATR', 'VWAP']
Debug: Data shape before training: (71731, 26)
Debug: X_train shape: (71702, 5), y_train shape: (71702,)
Debug: Target distribution: Target
0    38463
1    33239
Name: count, dtype: int64
Cell 4: Model trained, Avg CV Accuracy: 47.55%, Predictions shape: (29, 5), Time: 55.45s
Cell 5: Trade candidates generated, Shape: (0, 6), Time: 0.01s
Cell 7: No trades to backtest


  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker

Indicator weights calculated and stored in indicator_weights.db


  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker

Indicator weights calculated and stored in indicator_weights.db
Debug: Features available: ['RSI', 'MACD', 'BB_Upper', 'ATR', 'VWAP']
Debug: Data shape before training: (71731, 26)
Debug: X_train shape: (71702, 5), y_train shape: (71702,)
Debug: Target distribution: Target
0    38463
1    33239
Name: count, dtype: int64
Cell 4: Model trained, Avg CV Accuracy: 47.33%, Predictions shape: (29, 5), Time: 79.93s
Cell 5: Trade candidates generated, Shape: (0, 6), Time: 0.02s
Cell 7: No trades to backtest


  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker

Indicator weights calculated and stored in indicator_weights.db


  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False)
  df = yf.download(ticker

Indicator weights calculated and stored in indicator_weights.db
Debug: Features available: ['RSI', 'MACD', 'BB_Upper', 'ATR', 'VWAP']
Debug: Data shape before training: (71731, 26)
Debug: X_train shape: (71702, 5), y_train shape: (71702,)
Debug: Target distribution: Target
0    38463
1    33239
Name: count, dtype: int64
Cell 4: Model trained, Avg CV Accuracy: 47.39%, Predictions shape: (29, 5), Time: 67.04s
Cell 5: Trade candidates generated, Shape: (0, 6), Time: 0.01s
Cell 7: No trades to backtest
Cell 12: Pipeline failed: too many values to unpack (expected 2)


ValueError: too many values to unpack (expected 2)

In [None]:
# Cell 13: Schedule Pipeline
# Purpose: Schedule pipeline run during market hours.
# Inputs: None
# Outputs: Pipeline run if market open.

import logging
import time
import pandas as pd
from datetime import datetime

def schedule_pipeline():
    start_time = time.time()
    try:
        current_time = pd.to_datetime(datetime.now()).tz_convert('US/Eastern')
        market_open = current_time.replace(hour=9, minute=30, second=0, microsecond=0)
        market_close = current_time.replace(hour=16, minute=0, second=0, microsecond=0)

        if current_time.weekday() < 5 and market_open <= current_time <= market_close:
            run_pipeline()
        else:
            logging.info("Cell 13: Market closed, skipping pipeline run")

        logging.info(f"Cell 13: Schedule check completed, Time: {time.time() - start_time:.2f}s")
        print(f"Cell 13: Schedule check completed, Time: {time.time() - start_time:.2f}s")
    except Exception as e:
        logging.error(f"Cell 13: Failed: {str(e)}")
        raise

# Execute
schedule_pipeline()

In [None]:
# Cell 14: Modular Indicator Framework
# Purpose: Load config for indicators, apply to data.
# Inputs: processed_stock_data.csv, indicator_config.json.
# Outputs: modular_processed_stock_data.csv.

import json
import logging
import time
import pandas as pd
from joblib import Parallel, delayed

def load_indicator_config(config_path=f"{CONFIG['export_dir']}/indicator_config.json"):
    default_config = {
        "indicators": {
            "RSI": {"enabled": True, "params": {"length": 14}},
            "Stoch_K": {"enabled": True, "params": {"k": 14, "smooth_k": 3}},
            "MFI": {"enabled": True, "params": {"length": 14}},
            "MACD": {"enabled": True, "params": {"fast": 12, "slow": 26, "signal": 9}},
            "ADX": {"enabled": True, "params": {"length": 14}},
            "Ichimoku_A": {"enabled": True, "params": {"tenkan": 9, "kijun": 26, "senkou": 52}},
            "Supertrend": {"enabled": True, "params": {"length": 10, "multiplier": 3}},
            "BB_Upper": {"enabled": True, "params": {"length": 20, "std": 2}},
            "BB_Lower": {"enabled": True, "params": {"length": 20, "std": 2}},
            "ATR": {"enabled": True, "params": {"length": 14}},
            "KC_Upper": {"enabled": True, "params": {"length": 20, "scalar": 2, "mamode": "ema"}},
            "KC_Lower": {"enabled": True, "params": {"length": 20, "scalar": 2, "mamode": "ema"}},
            "VWAP": {"enabled": True, "params": {}},
            "AD": {"enabled": True, "params": {}},
            "CMF": {"enabled": True, "params": {"length": 20}},
            "OBV": {"enabled": True, "params": {}},
            "Volume_Osc": {"enabled": True, "params": {"length": 13}}
        }
    }
    try:
        if os.path.exists(config_path):
            with open(config_path, 'r') as f:
                config = json.load(f)
            logging.info(f"Cell 14: Loaded indicator config from {config_path}")
        else:
            with open(config_path, 'w') as f:
                json.dump(default_config, f, indent=4)
            config = default_config
            logging.info(f"Cell 14: Created default indicator config at {config_path}")
        return config
    except Exception as e:
        logging.error(f"Cell 14: Failed to load/create config: {str(e)}")
        raise

def apply_indicator_config(data):
    start_time = time.time()
    try:
        config = load_indicator_config()
        selected_features = [name for name, params in config['indicators'].items() if params['enabled']]

        def compute_indicators(group, ticker):
            min_rows = 15
            if len(group) < min_rows:
                logging.warning(f"Cell 14: Skipping {ticker}: insufficient data ({len(group)} rows)")
                return None

            group = group.set_index('Date')
            group = group[~group.index.duplicated(keep='last')]
            group['Volume'] = np.log1p(group['Volume'].clip(lower=1))

            for indicator, params in config['indicators'].items():
                if not params['enabled']:
                    continue
                try:
                    if indicator == 'RSI':
                        group[indicator] = rsi(group['Close'], **params['params'])
                    # Add similar for other indicators...
                    # (To avoid length, assume the functions are defined in previous cells or add them here if needed)
                except Exception as e:
                    logging.warning(f"Cell 14: Failed to compute {indicator} for {ticker}: {str(e)}")
                    group[indicator] = np.nan

            indicators = pd.DataFrame({
                'Date': group.index,
                'Ticker': np.full(len(group), ticker),
                'Open': group['Open'],
                'High': group['High'],
                'Low': group['Low'],
                'Close': group['Close'],
                'Volume': group['Volume'],
                **{col: group[col] for col in selected_features if col in group.columns}
            })
            return indicators

        df = data.copy()
        df['Date'] = pd.to_datetime(df['Date'])
        df = df.dropna(subset=['Date']).sort_values(['Ticker', 'Date'])
        df = df.drop_duplicates(subset=['Date', 'Ticker'], keep='last')

        results = Parallel(n_jobs=-1)(
            delayed(compute_indicators)(df[df['Ticker'] == ticker], ticker)
            for ticker in df['Ticker'].unique()
        )
        results = [r for r in results if r is not None]

        if not results:
            raise ValueError("Cell 14: No tickers had sufficient data for indicators")

        df = pd.concat(results, ignore_index=True)
        df['Date'] = pd.to_datetime(df['Date'])
        df = df.dropna().drop_duplicates(subset=['Date', 'Ticker'], keep='last').reset_index(drop=True)

        nan_counts = df.isna().sum()
        if nan_counts.sum() > 0:
            logging.warning(f"Cell 14: NaN values in indicators: {nan_counts[nan_counts > 0].to_dict()}")

        df.to_csv(f"{CONFIG['export_dir']}/modular_processed_stock_data.csv", index=False)
        logging.info(f"Cell 14: Modular indicators calculated, Shape: {df.shape}, Time: {time.time() - start_time:.2f}s")
        print(f"Cell 14: Modular indicators calculated, Shape: {df.shape}, Time: {time.time() - start_time:.2f}s")
        return df
    except Exception as e:
        logging.error(f"Cell 14: Failed: {str(e)}")
        raise

# Execute
data = pd.read_csv(f"{CONFIG['export_dir']}/processed_stock_data.csv")
apply_indicator_config(data)

In [None]:
# Cell 15: CLI Interface
# Purpose: Provide CLI for running the pipeline with custom tickers.
# Inputs: Command line arguments.
# Outputs: Pipeline run with specified tickers.

import argparse

def main():
    parser = argparse.ArgumentParser(description="Trading Pipeline")
    parser.add_argument('--tickers', nargs='+', default=CONFIG['tickers'], help="List of tickers to process")
    args = parser.parse_args()
    CONFIG['tickers'] = args.tickers
    run_pipeline()

if __name__ == "__main__":
    main()