In [1]:
import numpy as np 
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential  # type: ignore
from tensorflow.keras.layers import Dense,LSTM,Dropout # type: ignore
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error,mean_absolute_error
import matplotlib.pyplot as plt
from datetime import timedelta


In [2]:
sample_size = 11 #Years

In [3]:
filepath = "H:/Projects/CryptoBot/btcusd_1-min_data.csv"
data = pd.read_csv(filepath)
data['Timestamp'] =pd.to_datetime(data['Timestamp'],unit = 's')
data = data.sort_values(by = "Timestamp")
data['Timestamp']= data['Timestamp'].fillna(data["Timestamp"].median())

data = data[data['Timestamp'] >= data['Timestamp'].max() - timedelta(days=365*sample_size)]

In [4]:
lag_features = ["Open","High","Low","Close","Volume"]
data["target"] = data["Close"].shift(-1)



dat =data.copy()

for lag in range(1, 5):
    for feature in lag_features:
        data.loc[:, f"{feature}_lag_{lag}"] = data[feature].shift(lag)


data['rolling_mean_5'] = data['Close'].rolling(window=5).mean()
data['rolling_std_5'] = data['Close'].rolling(window=5).std()
data['rolling_mean_10'] = data['Close'].rolling(window=10).mean()
data['rolling_std_10'] = data['Close'].rolling(window=10).std()
data['hour'] = data['Timestamp'].dt.hour
data['day_of_week'] = data['Timestamp'].dt.dayofweek
data['price_range'] = data['High'] - data['Low']

data = data.dropna()

In [8]:
from sklearn.preprocessing import MinMaxScaler
import numpy as np

# Define the features and target
features = [
    "Open", "High", "Low", "Close", "Volume",
    "rolling_mean_5", "rolling_std_5", "rolling_mean_10", "rolling_std_10",
    "hour", "day_of_week"
]
target = "Close"

# Scale features
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data[features + [target]])

# Convert scaled data back to a DataFrame for easy manipulation
scaled_data = pd.DataFrame(scaled_data, columns=features + [target])
scaled_data = scaled_data.dropna()


In [9]:
# Define the number of timesteps to look back
look_back = 10 # For example, use the last 60 timesteps to predict the next one

# Create sequences
X, y = [],[]
for i in range(look_back, len(scaled_data)):
    X.append(scaled_data[features].iloc[i-look_back:i].values)  # Features over the last 'look_back' timesteps
    y.append(scaled_data[target].iloc[i])  # Target value at timestep i

X, y = pd.array(X), pd.array(y)

# Print shapes to confirm
print(f"X shape: {X.shape}")  # Should be (samples, look_back, features)
print(f"y shape: {y.shape}")  # Should be (samples,)


MemoryError: Unable to allocate 518. MiB for an array with shape (12, 5662232) and data type float64

In [None]:
# Split into training and testing sets
split_ratio = 0.8
split_index = int(len(X) * split_ratio)

X_train, X_test = X[:split_index], X[split_index:]
y_train, y_test = y[:split_index], y[split_index:]

print(f"Training set shape: {X_train.shape}")
print(f"Testing set shape: {X_test.shape}")


In [None]:
# Build the LSTM model
model = Sequential()

# First LSTM layer with Dropout
model.add(LSTM(units=50, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
model.add(Dropout(0.2))

# Second LSTM layer
model.add(LSTM(units=50, return_sequences=False))
model.add(Dropout(0.2))

# Dense output layer
model.add(Dense(units=1))  # Single output (predicting 'Close')

# Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')

# Print model summary
model.summary()


In [None]:
# Train the model
history = model.fit(
    X_train, y_train,
    epochs=20,
    batch_size=32,
    validation_data=(X_test, y_test),
    verbose=1
)


In [None]:
# Make predictions
y_pred = model.predict(X_test)

# Inverse scale the predictions and actual values
y_test_actual = scaler.inverse_transform(np.concatenate([np.zeros((len(y_test), len(features))), y_test.reshape(-1, 1)], axis=1))[:, -1]
y_pred_actual = scaler.inverse_transform(np.concatenate([np.zeros((len(y_pred), len(features))), y_pred], axis=1))[:, -1]

# Evaluate the model
from sklearn.metrics import mean_squared_error, mean_absolute_error

mse = mean_squared_error(y_test_actual, y_pred_actual)
mae = mean_absolute_error(y_test_actual, y_pred_actual)

print(f"Mean Squared Error (MSE): {mse}")
print(f"Mean Absolute Error (MAE): {mae}")


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 6))
plt.plot(y_test_actual, label="Actual", color="blue")
plt.plot(y_pred_actual, label="Predicted", color="orange")
plt.title("LSTM: Actual vs Predicted Closing Prices")
plt.xlabel("Time (Test Samples)")
plt.ylabel("Closing Price")
plt.legend()
plt.show()
