<a href="https://colab.research.google.com/github/Price-Jack/bitcoin-digital-twin/blob/main/notebooks/bitcoin_digital_twin.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.decomposition import PCA
from sklearn.metrics import (
    mean_absolute_error, mean_squared_error, r2_score,
    accuracy_score, precision_score, recall_score, f1_score,
    confusion_matrix, ConfusionMatrixDisplay
)
from scipy.stats import ttest_ind
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
import yfinance as yf

plt.style.use("seaborn-v0_8")

# project settings
TICKER = "BTC-USD"
PERIOD = "60d"
INTERVAL = "5m"
LOOKBACK = 60
HORIZON = 3


In [None]:
def download_data(ticker=TICKER, period=PERIOD, interval=INTERVAL):
    # pull stock data from yfinance
    df = yf.download(
        ticker,
        period=period,
        interval=interval,
        auto_adjust=False,
        progress=False
    )

    # check that all the price columns are there
    expected = ["Open", "High", "Low", "Close", "Adj Close", "Volume"]
    missing = [c for c in expected if c not in df.columns]
    if missing:
        raise ValueError(f"Missing columns: {missing}")

   # drop rows with missing values
    df = df.dropna()
    return df

df_raw = download_data()
print("Raw data shape:", df_raw.shape)
df_raw.head()

In [None]:
# calculate relative strength index
def rsi(series, window=14):
    delta = series.diff()
    gain = delta.clip(lower=0)
    loss = -delta.clip(upper=0)
    avg_gain = gain.rolling(window=window).mean()
    avg_loss = loss.rolling(window=window).mean()
    rs = avg_gain / (avg_loss + 1e-8)
    rsi = 100 - (100 / (1 + rs))
    return rsi


def build_features(df, horizon = 1):
    # create a copy
    data = df.copy()

    # if yfinance does multiindex, flatten it
    if isinstance(data.columns, pd.MultiIndex):
        data.columns = data.columns.get_level_values(0)

    # convert close to a series if it's somehow a dataframe
    if isinstance(data["Close"], pd.DataFrame):
        data["Close"] = data["Close"].iloc[:, 0]

    # basic 1-day return
    data["return_1"] = data["Close"].pct_change()

    # target variables for prediction
    data["future_close"] = data["Close"].shift(-horizon)
    data["future_return"] = (data["future_close"] - data["Close"]) / data["Close"]
    data["future_up"] = (data["future_return"] > 0).astype(int)

    # moving averages
    data["ma5"] = data["Close"].rolling(window=5).mean()
    data["ma10"] = data["Close"].rolling(window=10).mean()
    data["ma20"] = data["Close"].rolling(window=20).mean()

    # price volatility
    data["vol10"] = data["return_1"].rolling(window=10).std()
    data["vol20"] = data["return_1"].rolling(window=20).std()

    # volume metrics
    data["vol_mean_10"] = data["Volume"].rolling(window=10).mean()
    data["vol_mean_20"] = data["Volume"].rolling(window=20).mean()

    # momentum indicators
    data["mom5"] = data["return_1"].rolling(window=5).mean()
    data["mom10"] = data["return_1"].rolling(window=10).mean()

    # relative strength index
    data["rsi14"] = rsi(data["Close"], window=14)

    # MACD and its components
    ema_12 = data["Close"].ewm(span=12, adjust=False).mean()
    ema_26 = data["Close"].ewm(span=26, adjust=False).mean()
    data["MACD"] = ema_12 - ema_26
    data["MACD_signal"] = data["MACD"].ewm(span=9, adjust=False).mean()
    data["MACD_hist"] = data["MACD"] - data["MACD_signal"]

    # bollinger band width
    sma_20 = data["Close"].rolling(window=20).mean()
    std_20 = data["Close"].rolling(window=20).std()
    upper_bb = sma_20 + 2 * std_20
    lower_bb = sma_20 - 2 * std_20
    data["BB_width"] = (upper_bb - lower_bb) / (sma_20 + 1e-8)

    data = data.dropna()
    feature_cols = [
        "Close", "return_1",
        "ma5", "ma10", "ma20",
        "vol10", "vol20",
        "vol_mean_10", "vol_mean_20",
        "mom5", "mom10",
        "rsi14",
        "MACD", "MACD_signal", "MACD_hist",
        "BB_width",
    ]
    x = data[feature_cols].values
    y_reg = data["future_return"].values
    y_class = data["future_up"].values

    data.to_csv("btc_data.csv")

    return x, y_reg, y_class, data


x, y_reg, y_class, df_feat = build_features(df_raw, horizon=HORIZON)

# print("Feature matrix shape:", x.shape)
# print("Regression target shape:", y_reg.shape)
# print("Classification target shape:", y_class.shape)

df_feat.head()

In [None]:
try:
    from google.colab import files
    files.download("btc_data.csv")
except Exception:
    pass


In [None]:
plt.figure(figsize=(10, 4))
plt.plot(df_feat.index, df_feat["Close"])
plt.title(f"{TICKER} Close Price (5-min groupings)")
plt.xlabel("Time")
plt.ylabel("Price (USD)")
plt.grid(True)
plt.show()


In [None]:
n = len(df_feat)
train_size = int(n * 0.7)

# split to avoid lookahead
x_train = x[:train_size]
x_test = x[train_size:]
y_return_train = y_reg[:train_size]
y_return_test = y_reg[train_size:]
y_direction_train = y_class[:train_size]
y_direction_test = y_class[train_size:]

# normalize features
scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
x_test_scaled = scaler.transform(x_test)

print("Train:", x_train.shape, "| Test:", x_test.shape)

# train a linear regression model
return_model = LinearRegression()
return_model.fit(x_train_scaled, y_return_train)
y_return_pred = return_model.predict(x_test_scaled)

mae = mean_absolute_error(y_return_test, y_return_pred)
rmse = np.sqrt(mean_squared_error(y_return_test, y_return_pred))
r2 = r2_score(y_return_test, y_return_pred)

print("\nRegression Performance:")
print(f"MAE:  {mae:.6f}")
print(f"RMSE: {rmse:.6f}")
print(f"R²:   {r2:.4f}")

# train logistic regression for direction prediction
direction_model = LogisticRegression(max_iter=2000)
direction_model.fit(x_train_scaled, y_direction_train)
y_direction_pred = direction_model.predict(x_test_scaled)

accuracy = accuracy_score(y_direction_test, y_direction_pred)
precision = precision_score(y_direction_test, y_direction_pred)
recall = recall_score(y_direction_test, y_direction_pred)
f1 = f1_score(y_direction_test, y_direction_pred)

print("\nLogistic Regression Classification Performance:")
print(f"Accuracy:  {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1 Score:  {f1:.4f}")

In [None]:
# reduce to 2D for plotting
pca = PCA(n_components=2)
x_reduced = pca.fit_transform(x_train_scaled)

# see how much info is lost
# print("Explained variance:", pca.explained_variance_ratio_)
# print("Total explained variance:", pca.explained_variance_ratio_.sum())

# plot the projection by whether price goes up or down
plt.figure(figsize=(6, 5))
plt.scatter(x_reduced[:, 0], x_reduced[:, 1], c=y_direction_train, cmap="coolwarm", alpha=0.4)
plt.title("PCA Projection")
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.colorbar(label="direction")
plt.grid(True)
plt.show()

In [None]:
# scale all data
x_all_scaled = scaler.transform(x)

def make_seqs(x_arr, y_arr, lookback=LOOKBACK):
    seq_x, seq_y = [], []
    for i in range(lookback, len(x_arr)):
        seq_x.append(x_arr[i-lookback:i])
        seq_y.append(y_arr[i])
    return np.array(seq_x), np.array(seq_y)

x_class_seq, y_class_seq = make_seqs(x_all_scaled, y_class)

print("Sequenced features shape:", x_class_seq.shape)
print("Sequenced targets shape:", y_class_seq.shape)

n_seq = len(x_class_seq)
train_size_seq = int(n_seq * 0.7)

# train/test split for LSTM
x_class_train = x_class_seq[:train_size_seq]
x_class_test = x_class_seq[train_size_seq:]
y_class_train_seq = y_class_seq[:train_size_seq]
y_class_test_seq = y_class_seq[train_size_seq:]

print("Train sequences:", x_class_train.shape, "& Test sequences:", x_class_test.shape)


In [None]:
from sklearn.utils import class_weight

tf.keras.backend.clear_session()

# compute class weights
cls_weights = class_weight.compute_class_weight(
    class_weight="balanced",
    classes=np.unique(y_class_train_seq),
    y=y_class_train_seq
)
class_weights = {0: cls_weights[0], 1: cls_weights[1]}
# print("Class weights used for LSTM:", class_weights)

# LSTM model
model_class_lstm = Sequential([
    LSTM(64, return_sequences=True, input_shape=(LOOKBACK, x_all_scaled.shape[1])),
    Dropout(0.3),
    LSTM(32),
    Dropout(0.2),
    Dense(32, activation="relu"),
    Dense(1, activation="sigmoid")
])

model_class_lstm.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss="binary_crossentropy",
    metrics=["accuracy"]
)

early_stop = EarlyStopping(
    monitor="val_loss",
    patience=6,
    restore_best_weights=True
)

history = model_class_lstm.fit(
    x_class_train,
    y_class_train_seq,
    validation_split=0.2,
    epochs=50,
    batch_size=64,
    callbacks=[early_stop],
    class_weight=class_weights,
    verbose=1
)


In [None]:
# get predictions from the lstm (probabilities that price goes up)
y_proba = model_class_lstm.predict(x_class_test).flatten()
# make sure both arrays are the same length
n_aligned = min(len(y_class_test_seq), len(y_proba))

# trim to matching size
y_true = y_class_test_seq[:n_aligned]
y_pred_proba = y_proba[:n_aligned]

print("Aligned shapes -> y_true:", y_true.shape, "| y_pred_proba:", y_pred_proba.shape)


In [None]:
# test different cutoff points to see what gives the best predictions
thresholds = np.arange(0.40, 0.71, 0.01)

best_threshold = 0.5
best_f1 = 0.0

# find threshold that maximizes f1
for threshold in thresholds:
    predictions = (y_pred_proba > threshold).astype(int)
    f1 = f1_score(y_true, predictions)
    if f1 > best_f1:
        best_f1 = f1
        best_threshold = threshold

print(f"Best threshold for F1: {best_threshold:.2f} (F1 = {best_f1:.4f})")

In [None]:
# apply the best threshold we found
y_pred = (y_pred_proba > best_threshold).astype(int)

# calculate how well the lstm performed
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

print(f"LSTM Classification Performance:")
print(f"Accuracy:  {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1 Score:  {f1:.4f}")

In [None]:
# create confusion matrix
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0, 1])

# print confusion matrix
plt.figure(figsize=(5, 5))
disp.plot(cmap="Blues", values_format="d")
plt.title("Confusion Matrix – LSTM (direction prediction)")
plt.xlabel("Predicted label")
plt.ylabel("True label")
plt.show()

In [None]:
# figure out which dates in our dataframe match the test predictions
n_seq = len(x_class_seq)
train_size_seq = int(n_seq * 0.7)
test_start_idx = LOOKBACK + train_size_seq

# grab the actual prices for those test dates
test_len = len(y_pred_proba)
prices = df_feat["Close"].iloc[test_start_idx : test_start_idx + test_len]
returns = prices.pct_change().fillna(0).values

# set up two confidence levels for trading decisions
weak_signal = best_threshold
strong_signal = min(best_threshold + 0.10, 0.95)

print(f"Weak buy threshold:   {weak_signal:.2f}")
print(f"Strong buy threshold: {strong_signal:.2f}")

# decide how much to invest based on model confidence
positions = np.zeros_like(y_pred_proba, dtype=float)
current_position = 0.0

for i, confidence in enumerate(y_pred_proba):
    if confidence >= strong_signal:
        current_position = 1.0
    elif confidence >= weak_signal:
        current_position = 0.5
    else:
        current_position = 0.0
    positions[i] = current_position

# shift positions forward a day (cant trade on todays prediction until tomorrow)
positions_shifted = np.roll(positions, 1)
positions_shifted[0] = 0.0

# calculate returns for both
strategy_returns = positions_shifted * returns
baseline_returns = returns

# turn returns into cumulative growth
strategy_equity = (1 + strategy_returns).cumprod()
baseline_equity = (1 + baseline_returns).cumprod()

# keep the dates attached to data
strategy_equity = pd.Series(strategy_equity, index=prices.index, name="LSTM Strategy")
baseline_equity = pd.Series(baseline_equity, index=prices.index, name="Buy & Hold")

# compare the two approaches
plt.figure(figsize=(10, 5))
plt.plot(strategy_equity, label="LSTM Strategy (confidence-weighted)")
plt.plot(baseline_equity, label="Buy & Hold")
plt.title(f"Cumulative Returns: LSTM Strategy vs Buy & Hold (threshold={best_threshold:.2f})")
plt.ylabel("Growth of $1")
plt.xlabel("Time")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# compare the two strategies
strategy_avg = strategy_returns.mean()
strategy_vol = strategy_returns.std()
baseline_avg = baseline_returns.mean()
baseline_vol = baseline_returns.std()

print(f"Strategy avg:  {strategy_avg:.6f}, volatility: {strategy_vol:.6f}")
print(f"Baseline avg:  {baseline_avg:.6f}, volatility: {baseline_vol:.6f}")

# check if the difference is statistically significant
t_stat, p_value = ttest_ind(strategy_returns, baseline_returns, equal_var=False)

print(f"t-statistic: {t_stat:.4f}")
print(f"p-value:     {p_value:.6f}")