In [1]:

"""
Author: Cordell Stonecipher
Filename: cnn_stock_module.ipynb
Description:
Provides functions to build, train, tune, predict, and interpret a 1D CNN for stock price
Buy/Sell/Hold classification. This module
accepts NumPy arrays for training, validation, and test sets.

Functions:
  - build_cnn_model(input_shape, hp=None): returns compiled Keras model or HyperModel
  - tune_hyperparameters(X_train, y_train, X_val, y_val, max_trials=10): returns best model and HP
  - train_model(model, X_train, y_train, X_val, y_val, epochs=30, batch_size=64)
      Trains and returns history
  - evaluate_model(model, X, y, class_names): prints metrics
  - predict_sample(model, sample, class_names, weights=None): returns predicted class and probabilities
  - create_explainer(X_train, feature_names, class_names): returns LIME explainer
  - explain_instance(model, explainer, sample_flat, num_features, class_index)
      returns LIME explanation list

Dependencies:
  pip install tensorflow keras-tuner lime numpy
"""

import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
import keras_tuner as kt
from sklearn.metrics import classification_report, accuracy_score
from lime import lime_tabular

# WINDOW must match the temporal window length for each input sample
WINDOW = None  # set before calling explain_instance or predict_sample


def build_cnn_model(input_shape, hp=None):
    """
    Builds a 1D CNN for time-series classification.
    Convolutional layers act as learnable sliding-window feature extractors:
      Conv1D: applies filters (f) of width (k) across the time axis, computing dot products.
      MaxPooling1D: downsamples by taking the maximum over non-overlapping windows, reducing temporal dimension.
    Flatten: converts the 3D tensor [batch, time, features] to a 2D vector for dense layers.
    Dense: fully connected layers apply weight matrices W and biases b, computing x -> ReLU(Wx+b).
    Dropout: randomly zeroes a fraction d of inputs to prevent overfitting (approximate model averaging).
    Output layer: softmax activation computes exp(z_i)/sum_j exp(z_j), giving class probabilities.
    Loss: categorical_crossentropy = -sum(y_true * log(y_pred)).
    Optimizer: Adam uses adaptive learning rates with momentum (estimates of first/second moments).

    Hyperparameters tune:
      - number of conv blocks
      - filters per conv layer
      - kernel sizes
      - dense units
      - dropout rate
      - learning rate
    """
    inp = layers.Input(shape=input_shape)
    x = inp

    # Build conv+pool blocks
    num_blocks = hp.Int('conv_blocks', 1, 3, default=2) if hp else 2
    for i in range(num_blocks):
        filters = hp.Choice(f'filters_{i}', [32,64,128], default=64) if hp else 64
        kernel_size = hp.Choice(f'kernel_{i}', [3,5], default=3) if hp else 3
        # Convolution: y[t, f] = sum_{u=0..k-1}( x[t+u, :] * W[:, u, f] ) + b[f]
        x = layers.Conv1D(filters, kernel_size, activation='relu', padding='same')(x)
        # Pooling: y[t', f] = max( x[2*t':2*t'+2, f] ) -- halves time dimension
        x = layers.MaxPooling1D(pool_size=2)(x)

    # Flatten to vector: shape -> [batch, filters * (time/2^blocks)]
    x = layers.Flatten()(x)

    # Dense layer: x -> ReLU(Wx + b)
    dense_units = hp.Int('dense_units', 64, 256, step=64, default=128) if hp else 128
    x = layers.Dense(dense_units, activation='relu')(x)
    # Dropout: zero-out fraction to regularize
    dropout_rate = hp.Float('dropout', 0.1, 0.5, step=0.1, default=0.2) if hp else 0.2
    x = layers.Dropout(dropout_rate)(x)

    # Output layer: softmax for 3 classes
    output = layers.Dense(3, activation='softmax')(x)

    model = models.Model(inputs=inp, outputs=output)
    lr = hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4], default=1e-4) if hp else 1e-4
    # Adam optimizer: alpha=lr, uses moving averages of gradients
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
                  loss='categorical_crossentropy', metrics=['accuracy'])
    return model


def tune_hyperparameters(X_train, y_train, X_val, y_val, max_trials=10):
    """
    Bayesian optimization over hyperparameters to maximize validation accuracy.
    Returns the best model and hyperparameter configuration.
    """
    def model_builder(hp):
        return build_cnn_model(X_train.shape[1:], hp)

    tuner = kt.BayesianOptimization(
        model_builder,
        objective='val_accuracy',
        max_trials=max_trials,
        directory='cnn_tuner',
        project_name='cnn_stock'
    )
    tuner.search(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=20,
        batch_size=64,
        callbacks=[tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)]
    )
    best_model = tuner.get_best_models(num_models=1)[0]
    best_hps   = tuner.get_best_hyperparameters(num_trials=1)[0]
    return best_model, best_hps


def train_model(model, X_train, y_train, X_val, y_val, epochs=30, batch_size=64):
    """
    Train the CNN with early stopping to avoid overfitting.
    EarlyStopping monitors validation loss, stops training if no improvement.
    """
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)],
        verbose=1
    )
    return history


def evaluate_model(model, X, y, class_names=['Buy','Sell','Hold']):
    """
    Compute predictions, then accuracy and classification report.
    """
    preds = np.argmax(model.predict(X), axis=1)
    true  = np.argmax(y, axis=1) if y.ndim>1 else y
    acc   = accuracy_score(true, preds)
    print(f"Accuracy: {acc:.2%}")
    print(classification_report(true, preds, target_names=class_names))


def predict_sample(model, sample, class_names=['Buy','Sell','Hold'], weights=None):
    """
    Predict on one sample: apply softmax probabilities,
    optionally reweight probabilities, then renormalize.
    """
    probs = model.predict(sample.reshape(1, *sample.shape))[0]
    if weights is not None:
        w = np.array(weights)
        probs = probs * w
        probs = probs / probs.sum()
    cls = np.argmax(probs)
    return class_names[cls], probs


def create_explainer(X_train_flat, feature_names, class_names):
    """
    Initialize LIME explainer using training distribution.
    Each feature is treated as independent for tabular LIME.
    """
    return lime_tabular.LimeTabularExplainer(
        training_data=X_train_flat,
        feature_names=feature_names,
        class_names=class_names,
        mode='classification'
    )


def explain_instance(model, explainer, sample_flat, num_features=10, class_index=0):
    """
    Generate local linear approximation around sample to explain model's decision.
    Solves a weighted linear regression locally: minimize ||f(x') - L(x')|| + λ||L||.
    """
    if WINDOW is None:
        raise ValueError("Set WINDOW to your time-step length before calling explain_instance.")
    exp = explainer.explain_instance(
        data_row=sample_flat,
        predict_fn=lambda x: model.predict(x.reshape(-1, WINDOW, sample_flat.size//WINDOW)),
        num_features=num_features,
        labels=(class_index,)
    )
    return exp.as_list(label=class_index)


