## 1. Import Required Libraries

First we import all the libraries we need for data analysis and machine learning

In [1]:
# Import all necessary libraries for data analysis and machine learning
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
import pickle
import warnings
warnings.filterwarnings('ignore')
plt.style.use('default')
pd.set_option('display.max_columns', None)
pd.set_option('display.float_format', '{:.2f}'.format)


## 2. Data Loading Class

This class helps us load the CSV file and look at basic information about our data

In [2]:
class DataLoader:
    
    def __init__(self, file_path):
        self.file_path = file_path
        self.data = None
    
    def load_data(self):
        try:
            self.data = pd.read_csv(self.file_path)
            print(f"Data loaded successfully! Shape: {self.data.shape}")
            return self.data
        except Exception as e:
            print(f"Error loading data: {e}")
            return None
    
    def basic_info(self):
        if self.data is not None:
            print("Basic Information about the Dataset:")
            print(f"Number of rows: {self.data.shape[0]}")
            print(f"Number of columns: {self.data.shape[1]}")
            print("Column names:")
            print(self.data.columns.tolist())
            print("Data types:")
            print(self.data.dtypes)
            print("First 5 rows:")
            print(self.data.head())
        else:
            print("No data loaded yet. Please load data first.")

crypto_loader = DataLoader(r'crypto_data_updated_29_november.csv')
crypto_data = crypto_loader.load_data()
crypto_loader.basic_info()

Data loaded successfully! Shape: (1827, 9)
Basic Information about the Dataset:
Number of rows: 1827
Number of columns: 9
Column names:
['Date', 'Close (BTC)', 'Volume (BTC)', 'Close (ETH)', 'Volume (ETH)', 'Close (USDT)', 'Volume (USDT)', 'Close (BNB)', 'Volume (BNB)']
Data types:
Date              object
Close (BTC)      float64
Volume (BTC)       int64
Close (ETH)      float64
Volume (ETH)       int64
Close (USDT)     float64
Volume (USDT)      int64
Close (BNB)      float64
Volume (BNB)       int64
dtype: object
First 5 rows:
                        Date  Close (BTC)  Volume (BTC)  Close (ETH)  \
0  2017-11-13 00:00:00+00:00      6559.49    6263249920       316.72   
1  2017-11-14 00:00:00+00:00      6635.75    3197110016       337.63   
2  2017-11-15 00:00:00+00:00      7315.54    4200880128       333.36   
3  2017-11-16 00:00:00+00:00      7871.69    5123809792       330.92   
4  2017-11-17 00:00:00+00:00      7708.99    4651670016       332.39   

   Volume (ETH)  Close (USDT)  

## 3. Data Cleaning Class

This class helps us clean our data by removing missing values, duplicates, and fixing any problems

In [None]:
class DataCleaner:
    """
    This class cleans our data by handling missing values, removing duplicates,
    and dealing with outliers that might cause problems in our analysis.
    """
    
    def __init__(self, data):
        self.data = data.copy()
        self.original_shape = data.shape
    
    def check_missing_values(self):
        missing_values = self.data.isnull().sum()
        missing_percent = (missing_values / len(self.data)) * 100
        
        missing_info = pd.DataFrame({
            'Missing Count': missing_values,
            'Missing Percentage': missing_percent
        })
        
        print("Missing Values Information:")
        print(missing_info[missing_info['Missing Count'] > 0])
        
        if missing_values.sum() == 0:
            print("Great! No missing values found in the data.")
        
        return missing_info
    
    def handle_missing_values(self, method='drop'):
        if method == 'drop':
            self.data = self.data.dropna()
            print(f"Dropped rows with missing values. New shape: {self.data.shape}")
        elif method == 'fill':
            numeric_columns = self.data.select_dtypes(include=[np.number]).columns
            self.data[numeric_columns] = self.data[numeric_columns].fillna(self.data[numeric_columns].mean())
            print("Filled missing values with column averages.")
        
        return self.data
    
    def remove_duplicates(self):
        before_count = len(self.data)
        self.data = self.data.drop_duplicates()
        after_count = len(self.data)
        removed_count = before_count - after_count
        
        print(f"Removed {removed_count} duplicate rows.")
        return self.data
    
    def handle_outliers(self, columns=None, method='iqr'):
        if columns is None:
            columns = self.data.select_dtypes(include=[np.number]).columns
        
        before_count = len(self.data)
        
        for column in columns:
            if column in self.data.columns:
                Q1 = self.data[column].quantile(0.25)
                Q3 = self.data[column].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                
                # Removing outliers
                self.data = self.data[
                    (self.data[column] >= lower_bound) & 
                    (self.data[column] <= upper_bound)
                ]
        
        after_count = len(self.data)
        removed_count = before_count - after_count
        print(f"Removed {removed_count} outlier rows using IQR method.")
        
        return self.data
    
    def get_clean_data(self):
        print(f"Data cleaning completed!")
        print(f"Original shape: {self.original_shape}")
        print(f"Final shape: {self.data.shape}")
        return self.data

cleaner = DataCleaner(crypto_data)
cleaner.check_missing_values()
cleaner.handle_missing_values(method='drop')
cleaner.remove_duplicates()
clean_crypto_data = cleaner.get_clean_data()

Missing Values Information:
Empty DataFrame
Columns: [Missing Count, Missing Percentage]
Index: []
Great! No missing values found in the data.
Dropped rows with missing values. New shape: (1827, 9)
Removed 0 duplicate rows.
Data cleaning completed!
Original shape: (1827, 9)
Final shape: (1827, 9)


## 4. Data Preprocessing Class

This class prepares our data for machine learning by converting dates and scaling numbers

In [None]:
class DataPreprocessor:
    """
    This class prepares our data for machine learning by converting dates,
    creating new features, and scaling numerical values.
    """
    
    def __init__(self, data):
        self.data = data.copy()
        self.scaler = StandardScaler()
        self.scaled_columns = []
    
    def process_date_column(self, date_column='Date'):
        if date_column in self.data.columns:
            self.data[date_column] = pd.to_datetime(self.data[date_column])
            
            # Extract date features
            self.data['Year'] = self.data[date_column].dt.year
            self.data['Month'] = self.data[date_column].dt.month
            self.data['Day'] = self.data[date_column].dt.day
            self.data['DayOfWeek'] = self.data[date_column].dt.dayofweek
            
            print(f"Processed date column and created new date features.")
        else:
            print(f"Column {date_column} not found in data.")
        
        return self.data
    
    def create_features(self):
        # Create price ratios between different cryptocurrencies
        if 'Close (BTC)' in self.data.columns and 'Close (ETH)' in self.data.columns:
            self.data['BTC_ETH_Ratio'] = self.data['Close (BTC)'] / self.data['Close (ETH)']
        
        # Create volume to price ratios
        if 'Volume (BTC)' in self.data.columns and 'Close (BTC)' in self.data.columns:
            self.data['BTC_Volume_Price_Ratio'] = self.data['Volume (BTC)'] / self.data['Close (BTC)']
        
        if 'Volume (ETH)' in self.data.columns and 'Close (ETH)' in self.data.columns:
            self.data['ETH_Volume_Price_Ratio'] = self.data['Volume (ETH)'] / self.data['Close (ETH)']
        
        print("Created new features from existing data.")
        return self.data
    
    def scale_numerical_features(self, columns_to_scale=None):
        if columns_to_scale is None:
            columns_to_scale = self.data.select_dtypes(include=[np.number]).columns
            # Remove date-related columns from scaling
            columns_to_scale = [col for col in columns_to_scale if col not in ['Year', 'Month', 'Day', 'DayOfWeek']]
        
        self.scaled_columns = columns_to_scale
        self.data[columns_to_scale] = self.scaler.fit_transform(self.data[columns_to_scale])
        
        print(f"Scaled {len(columns_to_scale)} numerical columns.")
        return self.data
    
    def prepare_for_modeling(self, target_column='Close (BTC)'):
        # Remove non-numeric columns and the target column from features
        feature_columns = self.data.select_dtypes(include=[np.number]).columns
        feature_columns = [col for col in feature_columns if col != target_column]
        
        X = self.data[feature_columns]
        y = self.data[target_column]
        
        print(f"Prepared {len(feature_columns)} features to predict {target_column}")
        print(f"Feature columns: {feature_columns}")
        
        return X, y
    
    def get_processed_data(self):
        """
        Return the fully processed data.
        """
        return self.data

# Preprocess our crypto data
preprocessor = DataPreprocessor(clean_crypto_data)
processed_data = preprocessor.process_date_column()
processed_data = preprocessor.create_features()
processed_data = preprocessor.scale_numerical_features()

# Show the processed data
print("Processed data shape:", processed_data.shape)
print("New columns created:")
print(processed_data.columns.tolist())

Processed date column and created new date features.
Created new features from existing data.

Processed data shape: (1827, 16)

New columns created:
['Date', 'Close (BTC)', 'Volume (BTC)', 'Close (ETH)', 'Volume (ETH)', 'Close (USDT)', 'Volume (USDT)', 'Close (BNB)', 'Volume (BNB)', 'Year', 'Month', 'Day', 'DayOfWeek', 'BTC_ETH_Ratio', 'BTC_Volume_Price_Ratio', 'ETH_Volume_Price_Ratio']


## 5. Save Clean CSV Class

This class saves our cleaned and processed data to a new CSV file for future use

In [None]:
class DataSaver:
    """
    This class saves our cleaned and processed data to CSV files
    so we can use them later without repeating all the cleaning steps.
    """
    
    def __init__(self, data):
        self.data = data
    
    def save_to_csv(self, filename, include_index=False):
        try:
            self.data.to_csv(filename, index=include_index)
            print(f"Data saved successfully to {filename}")
            print(f"Saved {len(self.data)} rows and {len(self.data.columns)} columns")
        except Exception as e:
            print(f"Error saving data: {e}")
    
    def save_summary(self, filename):
        try:
            with open(filename, 'w') as f:
                f.write("Data Summary Report")
                f.write(f"Total rows: {len(self.data)}")
                f.write(f"Total columns: {len(self.data.columns)}")
                f.write("Column names:")
                for col in self.data.columns:
                    f.write(f"- {col}")
                f.write("\n")
                f.write("Basic statistics:")
                f.write(str(self.data.describe()))
            
            print(f"Summary saved to {filename}")
        except Exception as e:
            print(f"Error saving summary: {e}")

# Save our processed data
data_saver = DataSaver(processed_data)
data_saver.save_to_csv('clean_crypto_data.csv')
data_saver.save_summary('data_summary.txt')

Data saved successfully to clean_crypto_data.csv
Saved 1827 rows and 16 columns
Summary saved to data_summary.txt


## 6. Model Training Class

This class trains machine learning models to predict cryptocurrency prices

In [None]:
class ModelTrainer:
    def __init__(self, X, y):
        """
        Initialize the trainer so we can initialize with features and target variable.
        X: feature variables (what we use to predict)
        y: target variable (what we want to predict)
        """
        self.X = X
        self.y = y
        self.models = {}
        self.trained_models = {}
        self.results = {}
        
        # Initialize different regression models
        self.models = {
            'Linear Regression': LinearRegression(),
            'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42)
        }
    
    def split_data(self, test_size=0.2, random_state=42):
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            self.X, self.y, test_size=test_size, random_state=random_state
        )
        
        print(f"Data split completed!")
        print(f"Training set: {len(self.X_train)} samples")
        print(f"Testing set: {len(self.X_test)} samples")
        
        return self.X_train, self.X_test, self.y_train, self.y_test
    
    def train_models(self):
        print("Training models...")
        print("=" * 30)
        
        for model_name, model in self.models.items():
            print(f"Training {model_name}...")
            
            # Train the model
            model.fit(self.X_train, self.y_train)
            self.trained_models[model_name] = model
            
            print(f"{model_name} training completed!")
        
        print("All models trained successfully!")
    

    
    def plot_predictions(self, model_name):
        if model_name in self.trained_models:
            model = self.trained_models[model_name]
            y_pred = model.predict(self.X_test)
            
            plt.figure(figsize=(10, 6))
            plt.scatter(self.y_test, y_pred, alpha=0.6)
            plt.plot([self.y_test.min(), self.y_test.max()], 
                    [self.y_test.min(), self.y_test.max()], 
                    'r--', lw=2)
            plt.xlabel('Actual Values')
            plt.ylabel('Predicted Values')
            plt.title(f'{model_name}: Actual vs Predicted Values')
            
            # Add R2 score to the plot
            r2 = self.results[model_name]['R2 Score']
            plt.text(0.05, 0.95, f'RÂ² = {r2:.3f}', 
                    transform=plt.gca().transAxes, 
                    bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
            
            plt.tight_layout()
            plt.show()
        else:
            print(f"Model {model_name} not found or not trained yet.")
    
    def get_best_model(self):
        if not self.results:
            print("No models evaluated yet. Please run evaluate_models() first.")
            return None
        
        best_model_name = max(self.results.keys(), key=lambda x: self.results[x]['R2 Score'])
        best_score = self.results[best_model_name]['R2 Score']
        
        print(f"Best performing model: {best_model_name}")
        print(f"Best R2 Score: {best_score:.4f}")
        
        return best_model_name, self.trained_models[best_model_name]
    
    def feature_importance(self, model_name):
        if model_name in self.trained_models:
            model = self.trained_models[model_name]
            
            if hasattr(model, 'feature_importances_'):
                importance_df = pd.DataFrame({
                    'Feature': self.X.columns,
                    'Importance': model.feature_importances_
                }).sort_values('Importance', ascending=False)
                
                print(f"Feature Importance for {model_name}:")
                print(importance_df)
                plt.figure(figsize=(10, 6))
                plt.barh(importance_df['Feature'][:10], importance_df['Importance'][:10])
                plt.xlabel('Importance')
                plt.title(f'Top 10 Feature Importance - {model_name}')
                plt.gca().invert_yaxis()
                plt.tight_layout()
                plt.show()
                
                return importance_df
            else:
                print(f"Model {model_name} does not provide feature importance.")
        else:
            print(f"Model {model_name} not found.")

X, y = preprocessor.prepare_for_modeling(target_column='Close (BTC)')
model_trainer = ModelTrainer(X, y)
X_train, X_test, y_train, y_test = model_trainer.split_data(test_size=0.2)
model_trainer.train_models()

Prepared 14 features to predict Close (BTC)
Feature columns: ['Volume (BTC)', 'Close (ETH)', 'Volume (ETH)', 'Close (USDT)', 'Volume (USDT)', 'Close (BNB)', 'Volume (BNB)', 'Year', 'Month', 'Day', 'DayOfWeek', 'BTC_ETH_Ratio', 'BTC_Volume_Price_Ratio', 'ETH_Volume_Price_Ratio']
Data split completed!
Training set: 1461 samples
Testing set: 366 samples
Training models...
Training Linear Regression...
Linear Regression training completed!
Training Random Forest...
Random Forest training completed!

All models trained successfully!
Random Forest training completed!

All models trained successfully!
