### Dataset preparation

In [None]:
import pandas as pd

sentiment = pd.read_csv("/content/btc_sentiment_ohlcv (1).csv")
market = pd.read_csv("market_data.csv")

In [None]:
sentiment

In [None]:
market

In [None]:
# Deletting unececssary columns and set date column as an index
sentiment.drop("Unnamed: 0", axis=1, inplace=True)
sentiment["date"] = pd.to_datetime(sentiment["date"])
sentiment.set_index("date", inplace=True)

market.rename(columns={"Unnamed: 0" : "date"}, inplace=True)
market["date"] = pd.to_datetime(market["date"])
market.set_index("date", inplace=True)

In [None]:
market.info()

In [None]:
sentiment.info()

Technical indicators

In [None]:
!pip install ta

In [None]:
import pandas as pd
from ta.trend import EMAIndicator, MACD, ADXIndicator, PSARIndicator
from ta.volatility import BollingerBands, AverageTrueRange, KeltnerChannel
from ta.volume import OnBalanceVolumeIndicator, ChaikinMoneyFlowIndicator, AccDistIndexIndicator


def add_technical_indicators(df):
    # Trend Indicators
    df['ema_5'] = EMAIndicator(close=df['close'], window=5).ema_indicator()
    df['ema_20'] = EMAIndicator(close=df['close'], window=20).ema_indicator()
    df['ema_50'] = EMAIndicator(close=df['close'], window=50).ema_indicator()

    macd = MACD(close=df['close'])
    df['macd'] = macd.macd()
    df['signal'] = macd.macd_signal()
    df['histogram'] = macd.macd_diff()

    # Volatility Indicators
    bb = BollingerBands(close=df['close'], window=20, window_dev=2)
    df['bb_upper'] = bb.bollinger_hband()
    df['bb_lower'] = bb.bollinger_lband()
    df['bb_middle'] = bb.bollinger_mavg()

    atr = AverageTrueRange(high=df['high'], low=df['low'], close=df['close'], window=14)
    df['ATR'] = atr.average_true_range()

    # Donchian Channels (manual)
    df['donchian_upper'] = df['high'].rolling(window=20).max()
    df['donchian_lower'] = df['low'].rolling(window=20).min()
    df['donchian_middle'] = (df['donchian_upper'] + df['donchian_lower']) / 2

    # Volume Indicators
    df['obv'] = OnBalanceVolumeIndicator(close=df['close'], volume=df['volume']).on_balance_volume()

    df['vwap'] = (df['close'] * df['volume']).cumsum() / df['volume'].cumsum()

    return df

In [None]:
df = add_technical_indicators(sentiment)
df.head()

In [None]:
df.dropna(inplace=True)

In [None]:
df.info()

In [None]:
# Adding days of the week as number
df['day_number'] = df.index.weekday
df["month"] = df.index.month

In [None]:
df = df.join(market, how="left")
df

In [None]:
# Adding target variable
df['target'] = df['close'].diff().apply(lambda x: 1 if x > 0 else 0)
df["target"] = df["target"].shift(-1)

In [None]:
df.dropna(inplace=True)
df

In [None]:
df.describe()

Profiling report for EDA

In [None]:
!pip install ydata-profiling

In [None]:
"""from ydata_profiling import ProfileReport

profile = ProfileReport(df, title="Mulitmodal dataset for bitcoin direction prediciton", explorative=True)

# Save the report to an HTML file
profile.to_file("multimodal_report.html")
"""

In [None]:
df.to_csv("multimodal_dataset.csv")

### Scaling and Normalization

**Creating different version of datasets**
- Scaled dataset
- Normalized dataset
- Log transformation dataset
- Log and Normalized dataset

In [None]:
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import numpy as np

target_feature = 'target'  # Replace 'target' with the actual target column name

# Separate features and target
X = df.drop(columns=[target_feature])
y = df[target_feature]

# Initialize StandardScaler and MinMaXScaler
standard_scaler = StandardScaler()
minmax_scaler = MinMaxScaler()

features_log = X.copy()
for column in features_log.columns:
    # Applying log transformation. Adjust the shift if necessary.
    features_log[column] = np.log(features_log[column] + 1)

# Log transformed dataset
df_log_transformed = features_log.copy()
df_log_transformed[target_feature] = y

# Log normalized dataset
log_normalized_features = minmax_scaler.fit_transform(features_log)
log_norm_features_df = pd.DataFrame(log_normalized_features, index=features_log.index, columns=features_log.columns)
df_log_norm = log_norm_features_df.copy()
df_log_norm[target_feature] = y

# Fit the scaler to the features and transform them
scaled_features = standard_scaler.fit_transform(X)
normalized_features = minmax_scaler.fit_transform(X)

# Convert the scaled features back to a DataFrame
scaled_features_df = pd.DataFrame(scaled_features, index=X.index, columns=X.columns)
norm_features_df = pd.DataFrame(normalized_features, index=X.index, columns=X.columns)


# Reattach the target feature back to the DataFrame
df_scaled = scaled_features_df.copy()
df_scaled[target_feature] = y

df_norm = norm_features_df.copy()
df_norm[target_feature] = y

Datasets version collection

In [None]:
dfs = {
    "scaled": df_scaled,
    "normalized": df_norm,
    "log_transformed": df_log_transformed,
    "log_normalized": df_log_norm
}

Saving transformed datasets

In [None]:
for df_name, df in dfs.items():
    df.to_csv(f"{df_name}_dataset.csv", index=False)

### LSTM training and evaluation

Function to create sequence according to the window size

In [None]:
import numpy as np

def create_sequences(data, feature_cols, target_col, window_size=60):
    X, y = [], []

    for i in range(len(data) - window_size):
        # Range of data for this window
        seq_x = data[feature_cols].iloc[i:i+window_size].values
        # Target is the "day after the window"
        seq_y = data[target_col].iloc[i+window_size]

        X.append(seq_x)
        y.append(seq_y)

    return np.array(X), np.array(y)

Function to split data to train and test

In [None]:
def split(df):
  train_size = int(len(df) * 0.8)
  train_data = df[:train_size]
  test_data = df[train_size:]

  return train_data, test_data

Function to create a model

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Dropout

def create_model(X_train):
  model = Sequential()

  # Model layers
  model.add(LSTM(units=128, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
  model.add(LSTM(units=64, return_sequences=True))
  model.add(LSTM(units=32, return_sequences=False))
  model.add(Dense(units=50, activation="relu"))
  model.add(Dense(units=30, activation="relu"))
  model.add(Dense(units=30, activation="relu"))
  model.add(Dense(units=1, activation="sigmoid"))

  model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
  return model

In [None]:
# Plot model to see visual architecture
from tensorflow.keras.utils import plot_model
feature_cols = [col for col in dfs["log_normalized"].columns if col != "target"]
target_col = "target"
X_train = create_sequences(dfs["log_normalized"], feature_cols, target_col, window_size=60)[0]
model = create_model(X_train)
plot_model(model, to_file='model.png', show_shapes=True, show_layer_names=True)

Main training loop

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

def training_loop(df, window_sizes, batch_sizes):

  # Extracting features and target column
  feature_cols = [col for col in df.columns if col != "target"]
  target_col = "target"

  # Creating df to store results for each dataset
  result = pd.DataFrame(columns=["model", "window_size", "batch_size", "train_accuracy", "val_accuracy", "X_test", "y_test"])

  # Split dataframe into train and test
  train_data, test_data = split(df)

  for window_size in window_sizes:
    for batch_size in batch_sizes:

      # Creation of the sequences according to window size
      X_train, y_train = create_sequences(
          data=train_data,
          feature_cols=feature_cols,
          target_col=target_col,
          window_size=window_size
      )
      X_test, y_test = create_sequences(
          data=test_data,
          feature_cols=feature_cols,
          target_col=target_col,
          window_size=window_size
      )

      # Early stopping in case if validation loss doesn't change in 15 epochs
      early_stop = EarlyStopping(
        monitor='val_loss',
        patience=15,
        restore_best_weights=True  # Restore model weights from the best epoch
      )


      # Model creation with different split data
      model = create_model(X_train)

      # Training configuration with different batch sizes
      history = model.fit(
          X_train, y_train,
          epochs=30,
          batch_size=batch_size,
          shuffle=False,
          validation_split=0.1,
          callbacks=[early_stop]
      )

      # Saving results
      train_acc = max(history.history["accuracy"])
      val_acc   = max(history.history["val_accuracy"])

      row_dict = {
              "model": model,
              "window_size": window_size,
              "batch_size": batch_size,
              "train_accuracy": train_acc,
              "val_accuracy": val_acc,
              "X_test" : X_test,
              "y_test" : y_test
          }

      # Creating row dataframe
      temp_df = pd.DataFrame([row_dict])

      # Concatenate with the main results DataFrame
      result = pd.concat([result, temp_df], ignore_index=True)

  return result

In [None]:
# Intial set up
window_sizes = list(range(10, 101, 10))
batch_sizes = [64, 128]

In [None]:
columns_to_drop = ["macd", "signal", "histogram"]
dfs_to_drop = ["log_transformed", "log_normalized"]

for df in dfs_to_drop:
  dfs[df].drop(columns=columns_to_drop, inplace=True)

In [None]:
dfs["log_normalized"].dropna(inplace=True)
dfs["log_transformed"].dropna(inplace=True)

Evalutaion function

In [None]:
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.metrics import accuracy_score

def compute_accuracy(model, X_test, y_test):

    # Predicts probability
    y_prob = model.predict(X_test)  # shape: (num_samples, 1)

    # If you need actual class labels (0 or 1):
    y_pred = (y_prob >= 0.5).astype(int)
    # Calculate accuracy
    return accuracy_score(y_test, y_pred)

def compute_best_results(results):

  results["final_accuracy"] = None
  for idx, row in results.iterrows():
      # Grab the trained model from the row
      model = row['model']

      X_test = results["X_test"][idx]
      y_test = results["y_test"][idx]
      # Compute accuracy
      acc = compute_accuracy(model, X_test, y_test)

      # Save to 'final_accuracy' column
      results.at[idx, 'final_accuracy'] = acc

  results_highest = results.sort_values(by="final_accuracy", ascending=False)
  return results_highest.iloc[0]

#### Log transformed data

In [None]:
result_log = training_loop(dfs["log_transformed"], window_sizes, batch_sizes)

In [None]:
result_log_test = compute_best_results(result_log)
result_log_test

###Log transform with normalization

In [None]:
result_log_normalized = training_loop(dfs["log_normalized"], window_sizes, batch_sizes)
result_log_normalized_test = compute_best_results(result_log_normalized)
result_log_normalized_test

### Min max normalization

In [None]:
result_normalized = training_loop(dfs["normalized"], window_sizes, batch_sizes)
result_normalized_test = compute_best_results(result_normalized)
result_normalized_test

### Standard scaling

In [None]:
result_scaled = training_loop(dfs["scaled"], window_sizes, batch_sizes)
result_scaled_test = compute_best_results(result_scaled)
result_scaled_test

# XGBoost

Model creation and training functiion

In [None]:
# Import necessary libraries
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import xgboost as xgb

def xgboost_model_training(df):

  X = df.drop(columns=["target"])
  y = df["target"]

  # Split the data:
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=False)

  model = xgb.XGBClassifier(
      objective='binary:logistic',   # Use this objective if you have binary classes (e.g., up/down).
      eval_metric='logloss',         # Evaluation metric can be logloss, error, etc.
      use_label_encoder=False,       # Prevents deprecation warnings in newer versions of XGBoost.
      random_state=42
  )

  # Parameter grid for the grid search
  param_grid = {
      'max_depth': [3, 5, 7],
      'learning_rate': [0.01, 0.1, 0.2],
      'n_estimators': [100, 200, 300],
      'subsample': [0.8, 1.0]
  }

  grid_search = GridSearchCV(model, param_grid, scoring='accuracy', cv=5, verbose=1, n_jobs=-1)
  grid_search.fit(X_train, y_train)

  # Retrieve the best model after grid search
  best_model = grid_search.best_estimator_
  return best_model, X_test, y_test


Evaluation function

In [None]:
def evaluate_xgboost_model(best_model, X_test, y_test):
  # Make predictions on the test set
  y_pred = best_model.predict(X_test)

  # Evaluate using accuracy and other metrics.
  acc = accuracy_score(y_test, y_pred)
  print("Test Accuracy:", acc)

  # Detailed classification report
  print("Classification Report:\n", classification_report(y_test, y_pred))

  # Confusion matrix to see the distribution of predictions.
  conf_mat = confusion_matrix(y_test, y_pred)
  print("Confusion Matrix:\n", conf_mat)

Results

In [None]:
for df_name, df in dfs.items():
  print(f"{df_name}")
  print(50*"-")
  df["month"] = df.index.month
  best_model, X_test, y_test = xgboost_model_training(df)
  evaluate_xgboost_model(best_model, X_test, y_test)
  print(50*"-")
  print()