In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, GRU, Dense, Dropout, Conv1D, MaxPooling1D
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
import concurrent.futures
from tqdm import tqdm
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, LSTM, Conv1D, MaxPooling1D, GRU
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import mean_squared_error
from tensorflow.keras.models import model_from_json

# Pre processsinng

In [None]:
def preprocess_stock_data(file_path, stock_name):
    df = pd.read_csv(file_path)
    df['stockname'] = df['stockname'].str.strip().str.upper()
    stock_name = stock_name.strip().upper()
    filtered_df = df[df['stockname'] == stock_name]
    if filtered_df.empty:
        raise ValueError(f"No data found for the stock '{stock_name}'. Please check the name.")
    filtered_df['timestamp'] = pd.to_datetime(filtered_df['timestamp'])
    filtered_df = filtered_df.sort_values(by='timestamp')
    imputer = SimpleImputer(strategy='mean')
    numeric_cols = filtered_df.select_dtypes(include=['float64', 'int64']).columns
    filtered_df[numeric_cols] = imputer.fit_transform(filtered_df[numeric_cols])
    filtered_df['moving_average'] = filtered_df['high'].rolling(window=5).mean()
    filtered_df['std_dev'] = filtered_df['high'].rolling(window=5).std()
    filtered_df.fillna(method='bfill', inplace=True)  
    return filtered_df

def prepare_lstm_data(df, lookback=60):
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(df[['high', 'low', 'moving_average', 'std_dev']])
    X, y = [], []
    for i in range(lookback, len(scaled_data)):
        X.append(scaled_data[i-lookback:i]) 
        y.append(scaled_data[i, [0, 1]])  
    X, y = np.array(X), np.array(y)
    return X, y, scaler

# Models

In [None]:
def create_dnn_lstm_model(input_shape):
    model = Sequential([
        Dense(128, activation='relu', input_shape=input_shape),
        Dropout(0.3),
        LSTM(128, return_sequences=False),
        Dropout(0.3),
        Dense(64, activation='relu'),
        Dropout(0.2),
        Dense(2)  
    ])
    model.compile(optimizer=Adam(learning_rate=0.0005), loss='mean_squared_error')
    return model

def create_cnn_lstm_model(input_shape):
    model = Sequential([
        Conv1D(64, kernel_size=3, activation='relu', input_shape=input_shape),
        MaxPooling1D(pool_size=2),
        LSTM(128, return_sequences=False),
        Dropout(0.3),
        Dense(64, activation='relu'),
        Dropout(0.2),
        Dense(2) 
    ])
    model.compile(optimizer=Adam(learning_rate=0.0005), loss='mean_squared_error')
    return model

def create_gru_lstm_model(input_shape):
    model = Sequential([
        GRU(128, return_sequences=True, input_shape=input_shape),
        Dropout(0.3),
        LSTM(128, return_sequences=False),
        Dropout(0.3),
        Dense(64, activation='relu'),
        Dropout(0.2),
        Dense(2)  
    ])
    model.compile(optimizer=Adam(learning_rate=0.0005), loss='mean_squared_error')
    return model

# supporter

In [None]:
def find_exact_date(df, target_date):
    target_date = pd.to_datetime(target_date).normalize()
    if target_date in df['timestamp'].dt.normalize().values:
        return target_date
    else:
        raise ValueError(f"No data available for the exact date {target_date.date()}.")

# Train

In [None]:
data = preprocess_stock_data('/kaggle/input/stock-past-one-year-data/stocks.csv', 'CSX')
prediction_date = '2024-11-15'
prediction_date = pd.to_datetime(prediction_date).normalize()
    
try:
    exact_date = find_exact_date(data, prediction_date)
    print(f"Using the exact available date: {exact_date.date()} for prediction.")
except ValueError as e:
    print(e)
    exit()

In [None]:
train_data = data[data['timestamp'] < exact_date]
test_data = data[data['timestamp'].dt.normalize() == exact_date]
    
if test_data.empty:
    raise ValueError(f"No data available for the prediction date: {exact_date.date()}")
    
X_train, y_train, scaler = prepare_lstm_data(train_data, lookback=60)
X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], X_train.shape[2]))

In [None]:
models = {
    'dnn_lstm': create_dnn_lstm_model((X_train.shape[1], X_train.shape[2])),
    'cnn_lstm': create_cnn_lstm_model((X_train.shape[1], X_train.shape[2])),
    'gru_lstm': create_gru_lstm_model((X_train.shape[1], X_train.shape[2])),
}

early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

In [None]:
def train_model(name, model):
    print(f"Training model: {name}")
    history = model.fit(
        X_train, y_train, 
        epochs=100, 
        batch_size=32, 
        validation_split=0.2, 
        callbacks=[early_stopping],
        verbose=1
    )
    return name, history

histories = {}

# Use ThreadPoolExecutor to train models in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {
        executor.submit(train_model, name, model): name 
        for name, model in models.items()
    }
    for future in concurrent.futures.as_completed(futures):
        name, history = future.result()
        histories[name] = history

# test

In [None]:
# X_test, _, _ = prepare_lstm_data(pd.concat([train_data, test_data]), lookback=60)
# X_test = X_test[-1].reshape((1, X_test.shape[1], X_test.shape[2]))

# predictions = []
# for name, model in models.items():
#     print(f"Predicting with model: {name}")
#     predicted_prices = model.predict(X_test)
#     predicted_prices = scaler.inverse_transform(predicted_prices)  # Rescale predictions
#     predicted_high_price, predicted_low_price = predicted_prices[0]
#     predictions.append([predicted_high_price,predicted_low_price])

# print('predictions',predictions)

# final_prediction = np.mean(predictions, axis=0)[0]
# predicted_high, predicted_low = final_prediction  

# actual_high = test_data['high'].values[0]
# actual_low = test_data['low'].values[0]

# print(f"Prediction Date: {exact_date.date()}")
# print(f"Predicted High Price: {predicted_high:.2f}")
# print(f"Predicted Low Price: {predicted_low:.2f}")
# print(f"Actual High Price: {actual_high:.2f}")
# print(f"Actual Low Price: {actual_low:.2f}")

In [None]:
# plt.figure(figsize=(10, 6))
# plt.plot(train_data['timestamp'], train_data['high'], label="Training Data (High)")
# plt.plot(train_data['timestamp'], train_data['low'], label="Training Data (Low)")
# plt.axvline(x=exact_date, color='r', linestyle='--', label="Prediction Date")
# plt.scatter(exact_date, predicted_high, color='g', label="Predicted High")
# plt.scatter(exact_date, predicted_low, color='b', label="Predicted Low")
# plt.scatter(exact_date, actual_high, color='orange', label="Actual High")
# plt.scatter(exact_date, actual_low, color='purple', label="Actual Low")
# plt.legend()
# plt.title(f"Stock Price Prediction for {'TSLA'}")
# plt.xlabel("Date")
# plt.ylabel("Price")
# plt.xticks(rotation=45)
# plt.tight_layout()
# plt.show()

# Boost base modelss 

In [None]:
# data = preprocess_stock_data('/kaggle/input/stock-past-one-year-data/stocks.csv', 'CSX')
# prediction_date = '2024-11-15'
# prediction_date = pd.to_datetime(prediction_date).normalize()

# try:
#     exact_date = find_exact_date(data, prediction_date)
#     print(f"Using the exact available date: {exact_date.date()} for prediction.")
# except ValueError as e:
#     print(e)
#     exit()

# # Scaling and data preparation
# train_data = data[data['timestamp'] < exact_date]
# test_data = data[data['timestamp'] == exact_date]
# X_train, y_train, _ = prepare_lstm_data(train_data, lookback=60)

# # Fit the scaler
# scaler = MinMaxScaler()
# y_train_scaled = scaler.fit_transform(y_train)

# base_models = {
#     'dnn_lstm': create_dnn_lstm_model((X_train.shape[1], X_train.shape[2])),
#     'cnn_lstm': create_cnn_lstm_model((X_train.shape[1], X_train.shape[2])),
#     'gru_lstm': create_gru_lstm_model((X_train.shape[1], X_train.shape[2])),
# }

# early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# def boost_model(base_model, X_train, y_train, num_boosts=3):
#     models = []
#     residuals = y_train.copy()
#     model_json = base_model.to_json()
#     for i in range(num_boosts):
#         print(f"Training Boosted Model {i + 1}")
#         model = model_from_json(model_json)
#         model.compile(optimizer=Adam(learning_rate=0.0005), loss='mean_squared_error')
#         model.fit(X_train, residuals, epochs=40, batch_size=32, validation_split=0.2, callbacks=[early_stopping], verbose=1)
#         residuals -= model.predict(X_train)
#         models.append(model)
#     return models

# boosted_models = {}

# for name, model in base_models.items():
#     print("******************************************************")
#     print(f"Boosting {name}...")
#     boosted_models[name] = boost_model(model, X_train, y_train_scaled)

# # Prepare test data
# X_test, _, _ = prepare_lstm_data(pd.concat([train_data, test_data]), lookback=60)
# X_test = X_test[-1].reshape((1, X_test.shape[1], X_test.shape[2]))

# preds = []
# for name, boosted_model_list in boosted_models.items():
#     print(f"Predicting with boosted {name}")
#     combined_predictions = np.mean([model.predict(X_test) for model in boosted_model_list], axis=0)
#     combined_predictions = scaler.inverse_transform(combined_predictions)
#     predicted_high_price, predicted_low_price = combined_predictions[0]
#     preds.append([predicted_high_price, predicted_low_price])

# print('Predictions:', preds)

# Cyclic train

In [None]:
def calculate_weighted_average(predictions, weights):
    """
    Calculate the weighted average for the predictions using the provided weights.
    predictions: A list of predictions (e.g., [model1_predictions, model2_predictions, model3_predictions])
    weights: Corresponding weights for each model's predictions
    """
    weighted_sum = sum([p * w for p, w in zip(predictions, weights)])
    return weighted_sum

def calculate_error(predicted_high, predicted_low, actual_high, actual_low):
    """
    Calculate a simple error metric (e.g., Mean Squared Error) between predicted and actual prices.
    """
    high_error = (predicted_high - actual_high) ** 2
    low_error = (predicted_low - actual_low) ** 2
    return high_error + low_error

def trainner_and_tester(prediction_date):
    data = preprocess_stock_data('/kaggle/input/stock-past-one-year-data/stocks.csv', 'TSLA')
    prediction_date = pd.to_datetime(prediction_date).normalize()
        
    try:
        exact_date = find_exact_date(data, prediction_date)
        print(f"Using the exact available date: {exact_date.date()} for prediction.")
    except ValueError as e:
        print(e)
        exit()

    train_data = data[data['timestamp'] < exact_date]
    test_data = data[data['timestamp'].dt.normalize() == exact_date]
    
    if test_data.empty:
        raise ValueError(f"No data available for the prediction date: {exact_date.date()}")
        
    X_train, y_train, scaler = prepare_lstm_data(train_data, lookback=60)
    X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], X_train.shape[2]))

    histories = {}
    
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = {
            executor.submit(train_model, name, model): name 
            for name, model in models.items()
        }
        for future in concurrent.futures.as_completed(futures):
            name, history = future.result()
            histories[name] = history

    X_test, _, _ = prepare_lstm_data(pd.concat([train_data, test_data]), lookback=60)
    X_test = X_test[-1].reshape((1, X_test.shape[1], X_test.shape[2]))
    
    predictions = [[],[]]
    for name, model in models.items():
        print(f"Predicting with model: {name}")
        predicted_prices = model.predict(X_test)  
        dummy_columns = np.zeros((predicted_prices.shape[0], 2))  
        predicted_prices_with_dummies = np.hstack([predicted_prices, dummy_columns])
        predicted_prices_rescaled = scaler.inverse_transform(predicted_prices_with_dummies)
        predicted_high_price, predicted_low_price = predicted_prices_rescaled[0, 0], predicted_prices_rescaled[0, 1]
        predictions[0].append(predicted_high_price)
        predictions[1].append(predicted_low_price)

    possible_weights = [
    [0.1, 0.1, 0.8],
    [0.1, 0.2, 0.7],
    [0.1, 0.3, 0.6],
    [0.1, 0.4, 0.5],
    [0.1, 0.5, 0.4],
    [0.1, 0.6, 0.3],
    [0.1, 0.7, 0.2],
    [0.1, 0.8, 0.1],
    [0.2, 0.1, 0.7],
    [0.2, 0.2, 0.6],
    [0.2, 0.3, 0.5],
    [0.2, 0.4, 0.4],
    [0.2, 0.5, 0.3],
    [0.2, 0.6, 0.2],
    [0.2, 0.7, 0.1],
    [0.3, 0.1, 0.6],
    [0.3, 0.2, 0.5],
    [0.3, 0.3, 0.4],
    [0.3, 0.4, 0.3],
    [0.3, 0.5, 0.2],
    [0.3, 0.6, 0.1],
    [0.4, 0.1, 0.5],
    [0.4, 0.2, 0.4],
    [0.4, 0.3, 0.3],
    [0.4, 0.4, 0.2],
    [0.4, 0.5, 0.1],
    [0.5, 0.1, 0.4],
    [0.5, 0.2, 0.3],
    [0.5, 0.3, 0.2],
    [0.5, 0.4, 0.1],
    [0.6, 0.1, 0.3],
    [0.6, 0.2, 0.2],
    [0.6, 0.3, 0.1],
    [0.7, 0.1, 0.2],
    [0.7, 0.2, 0.1],
    [0.8, 0.1, 0.1]
]

    actual_high = test_data['high'].values[0]
    actual_low = test_data['low'].values[0]
    
    best_weights = None
    best_error = float('inf')
    best_predicted_high = 0
    best_predicted_low = 0
    
    for weights in possible_weights:
        weighted_high = calculate_weighted_average(predictions[0], weights)
        weighted_low = calculate_weighted_average(predictions[1], weights)
        
        error = calculate_error(weighted_high, weighted_low, actual_high, actual_low)
        
        if error < best_error:
            best_error = error
            best_weights = weights
            best_predicted_high = weighted_high
            best_predicted_low = weighted_low

    mean_high_prediction = np.mean(predictions[0])
    mean_low_prediction = np.mean(predictions[1])

    return (
        predictions , exact_date.date(), best_weights,best_predicted_high, mean_high_prediction,best_predicted_low, mean_low_prediction,actual_high, actual_low
    )

predictions ,exact_date,best_weights, best_predicted_high, mean_high_prediction,best_predicted_low, mean_low_prediction,actual_high, actual_low = trainner_and_tester('2024-12-02')

print(f"\nPrediction Date: {exact_date}")
print(f"\best weights: {best_weights}")
print(f"Best Predicted High: {best_predicted_high:.2f}")
print(f"Best Predicted Low: {best_predicted_low:.2f}")
print(f"mean Predicted High: {mean_high_prediction:.2f}")
print(f"mean Predicted Low: {mean_low_prediction:.2f}")
print(f"Actual High Price: {actual_high:.2f}")
print(f"Actual Low Price: {actual_low:.2f}")

# mass test

In [None]:
# Get the last 10 dates from the dataset
datatail = data.tail(300)
date_list = datatail['timestamp'].dt.strftime('%Y-%m-%d').tolist()
print(date_list)

# Initialize an empty list to store the results
results = []

# Iterate through each date and get the predictions
for date in date_list:
    perdictions , exact_date, best_weights, best_predicted_high, mean_high_prediction, best_predicted_low, mean_low_prediction, actual_high, actual_low = trainner_and_tester(date)
    
    result = {
        'Prediction Date': exact_date,
        'Best Predicted High': best_predicted_high,
        'Best Predicted Low': best_predicted_low,
        'Mean Predicted High': mean_high_prediction,
        'Mean Predicted Low': mean_low_prediction,
        'Actual High': actual_high,
        'Actual Low': actual_low,
        'dnn_lstm_low':predictions[0][0],
        'cnn_lstm_low':predictions[0][1],
        'gru_lstm_low':predictions[0][2],
        'dnn_lstm_high':predictions[1][0],
        'cnn_lstm_high':predictions[1][1],
        'gnn_lstm_high':predictions[1][2],
    }
    results.append(result)

# Convert results to a DataFrame
resultsdf = pd.DataFrame(results)

# Display the results
resultsdf

In [None]:
resultsdf.columns

In [None]:
actual_highs = resultsdf['Actual High']
actual_lows = resultsdf['Actual Low']

# Predicted values for each model
dnn_lstm_low = resultsdf['dnn_lstm_low']
cnn_lstm_low = resultsdf['cnn_lstm_low']
gru_lstm_low = resultsdf['gru_lstm_low']
dnn_lstm_high = resultsdf['dnn_lstm_high']
cnn_lstm_high = resultsdf['cnn_lstm_high']
gru_lstm_high = resultsdf['gnn_lstm_high']

mean_predicted_highs = resultsdf['Mean Predicted High']
mean_predicted_lows = resultsdf['Mean Predicted Low']
weighted_predicted_highs = resultsdf['Best Predicted High']
weighted_predicted_lows = resultsdf['Best Predicted Low']

# Calculate RMSE and MAE for each model prediction (both high and low prices)
def calculate_metrics(actuals, predicted):
    rmse = np.sqrt(mean_squared_error(actuals, predicted))
    mae = mean_absolute_error(actuals, predicted)
    return rmse, mae

# For High Prices
rmse_dnn_lstm_high, mae_dnn_lstm_high = calculate_metrics(actual_highs, dnn_lstm_high)
rmse_cnn_lstm_high, mae_cnn_lstm_high = calculate_metrics(actual_highs, cnn_lstm_high)
rmse_gru_lstm_high, mae_gru_lstm_high = calculate_metrics(actual_highs, gru_lstm_high)
rmse_mean_high, mae_mean_high = calculate_metrics(actual_highs, mean_predicted_highs)
rmse_weighted_high, mae_weighted_high = calculate_metrics(actual_highs, weighted_predicted_highs)

# For Low Prices
rmse_dnn_lstm_low, mae_dnn_lstm_low = calculate_metrics(actual_lows, dnn_lstm_low)
rmse_cnn_lstm_low, mae_cnn_lstm_low = calculate_metrics(actual_lows, cnn_lstm_low)
rmse_gru_lstm_low, mae_gru_lstm_low = calculate_metrics(actual_lows, gru_lstm_low)
rmse_mean_low, mae_mean_low = calculate_metrics(actual_lows, mean_predicted_lows)
rmse_weighted_low, mae_weighted_low = calculate_metrics(actual_lows, weighted_predicted_lows)

# Print the results for each model
print("\nRegression Metrics for DNN LSTM Model (High Prices):")
print(f"RMSE: {rmse_dnn_lstm_high:.2f}")
print(f"MAE: {mae_dnn_lstm_high:.2f}")

print("\nRegression Metrics for CNN LSTM Model (High Prices):")
print(f"RMSE: {rmse_cnn_lstm_high:.2f}")
print(f"MAE: {mae_cnn_lstm_high:.2f}")

print("\nRegression Metrics for GRU LSTM Model (High Prices):")
print(f"RMSE: {rmse_gru_lstm_high:.2f}")
print(f"MAE: {mae_gru_lstm_high:.2f}")

print("\nRegression Metrics for Mean Model (High Prices):")
print(f"RMSE: {rmse_mean_high:.2f}")
print(f"MAE: {mae_mean_high:.2f}")

print("\nRegression Metrics for Weighted Model (High Prices):")
print(f"RMSE: {rmse_weighted_high:.2f}")
print(f"MAE: {mae_weighted_high:.2f}")

print("\nRegression Metrics for DNN LSTM Model (Low Prices):")
print(f"RMSE: {rmse_dnn_lstm_low:.2f}")
print(f"MAE: {mae_dnn_lstm_low:.2f}")

print("\nRegression Metrics for CNN LSTM Model (Low Prices):")
print(f"RMSE: {rmse_cnn_lstm_low:.2f}")
print(f"MAE: {mae_cnn_lstm_low:.2f}")

print("\nRegression Metrics for GRU LSTM Model (Low Prices):")
print(f"RMSE: {rmse_gru_lstm_low:.2f}")
print(f"MAE: {mae_gru_lstm_low:.2f}")

print("\nRegression Metrics for Mean Model (Low Prices):")
print(f"RMSE: {rmse_mean_low:.2f}")
print(f"MAE: {mae_mean_low:.2f}")

print("\nRegression Metrics for Weighted Model (Low Prices):")
print(f"RMSE: {rmse_weighted_low:.2f}")
print(f"MAE: {mae_weighted_low:.2f}")

In [None]:
# Ensure the 'Prediction Date' column is in datetime format
resultsdf['Prediction Date'] = pd.to_datetime(resultsdf['Prediction Date'])

# Create the plot with all models in a single graph
plt.figure(figsize=(12, 8))

# Plot for DNN LSTM Model
plt.plot(resultsdf['Prediction Date'], resultsdf['dnn_lstm_high'], label="DNN LSTM Predicted High", color='r', marker='o')
plt.plot(resultsdf['Prediction Date'], resultsdf['dnn_lstm_low'], label="DNN LSTM Predicted Low", color='r', marker='*')

# Plot for CNN LSTM Model
plt.plot(resultsdf['Prediction Date'], resultsdf['cnn_lstm_high'], label="CNN LSTM Predicted High", color='b', marker='o')
plt.plot(resultsdf['Prediction Date'], resultsdf['cnn_lstm_low'], label="CNN LSTM Predicted Low", color='b', marker='*')

# Plot for GRU LSTM Model
plt.plot(resultsdf['Prediction Date'], resultsdf['gnn_lstm_high'], label="GRU LSTM Predicted High", color='g', marker='o')
plt.plot(resultsdf['Prediction Date'], resultsdf['gru_lstm_low'], label="GRU LSTM Predicted Low", color='g', marker='*')

# Plot for Mean Model
plt.plot(resultsdf['Prediction Date'], resultsdf['Mean Predicted High'], label="Mean Predicted High", color='c', marker='o')
plt.plot(resultsdf['Prediction Date'], resultsdf['Mean Predicted Low'], label="Mean Predicted Low", color='c', marker='*')

# Plot for Weighted Model
plt.plot(resultsdf['Prediction Date'], resultsdf['Best Predicted High'], label="Weighted Predicted High", color='m', marker='o')
plt.plot(resultsdf['Prediction Date'], resultsdf['Best Predicted Low'], label="Weighted Predicted Low", color='m', marker='*')

# Plot for Actual High and Low
plt.plot(resultsdf['Prediction Date'], resultsdf['Actual High'], label="Actual High", color='orange', marker='o')
plt.plot(resultsdf['Prediction Date'], resultsdf['Actual Low'], label="Actual Low", color='orange', marker='*')

# Adding title, labels, and legend
plt.title(f"Stock Price Prediction (All Models) for TSLA")
plt.xlabel("Date")
plt.ylabel("Price")
plt.xticks(rotation=45)
plt.legend()

plt.tight_layout()

plt.show()

In [None]:
resultsdf.to_csv("prepareforsttack.csv")