## Applying Artificial Neural Network Algorithms to the Problem (Stock Time Series Forecasting)

# HYBRID MODEL X3 v03

In [4]:
import numpy as np
import pandas as pd
import yfinance as yf
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from sklearn.impute import SimpleImputer
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, SimpleRNN, LSTM, Dense, Dropout, BatchNormalization, Bidirectional
from tensorflow.keras.optimizers import Adam
from ta.momentum import RSIIndicator
from ta.volatility import BollingerBands
from ta.trend import MACD
from scipy.stats import zscore
from datetime import datetime, timedelta
from sklearn.model_selection import TimeSeriesSplit
import warnings
warnings.filterwarnings('ignore')

# Define ticker symbol and data range
ticker = "BTC-USD"
start = "2000-01-01"
end = (datetime.today() - timedelta(days=5)).strftime("%Y-%m-%d")

# Download historical data
df = yf.download(ticker, start=start, end=end)

# Preprocess the data
df['Date'] = pd.to_datetime(df.index)
df['Date'] = df['Date'].apply(lambda date: date.timestamp())

# Remove noise and outliers
z_scores = zscore(df[['Open', 'High', 'Low', 'Close', 'Volume']])
df = df[(z_scores < 3).all(axis=1)]

# Feature engineering
df['Price_Change_Pct'] = df['Close'].pct_change()
df['Log_Returns'] = np.log(df['Close'] / df['Close'].shift(1))
df['50d_MA'] = df['Close'].rolling(window=50).mean()
df['200d_MA'] = df['Close'].rolling(window=200).mean()

# Technical indicators
rsi_indicator = RSIIndicator(df['Close'], window=14)
df['RSI'] = rsi_indicator.rsi()
bb_indicator = BollingerBands(df['Close'], window=20)
df['Bollinger_Bands'] = bb_indicator.bollinger_mavg()
macd_indicator = MACD(df['Close'], window_slow=26, window_fast=12)
df['MACD'] = macd_indicator.macd()
df['MACD_Signal'] = macd_indicator.macd_signal()

# Remove NA
df = df.dropna()

# Split the data into features and target
X = df[['Price_Change_Pct', 'Log_Returns', '50d_MA', '200d_MA', 'RSI', 'Bollinger_Bands', 'MACD']]
y = df['Close']

# Impute missing values using mean imputation
imputer = SimpleImputer(strategy='mean')
X_imputed = imputer.fit_transform(X)

# Normalize the data
scaler = MinMaxScaler(feature_range=(0, 1))
X_scaled = scaler.fit_transform(X_imputed)

# Reshape X_scaled to fit CNN input
n_timesteps = 10
n_features = X_scaled.shape[1]

# Ensure that X_scaled is a multiple of n_timesteps
n_samples = (X_scaled.shape[0] // n_timesteps) * n_timesteps
X_scaled = X_scaled[:n_samples]
y = y[:n_samples]

X_scaled = np.reshape(X_scaled, (-1, n_timesteps, n_features))

# TimeSeriesSplit to split the dataset into training and test sets
tscv = TimeSeriesSplit(n_splits=10)
fold = 1
mse_scores = []

for train_index, test_index in tscv.split(X_scaled):
    print(f"Training fold {fold}...")
    
    X_train, X_test = X_scaled[train_index], X_scaled[test_index]
    y_train, y_test = y.iloc[train_index], y.iloc[test_index]

    # Build the hybrid model for each fold
    model = Sequential()

    # 1. CNN layer: Extracts features from time series
    model.add(Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(X_train.shape[1], X_train.shape[2])))
    model.add(MaxPooling1D(pool_size=2))
    model.add(BatchNormalization())

    # 2. Bidirectional RNN layer: Captures past and future dependencies
    model.add(Bidirectional(SimpleRNN(units=64, return_sequences=True)))
    model.add(BatchNormalization())

    # 3. LSTM layers: Learns long-term dependencies
    model.add(LSTM(units=256, return_sequences=True))
    model.add(Dropout(0.3))
    model.add(LSTM(units=128))
    model.add(Dropout(0.3))

    # 4. Output layer: Predict the closing price
    model.add(Dense(units=1))

    # Compile the model with Adam optimizer and a lower learning rate
    optimizer = Adam(learning_rate=0.005)
    model.compile(optimizer=optimizer, loss='mse')

    # Train the model for each fold
    model.fit(X_train, y_train, epochs=50, batch_size=64, validation_data=(X_test, y_test), verbose=0)

    # Predictions and evaluation for each fold
    predictions = model.predict(X_test)
    predictions = predictions.flatten()
    y_test = y_test.values.flatten()
    
    mse = mean_squared_error(y_test, predictions)
    mse_scores.append(mse)
    print(f"Fold {fold} - Mean Squared Error: {mse}")
    
    fold += 1

# Calculate average MSE across folds
avg_mse = np.mean(mse_scores)
print(f"Average MSE across folds: {avg_mse}")

[*********************100%***********************]  1 of 1 completed


Training fold 1...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
Fold 1 - Mean Squared Error: 35312.91319058048
Training fold 2...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 745ms/step
Fold 2 - Mean Squared Error: 31779.008489761247
Training fold 3...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 890ms/step
Fold 3 - Mean Squared Error: 43179.79994747956
Training fold 4...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 988ms/step
Fold 4 - Mean Squared Error: 27054.459226908846
Training fold 5...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 711ms/step
Fold 5 - Mean Squared Error: 17771.92587319177
Training fold 6...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 848ms/step
Fold 6 - Mean Squared Error: 43015.864955780155
Training fold 7...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
Fold 7 - Mean Squared Error: 51230.51788615329
Training fold 8...
[1m1/1[0m