# Bitcoin Price Prediction Using Deep Learning Techniques

## Part 4: CNN Models with Gramian Angular Field Representations

In this notebook, we explore Convolutional Neural Network (CNN) models for Bitcoin price prediction using Gramian Angular Field (GAF) image representations of time series data.

In [None]:
# Import Libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error

# Deep Learning Libraries
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.callbacks import EarlyStopping

# GAF Transformation
from pyts.image import GramianAngularField

# Visualization Settings
import matplotlib as mpl
mpl.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['axes.grid'] = True
plt.rcParams['axes.spines.top'] = False
plt.rcParams['axes.spines.right'] = False

# Suppress Warnings
import warnings
warnings.filterwarnings("ignore")

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

# Load prepared data
try:
    with open('btc_prepared_data.pkl', 'rb') as f:
        data_dict = pickle.load(f)
    
    # Extract data
    btc_data = data_dict['btc_data']
    optimal_d = data_dict['optimal_d']
    X_train_price = data_dict['X_train_price']
    X_test_price = data_dict['X_test_price']
    y_train_price = data_dict['y_train_price']
    y_test_price = data_dict['y_test_price']
    X_train_frac = data_dict['X_train_frac']
    X_test_frac = data_dict['X_test_frac']
    y_train_frac = data_dict['y_train_frac']
    y_test_frac = data_dict['y_test_frac']
    window_size = data_dict['window_size']
    
    print("Data loaded successfully.")
    print(f"Window size: {window_size}")
except FileNotFoundError:
    print("Prepared data not found. Please run Part 2 first.")

## Gramian Angular Field (GAF) Transformation

### Theoretical Background

The Gramian Angular Field (GAF) is a method to encode time series data as images, allowing us to leverage powerful CNN architectures for time series analysis. There are two types of GAF:

1. **Gramian Angular Summation Field (GASF)**: Encodes the temporal correlation between observations by the summation of trigonometric functions.
2. **Gramian Angular Difference Field (GADF)**: Encodes the temporal correlation by the difference of trigonometric functions.

The transformation involves these steps:
1. Normalize the time series to the range [-1, 1] or [0, 1]
2. Represent each value as an angle in the polar coordinate system
3. Compute the trigonometric sum/difference between each pair of angles

Let's implement this transformation for our Bitcoin price data.

In [None]:
def create_gaf_images(data, method='summation'):
    """
    Create Gramian Angular Field images from time series data.
    
    Parameters:
    -----------
    data : numpy.ndarray
        Time series data with shape (n_samples, n_timestamps)
    method : str
        'summation' for GASF or 'difference' for GADF
        
    Returns:
    --------
    numpy.ndarray
        GAF images with shape (n_samples, n_timestamps, n_timestamps, 1)
    """
    # Initialize GAF transformer
    gaf = GramianAngularField(method=method)
    
    # Transform each sample
    gaf_images = []
    for sample in data:
        # Reshape for pyts (expects shape [n_samples, n_timestamps])
        sample_reshaped = sample.reshape(1, -1)
        
        # Transform to GAF
        gaf_image = gaf.fit_transform(sample_reshaped)
        
        # Append to list
        gaf_images.append(gaf_image[0])
    
    # Convert to numpy array and add channel dimension for CNN
    gaf_images = np.array(gaf_images)[..., np.newaxis]
    
    return gaf_images

# Create GAF images for raw price data
X_train_price_gaf = create_gaf_images(X_train_price)
X_test_price_gaf = create_gaf_images(X_test_price)

# Create GAF images for fractionally differenced data
X_train_frac_gaf = create_gaf_images(X_train_frac)
X_test_frac_gaf = create_gaf_images(X_test_frac)

# Print shapes
print("GAF Image Shapes:")
print(f"X_train_price_gaf: {X_train_price_gaf.shape}")
print(f"X_test_price_gaf: {X_test_price_gaf.shape}")
print(f"X_train_frac_gaf: {X_train_frac_gaf.shape}")
print(f"X_test_frac_gaf: {X_test_frac_gaf.shape}")

## Visualize GAF Images

Let's visualize some GAF images to understand how time series data is represented in this format.

In [None]:
def plot_time_series_and_gaf(time_series, gaf_image, title=""):
    """
    Plot a time series and its corresponding GAF image side by side.
    
    Parameters:
    -----------
    time_series : numpy.ndarray
        Time series data
    gaf_image : numpy.ndarray
        GAF image representation
    title : str
        Title for the plot
    """
    fig, axes = plt.subplots(1, 2, figsize=(16, 6))
    
    # Plot time series
    axes[0].plot(time_series, color='#1f77b4')
    axes[0].set_title(f'{title} - Time Series', fontsize=14, fontweight='bold')
    axes[0].set_xlabel('Time Step')
    axes[0].set_ylabel('Value')
    axes[0].grid(True, alpha=0.3)
    
    # Plot GAF image
    im = axes[1].imshow(gaf_image[:, :, 0], cmap='viridis')
    axes[1].set_title(f'{title} - GAF Image', fontsize=14, fontweight='bold')
    axes[1].set_xlabel('Time Step')
    axes[1].set_ylabel('Time Step')
    fig.colorbar(im, ax=axes[1])
    
    plt.tight_layout()
    plt.show()

# Plot examples of time series and their GAF representations
# Raw price data
plot_time_series_and_gaf(
    X_train_price[0], 
    X_train_price_gaf[0], 
    "Bitcoin Price"
)

# Fractionally differenced data
plot_time_series_and_gaf(
    X_train_frac[0], 
    X_train_frac_gaf[0], 
    "Fractionally Differenced Bitcoin Price"
)

## CNN Model for GAF Images

Now, let's build a CNN model to process the GAF images for time series prediction.

In [None]:
def build_cnn_model(input_shape, learning_rate=0.001):
    """
    Build a CNN model for GAF image processing.
    
    Parameters:
    -----------
    input_shape : tuple
        Shape of the input GAF images
    learning_rate : float
        Learning rate for the optimizer
        
    Returns:
    --------
    tensorflow.keras.models.Sequential
        Compiled CNN model
    """
    model = Sequential([
        # Convolutional layers
        Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=input_shape),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2)),
        Conv2D(128, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2)),
        
        # Flatten and dense layers
        Flatten(),
        Dense(128, activation='relu'),
        Dropout(0.3),
        Dense(64, activation='relu'),
        Dropout(0.3),
        Dense(1)
    ])
    
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
        loss='mean_squared_error'
    )
    
    return model

# Get input shape from GAF images
input_shape = X_train_price_gaf.shape[1:]

# Build CNN models
cnn_price_model = build_cnn_model(input_shape)
cnn_frac_model = build_cnn_model(input_shape)

# Define early stopping callback
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True
)

# Normalize target values for training
price_scaler = MinMaxScaler()
y_train_price_scaled = price_scaler.fit_transform(y_train_price.reshape(-1, 1)).flatten()
y_test_price_scaled = price_scaler.transform(y_test_price.reshape(-1, 1)).flatten()

frac_scaler = MinMaxScaler()
y_train_frac_scaled = frac_scaler.fit_transform(y_train_frac.reshape(-1, 1)).flatten()
y_test_frac_scaled = frac_scaler.transform(y_test_frac.reshape(-1, 1)).flatten()

## Train CNN Model for Raw Price Prediction

In [None]:
# Train CNN model for raw price prediction
history_price = cnn_price_model.fit(
    X_train_price_gaf, y_train_price_scaled,
    epochs=50,
    batch_size=32,
    validation_split=0.2,
    callbacks=[early_stopping],
    verbose=1
)

# Plot training history
plt.figure(figsize=(10, 6))
plt.plot(history_price.history['loss'], label='Training Loss')
plt.plot(history_price.history['val_loss'], label='Validation Loss')
plt.title('CNN Model Training History (Raw Price Prediction)', fontsize=14, fontweight='bold')
plt.xlabel('Epoch')
plt.ylabel('Loss (MSE)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

## Train CNN Model for Fractionally Differenced Series Prediction

In [None]:
# Train CNN model for fractionally differenced series prediction
history_frac = cnn_frac_model.fit(
    X_train_frac_gaf, y_train_frac_scaled,
    epochs=50,
    batch_size=32,
    validation_split=0.2,
    callbacks=[early_stopping],
    verbose=1
)

# Plot training history
plt.figure(figsize=(10, 6))
plt.plot(history_frac.history['loss'], label='Training Loss')
plt.plot(history_frac.history['val_loss'], label='Validation Loss')
plt.title('CNN Model Training History (Fractionally Differenced Prediction)', fontsize=14, fontweight='bold')
plt.xlabel('Epoch')
plt.ylabel('Loss (MSE)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

## Model Evaluation

Let's evaluate the performance of our CNN models on the test data.

In [None]:
def evaluate_model(model, X_test, y_test_scaled, scaler, model_name):
    """
    Evaluate a model's performance on test data.
    
    Parameters:
    -----------
    model : tensorflow.keras.models.Model
        Trained model to evaluate
    X_test : numpy.ndarray
        Test input data
    y_test_scaled : numpy.ndarray
        Scaled test target data
    scaler : sklearn.preprocessing.MinMaxScaler
        Scaler used to normalize the target data
    model_name : str
        Name of the model for display purposes
        
    Returns:
    --------
    dict
        Dictionary containing evaluation metrics
    """
    # Make predictions
    y_pred_scaled = model.predict(X_test)
    
    # Inverse transform predictions and actual values
    y_pred = scaler.inverse_transform(y_pred_scaled).flatten()
    y_true = scaler.inverse_transform(y_test_scaled.reshape(-1, 1)).flatten()
    
    # Calculate metrics
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
    
    # Print metrics
    print(f"Evaluation Metrics for {model_name}:")
    print(f"MAE: {mae:.4f}")
    print(f"RMSE: {rmse:.4f}")
    print(f"MAPE: {mape:.4f}%")
    
    # Plot actual vs. predicted values
    plt.figure(figsize=(14, 7))
    plt.plot(y_true, label='Actual', color='blue', alpha=0.7)
    plt.plot(y_pred, label='Predicted', color='red', alpha=0.7)
    plt.title(f'{model_name}: Actual vs. Predicted Values', fontsize=14, fontweight='bold')
    plt.xlabel('Time Step')
    plt.ylabel('Value')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()
    
    # Plot prediction error
    error = y_true - y_pred
    plt.figure(figsize=(14, 7))
    plt.plot(error, color='green', alpha=0.7)
    plt.title(f'{model_name}: Prediction Error', fontsize=14, fontweight='bold')
    plt.xlabel('Time Step')
    plt.ylabel('Error')
    plt.axhline(y=0, color='r', linestyle='--')
    plt.grid(True, alpha=0.3)
    plt.show()
    
    return {
        'model_name': model_name,
        'mae': mae,
        'rmse': rmse,
        'mape': mape,
        'y_true': y_true,
        'y_pred': y_pred
    }

# Evaluate CNN model for raw price prediction
cnn_price_results = evaluate_model(
    cnn_price_model, 
    X_test_price_gaf, 
    y_test_price_scaled, 
    price_scaler, 
    "CNN Model (Raw Price Prediction)"
)

# Evaluate CNN model for fractionally differenced series prediction
cnn_frac_results = evaluate_model(
    cnn_frac_model, 
    X_test_frac_gaf, 
    y_test_frac_scaled, 
    frac_scaler, 
    "CNN Model (Fractionally Differenced Prediction)"
)

## Performance Comparison

Let's compare the performance of our CNN models on both the raw price data and the fractionally differenced data.

In [None]:
# Create a DataFrame for performance comparison
performance_df = pd.DataFrame([
    {
        'Model': 'CNN with GAF (Raw Price)',
        'MAE': cnn_price_results['mae'],
        'RMSE': cnn_price_results['rmse'],
        'MAPE (%)': cnn_price_results['mape']
    },
    {
        'Model': 'CNN with GAF (Fractionally Differenced)',
        'MAE': cnn_frac_results['mae'],
        'RMSE': cnn_frac_results['rmse'],
        'MAPE (%)': cnn_frac_results['mape']
    }
])

display(performance_df)

# Save models and results
cnn_price_model.save('cnn_price_model.h5')
cnn_frac_model.save('cnn_frac_model.h5')

results_dict = {
    'cnn_price_results': cnn_price_results,
    'cnn_frac_results': cnn_frac_results,
    'performance_df': performance_df
}

with open('cnn_results.pkl', 'wb') as f:
    pickle.dump(results_dict, f)

print("Models and results saved successfully.")

## Summary of Part 4

In this part of our analysis, we've:

1. Transformed time series data into Gramian Angular Field (GAF) images
2. Visualized the GAF representations of both raw price and fractionally differenced data
3. Built and trained CNN models to process these GAF images for price prediction
4. Evaluated and compared the performance of the CNN models

Key findings:
- GAF transformation provides a novel way to represent time series data as images
- CNN models can effectively capture patterns in these image representations
- The choice between using raw price data or fractionally differenced data affects the model performance

In the next part, we'll compare all our models (MLP and CNN) and draw final conclusions about the best approach for Bitcoin price prediction.