# APS1052 Final Project - LSTM Bitcoin

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Data
The data set is already downloaded for us from the Glassnode

In [None]:
# Load the dataset
file_path = "data/btc_dataset.csv"  # Replace with your dataset path
df = pd.read_csv(file_path, parse_dates=['Date'], index_col='Date')

# Preview the data
df.head()


Check if there is any missing value.

In [None]:
# Check for missing values in the dataset
missing_values = df.isnull().sum()

# Filter columns with missing values
missing_columns = missing_values[missing_values > 0]

# Display the result
if missing_columns.empty:
    print("No missing values in the dataset.")
else:
    print("Missing values in each column:")
    print(missing_columns)

    # Visualize missing data
    plt.figure(figsize=(12, 6))
    missing_columns.plot(kind='bar', title="Count of Missing Values by Column")
    plt.xlabel("Columns")
    plt.ylabel("Number of Missing Values")
    plt.show()


# Core

## Loading Data

### Feature Engineering

#### 1. Realized Volatility (14-day Rolling)
- **How to Derive**: Compute the rolling standard deviation of the "Closing Price (USD)" log returns over the past 14 days.
- **Implementation**:
    $$
    \text{Realized Volatility} = \sqrt{\frac{1}{N} \sum_{i=1}^{N} (\log(P_t/P_{t-1}))^2}
    $$
- **Relevant Columns**: `"Closing Price (USD)"`

In [None]:
# Calculate log returns
df['log_return'] = np.log(df['Closing Price (USD)'] / df['Closing Price (USD)'].shift(1))

# Calculate realized volatility (14-day rolling standard deviation)
df['realized_volatility'] = df['log_return'].rolling(window=14).std()

# Drop the temporary column
df.drop(columns=['log_return'], inplace=True)

# Visualize the feature
plt.figure(figsize=(10, 6))
df['realized_volatility'].plot(title="Realized Volatility (14-day Rolling)")
plt.show()


In [None]:
# Calculate log returns
df['log_return'] = np.log(df['Closing Price (USD)'] / df['Closing Price (USD)'].shift(1))

# Calculate realized volatility (14-day rolling standard deviation)
df['realized_volatility'] = df['log_return'].rolling(window=14).std()

# Drop the temporary column
df.drop(columns=['log_return'], inplace=True)

# Visualize the feature
plt.figure(figsize=(10, 6))
df['realized_volatility'].plot(title="Realized Volatility (14-day Rolling)")
plt.show()


#### 2. Exchange Netflow Proxy
- **How to Derive**: Use `"active_addresses"` as a proxy for activity. Calculate the daily change in `"active_addresses"` or the ratio of `"active_addresses"` to `"total_addresses"`. This can act as a proxy for inflow/outflow activity.
- **Implementation**:
$$
     \text{Netflow Proxy} = \frac{\text{active\_addresses}}{\text{total\_addresses}}
     $$
- **Relevant Columns**: `"active_addresses"`, `"total_addresses"`


In [None]:
# Ratio of active addresses to total addresses as a proxy for netflow
df['exchange_netflow_proxy'] = df['active_addresses'] / df['total_addresses']

# Visualize the feature
plt.figure(figsize=(10, 6))
df['exchange_netflow_proxy'].plot(title="Exchange Netflow Proxy")
plt.show()


#### 3. Social Sentiment Proxy (Google Popularity Change)
- **How to Derive**: Use the percentage change or rolling average of `"Google_popularity"` to capture shifts in public interest over time.
- **Implementation**:
     $$
     \text{Google Sentiment} = \frac{\text{Google\_popularity}_t - \text{Google\_popularity}_{t-1}}{\text{Google\_popularity}_{t-1}}
     $$
- **Relevant Columns**: `"Google_popularity"`

In [None]:
# Percentage change in Google Popularity
df['google_sentiment_proxy'] = df['Google_popularity'].pct_change()

# Visualize the feature
plt.figure(figsize=(10, 6))
df['google_sentiment_proxy'].plot(title="Google Sentiment Proxy")
plt.show()


#### 4. Funding Rates Proxy (Mining Cost Ratio)
   - **How to Derive**: Calculate the ratio of `"hash_rate"` to `"difficulty"`, which can act as a proxy for mining costs and potential supply-side pressures.
   - **Implementation**:
     $$
     \text{Mining Cost Ratio} = \frac{\text{hash\_rate}}{\text{difficulty}}
     $$
   - **Relevant Columns**: `"hash_rate"`, `"difficulty"`


In [None]:
# Convert columns to numeric
df['hash_rate'] = pd.to_numeric(df['hash_rate'], errors='coerce')
df['difficulty'] = pd.to_numeric(df['difficulty'], errors='coerce')

# Mining cost ratio as a proxy for funding rates
df['funding_rate_proxy'] = df['hash_rate'] / df['difficulty']

# Visualize the feature
plt.figure(figsize=(10, 6))
df['funding_rate_proxy'].plot(title="Funding Rates Proxy")
plt.show()


#### 5. Macro Interaction Proxy (GLD-SPY Interaction)
   - **How to Derive**: Create interaction terms between `"GLD"` (gold prices) and `"SPY"` (S&P 500 index) to capture broader macroeconomic influences. This could be the product or difference between the two.
   - **Implementation**:
     $$
     \text{GLD-SPY Interaction} = \text{GLD} \times \text{SPY}
     $$
   - **Relevant Columns**: `"GLD"`, `"SPY"`

Check if there is any missing value.

In [None]:
# Interaction between GLD and SPY
df['macro_interaction_proxy'] = df['GLD'] * df['SPY']

# Visualize the feature
plt.figure(figsize=(10, 6))
df['macro_interaction_proxy'].plot(title="Macro Interaction Proxy (GLD * SPY)")
plt.show()


In [None]:
# Check for missing values in the dataset
missing_values = df.isnull().sum()

# Filter columns with missing values
missing_columns = missing_values[missing_values > 0]

# Display the result
if missing_columns.empty:
    print("No missing values in the dataset.")
else:
    print("Missing values in each column:")
    print(missing_columns)

    # Visualize missing data
    plt.figure(figsize=(12, 6))
    missing_columns.plot(kind='bar', title="Count of Missing Values by Column")
    plt.xlabel("Columns")
    plt.ylabel("Number of Missing Values")
    plt.show()


This caused by the way we calculate the two features.
We have two different options:
1. `NaN`
2. `0`

Which Option to Choose?
- **For Long-Term Models or Sparse Data:** Use **Option 1** (`NaN`) and drop rows with missing values. This maintains the integrity of the calculations.
- **For Time Series Models with Long Sequences:** Use **Option 2** (fill with `0`) to avoid breaking sequence continuity.

Since we are working with LSTMs and sequences, Option 2 (setting to `0`) is more practical:
- It avoids disrupting sequences, which is critical for time series models.
- The missing data period (**14** days for volatility, **1** day for sentiment) is short, so the impact of filling with `0` is minimal.


In [None]:
df['realized_volatility'] = df['realized_volatility'].fillna(0)
df['google_sentiment_proxy'] = df['google_sentiment_proxy'].fillna(0)

In [None]:
# Save the current DataFrame to a CSV file
file_path = "data/BTC_data.csv"
df.to_csv(file_path, index=True)

### Dataset Preparation

#### Dataset Split

In [None]:
# Define split ratios
train_ratio = 0.64  # 64% for training (80% of Train + Validation)
validation_ratio = 0.16  # 16% for validation (20% of Train + Validation)
test_ratio = 0.20  # 20% for testing

# Calculate indices for splits
train_end_index = int(len(df) * train_ratio)
validation_end_index = int(len(df) * (train_ratio + validation_ratio))

# Sequential split
train_data = df.iloc[:train_end_index]
validation_data = df.iloc[train_end_index:validation_end_index]
test_data = df.iloc[validation_end_index:]

# Print the shapes
print(f"Training data shape: {train_data.shape}")
print(f"Validation data shape: {validation_data.shape}")
print(f"Testing data shape: {test_data.shape}")


#### Data Normalization

In [None]:
# Function to normalize sequences for training (with leading zero handling)
def normalize_sequences_train(data, columns_with_leading_zeros):
    normalized_data = []
    for i in range(len(data) - lookback_window):
        sequence = data.iloc[i:i + lookback_window].copy()  # Copy the sequence
        base = sequence.iloc[0].copy()  # Explicitly copy the first row as baseline

        # For problematic columns, find the first non-zero value
        for col in columns_with_leading_zeros:
            non_zero_index = sequence[col].ne(0).idxmax()  # Find the index of the first non-zero value
            if pd.notnull(non_zero_index):  # Ensure the index exists
                base[col] = sequence.loc[non_zero_index, col]  # Set the first non-zero value as the baseline

        # Normalize the sequence
        normalized_sequence = (sequence / base) - 1
        normalized_data.append(normalized_sequence.values)

    return np.array(normalized_data)

# Function to normalize sequences for validation and test (no leading zero handling needed)
def normalize_sequences(data):
    normalized_data = []
    for i in range(len(data) - lookback_window):
        sequence = data.iloc[i:i + lookback_window].copy()
        base = sequence.iloc[0].copy()
        normalized_sequence = (sequence / base) - 1
        normalized_data.append(normalized_sequence.values)

    return np.array(normalized_data)

# Normalize training data with leading zero handling
x_train = normalize_sequences_train(train_data, leading_zeros)

# Normalize validation and test data without leading zero handling
x_validation = normalize_sequences(validation_data)
x_test = normalize_sequences(test_data)

# Print the shapes of normalized data
print(f"Normalized training data shape: {x_train.shape}")
print(f"Normalized validation data shape: {x_validation.shape}")
print(f"Normalized testing data shape: {x_test.shape}")


Check any `NaN`, `inf` or Anomalous Values in the normalized dataset

In [None]:
# Identify problematic sequences
for i, seq in enumerate(x_train):
    if np.isnan(seq).any():
        print(f"NaN detected in sequence index {i}")
        print(seq)  # Inspect the sequence


In [None]:
# Debug a single sequence normalization
sequence = train_data.iloc[0:lookback_window].copy()
base = sequence.iloc[0]  # Baseline
print("Base values (used for normalization):")
print(base)

# Check if any baseline value is NaN
if base.isnull().any():
    print("NaN detected in baseline!")

# Perform normalization
normalized_sequence = (sequence / base) - 1
print("Normalized sequence:")
print(normalized_sequence)


In [None]:
# Function to validate normalized data
def validate_normalized_data(normalized_data):
    # Check for NaN values
    nan_count = np.isnan(normalized_data).sum()
    if nan_count > 0:
        print(f"Warning: Found {nan_count} NaN values in the normalized data.")
        normalized_data[np.isnan(normalized_data)] = 0  # Replace NaN with 0

    # Check for Inf values
    inf_count = np.isinf(normalized_data).sum()
    if inf_count > 0:
        print(f"Warning: Found {inf_count} Inf values in the normalized data.")
        normalized_data[np.isinf(normalized_data)] = 0  # Replace Inf with 0

    # Check for negative or unexpected values (optional, based on your use case)
    if (normalized_data < -1).sum() > 0:
        print("Warning: Found values less than -1. Review the normalization process.")

    print("Validation complete. Normalized data is clean.")
    return normalized_data


In [None]:
# Validate the normalized datasets
x_train = validate_normalized_data(x_train)
x_validation = validate_normalized_data(x_validation)
x_test = validate_normalized_data(x_test)
