## Create Neural Network


In [None]:
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, Flatten, Dropout, BatchNormalization, LeakyReLU, GlobalAveragePooling2D
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
import utils_datasets as utils
import importlib
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
import datetime
from sklearn.utils.class_weight import compute_class_weight
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from sklearn.metrics import classification_report
from tensorflow.keras import layers, models, callbacks
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import tensorflow as tf
import os
importlib.reload(utils)

In [None]:
importlib.reload(utils)


# Input shape: 64x60 grayscale image
inputs = layers.Input(shape=(114, 120, 3))

x = layers.Conv2D(32, (5,4), padding='same', activation='relu')(inputs)
x = layers.Conv2D(32, (5,4), padding='same', activation='relu')(x)
x = layers.BatchNormalization()(x)
x = layers.MaxPooling2D(pool_size=(2,2))(x)

x = layers.Conv2D(64, (4,3), padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU(alpha=0.1)(x)
x = layers.Conv2D(64, (4,3), padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU(alpha=0.1)(x)
x = layers.MaxPooling2D(pool_size=(2,2))(x)

x = layers.Conv2D(128, (4,3), padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU(alpha=0.1)(x)
x = layers.Conv2D(128, (4,3), padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU(alpha=0.1)(x)
x = layers.MaxPooling2D(pool_size=(2,2))(x)

x = layers.Conv2D(256, (4,3), padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU(alpha=0.1)(x)
x = layers.Conv2D(256, (4,3), padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU(alpha=0.1)(x)
x = layers.GlobalAveragePooling2D()(x)

x = layers.Dense(256, kernel_regularizer=tf.keras.regularizers.l2(1e-4))(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU(alpha=0.1)(x)
x = layers.Dropout(0.5)(x)

outputs = layers.Dense(2, activation='softmax')(x)

# Define the model
model = Model(inputs=inputs, outputs=outputs)


# Compile with binary crossentropy
model.compile(optimizer=Adam(learning_rate=0.0005),
              loss='binary_crossentropy',
              metrics=['accuracy'])

# Show architecture
model.summary()

# Code chunk to apply the model on one image of the datasets

In [None]:
from tensorflow.keras.preprocessing import image
img_path = ''  # <-- update this to your image path

# Load the image
img = image.load_img(img_path, color_mode='grayscale', target_size=(28, 28))
img_array = image.img_to_array(img)
model = Sequential()

model = model.add(Conv2D(64, kernel_size=(5, 3), padding='same', input_shape=(64, 60, 1)))


outout = model.predict()

## Fetch dataframes
- Get the different datasets used to construct the model

In [None]:
importlib.reload(utils)
#use the datasets that doesn t conaint all the datas (that has been droped in the preceding notebook.)
path_dataset = ""
train_df, val_df, test_df = utils.split_global_dataset(path_dataset)
train_ds = utils.dataframe_to_dataset_color(train_df).shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)
val_ds = utils.dataframe_to_dataset_color(val_df).batch(32).prefetch(tf.data.AUTOTUNE)
test_ds = utils.dataframe_to_dataset_color(test_df).batch(32)

class_weights_array = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(train_df['y']),
    y=train_df['y']
)
class_weights = {0: class_weights_array[0], 1: class_weights_array[1]}

## Train the model

In [None]:

log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
callbacks = [
    TensorBoard(log_dir=log_dir, histogram_freq=1),
    #EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
    ModelCheckpoint("best_model.keras", save_best_only=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, verbose=1)
]

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=50,
    class_weight=class_weights,
    callbacks=[callbacks],
)


In [None]:
from tensorflow.keras.models import load_model
model_path = ""
model = load_model(model_path)

# loss, acc = model.evaluate(test_ds)
# print(f"Global Test Loss: {loss:.4f}, Global Test Accuracy: {acc:.4f}")

# Evaluate the model

In [None]:
import numpy as np

# Storage
y_true = []
y_pred = []

correct_class_1 = []  # (image, confidence)
correct_class_0 = []

incorrect_class_1 = []  # (image, confidence)
incorrect_class_0 = []


#incorrect_class_1 = True class was 1, but model predicted wrong
for X_batch, y_batch in test_ds:
    preds = model.predict(X_batch, verbose=0)
    preds_class = np.argmax(preds, axis=1)  # Predicted classes
    true_class = y_batch.numpy().astype(int)  # True labels

    y_true.extend(true_class)
    y_pred.extend(preds_class)

    # Go through batch
    for i in range(len(preds_class)):
        if preds_class[i] == true_class[i]:  # Correct prediction
            if preds_class[i] == 1:
                correct_class_1.append([X_batch[i], preds[i][1]])  # (image, confidence for class 1)
            elif preds_class[i] == 0:
                correct_class_0.append([X_batch[i], preds[i][0]])  # (image, confidence for class 0)
        else:  # Incorrect prediction
            if true_class[i] == 1:
                incorrect_class_1.append([X_batch[i], preds[i][1]])  # Actually class 1, but predicted wrong
            elif true_class[i] == 0:
                incorrect_class_0.append([X_batch[i], preds[i][0]])  # Actually class 0, but predicted wrong




print(classification_report(y_true, y_pred, target_names=["Class 0", "Class 1"]))

# Confusion Matrix

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report
import matplotlib.pyplot as plt
import numpy as np

cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Class 0", "Class 1"])
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix on Test Set")
plt.grid(False)
plt.show()

## Explore features of importance

In [None]:

import numpy as np
import matplotlib.pyplot as plt


def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    grad_model = tf.keras.models.Model(
        [model.inputs], 
        [model.get_layer(last_conv_layer_name).output, model.output]
    )

    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(predictions[0])
        class_channel = predictions[:, pred_index]

    grads = tape.gradient(class_channel, conv_outputs)

    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    conv_outputs = conv_outputs[0]
    heatmap = conv_outputs @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()

def make_gradcam_heatmap_from_tensor(img_tensor, model, last_conv_layer_name, pred_index=None):
    # img_tensor must be shape (1, height, width, channels)
    grad_model = tf.keras.models.Model(
        [model.inputs], 
        [model.get_layer(last_conv_layer_name).output, model.output]
    )

    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(img_tensor)
        if pred_index is None:
            pred_index = tf.argmax(predictions[0])
        class_channel = predictions[:, pred_index]

    grads = tape.gradient(class_channel, conv_outputs)

    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    conv_outputs = conv_outputs[0]
    heatmap = conv_outputs @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy(), predictions.numpy()

In [None]:
importlib.reload(utils)


# # Load your model
model_path = ""
model = load_model(model_path)



img_tensor, _ = correct_class_1[6]  # Take the tensor only

img_tensor_expanded = tf.expand_dims(img_tensor, axis=0)  # Model expects (1,114,120,3)

heatmap, predictions = make_gradcam_heatmap_from_tensor(img_tensor_expanded, model, last_conv_layer_name='conv2d_7')



predicted_class = np.argmax(predictions[0])
predicted_confidence = np.max(predictions[0])

utils.plot_gradcam_side_by_side_from_tensor(img_tensor, heatmap, predicted_class, predicted_confidence)


# Select the stocks on which we ll test our hedging process

In [None]:
#dataset path (the one with all the datas)
base_path = ""
tickers = os.listdir(base_path)
model = load_model('/Users/edouardblanc/Desktop/aze.keras')
df = utils.load_all_ticker_data(base_path)

for ticker in tickers:
    y_true = []
    y_pred = []
    df_ticker = df[df['ticker'] == ticker]
    df_ticker= utils.dataframe_to_dataset_color(df_ticker).batch(32).prefetch(tf.data.AUTOTUNE)
    for X_batch, y_batch in df_ticker:
        preds = model.predict(X_batch, verbose=0)
        preds_class = np.argmax(preds, axis=1)  # Predicted classes
        true_class = y_batch.numpy().astype(int)  # True labels

        y_true.extend(true_class)
        y_pred.extend(preds_class)


    print(f'Ticker report : {ticker}')
    print(classification_report(y_true, y_pred, target_names=["Class 0", "Class 1"]))


    
#right ticker ups

# Visualisation of the choosen stock

In [None]:
import plotly.graph_objects as go
import yfinance as yf

ticker_selected = 'UPS'
df_ticker = utils.download_historical_prices(ticker_selected)
start_index = df_ticker['MA20'].first_valid_index()


for first_day in range(start_index, df_ticker.shape[0] - 20-1):
    df_extract = df_ticker.iloc[first_day:first_day + 20, :]
    image = utils.generate_ohlc_image_bis(df_extract, include_volume=True, include_ma=True, n_days=20, target_size=None)
    y =  int(df_extract.iloc[-1]['Open'] * (1 + 0.007) <  df_ticker.iloc[first_day + 20]['Close'])
    break

fig = go.Figure()


df_plot =  yf.download(ticker_selected, start="2000-01-01")
df_plot.columns = df_plot.columns.get_level_values(0)
df_plot = df_plot[['Open', 'Close']].copy()
df_plot.columns.name = None 
df_plot = df_plot.iloc[2206:3000]
#2206, 2245
prices = []
x_vals = []

open_prices = []
close_prices = []
open_x = []
close_x = []

# Build the curves and markers
for i, row in df_plot.iterrows():

    # Line: alternating Open/Close to form the zigzag price path
    prices.append(row['Open'])
    prices.append(row['Close'])
    x_vals.append(row.name)
    x_vals.append(row.name)

    # Markers
    open_prices.append(row['Open'])
    close_prices.append(row['Close'])
    open_x.append(row.name)
    close_x.append(row.name)

# Main price line
fig = go.Figure()
fig.add_trace(go.Scatter(
    x=x_vals,
    y=prices,
    mode='lines',
    name='Price Line',
    line=dict(color='lightblue'),
    hovertemplate='Index: %{x}<br>Price: %{y:.2f}<extra></extra>'
))

# Open price markers (small red dots)
fig.add_trace(go.Scatter(
    x=open_x,
    y=open_prices,
    mode='markers',
    name='Open',
    marker=dict(color='green', size=6, symbol='circle'),
    hovertemplate='Open: %{y:.2f}<extra></extra>'
))

# Close price markers (small green dots)
fig.add_trace(go.Scatter(
    x=close_x,
    y=close_prices,
    mode='markers',
    name='Close',
    marker=dict(color='red', size=6, symbol='circle'),
    hovertemplate='Close: %{y:.2f}<extra></extra>'
))

# Add subtle closing price overlay line (on top, thin, neutral color)
fig.add_trace(go.Scatter(
    x=df_plot.index,
    y=df_plot['Close'],
    mode='lines',
    name='Close Trace',
    line=dict(color='gray', width=1, dash='dot'),  # Thin gray dashed line
    hoverinfo='skip',  # Hide hover to avoid clutter
    showlegend=False   # Hide from legend for minimalism
))

# Layout settings
fig.update_layout(
    title="ðŸ“Š UPS Price Evolution (Open vs Close)",
    xaxis_title="Time Index",
    yaxis_title="Price",
    template="plotly_dark",
    height=500,
    width=1200,
    font=dict(size=14),
    margin=dict(l=50, r=50, t=60, b=40)
)

fig.show()

## Locate the window that maximizes the event our model is trained to detect


In [None]:
df_ticker['event'] = df_ticker['Close'] > 1.007 * df_ticker['Open']

# Step 2: Rolling 40-day sum of True events
df_ticker['event_count'] = df_ticker['event'].rolling(window=252).sum()

# Step 3: Find the index with max event count
max_idx = df_ticker['event_count'].idxmax()
start_idx = max_idx - 39
end_idx = max_idx
print(start_idx, end_idx)
df_plot

## Get a proxy for the risk free rate

In [None]:
import yfinance as yf

risk_free =  yf.download("^IRX", start="2000-01-01")
risk_free.columns = risk_free.columns.get_level_values(0)
risk_free = risk_free[['Open', 'Close']].copy()
risk_free.columns.name = None  # remove 'Price' label above columns
risk_free

## Get a proxy for dividends


In [None]:
stock_ticker = yf.Ticker("UPS")
dividend = stock_ticker.info.get('dividendYield', 0.0)
dividend