In [1]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.dates import DateFormatter
from matplotlib.ticker import FuncFormatter, MaxNLocator
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense
import gradio as gr
import matplotlib.animation as animation

# Function to predict future stock prices using the trained model
def predict_stock_prices(symbol, initial_capital, start_date, end_date):
    global cash, stocks, buy_price, sell_price, final_capital, df, test_df, sequence_length, features, model, buy_count, sell_count

    buy_count = 0
    sell_count = 0

    # Append '.csv' to the symbol to construct the filename
    filename = f"{symbol}.csv"

    # Construct the path to the data file
    data_path = os.path.join('archive', 'stocks', filename)

    # Load the dataset
    df = pd.read_csv(data_path)

    # Ensure correct data types
    df['Date'] = pd.to_datetime(df['Date'])
    df.set_index('Date', inplace=True)

    # Automatically determine the start and end dates of the data
    available_start_date = df.index.min()
    available_end_date = df.index.max()

    # Ensure provided dates are within the available range
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)
    
    if start_date < available_start_date or end_date > available_end_date:
        raise ValueError(f"Date range out of bounds. Available range is from {available_start_date.date()} to {available_end_date.date()}")

    # Use the determined date range
    df = df[start_date:end_date]

    # Feature columns and target
    features = ['Open', 'High', 'Low', 'Close', 'Volume']
    target = 'Close'

    # Check if the subset is empty
    if df.empty:
        raise ValueError("DataFrame is empty. Check the date range and ensure the data file contains data for the specified period.")

    # Normalize the data
    scaler = MinMaxScaler()
    if not df[features].empty:
        df[features] = scaler.fit_transform(df[features])
    else:
        raise ValueError("DataFrame is empty or features are not selected correctly.")

    # Create sequences (e.g., using past 30 days to predict the next day)
    sequence_length = 30

    def create_sequences(data, seq_length, features, target):
        sequences = []
        labels = []
        for i in range(len(data) - seq_length):
            sequences.append(data.iloc[i:i + seq_length][features].values)
            labels.append(data.iloc[i + seq_length][target])
        return np.array(sequences), np.array(labels)

    # Split the data into training and testing sets
    train_size = int(len(df) * 0.8)
    train_df = df[:train_size]
    test_df = df[train_size:]

    # Check if the test data is too small, and adjust accordingly
    if len(test_df) <= sequence_length:
        train_df = df
        test_df = df
        sequence_length = max(1, int(len(df) * 0.2))

    # Create sequences
    X_train, y_train = create_sequences(train_df, sequence_length, features, target)
    X_test, y_test = create_sequences(test_df, sequence_length, features, target)

    # Check the shapes of the data
    print(f"X_train shape: {X_train.shape}")
    print(f"y_train shape: {y_train.shape}")
    print(f"X_test shape: {X_test.shape}")
    print(f"y_test shape: {y_test.shape}")

    # Ensure the test data is not empty
    if X_test.shape[0] == 0 or y_test.shape[0] == 0:
        raise ValueError("Test data is empty. Increase the size of the dataset or adjust the sequence length.")

    # Define the CNN model
    model = Sequential([
        Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=(sequence_length, len(features))),
        MaxPooling1D(pool_size=2),
        Conv1D(filters=32, kernel_size=3, activation='relu'),
        MaxPooling1D(pool_size=2),
        Flatten(),
        Dense(20, activation='relu'),
        Dense(1)
    ])

    model.compile(optimizer='adam', loss='mse')
    model.summary()

    # Train the model
    model.fit(X_train, y_train, epochs=5, validation_data=(X_test, y_test))

    # Reset cash and stocks
    cash = initial_capital
    stocks = 0
    buy_dates = []
    sell_dates = []
    buy_price = None
    sell_price = None

    # Initialize logs
    logs = []
    capital_changes = [f"Initial Capital: ${cash}"]

    # Generate predictions
    df['Predicted_Close'] = np.nan
    for i in range(len(test_df) - sequence_length):
        X_input = test_df.iloc[i:i + sequence_length][features].values
        X_input = X_input.reshape((1, sequence_length, len(features)))
        pred = model.predict(X_input)[0][0]
        df.iloc[train_size + sequence_length + i, df.columns.get_loc('Predicted_Close')] = pred

    # Initialize data for animation
    dates = []
    close_prices = []
    predicted_prices = []
    buys = {'dates': [], 'prices': []}
    sells = {'dates': [], 'prices': []}

    # Define the buy and sell logic
    df['Min'] = df['Low'][df['Low'] == df['Low'].rolling(window=3, center=True).min()]
    df['Max'] = df['High'][df['High'] == df['High'].rolling(window=3, center=True).max()]

    fig, ax1 = plt.subplots(figsize=(14, 7))
    fig.suptitle('Stock Prices with Buy/Sell Signals and Volume', fontsize=16)

    # Plot the closing prices and predicted closing prices
    ax1.set_xlabel('Date')
    ax1.set_ylabel('Price')
    date_form = DateFormatter("%Y-%m")
    ax1.xaxis.set_major_formatter(date_form)

    # Add grid lines
    ax1.grid(True, linestyle='--', alpha=0.5)

    # Plot the data with more details
    line, = ax1.plot(df.index, df['Close'], label='Close Prices', color='blue', linewidth=1)
    pred_line, = ax1.plot(df.index, df['Predicted_Close'], label='Predicted Close Prices', linestyle='--', color='orange', linewidth=1)
    buy_scatter = ax1.scatter([], [], marker='^', color='green', label='Buy Signal', s=100)
    sell_scatter = ax1.scatter([], [], marker='v', color='red', label='Sell Signal', s=100)

    # Plot moving averages
    df['SMA9'] = df['Close'].rolling(window=9).mean()
    df['SMA21'] = df['Close'].rolling(window=21).mean()
    ax1.plot(df.index, df['SMA9'], label='SMA 9', color='purple', linewidth=1)
    ax1.plot(df.index, df['SMA21'], label='SMA 21', color='cyan', linewidth=1)

    ax1.legend(loc='upper left')

    # Create a secondary y-axis for the volume bars
    ax2 = ax1.twinx()
    ax2.set_ylabel('Volume')
    ax2.bar(df.index, df['Volume'], color='blue', alpha=0.3, width=1)
    ax2.grid(False)

    # Format x-axis to show only the major ticks and labels
    ax1.xaxis.set_major_locator(MaxNLocator(nbins=8))

    # Improve y-axis formatting for better readability
    ax1.yaxis.set_major_formatter('${x:,.2f}')

    # Function to format volume axis
    def volume_formatter(x, pos):
        if x >= 1e6:
            return f'{x*1e-6:.1f}M'
        elif x >= 1e3:
            return f'{x*1e-3:.1f}K'
        else:
            return f'{x:.0f}'

    ax2.yaxis.set_major_formatter(FuncFormatter(volume_formatter))

    def animate(i):
        global cash, stocks, buy_price, sell_price, final_capital, buy_count, sell_count

        if i >= len(df):
            return

        current_date = df.index[i]
        current_low = df['Low'].iloc[i]
        current_high = df['High'].iloc[i]
        current_close = df['Close'].iloc[i]
        current_pred = df['Predicted_Close'].iloc[i]

        dates.append(current_date)
        close_prices.append(current_close)
        predicted_prices.append(current_pred)

        if not np.isnan(df['Min'].iloc[i]):
            if cash > 0:
                buy_price = current_low
                stocks = cash / buy_price
                cash = 0
                buys['dates'].append(current_date)
                buys['prices'].append(current_close)
                log = f"Bought at {buy_price:.2f} on {current_date.date()}"
                logs.append(log)
                buy_count += 1

        if not np.isnan(df['Max'].iloc[i]):
            if stocks > 0:
                sell_price = current_high
                cash = stocks * sell_price
                stocks = 0
                sells['dates'].append(current_date)
                sells['prices'].append(current_close)
                log = f"Sold at {sell_price:.2f} on {current_date.date()}"
                logs.append(log)
                sell_count += 1
                capital_changes.append(f"Capital on {current_date.date()}: ${cash:.2f}")

        final_capital = cash + stocks * current_close

        line.set_data(dates, close_prices)
        pred_line.set_data(dates, predicted_prices)
        if buys['dates']:
            buy_scatter.set_offsets(np.c_[buys['dates'], buys['prices']])
        if sells['dates']:
            sell_scatter.set_offsets(np.c_[sells['dates'], sells['prices']])
        ax1.relim()
        ax1.autoscale_view()
        ax2.relim()
        ax2.autoscale_view()

        return line, pred_line, buy_scatter, sell_scatter

    ani = animation.FuncAnimation(fig, animate, frames=len(df), interval=1000, repeat=False)  # Adjust interval for frame rate
    ani.save('trading_animation.gif', writer='pillow', fps=1)  # Adjust fps for realistic animation

    logs.append(f"Final Capital: ${final_capital:.2f}")
    capital_changes.append(f"Final Capital: ${final_capital:.2f}")
    order_summary = f"Total Buy Orders: {buy_count}\nTotal Sell Orders: {sell_count}"

    return 'trading_animation.gif', '\n'.join(logs), '\n'.join(capital_changes), order_summary

def get_available_date_range(symbol):
    filename = f"{symbol}.csv"
    data_path = os.path.join('archive', 'stocks', filename)
    df = pd.read_csv(data_path)
    df['Date'] = pd.to_datetime(df['Date'])
    available_start_date = df['Date'].min().date()
    available_end_date = df['Date'].max().date()
    return available_start_date, available_end_date

def update_date_range(symbol):
    available_start_date, available_end_date = get_available_date_range(symbol)
    return f"Available date range for {symbol}: {available_start_date} to {available_end_date}"

def main(symbol, initial_capital, start_date, end_date):
    date_range = update_date_range(symbol)
    gif, logs, capital_changes, order_summary = predict_stock_prices(symbol, initial_capital, start_date, end_date)
    return date_range, gif, logs, capital_changes, order_summary

# Get the list of available CSV files
available_symbols = [f.split('.')[0] for f in os.listdir('archive/stocks') if f.endswith('.csv')]

with gr.Blocks() as iface:
    symbol_input = gr.Dropdown(label="Stock Ticker Symbol", choices=available_symbols, interactive=True, allow_custom_value=True)
    date_range_output = gr.Textbox(label="Available Date Range", interactive=False)
    initial_capital_input = gr.Number(label="Initial Capital", value=10000, step=100)
    start_date_input = gr.Textbox(label="Start Date (YYYY-MM-DD)")
    end_date_input = gr.Textbox(label="End Date (YYYY-MM-DD)")
    order_summary_output = gr.Textbox(label="Order Summary", interactive=False)
    gif_output = gr.Image(type="filepath")
    logs_output = gr.Textbox(label="Logs", interactive=False)
    capital_changes_output = gr.Textbox(label="Capital Changes", interactive=False)

    symbol_input.change(
        fn=update_date_range,
        inputs=symbol_input,
        outputs=date_range_output
    )

    predict_button = gr.Button("Predict")
    predict_button.click(
        fn=main,
        inputs=[symbol_input, initial_capital_input, start_date_input, end_date_input],
        outputs=[date_range_output, gif_output, logs_output, capital_changes_output, order_summary_output]
    )

iface.launch()


  from .autonotebook import tqdm as notebook_tqdm


Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.


