In [23]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, SimpleRNN, LSTM, Dropout, MultiHeadAttention, LayerNormalization
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, SimpleRNN, LSTM, MultiHeadAttention, LayerNormalization
TF_ENABLE_ONEDNN_OPTS=0

In [3]:


# Generate synthetic data (e.g., exponential decay)
def simulate_ode(k=0.1, y0=1.0, t_max=10, num_points=1000):
    t = np.linspace(0, t_max, num_points)
    y = y0 * np.exp(-k * t)
    return t, y

# Create synthetic data
t, y = simulate_ode()
X = t.reshape(-1, 1)  # Time as input
y = y.reshape(-1, 1)  # State as output

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


In [26]:


def objective(trial, model_type):
    # Common hyperparameters
    learning_rate = trial.suggest_loguniform('learning_rate', 1e-4, 1e-1)
    batch_size = trial.suggest_categorical('batch_size', [16, 32, 64, 128])

    # Define the model based on the type
    model = Sequential()
    
    if model_type == 'rnn':
        num_layers = trial.suggest_int('num_layers', 1, 3)
        units = trial.suggest_int('units', 32, 256, step=32)
        for _ in range(num_layers):
            model.add(SimpleRNN(units, activation='tanh', return_sequences=True if _ < num_layers - 1 else False))
        model.add(Dense(1))

    elif model_type == 'lstm':
        num_layers = trial.suggest_int('num_layers', 1, 3)
        units = trial.suggest_int('units', 32, 256, step=32)
        for _ in range(num_layers):
            model.add(LSTM(units, activation='tanh', return_sequences=True if _ < num_layers - 1 else False))
        model.add(Dense(1))
        
    elif model_type == 'transformer':
        num_heads = trial.suggest_int('num_heads', 2, 8)
        key_dim = trial.suggest_int('key_dim', 16, 64, step=16)
        model.add(MultiHeadAttention(num_heads=num_heads, key_dim=key_dim, input_shape=(None, 1)))
        model.add(LayerNormalization())
        model.add(Dense(1))

    elif model_type == 'neural_net':
        num_layers = trial.suggest_int('num_layers', 1, 3)
        units = trial.suggest_int('units', 32, 256, step=32)
        for _ in range(num_layers):
            model.add(Dense(units, activation='relu'))
        model.add(Dense(1))

    elif model_type == 'ode':
        num_layers = trial.suggest_int('num_layers', 1, 3)
        units = trial.suggest_int('units', 32, 256, step=32)
        model.add(Dense(units, activation='relu', input_dim=1))
        for _ in range(num_layers):
            model.add(Dense(units, activation='relu'))
        model.add(Dense(1))  # Approximate derivative

    # Compile the model
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss='mse', metrics=['mae'])

    # Train the model
    history = model.fit(
        X_train,
        y_train,
        validation_data=(X_test, y_test),
        epochs=10,
        batch_size=batch_size,
        verbose=0,
    )

    # Return the validation loss for Optuna to minimize
    val_loss = history.history['val_loss'][-1]
    return val_loss


In [None]:
study_rnn = optuna.create_study(direction='minimize')
study_rnn.optimize(lambda trial: objective(trial, model_type='rnn'), n_trials=20)
print("Best RNN parameters:", study_rnn.best_params)


In [None]:
study_lstm = optuna.create_study(direction='minimize')
study_lstm.optimize(lambda trial: objective(trial, model_type='lstm'), n_trials=20)
print("Best LSTM parameters:", study_lstm.best_params)


In [None]:
study_transformer = optuna.create_study(direction='minimize')
study_transformer.optimize(lambda trial: objective(trial, model_type='transformer'), n_trials=20)
print("Best Transformer parameters:", study_transformer.best_params)


In [None]:
study_nn = optuna.create_study(direction='minimize')
study_nn.optimize(lambda trial: objective(trial, model_type='neural_net'), n_trials=20)
print("Best Neural Network parameters:", study_nn.best_params)


In [None]:
study_ode = optuna.create_study(direction='minimize')
study_ode.optimize(lambda trial: objective(trial, model_type='ode'), n_trials=20)
print("Best ODE parameters:", study_ode.best_params)


In [None]:
# Example: Using the best RNN model
best_rnn_params = study_rnn.best_params
print("Best RNN Parameters:", best_rnn_params)

Here’s how to use this function to print the architecture for each model:

In [15]:
def preprocess_stock_data(csv_path, date_column='date', close_column='close', test_size=0.2, val_size=0.1, time_steps=60):
    """
    Prepares stock market price data for time series modeling.
    
    Args:
        csv_path (str): Path to the CSV file containing the data.
        date_column (str): Name of the date column in the CSV.
        close_column (str): Name of the closing price column in the CSV.
        test_size (float): Proportion of the data for testing.
        val_size (float): Proportion of the training data for validation.
        time_steps (int): Number of past time steps to use for each sample.
    
    Returns:
        X_train, y_train: Training data and labels.
        X_val, y_val: Validation data and labels.
        X_test, y_test: Testing data and labels.
        scaler: Fitted MinMaxScaler instance for inverse scaling.
    """
    # Load the dataset
    data = pd.read_csv(csv_path, parse_dates=[date_column])
    data.sort_values(by=date_column, inplace=True)
    
    # Extract the 'close' column for scaling
    close_prices = data[close_column].values.reshape(-1, 1)
    
    # Scale the data
    scaler = MinMaxScaler(feature_range=(0, 1))
    scaled_close = scaler.fit_transform(close_prices)
    
    # Create sequences of time_steps
    X, y = [], []
    for i in range(time_steps, len(scaled_close)):
        X.append(scaled_close[i-time_steps:i])
        y.append(scaled_close[i])
    
    X, y = np.array(X), np.array(y)
    
    # Split data into train, validation, and test sets
    train_size = int((1 - test_size) * len(X))
    val_size = int(val_size * train_size)
    
    X_train, X_temp = X[:train_size], X[train_size:]
    y_train, y_temp = y[:train_size], y[train_size:]
    
    X_val, X_test = X_temp[:val_size], X_temp[val_size:]
    y_val, y_test = y_temp[:val_size], y_temp[val_size:]
    
    return X_train, y_train, X_val, y_val, X_test, y_test, scaler


In [20]:
csv_path = "inputs/google_stock_data.csv"  # Path to your CSV file
a = pd.read_csv(csv_path)
a.head()
print(len(a))

753


In [22]:

X_train, y_train, X_val, y_val, X_test, y_test, scaler = preprocess_stock_data(
    csv_path, date_column='Date', close_column='Close', time_steps=60
)

print(f"Train shape: {X_train.shape}, {y_train.shape}")
print(f"Validation shape: {X_val.shape}, {y_val.shape}")
print(f"Test shape: {X_test.shape}, {y_test.shape}")

Train shape: (554, 60, 1), (554, 1)
Validation shape: (55, 60, 1), (55, 1)
Test shape: (84, 60, 1), (84, 1)


In [12]:

def build_best_model(best_params, model_type):
    model = Sequential()
    
    if model_type == 'rnn':
        for _ in range(best_params['num_layers']):
            model.add(SimpleRNN(
                units=best_params['units'],
                activation='tanh',
                return_sequences=True if _ < best_params['num_layers'] - 1 else False
            ))
        model.add(Dense(1))

    elif model_type == 'lstm':
        for _ in range(best_params['num_layers']):
            model.add(LSTM(
                units=best_params['units'],
                activation='tanh',
                return_sequences=True if _ < best_params['num_layers'] - 1 else False
            ))
        model.add(Dense(1))

    elif model_type == 'transformer':
        model.add(MultiHeadAttention(
            num_heads=best_params['num_heads'],
            key_dim=best_params['key_dim'],
            input_shape=(None, 1)
        ))
        model.add(LayerNormalization())
        model.add(Dense(1))

    elif model_type == 'neural_net':
        for _ in range(best_params['num_layers']):
            model.add(Dense(
                units=best_params['units'],
                activation='relu'
            ))
        model.add(Dense(1))

    elif model_type == 'ode':
        model.add(Dense(
            best_params['units'], activation='relu', input_dim=1
        ))
        for _ in range(best_params['num_layers']):
            model.add(Dense(
                best_params['units'], activation='relu'
            ))
        model.add(Dense(1))  # Approximate derivative

    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=best_params['learning_rate']), loss='mse')
    return model


In [None]:
3. Output Explanation
Each summary() call will display:
The layers of the model.
The number of parameters in each layer.
The total trainable and non-trainable parameters.


In [4]:
# Example: Print the best RNN architecture
best_rnn_model = build_best_model(study_rnn.best_params, 'rnn')
print("Best RNN Architecture:")
best_rnn_model.summary()



NameError: name 'build_best_model' is not defined

In [5]:

# Example: Print the best LSTM architecture
best_lstm_model = build_best_model(study_lstm.best_params, 'lstm')
print("\nBest LSTM Architecture:")
best_lstm_model.summary()



NameError: name 'build_best_model' is not defined

In [7]:
# Example: Print the best Transformer architecture
best_transformer_model = build_best_model(study_transformer.best_params, 'transformer')
print("\nBest Transformer Architecture:")
best_transformer_model.summary()



NameError: name 'build_best_model' is not defined

In [8]:
# Example: Print the best Neural Network architecture
best_nn_model = build_best_model(study_nn.best_params, 'neural_net')
print("\nBest Neural Network Architecture:")
best_nn_model.summary()



NameError: name 'build_best_model' is not defined

In [9]:
# Example: Print the best ODE architecture
best_ode_model = build_best_model(study_ode.best_params, 'ode')
print("\nBest ODE Architecture:")
best_ode_model.summary()

NameError: name 'build_best_model' is not defined

In [24]:


def prepare_data(file_path, time_steps, test_split=0.2, val_split=0.2, target_column='Close'):
    """
    Prepares time series data for training, validation, and testing.

    Args:
        file_path (str): Path to the CSV file.
        time_steps (int): Number of time steps (look-back window) for the sequences.
        test_split (float): Fraction of data to use for testing.
        val_split (float): Fraction of training data to use for validation.
        target_column (str): Column name of the target variable.

    Returns:
        tuple: (X_train, y_train, X_val, y_val, X_test, y_test)
    """
    # Load the dataset
    data = pd.read_csv(file_path)
    values = data[target_column].values
    
    # Create sequences
    X, y = [], []
    for i in range(len(values) - time_steps):
        X.append(values[i:i + time_steps])
        y.append(values[i + time_steps])
    
    X, y = np.array(X), np.array(y)
    
    # Split data into train, validation, and test
    test_size = int(len(X) * test_split)
    val_size = int(len(X) * val_split)
    
    X_train, y_train = X[:-test_size - val_size], y[:-test_size - val_size]
    X_val, y_val = X[-test_size - val_size:-test_size], y[-test_size - val_size:-test_size]
    X_test, y_test = X[-test_size:], y[-test_size:]
    
    return X_train, y_train, X_val, y_val, X_test, y_test


In [25]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, SimpleRNN, LSTM, MultiHeadAttention, LayerNormalization
import tensorflow as tf

def build_best_model(best_params, model_type, time_steps):
    """
    Builds and compiles the best model based on the type and parameters.

    Args:
        best_params (dict): Best hyperparameters for the model.
        model_type (str): Type of model ('rnn', 'lstm', 'transformer', 'neural_net', 'ode').
        time_steps (int): Number of time steps (look-back window).

    Returns:
        Sequential: Compiled model.
    """
    model = Sequential()
    
    if model_type == 'rnn':
        for _ in range(best_params['num_layers']):
            model.add(SimpleRNN(
                units=best_params['units'],
                activation='tanh',
                return_sequences=True if _ < best_params['num_layers'] - 1 else False,
                input_shape=(time_steps, 1) if _ == 0 else None
            ))
        model.add(Dense(1))
    
    elif model_type == 'lstm':
        for _ in range(best_params['num_layers']):
            model.add(LSTM(
                units=best_params['units'],
                activation='tanh',
                return_sequences=True if _ < best_params['num_layers'] - 1 else False,
                input_shape=(time_steps, 1) if _ == 0 else None
            ))
        model.add(Dense(1))
    
    elif model_type == 'transformer':
        model.add(MultiHeadAttention(
            num_heads=best_params['num_heads'],
            key_dim=best_params['key_dim'],
            input_shape=(None, 1)
        ))
        model.add(LayerNormalization())
        model.add(Dense(1))
    
    elif model_type == 'neural_net':
        model.add(Dense(
            best_params['units'], activation='relu', input_dim=time_steps
        ))
        for _ in range(best_params['num_layers']):
            model.add(Dense(
                units=best_params['units'],
                activation='relu'
            ))
        model.add(Dense(1))
    
    elif model_type == 'ode':
        model.add(Dense(
            best_params['units'], activation='relu', input_dim=time_steps
        ))
        for _ in range(best_params['num_layers']):
            model.add(Dense(
                best_params['units'], activation='relu'
            ))
        model.add(Dense(1))  # Approximate derivative
    
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=best_params['learning_rate']), loss='mse')
    return model
