In [2]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import mean_absolute_error, mean_squared_error
import numpy as np
import kagglehub
from pathlib import Path
import joblib
from pathlib import Path

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# Download latest version
path = Path(kagglehub.dataset_download("sandersekreve/bitcoin-dataset-1h"))

In [4]:
csv_path = path / 'btc_1h_data_2018_to_2025.csv'
df = pd.read_csv(csv_path)

In [5]:
df.head()

Unnamed: 0,Open time,Open,High,Low,Close,Volume,Close time,Quote asset volume,Number of trades,Taker buy base asset volume,Taker buy quote asset volume,Ignore
0,2018-01-01 00:00:00,13715.65,13715.65,13400.01,13529.01,443.356199,2018-01-01 00:59:59.999,5993910.0,5228,228.521921,3090541.0,0
1,2018-01-01 01:00:00,13528.99,13595.89,13155.38,13203.06,383.697006,2018-01-01 01:59:59.999,5154522.0,4534,180.840403,2430449.0,0
2,2018-01-01 02:00:00,13203.0,13418.43,13200.0,13330.18,429.064572,2018-01-01 02:59:59.999,5710192.0,4887,192.237935,2558505.0,0
3,2018-01-01 03:00:00,13330.26,13611.27,13290.0,13410.03,420.08703,2018-01-01 03:59:59.999,5657448.0,4789,137.918407,1858041.0,0
4,2018-01-01 04:00:00,13434.98,13623.29,13322.15,13601.01,340.807329,2018-01-01 04:59:59.999,4588047.0,4563,172.957635,2328058.0,0


In [6]:
# Add technical indicators

# Simple Moving Averages
df["SMA_7"] = df["Close"].rolling(window=7).mean()
df["SMA_21"] = df["Close"].rolling(window=21).mean()

# RSI (Relative Strength Index)
delta = df["Close"].diff()
gain = (delta.where(delta > 0, 0)).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
rs = gain / loss
df["RSI_14"] = 100 - (100 / (1 + rs))

# MACD (Moving Average Convergence Divergence)
exp1 = df["Close"].ewm(span=12, adjust=False).mean()
exp2 = df["Close"].ewm(span=26, adjust=False).mean()
df["MACD"] = exp1 - exp2
df["Signal_line"] = df["MACD"].ewm(span=9, adjust=False).mean()

df.dropna(inplace=True)

In [7]:
# === Feature Selection ===
features = [
    # 'Open', 'High', 'Low', 'Close', 'Volume',
    # MACD, Signal_line, SMA_7, RSI_14
    # 'High', 'Low', 'Close', 'SMA_21'
    'High', 'Close', 'Volume', 'Signal_line'
]
target_col = "High"
target_idx = features.index(target_col)

data = df[features]


corr_matrix = df[features].corr()
print(corr_matrix)


                 High     Close    Volume  Signal_line
High         1.000000  0.999971 -0.120442     0.060202
Close        0.999971  1.000000 -0.122295     0.060914
Volume      -0.120442 -0.122295  1.000000    -0.019840
Signal_line  0.060202  0.060914 -0.019840     1.000000


In [8]:
# === Train/Test Split ===
sequence_length = 80
split_idx = int(len(data) * 0.8)

train_data = data.iloc[:split_idx]
test_data = data.iloc[split_idx:]

In [9]:
# === Scaling ===
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled = scaler.transform(test_data)

In [10]:
# === Build Sequences ===
def build_sequences(scaled_data):
    X, y = [], []
    for i in range(sequence_length, len(scaled_data)):
        X.append(scaled_data[i-sequence_length:i])
        y.append(scaled_data[i, target_idx])
    return np.array(X), np.array(y)

X_train, y_train = build_sequences(train_scaled)
X_test, y_test = build_sequences(test_scaled)

In [11]:
# === Model Definition ===
model = Sequential([
    LSTM(64, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])),
    Dropout(0.2),
    LSTM(64),
    Dropout(0.2),
    Dense(1)
])

model.compile(optimizer="adam", loss="mean_squared_error")
history = model.fit(X_train, y_train, epochs=25, batch_size=32, validation_split=0.1)

Epoch 1/25


  super().__init__(**kwargs)


[1m1437/1437[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m63s[0m 42ms/step - loss: 0.0028 - val_loss: 2.8313e-04
Epoch 2/25
[1m1437/1437[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m62s[0m 43ms/step - loss: 5.7089e-04 - val_loss: 1.1509e-04
Epoch 3/25
[1m1437/1437[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m65s[0m 45ms/step - loss: 4.4640e-04 - val_loss: 8.3681e-05
Epoch 4/25
[1m1437/1437[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m67s[0m 47ms/step - loss: 4.2260e-04 - val_loss: 1.5999e-05
Epoch 5/25
[1m1437/1437[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m64s[0m 44ms/step - loss: 4.1578e-04 - val_loss: 6.2717e-05
Epoch 6/25
[1m1437/1437[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m61s[0m 42ms/step - loss: 3.9669e-04 - val_loss: 8.3397e-06
Epoch 7/25
[1m1437/1437[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m64s[0m 45ms/step - loss: 3.9913e-04 - val_loss: 1.7956e-05
Epoch 8/25
[1m1437/1437[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m65s[0m 45ms/step -

In [12]:
# === Prediction ===
predicted_scaled = model.predict(X_test)

[1m398/398[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 18ms/step


In [13]:
# === Invert Scaling for Target ===
high_mean = scaler.data_min_[target_idx]
high_scale = scaler.data_max_[target_idx] - scaler.data_min_[target_idx]

predicted_high = predicted_scaled * high_scale + high_mean
actual_high = y_test * high_scale + high_mean

In [14]:
# save model
model.save("bitcoin_lstm_model8.keras")

In [None]:

# === Evaluation ===
mae = mean_absolute_error(actual_high, predicted_high)
rmse = np.sqrt(mean_squared_error(actual_high, predicted_high))

def mean_absolute_percentage_error(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    epsilon = 1e-10  # to avoid division by zero
    return np.mean(np.abs((y_true - y_pred) / (y_true + epsilon))) * 100

mape = mean_absolute_percentage_error(actual_high, predicted_high)

print(f"📏 MAE: ${mae:.2f}")
print(f"📏 RMSE: ${rmse:.2f}")
print(f"📉 MAPE: {mape:.2f}%")

# save model
model.save(f"bitcoin_1h_lstm_model_{mae:.2f}.keras")

# save scaler
joblib.dump(scaler, f"1h_scaler_{mae:.2f}.pkl")

📏 MAE: $4680.47
📏 RMSE: $7367.79
📉 MAPE: 29.29%


['1h_scaler4680.47.pkl']

In [None]:
# === (Optional) Plotting ===
plt.figure(figsize=(12, 6))
plt.plot(actual_high, label="Actual High", color="blue")
plt.plot(predicted_high, label="Predicted High", color="orange")
plt.title("Bitcoin High Price Prediction")
plt.xlabel("Days")
plt.ylabel("Price (USD)")
plt.legend()
plt.show()

In [None]:
plt.plot(history.history['loss'], label='Train loss')
plt.plot(history.history['val_loss'], label='Val loss')
plt.legend()
plt.title("Training vs Validation Loss")
plt.show()


In [None]:
plt.figure(figsize=(14,6))
plt.plot(actual_high, color='black', label='Actual High Price')
plt.plot(predicted_high, color='green', label='Predicted High Price')
plt.title('Bitcoin High Price Prediction')
plt.xlabel('Days')
plt.ylabel('Price')
plt.legend()
plt.show()


In [None]:
# import itertools
# from sklearn.model_selection import train_test_split
# from sklearn.preprocessing import MinMaxScaler
# from sklearn.metrics import mean_absolute_error
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import LSTM, Dropout, Dense
# import numpy as np
# import pandas as pd
# from tqdm import tqdm

# # Full feature set
# all_features = [
#     'Open', 'High', 'Low', 'Close', 'Volume',
#     'MACD', 'Signal_line', 'SMA_7', 'RSI_14',
#     'SMA_21'
#     # 'Open', 'High', 'Low', 'Volume', 'SMA_21'
# ]

# sequence_length = 30
# best_mae = float('inf')
# best_features = []
# results = []

# # Brute-force loop
# for subset_size in range(4, 9):  # Try 4 to 8 feature combinations
#     for subset in tqdm(list(itertools.combinations(all_features, subset_size))):
#         try:
#             data = df[list(subset)].copy()
#             target_col = "High"
#             if target_col not in subset:
#                 continue

#             split_idx = int(len(data) * 0.8)
#             train_data = data.iloc[:split_idx]
#             test_data = data.iloc[split_idx:]

#             scaler = MinMaxScaler()
#             train_scaled = scaler.fit_transform(train_data)
#             test_scaled = scaler.transform(test_data)
#             target_idx = list(subset).index(target_col)

#             def build_seq(scaled):
#                 X, y = [], []
#                 for i in range(sequence_length, len(scaled)):
#                     X.append(scaled[i-sequence_length:i])
#                     y.append(scaled[i, target_idx])
#                 return np.array(X), np.array(y)

#             X_train, y_train = build_seq(train_scaled)
#             X_test, y_test = build_seq(test_scaled)

#             # Quick LSTM
#             model = Sequential([
#                 LSTM(32, return_sequences=False, input_shape=(X_train.shape[1], X_train.shape[2])),
#                 Dropout(0.2),
#                 Dense(1)
#             ])
#             model.compile(optimizer="adam", loss="mae")
#             model.fit(X_train, y_train, epochs=5, batch_size=32, verbose=0)

#             preds = model.predict(X_test)
#             high_mean = scaler.data_min_[target_idx]
#             high_range = scaler.data_max_[target_idx] - scaler.data_min_[target_idx]
#             preds = preds * high_range + high_mean
#             y_true = y_test * high_range + high_mean

#             mae = mean_absolute_error(y_true, preds)
#             results.append((subset, mae))

#             if mae < best_mae:
#                 best_mae = mae
#                 best_features = subset
#                 print(f"🔥 New Best MAE: ${mae:.2f} using {subset}")

#         except Exception as e:
#             print(f"⚠️ Skipped {subset}: {e}")
#             continue

# # Final output
# print(f"\n✅ Best feature set: {best_features}")
# print(f"📏 Best MAE: ${best_mae:.2f}")
