In [None]:
class Model:
    def __init__(self, learning_rate, num_layers, size, size_layer, output_size, dropout_rate=0.1):
    
        self.lstm_cells = [tf.keras.layers.LSTMCell(size_layer) for _ in range(num_layers)]

        self.inputs = tf.keras.Input(shape=(None, size))

        self.rnn = tf.keras.layers.RNN(self.lstm_cells, return_sequences=True, return_state=True)

        self.outputs, *self.last_state = self.rnn(self.inputs)

        self.logits = tf.keras.layers.Dense(output_size)(self.outputs[:, -1, :])

        self.model = tf.keras.Model(inputs=self.inputs, outputs=self.logits)
        
        self.model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate), loss='mean_absolute_error')

    def predict(self, X):
        
        return self.model(X)  

    def evaluate(self, X, y):
        
        return self.model.evaluate(X, y) 

    def fit(self, X, y, epochs, batch_size, validation_data, verbose=0):
        
        return self.model.fit(X, y, epochs=epochs, batch_size=batch_size, validation_data=validation_data, verbose=verbose)

In [None]:
def predict_next_day(modelnn, historical_data):
    
    historical_data = np.expand_dims(historical_data, axis=1) 

    next_day_prediction = modelnn.predict(historical_data)
    return next_day_prediction.numpy().flatten()

In [None]:
def forecast(X_train, y_train, X_test, y_test, learning_rate, num_layers, size, size_layer, dropout_rate, epoch, batch_size):
    modelnn = Model(
        learning_rate, 
        num_layers, 
        size, 
        size_layer, 
        1,  
        dropout_rate
    )
    
    modelnn.fit(X_train, y_train, epochs=epoch, batch_size=batch_size, validation_data=(X_test, y_test),verbose=0)

    loss = modelnn.evaluate(X_test, y_test)

    y_pred = modelnn.predict(X_test)

    mse = mean_squared_error(y_test, y_pred)
    y_pred_np = y_pred.numpy()
    r2 = r2_score(y_test, y_pred_np)

    return modelnn, y_test, y_pred

In [None]:
def stock_price_preprocessing(merged_data, mem_days, pre_days, columns, k_best_features=0):
    
    merged_data = merged_data[columns]
    
    merged_data['close_change'] = merged_data['close'].shift(-pre_days)
  
    threshold = 0.5  

    null_ratios = merged_data.isnull().sum() / len(merged_data)  

    columns_to_drop = null_ratios[null_ratios > threshold].index  
    
    merged_data_cleaned = merged_data.drop(columns=columns_to_drop)  
  
    merged_data_cleaned.dropna(inplace=True)  
    X = merged_data_cleaned[[col for col in merged_data_cleaned.columns if col != 'close_change']].values  
    y = merged_data_cleaned['close_change'].values  
    
       
    if(k_best_features):
        selector = SelectKBest(mutual_info_regression, k=k_best_features)
        X_selected = selector.fit_transform(X, y)
    else:
        X_selected = X
    
    
    scaler = MinMaxScaler()
    sca_X = scaler.fit_transform(X_selected) 
    
    X_reshaped = sca_X.reshape((sca_X.shape[0], sca_X.shape[1]))
    
    X_processed, y_processed = [], []
   
    for i in range(mem_days, X_reshaped.shape[0]):
        X_processed.append(X_reshaped[i-mem_days:i])  
        y_processed.append(y[i]) 
    
    
    return np.array(X_processed), np.array(y_processed)