In [5]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import torch
from torch import nn
from torch.utils.data import DataLoader
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from torch.utils.data import DataLoader, TensorDataset
import seaborn as sns
import matplotlib.pyplot as plt

In [2]:
class CryptoPricePredictionPipeline:
    def __init__(self, csv_path):
        # Load the data
        self.df = pd.read_csv(csv_path)
        
        # Feature selection (excluding time-based columns)
        features = ['Open', 'High', 'Low', 'Volume', 
                    'Quote_Asset_Volume', 'Number_of_Trades', 
                    'Taker_Buy_Base_Volume', 'Taker_Buy_Quote_Volume']
        target = 'Close'
        
        # Separate features and target
        self.X = self.df[features].values
        self.y = self.df[target].values
        
        # Initialize scaler
        self.scaler_X = StandardScaler()
        self.scaler_y = StandardScaler()
        
    def prepare_data(self, test_size=0.2, random_state=42):
        # Scale features and target
        X_scaled = self.scaler_X.fit_transform(self.X)
        y_scaled = self.scaler_y.fit_transform(self.y.reshape(-1, 1)).flatten()
        
        # Split the data
        X_train, X_test, y_train, y_test = train_test_split(
            X_scaled, y_scaled, 
            test_size=test_size, 
            random_state=random_state
        )
        
        # Convert to PyTorch tensors
        self.X_train = torch.FloatTensor(X_train)
        self.X_test = torch.FloatTensor(X_test)
        self.y_train = torch.FloatTensor(y_train)
        self.y_test = torch.FloatTensor(y_test)
        
        # Create DataLoaders
        train_dataset = TensorDataset(self.X_train, self.y_train)
        test_dataset = TensorDataset(self.X_test, self.y_test)
        
        self.train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        self.test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [3]:
class CryptoPriceMLP(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(CryptoPriceMLP, self).__init__()
        
        # One hidden layer network
        self.layers = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, output_size)
        )
    
    def forward(self, x):
        return self.layers(x)


In [None]:
# 3. Training Pipeline
class ModelTrainer:
    def __init__(self, model, learning_rate=0.01):
        self.model = model
        self.criterion = nn.MSELoss()
        self.optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    def train(self, train_loader, epochs=100):
        self.model.train()
        
        for epoch in range(epochs):
            total_loss = 0
            
            for batch_X, batch_y in train_loader:
                
                self.optimizer.zero_grad()
                
                # Forward pass
                outputs = self.model(batch_X)
                
                # Compute loss
                loss = self.criterion(outputs.squeeze(), batch_y)
                
                # Backward pass
                loss.backward()
                
                # Optimize
                self.optimizer.step()
                
                total_loss += loss.item()
                
            
            # Print loss every 10 epochs
            if (epoch + 1) % 10 == 0:
                print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(train_loader):.4f}')
    
    def evaluate(self, test_loader):
        self.model.eval()
        total_loss = 0
        
        with torch.no_grad():
            for batch_X, batch_y in test_loader:
                outputs = self.model(batch_X)
                loss = nn.MSELoss()(outputs.squeeze(), batch_y)
                total_loss += loss.item()
        
        print(f'Test Loss: {total_loss/len(test_loader):.4f}')
    
    def predict(self, X):
        self.model.eval()
        with torch.no_grad():
            prediction = self.model(X)
        return prediction


In [6]:
class ModelEvaluator:
    def __init__(self, model, data_pipeline, trainer):
        self.model = model
        self.data_pipeline = data_pipeline
        self.trainer = trainer
        
    def plot_training_progress(self, training_losses):
        """
        Plot training loss over epochs
        """
        plt.figure(figsize=(10, 5))
        plt.plot(training_losses, label='Training Loss')
        plt.title('Training Loss Over Epochs')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.legend()
        plt.tight_layout()
        plt.savefig('training_loss.png')
        plt.close()
    
    def plot_actual_vs_predicted(self):
        """
        Create scatter plot of actual vs predicted values
        """
        # Predict on test data
        self.model.eval()
        with torch.no_grad():
            predictions = self.model(self.data_pipeline.X_test).numpy()
        
        # Inverse transform predictions and actual values
        y_pred = self.data_pipeline.scaler_y.inverse_transform(predictions)
        y_true = self.data_pipeline.scaler_y.inverse_transform(
            self.data_pipeline.y_test.numpy().reshape(-1, 1)
        )
        
        # Create scatter plot
        plt.figure(figsize=(10, 6))
        plt.scatter(y_true, y_pred, alpha=0.5)
        plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 
                 'r--', lw=2)
        plt.title('Actual vs Predicted Prices')
        plt.xlabel('Actual Prices')
        plt.ylabel('Predicted Prices')
        plt.tight_layout()
        plt.savefig('actual_vs_predicted.png')
        plt.close()
    
    def plot_residuals(self):
        """
        Create residual plot to assess model performance
        """
        # Predict on test data
        self.model.eval()
        with torch.no_grad():
            predictions = self.model(self.data_pipeline.X_test).numpy()
        
        # Inverse transform predictions and actual values
        y_pred = self.data_pipeline.scaler_y.inverse_transform(predictions)
        y_true = self.data_pipeline.scaler_y.inverse_transform(
            self.data_pipeline.y_test.numpy().reshape(-1, 1)
        )
        
        # Calculate residuals
        residuals = y_true - y_pred
        
        plt.figure(figsize=(10, 6))
        plt.scatter(y_pred, residuals, alpha=0.5)
        plt.title('Residual Plot')
        plt.xlabel('Predicted Values')
        plt.ylabel('Residuals')
        plt.axhline(y=0, color='r', linestyle='--')
        plt.tight_layout()
        plt.savefig('residuals.png')
        plt.close()
    
    def generate_performance_metrics(self):
        """
        Calculate and print performance metrics
        """
        # Predict on test data
        self.model.eval()
        with torch.no_grad():
            predictions = self.model(self.data_pipeline.X_test).numpy()
        
        # Inverse transform predictions and actual values
        y_pred = self.data_pipeline.scaler_y.inverse_transform(predictions)
        y_true = self.data_pipeline.scaler_y.inverse_transform(
            self.data_pipeline.y_test.numpy().reshape(-1, 1)
        )
        
        # Calculate metrics
        mse = mean_squared_error(y_true, y_pred)
        mae = mean_absolute_error(y_true, y_pred)
        r2 = r2_score(y_true, y_pred)
        
        # Create metrics dataframe
        metrics_df = pd.DataFrame({
            'Metric': ['Mean Squared Error', 'Mean Absolute Error', 'R-squared'],
            'Value': [mse, mae, r2]
        })
        
        # Save metrics to CSV
        metrics_df.to_csv('model_performance_metrics.csv', index=False)
        
        # Print metrics
        print("Model Performance Metrics:")
        print(metrics_df.to_string(index=False))

In [7]:
csv_path = '../data/crypto_data.csv'
data_pipeline = CryptoPricePredictionPipeline(csv_path)
data_pipeline.prepare_data()

input_size = data_pipeline.X_train.shape[1]
hidden_size = 64  # You can tune this
output_size = 1

model = CryptoPriceMLP(input_size, hidden_size, output_size)


trainer = ModelTrainer(model)

# Train the Model
trainer.train(data_pipeline.train_loader, epochs=100)

# Evaluate Model
trainer.evaluate(data_pipeline.test_loader)


evaluator = ModelEvaluator(model, data_pipeline, trainer)

# Generate Visualizations
#evaluator.plot_training_progress(training_losses)
evaluator.plot_actual_vs_predicted()
evaluator.plot_residuals()

# Generate Performance Metrics
evaluator.generate_performance_metrics()

Epoch [10/100], Loss: 0.0002
Epoch [20/100], Loss: 0.0001
Epoch [30/100], Loss: 0.0002
Epoch [40/100], Loss: 0.0002
Epoch [50/100], Loss: 0.0001
Epoch [60/100], Loss: 0.0001
Epoch [70/100], Loss: 0.0001
Epoch [80/100], Loss: 0.0001
Epoch [90/100], Loss: 0.0002
Epoch [100/100], Loss: 0.0001
Test Loss: 0.0006
Model Performance Metrics:
             Metric      Value
 Mean Squared Error 953.453857
Mean Absolute Error  24.895945
          R-squared   0.999380
