In [None]:
!pip install scikit-learn

In [None]:
import csv
from typing import Any
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.optimizers import Adam
import pandas as pd
from sklearn.impute import SimpleImputer
import os
from sklearn.metrics import accuracy_score

import tensorflow as tf


In [None]:
# classification

In [None]:
# LOAD CSV INTO DICT ARRAY
def load_csv(file_name):
    container = []
    file_path = f'{file_name}'

    with open(file_path, mode='r', newline='') as file:
        csv_reader = csv.DictReader(file)
        
        for row in csv_reader:
            container.append(row)

    return container

In [None]:
class create_model_suite:
    def __init__(self):
        self.model = None
    
        self.raw_data = pd.read_csv('finance_historical.csv')
        #self.fifth_row = self.raw_data.iloc[4]   

        # References to model layers
        self.references = {
            'lstm': LSTM,
            'dense': Dense,
        }

    ########################################################################################################
    ########################################################################################################


    def train_model(self, model_name: str, model_params: dict) -> None:
        try:
            # Feature extraction: Reshaping 'close' prices to use as input
            X = self.raw_data[['open', 'high', 'low', 'volume']]

            # Define the target variable for classification (binary: 1 if next close > current close, else 0)
            self.raw_data['target'] = (self.raw_data['close'].shift(-1) > self.raw_data['close']).astype(int)
            y = self.raw_data['target'].values[:-1]  # Exclude the last row due to NaN from the shift
            X = X[:-1]  # Align X with y

            # Impute missing values in X using the median strategy
            imputer = SimpleImputer(strategy='median')
            X = imputer.fit_transform(X)

            # Reshape X for LSTM input (samples, timesteps, features)
            X = X.reshape((X.shape[0], 1, X.shape[1]))

            model_layers = []

            # Iterate over the layers defined in model_params['layers']
            for i, layer_params in enumerate(model_params['layers']):
                layer_type = layer_params['type']  # 'lstm' or 'dense'
                layer_class = self.references.get(layer_type)  # Lookup corresponding class (e.g., LSTM, Dense)

                if layer_class is None:
                    raise ValueError(f"Layer type '{layer_type}' not found in references.")

                # For LSTM layers, we need to pass input_shape for the first layer
                if i == 0 and 'input_shape' in layer_params:
                    # First layer requires input_shape
                    layer = layer_class(
                        units=layer_params['units'],
                        activation=layer_params['activation_func'],
                        input_shape=layer_params['input_shape']
                    )
                else:
                    # Subsequent layers don't require input_shape
                    layer = layer_class(
                        units=layer_params['units'],
                        activation=layer_params['activation_func']
                    )
                
                # Add layer to model
                model_layers.append(layer)

            # Initialize the Sequential model with dynamically built layers
            self.model = Sequential(model_layers)

            # Dynamically set optimizer, loss function, and metrics
            optimizer = model_params.get('optimizer', Adam(learning_rate=0.001))  # Default to Adam optimizer
            loss_function = model_params.get('loss', 'binary_crossentropy')       # Default to binary cross-entropy
            metrics = model_params.get('metrics', ['accuracy'])                   # Default to accuracy metric

            # Compile the model with dynamic parameters
            self.model.compile(optimizer=optimizer, loss=loss_function, metrics=metrics)

            # Train the model with dynamic epochs and batch size
            self.model.fit(
                X, y, 
                epochs=model_params.get('epochs', 50), 
                batch_size=model_params.get('batch_size', 32), 
                verbose=1
            )

            # Evaluate the model's performance
            y_pred = (self.model.predict(X) > 0.5).astype("int32")
            accuracy = accuracy_score(y, y_pred)
            print(f"Model trained with accuracy: {accuracy}")

            # Save the model to the current directory
            self.model.save(f'{model_name}.keras')

        except Exception as e:
            print(f"Error during model training: {e}")
            raise

    ########################################################################################################
    ########################################################################################################

    def load_model(self, model_name: str) -> None:
        if self.model is not None:
            raise Exception('LOAD ERROR: A MODEL HAS ALREADY BEEN LOADED')
        try:
            self.model = tf.keras.models.load_model(f'{model_name}.keras')
            print(f"Model loaded from {model_name}")
        except Exception as e:
            print(f"Error loading model: {e}")
            raise

    ########################################################################################################
    ########################################################################################################

    def predict_outcome(self, row_index: int) -> Any:
        if self.model is None:
            raise Exception('PREDICT ERROR: LOAD A MODEL FIRST')
        try:
            # Select the features for the specified row (ensure these are the same features used for training)
            input_data = self.raw_data.iloc[row_index:row_index+1][['open', 'high', 'low', 'volume']]
    
            # Reshape the input data for LSTM: (samples, timesteps, features)
            X = input_data.values.reshape((1, 1, -1))  # 1 sample, 1 timestep, and features count
    
            # Make the prediction
            prediction = self.model.predict(X)
            return prediction
        except Exception as e:
            print(f"Error during prediction: {e}")
            raise



# Train Model

In [None]:
# Define valid model parameters with dynamic optimizer, loss, and metrics
suite = create_model_suite()

model_params = {
    'epochs': 100,
    'batch_size': 32,
    'model_tag': 'v1',
    'version': 1,
    'optimizer': Adam(learning_rate=0.001),
    'loss': 'binary_crossentropy',
    'metrics': ['accuracy', 'mse'],
    'layers': [
        {
            'type': 'lstm',
            'units': 50,
            'activation_func': 'relu',
        },
        {
            'type': 'dense',
            'units': 1,
            'activation_func': 'sigmoid'
        }
    ]
}

# Train the model using the dynamic model parameters
suite.train_model('LSTM', model_params)



# Load the same model

In [None]:
suite = create_model_suite()
suite.load_model('LSTM')

row_index=9
prediction = suite.predict_outcome(row_index)
print(prediction)

# Regression


In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
import os

class create_model_suite:
    def __init__(self):
        self.model = None
        self.raw_data = None
        self.timesteps = 3  # Number of timesteps for LSTM (look-back period)
        self.scaler = None
        self.imputer = None

        # References to model layers
        self.references = {
            'lstm': LSTM,
            'dense': Dense,
        }

    def load_data(self, file_name: str) -> None:
        """ Load data from a CSV file """
        try:
            self.raw_data = pd.read_csv(file_name)
            print("Data loaded successfully.")
            print(self.raw_data.head(5))
        except Exception as e:
            print(f"Failed to load data: {e}")

    def train_model(self, model_params: dict) -> None:
        """ Train an LSTM model with specified parameters """
        if self.raw_data is None or self.raw_data.empty:
            print("No data available to train the model.")
            return

        try:
            # Ensure timestamp is parsed correctly (if applicable)
            if 'timestamp' in self.raw_data.columns:
                self.raw_data['timestamp'] = pd.to_datetime(self.raw_data['timestamp'])

            # Select the features and target
            X = self.raw_data[['open', 'high', 'low', 'volume']]
            y = self.raw_data['adjusted_close']

            # Impute missing values in X
            self.imputer = SimpleImputer(strategy='median')
            X = self.imputer.fit_transform(X)

            # Normalize features
            self.scaler = StandardScaler()
            X = self.scaler.fit_transform(X)
            
            # Prepare sequences for LSTM
            X, y = self.create_sequences(X, y.values, self.timesteps)
            
            # Split data into training and testing sets
            X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
            # Initialize and configure the LSTM model
            model_layers = []
            for i, layer_params in enumerate(model_params['layers']):
                layer_type = layer_params['type']
                layer_class = self.references.get(layer_type)
                
                if layer_class is None:
                    raise ValueError(f"Layer type '{layer_type}' not found in references.")

                # Add LSTM layers with input_shape for the first layer
                if i == 0:
                    layer = layer_class(
                        units=layer_params['units'],
                        activation=layer_params['activation_func'],
                        input_shape=layer_params['input_shape']
                    )
                else:
                    # Subsequent layers don't require input_shape
                    layer = layer_class(
                        units=layer_params['units'],
                        activation=layer_params['activation_func']
                    )
                model_layers.append(layer)

            # Add Dense output layer for regression
            model_layers.append(Dense(1, activation='linear'))  # Output layer for regression
            
            # Initialize the Sequential model with dynamically built layers
            self.model = Sequential(model_layers)

            # Dynamically set optimizer, loss function, and metrics
            optimizer = model_params.get('optimizer', Adam(learning_rate=0.001))
            loss_function = model_params.get('loss', 'mean_squared_error')
            metrics = model_params.get('metrics', ['mae'])
    
            self.model.compile(optimizer=optimizer, loss=loss_function, metrics=metrics)
            self.model.fit(
                X_train, y_train, 
                epochs=model_params.get('epochs', 50), 
                batch_size=model_params.get('batch_size', 32), 
                verbose=1
            )
    
            # Evaluate the model's performance
            y_pred = self.model.predict(X_test)
            mse = mean_squared_error(y_test, y_pred)
            print(f"Model trained with MSE: {mse}")

            # Save the model
            model_filename = f'model_lstm_{model_params.get("model_tag", "v1")}.keras'
            self.model.save(model_filename)
            print(f"Model saved as {model_filename}")

        except Exception as e:
            print(f"Error during model training: {e}")
            raise



    def load_model(self, model_name: str) -> None:
        if self.model is not None:
            raise Exception('LOAD ERROR: A MODEL HAS ALREADY BEEN LOADED')
        try:
            self.model = tf.keras.models.load_model(f'{model_name}.keras')
            print(f"Model loaded from {model_name}")
        except Exception as e:
            print(f"Error loading model: {e}")
            raise

    def predict_outcome(self, input_data: dict) -> Any:
        """ Predict the outcome based on input data """
        if self.model is None:
            raise Exception('PREDICT ERROR: LOAD A MODEL FIRST')
        
        try:
            # Convert input data to DataFrame for consistency
            input_df = pd.DataFrame([input_data])
            
            # Impute missing values and normalize features
            input_df = self.imputer.transform(input_df)
            input_df = self.scaler.transform(input_df)
            
            # Ensure the input data has the correct number of features
            if input_df.shape[1] != self.raw_data[['open', 'high', 'low', 'volume']].shape[1]:
                raise ValueError("The number of features in the input data does not match the training data.")
    
            # Prepare sequence for LSTM input
            # Since we're predicting a single point, we need to simulate a sequence
            if input_df.shape[0] < self.timesteps:
                # Repeat the input data to simulate the sequence
                repeated_input = np.tile(input_df, (self.timesteps, 1))
            else:
                # Slice the last `timesteps` data points if input_data contains a sufficient number of data points
                repeated_input = input_df[-self.timesteps:]
            
            # Reshape to fit LSTM input
            input_sequence = np.reshape(repeated_input, (1, self.timesteps, input_df.shape[1]))
            
            # Make the prediction
            prediction = self.model.predict(input_sequence)
            return prediction.flatten()[0]
        except Exception as e:
            print(f"Error during prediction: {e}")
            raise

    def create_sequences(self, data, target, timesteps):
        """ Create sequences for LSTM input """
        X, y = [], []
        for i in range(len(data) - timesteps):
            X.append(data[i:i + timesteps])
            y.append(target[i + timesteps])
        return np.array(X), np.array(y)


In [None]:
from sklearn.preprocessing import StandardScaler
import numpy as np

# Create an instance of the model suite
suite = create_model_suite()

# Load the data
suite.load_data('finance_historical.csv')

# Define model parameters
model_params = {
    'model_tag': 'v1',
    'layers': [
        {'type': 'lstm', 'units': 50, 'activation_func': 'tanh', 'input_shape': (10, 4)},
        {'type': 'dense', 'units': 25, 'activation_func': 'relu'}
    ],
    'optimizer': 'adam',
    'loss': 'mean_squared_error',
    'metrics': ['accuracy'],
    'epochs': 20,
    'batch_size': 32
}

# Train the model
suite.train_model(model_params)

# Load the model and make a prediction
#suite.load_model({'model_tag': 'v1'})

# Sample input data for prediction
sample_input = {
    'open': 7.5,
    'high': 7.6,
    'low': 7.4,
    'volume': 600000000
}

# Make a prediction
try:
    prediction = suite.predict_outcome(sample_input)
    print(f"Predicted Adjusted Close Price: {prediction}")
except Exception as e:
    print(f"Error during prediction: {e}")


In [None]:
suite = create_model_suite()
suite.load_model('model_lstm_v1')