<a href="https://colab.research.google.com/github/Remorse-Code/Cryptocurrency-Technical-Analysis-Tool/blob/main/Cryptocurrency_Technical_Analysis_Tool.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# connecting to drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
from scipy import stats

# [Previous functions remain the same, so keeping them for completeness]
def calculate_rsi(close_prices, window=14):
    delta = close_prices.diff()
    gain = (delta.where(delta > 0, 0))
    loss = (-delta.where(delta < 0, 0))
    avg_gain = gain.rolling(window=window, min_periods=1).mean()
    avg_loss = loss.rolling(window=window, min_periods=1).mean()
    rs = avg_gain / avg_loss
    return 100 - (100 / (1 + rs))

def calculate_macd(close_prices, fast=12, slow=26, signal=9):
    exp1 = close_prices.ewm(span=fast, adjust=False).mean()
    exp2 = close_prices.ewm(span=slow, adjust=False).mean()
    macd = exp1 - exp2
    signal_line = macd.ewm(span=signal, adjust=False).mean()
    histogram = macd - signal_line
    return macd, signal_line, histogram

def calculate_bollinger_bands(close_prices, window=20, num_std=2):
    middle_band = close_prices.rolling(window=window).mean()
    std_dev = close_prices.rolling(window=window).std()
    upper_band = middle_band + (std_dev * num_std)
    lower_band = middle_band - (std_dev * num_std)
    return upper_band, middle_band, lower_band

def calculate_high_low_spread(high_prices, low_prices, window=14):
    spread = ((high_prices - low_prices) / low_prices) * 100
    avg_spread = spread.rolling(window=window).mean()
    return spread, avg_spread

def calculate_momentum(close_prices, window=14):
    momentum = close_prices.diff(window)
    momentum_pct = (momentum / close_prices.shift(window)) * 100
    return momentum_pct

def calculate_volatility(close_prices, window=14):
    log_returns = np.log(close_prices / close_prices.shift(1))
    volatility = log_returns.rolling(window=window).std() * np.sqrt(252 * 1440)
    return volatility * 100

def calculate_stochastic_oscillator(high_prices, low_prices, close_prices, k_window=14, d_window=3):
    lowest_low = low_prices.rolling(window=k_window).min()
    highest_high = high_prices.rolling(window=k_window).max()
    k_line = ((close_prices - lowest_low) / (highest_high - lowest_low)) * 100
    d_line = k_line.rolling(window=d_window).mean()
    return k_line, d_line

def calculate_atr(high_prices, low_prices, close_prices, window=14):
    high_low = high_prices - low_prices
    high_close = np.abs(high_prices - close_prices.shift())
    low_close = np.abs(low_prices - close_prices.shift())
    ranges = pd.concat([high_low, high_close, low_close], axis=1)
    true_range = ranges.max(axis=1)
    atr = true_range.rolling(window=window).mean()
    return atr

def calculate_statistical_features(close_prices, window=14):
    log_returns = np.log(close_prices / close_prices.shift(1))
    rolling_mean = log_returns.rolling(window=window).mean()
    rolling_std = log_returns.rolling(window=window).std()
    rolling_skew = log_returns.rolling(window=window).skew()
    rolling_kurt = log_returns.rolling(window=window).kurt()
    z_score = (log_returns - rolling_mean) / rolling_std
    return rolling_mean, rolling_std, rolling_skew, rolling_kurt, z_score

# Load and prepare data
file_path = 'drive/MyDrive/datasets/quant/ml/BTCUSDT_1m.csv'
df = pd.read_csv(file_path)
df = df.iloc[0:5000]

# Convert timestamp
if 'timestamp' in df.columns:
    df['timestamp'] = pd.to_datetime(df['timestamp'])
elif 'time' in df.columns:
    df['timestamp'] = pd.to_datetime(df['time'])
elif 'date' in df.columns:
    df['timestamp'] = pd.to_datetime(df['date'])

# Calculate indicators
df['RSI'] = calculate_rsi(df['close'])
df['MA20'] = df['close'].rolling(window=20).mean()
df['MA50'] = df['close'].rolling(window=50).mean()
df['MA200'] = df['close'].rolling(window=200).mean()
df['MACD'], df['MACD_Signal'], df['MACD_Histogram'] = calculate_macd(df['close'])
df['BB_Upper'], df['BB_Middle'], df['BB_Lower'] = calculate_bollinger_bands(df['close'])
df['Stoch_K'], df['Stoch_D'] = calculate_stochastic_oscillator(df['high'], df['low'], df['close'])
df['Spread'], df['Avg_Spread'] = calculate_high_low_spread(df['high'], df['low'])
df['Momentum'] = calculate_momentum(df['close'])
df['Volatility'] = calculate_volatility(df['close'])
df['Returns_Mean'], df['Returns_Std'], df['Returns_Skew'], df['Returns_Kurt'], df['Returns_ZScore'] = calculate_statistical_features(df['close'])

# Create subplots
fig = make_subplots(rows=9, cols=1,
                    shared_xaxes=True,
                    vertical_spacing=0.03,
                    row_heights=[0.3, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1])

# Main price chart
fig.add_trace(go.Candlestick(
    x=df['timestamp'],
    open=df['open'],
    high=df['high'],
    low=df['low'],
    close=df['close'],
    name='OHLC',
    increasing_line_color='#26A69A',
    decreasing_line_color='#EF5350'
), row=1, col=1)

# Moving Averages
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['MA20'], name='MA20',
                        line=dict(color='#FFB74D', width=1)), row=1, col=1)
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['MA50'], name='MA50',
                        line=dict(color='#FF7043', width=1)), row=1, col=1)
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['MA200'], name='MA200',
                        line=dict(color='#E53935', width=1)), row=1, col=1)

# Bollinger Bands
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['BB_Upper'], name='BB Upper',
                        line=dict(color='rgba(128,128,128,0.7)', width=1, dash='dash')), row=1, col=1)
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['BB_Lower'], name='BB Lower',
                        line=dict(color='rgba(128,128,128,0.7)', width=1, dash='dash'),
                        fill='tonexty', fillcolor='rgba(128,128,128,0.1)'), row=1, col=1)

# Volume
fig.add_trace(go.Bar(
    x=df['timestamp'],
    y=df['volume'],
    name='Volume',
    marker=dict(color='#90A4AE', opacity=0.7)
), row=2, col=1)

# MACD
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['MACD'], name='MACD',
                        line=dict(color='#42A5F5', width=1)), row=3, col=1)
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['MACD_Signal'], name='Signal',
                        line=dict(color='#FF7043', width=1)), row=3, col=1)
fig.add_trace(go.Bar(x=df['timestamp'], y=df['MACD_Histogram'], name='MACD Histogram',
                    marker_color='#78909C'), row=3, col=1)

# RSI
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['RSI'], name='RSI',
                        line=dict(color='#7E57C2', width=1)), row=4, col=1)
fig.add_hline(y=70, line_color="#EF5350", line_dash="dash", line_width=1,
              annotation_text="Overbought", annotation_position="right", row=4, col=1)
fig.add_hline(y=30, line_color="#26A69A", line_dash="dash", line_width=1,
              annotation_text="Oversold", annotation_position="right", row=4, col=1)

# High-Low Spread
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['Spread'], name='HL Spread',
                        line=dict(color='#26C6DA', width=1)), row=5, col=1)
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['Avg_Spread'], name='Avg Spread',
                        line=dict(color='#78909C', width=1)), row=5, col=1)

# Momentum
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['Momentum'], name='Momentum',
                        line=dict(color='#FFB74D', width=1)), row=6, col=1)
fig.add_hline(y=0, line_color="#78909C", line_dash="dash", line_width=1, row=6, col=1)

# Volatility
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['Volatility'], name='Volatility',
                        line=dict(color='#FF7043', width=1)), row=7, col=1)

# Stochastic Oscillator
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['Stoch_K'], name='Stoch %K',
                        line=dict(color='#42A5F5', width=1)), row=8, col=1)
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['Stoch_D'], name='Stoch %D',
                        line=dict(color='#FF7043', width=1)), row=8, col=1)

# Statistical Features
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['Returns_ZScore'], name='Z-Score',
                        line=dict(color='#7E57C2', width=1)), row=9, col=1)
fig.add_trace(go.Scatter(x=df['timestamp'], y=df['Returns_Skew'], name='Skewness',
                        line=dict(color='#26A69A', width=1)), row=9, col=1)

# Update layout for light theme
fig.update_layout(
    title='BTC/USDT Advanced Technical Analysis',
    height=2000,
    showlegend=True,
    xaxis_rangeslider_visible=False,
    hovermode='x unified',
    dragmode='pan',
    template='plotly',  # Using default light template
    margin=dict(l=50, r=50, t=50, b=50),
    plot_bgcolor='white',
    paper_bgcolor='white',
    font=dict(color='#424242')
)

# Configure y-axes titles and ranges
fig.update_yaxes(title_text="Price", row=1, col=1)
fig.update_yaxes(title_text="Volume", row=2, col=1)
fig.update_yaxes(title_text="MACD", row=3, col=1)
fig.update_yaxes(title_text="RSI", range=[0, 100], row=4, col=1)
fig.update_yaxes(title_text="HL Spread %", row=5, col=1)
fig.update_yaxes(title_text="Momentum %", row=6, col=1)
fig.update_yaxes(title_text="Volatility %", row=7, col=1)
fig.update_yaxes(title_text="Stochastic", range=[0, 100], row=8, col=1)
fig.update_yaxes(title_text="Statistics", row=9, col=1)

# Update grid styling
for i in range(1, 10):
    fig.update_yaxes(
        showgrid=True,
        gridwidth=1,
        gridcolor='rgba(128,128,128,0.2)',
        row=i, col=1
    )
    fig.update_xaxes(
        showgrid=True,
        gridwidth=1,
        gridcolor='rgba(128,128,128,0.2)',
        row=i, col=1
    )

# Show plot
fig.show()

In [None]:
import pandas as pd
import numpy as np
from scipy import stats
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from sklearn.preprocessing import MinMaxScaler, StandardScaler, RobustScaler
from sklearn.decomposition import PCA

class FeatureProcessor:
    def __init__(self):
        self.scalers = {}
        self.pca = None

    def normalize_feature(self, data, feature_name, method='standard'):
        """
        Normalize a feature using specified method
        methods: 'standard', 'minmax', 'robust'
        """
        if feature_name not in self.scalers:
            if method == 'standard':
                self.scalers[feature_name] = StandardScaler()
            elif method == 'minmax':
                self.scalers[feature_name] = MinMaxScaler()
            elif method == 'robust':
                self.scalers[feature_name] = RobustScaler()

        # Reshape for single feature
        reshaped_data = data.values.reshape(-1, 1)
        normalized_data = self.scalers[feature_name].fit_transform(reshaped_data)
        return normalized_data.flatten()

    def apply_pca(self, features, n_components=0.95):
        """
        Apply PCA to reduce feature dimensionality while preserving variance
        """
        if self.pca is None:
            self.pca = PCA(n_components=n_components)
            return self.pca.fit_transform(features)
        return self.pca.transform(features)

def calculate_technical_indicators(df):
    """Calculate all technical indicators"""

    # RSI
    def calculate_rsi(close_prices, window=14):
        delta = close_prices.diff()
        gain = (delta.where(delta > 0, 0))
        loss = (-delta.where(delta < 0, 0))
        avg_gain = gain.rolling(window=window, min_periods=1).mean()
        avg_loss = loss.rolling(window=window, min_periods=1).mean()
        rs = avg_gain / avg_loss
        return 100 - (100 / (1 + rs))

    # MACD
    def calculate_macd(close_prices, fast=12, slow=26, signal=9):
        exp1 = close_prices.ewm(span=fast, adjust=False).mean()
        exp2 = close_prices.ewm(span=slow, adjust=False).mean()
        macd = exp1 - exp2
        signal_line = macd.ewm(span=signal, adjust=False).mean()
        histogram = macd - signal_line
        return macd, signal_line, histogram

    # Bollinger Bands
    def calculate_bollinger_bands(close_prices, window=20, num_std=2):
        middle_band = close_prices.rolling(window=window).mean()
        std_dev = close_prices.rolling(window=window).std()
        upper_band = middle_band + (std_dev * num_std)
        lower_band = middle_band - (std_dev * num_std)
        return upper_band, middle_band, lower_band

    # Basic price features
    df['returns'] = df['close'].pct_change()
    df['log_returns'] = np.log(df['close'] / df['close'].shift(1))

    # Technical indicators
    df['RSI'] = calculate_rsi(df['close'])
    df['MACD'], df['MACD_Signal'], df['MACD_Hist'] = calculate_macd(df['close'])
    df['BB_Upper'], df['BB_Middle'], df['BB_Lower'] = calculate_bollinger_bands(df['close'])

    # Moving averages
    windows = [5, 10, 20, 50, 200]
    for window in windows:
        df[f'MA_{window}'] = df['close'].rolling(window=window).mean()
        df[f'EMA_{window}'] = df['close'].ewm(span=window, adjust=False).mean()

    return df

def calculate_statistical_features(df, windows=[5, 10, 20]):
    """Calculate statistical features"""

    for window in windows:
        # Rolling statistics
        df[f'rolling_mean_{window}'] = df['returns'].rolling(window=window).mean()
        df[f'rolling_std_{window}'] = df['returns'].rolling(window=window).std()
        df[f'rolling_skew_{window}'] = df['returns'].rolling(window=window).skew()
        df[f'rolling_kurt_{window}'] = df['returns'].rolling(window=window).kurt()

        # Volume analysis
        df[f'volume_ma_{window}'] = df['volume'].rolling(window=window).mean()
        df[f'volume_std_{window}'] = df['volume'].rolling(window=window).std()

        # Price ranges
        df[f'high_low_range_{window}'] = ((df['high'] - df['low']) / df['low']).rolling(window=window).mean()

    return df

def create_lagged_features(df, features, lags=[1, 2, 3, 5, 10]):
    """Create lagged features"""

    for feature in features:
        for lag in lags:
            df[f'{feature}_lag_{lag}'] = df[feature].shift(lag)

    return df

def prepare_features_for_ml(df, processor):
    """Prepare and normalize features for machine learning"""

    # List of features to normalize
    price_features = ['close', 'high', 'low', 'open']
    technical_features = ['RSI', 'MACD', 'MACD_Signal', 'BB_Upper', 'BB_Lower']
    statistical_features = [col for col in df.columns if 'rolling_' in col]
    volume_features = [col for col in df.columns if 'volume_' in col]

    # Dictionary to store normalized features
    normalized_features = {}

    # Normalize price features using robust scaling (handles outliers better)
    for feature in price_features:
        normalized_features[f'{feature}_norm'] = processor.normalize_feature(
            df[feature], feature, method='robust'
        )

    # Normalize technical indicators using standard scaling
    for feature in technical_features:
        normalized_features[f'{feature}_norm'] = processor.normalize_feature(
            df[feature], feature, method='standard'
        )

    # Normalize statistical features using standard scaling
    for feature in statistical_features:
        normalized_features[f'{feature}_norm'] = processor.normalize_feature(
            df[feature], feature, method='standard'
        )

    # Normalize volume features using robust scaling
    for feature in volume_features:
        normalized_features[f'{feature}_norm'] = processor.normalize_feature(
            df[feature], feature, method='robust'
        )

    # Create normalized features DataFrame
    normalized_df = pd.DataFrame(normalized_features, index=df.index)

    return normalized_df

def create_feature_matrix(df, normalized_df, target_column='returns'):
    """Create feature matrix and target variable for ML"""

    # Combine normalized features
    feature_matrix = normalized_df.copy()

    # Add target variable (future returns)
    feature_matrix['target'] = df[target_column].shift(-1)  # Next period's returns

    # Remove any remaining NaN values
    feature_matrix = feature_matrix.dropna()

    return feature_matrix

# Main execution
def main():
    # Load data
    file_path = 'drive/MyDrive/datasets/quant/ml/BTCUSDT_1m.csv'
    df = pd.read_csv(file_path)

    df = df.iloc[0:5000]

    # Convert timestamp
    if 'timestamp' in df.columns:
        df['timestamp'] = pd.to_datetime(df['timestamp'])
    elif 'time' in df.columns:
        df['timestamp'] = pd.to_datetime(df['time'])
    elif 'date' in df.columns:
        df['timestamp'] = pd.to_datetime(df['date'])

    # Set timestamp as index
    df.set_index('timestamp', inplace=True)

    # Calculate all features
    df = calculate_technical_indicators(df)
    df = calculate_statistical_features(df)

    # Create lagged features for important indicators
    important_features = ['close', 'volume', 'RSI', 'MACD']
    df = create_lagged_features(df, important_features)

    # Initialize feature processor
    processor = FeatureProcessor()

    # Normalize features
    normalized_df = prepare_features_for_ml(df, processor)

    # Create feature matrix for ML
    feature_matrix = create_feature_matrix(df, normalized_df)

    # Print feature information
    print("\nFeature Matrix Info:")
    print(feature_matrix.info())

    # Calculate and print feature correlations
    correlation_matrix = feature_matrix.corr()
    print("\nTop Feature Correlations with Target:")
    print(correlation_matrix['target'].sort_values(ascending=False).head(10))

    # Apply PCA to reduce dimensionality
    features_for_pca = feature_matrix.drop('target', axis=1)
    pca_features = processor.apply_pca(features_for_pca)

    print("\nPCA Components Explained Variance Ratio:")
    print(processor.pca.explained_variance_ratio_)

    # Save processed features
    feature_matrix.to_csv('processed_features.csv')

    return df, normalized_df, feature_matrix

if __name__ == "__main__":
    df, normalized_df, feature_matrix = main()

# Visualization of normalized features
def plot_normalized_features(normalized_df, original_df):
    fig = make_subplots(rows=3, cols=1,
                       subplot_titles=('Normalized Price Features',
                                     'Normalized Technical Indicators',
                                     'Normalized Statistical Features'),
                       vertical_spacing=0.1,
                       row_heights=[0.4, 0.3, 0.3])

    # Plot normalized price features
    price_features = [col for col in normalized_df.columns if any(x in col for x in ['close', 'high', 'low', 'open'])]
    for feature in price_features:
        fig.add_trace(
            go.Scatter(y=normalized_df[feature], name=feature),
            row=1, col=1
        )

    # Plot normalized technical indicators
    tech_features = [col for col in normalized_df.columns if any(x in col for x in ['RSI', 'MACD', 'BB'])]
    for feature in tech_features:
        fig.add_trace(
            go.Scatter(y=normalized_df[feature], name=feature),
            row=2, col=1
        )

    # Plot normalized statistical features
    stat_features = [col for col in normalized_df.columns if 'rolling_' in col]
    for feature in stat_features[:5]:  # Plot first 5 statistical features
        fig.add_trace(
            go.Scatter(y=normalized_df[feature], name=feature),
            row=3, col=1
        )

    fig.update_layout(height=1200, title_text="Normalized Features Comparison")
    fig.show()

# Plot normalized features
plot_normalized_features(normalized_df, df)


Feature Matrix Info:
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 4617 entries, 2017-08-17 04:20:00 to 2017-08-20 15:18:00
Data columns (total 33 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   close_norm            4617 non-null   float64
 1   high_norm             4617 non-null   float64
 2   low_norm              4617 non-null   float64
 3   open_norm             4617 non-null   float64
 4   RSI_norm              4617 non-null   float64
 5   MACD_norm             4617 non-null   float64
 6   MACD_Signal_norm      4617 non-null   float64
 7   BB_Upper_norm         4617 non-null   float64
 8   BB_Lower_norm         4617 non-null   float64
 9   rolling_mean_5_norm   4617 non-null   float64
 10  rolling_std_5_norm    4617 non-null   float64
 11  rolling_skew_5_norm   4617 non-null   float64
 12  rolling_kurt_5_norm   4617 non-null   float64
 13  rolling_mean_10_norm  4617 non-null   float64
 14  rolling_std_10

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, classification_report
from sklearn.preprocessing import StandardScaler
import xgboost as xgb
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, LSTM

class MLTradingStrategy:
    def __init__(self, feature_matrix, lookback_period=10, prediction_horizon=1):
        self.feature_matrix = feature_matrix
        self.lookback_period = lookback_period
        self.prediction_horizon = prediction_horizon
        self.model = None
        self.scaler = StandardScaler()

    def prepare_data(self):
        """Prepare data for ML model"""
        # Create binary classification target (1 for price increase, 0 for decrease)
        self.feature_matrix['target_direction'] = np.where(
            self.feature_matrix['target'] > 0, 1, 0
        )

        # Split features and target
        X = self.feature_matrix.drop(['target', 'target_direction'], axis=1)
        y = self.feature_matrix['target_direction']

        # Create time series split
        tscv = TimeSeriesSplit(n_splits=5)
        return X, y, tscv

    def train_random_forest(self, X, y):
        """Train Random Forest model"""
        self.model = RandomForestClassifier(
            n_estimators=100,
            max_depth=10,
            random_state=42
        )
        self.model.fit(X, y)

    def train_xgboost(self, X, y):
        """Train XGBoost model"""
        self.model = xgb.XGBClassifier(
            n_estimators=100,
            max_depth=5,
            learning_rate=0.1,
            random_state=42
        )
        self.model.fit(X, y)

    def train_neural_network(self, X, y):
        """Train Neural Network model"""
        model = Sequential([
            Dense(64, activation='relu', input_shape=(X.shape[1],)),
            Dropout(0.2),
            Dense(32, activation='relu'),
            Dropout(0.2),
            Dense(1, activation='sigmoid')
        ])

        model.compile(optimizer='adam',
                     loss='binary_crossentropy',
                     metrics=['accuracy'])

        self.model = model
        self.model.fit(X, y, epochs=50, batch_size=32, verbose=0)

    def evaluate_model(self, X, y):
        """Evaluate model performance"""
        if isinstance(self.model, tf.keras.Model):
            y_pred = (self.model.predict(X) > 0.5).astype(int)
        else:
            y_pred = self.model.predict(X)

        return {
            'accuracy': accuracy_score(y, y_pred),
            'precision': precision_score(y, y_pred),
            'recall': recall_score(y, y_pred)
        }

class BacktestFramework:
    def __init__(self, predictions, actual_returns, initial_capital=10000):
        self.predictions = predictions
        self.actual_returns = actual_returns
        self.initial_capital = initial_capital

    def calculate_positions(self):
        """Calculate positions based on predictions"""
        return np.where(self.predictions == 1, 1, -1)

    def calculate_strategy_returns(self):
        """Calculate strategy returns"""
        positions = self.calculate_positions()
        strategy_returns = positions * self.actual_returns
        return strategy_returns

    def calculate_metrics(self):
        """Calculate performance metrics"""
        strategy_returns = self.calculate_strategy_returns()
        cumulative_returns = (1 + strategy_returns).cumprod()

        # Sharpe Ratio
        sharpe_ratio = np.sqrt(252) * (strategy_returns.mean() / strategy_returns.std())

        # Max Drawdown
        rolling_max = cumulative_returns.expanding().max()
        drawdowns = cumulative_returns / rolling_max - 1
        max_drawdown = drawdowns.min()

        # Final portfolio value
        final_value = self.initial_capital * cumulative_returns.iloc[-1]

        return {
            'sharpe_ratio': sharpe_ratio,
            'max_drawdown': max_drawdown,
            'final_value': final_value,
            'total_return': (final_value / self.initial_capital - 1) * 100
        }

    def plot_results(self):
        """Plot backtest results"""
        strategy_returns = self.calculate_strategy_returns()
        cumulative_returns = (1 + strategy_returns).cumprod()

        fig = go.Figure()

        fig.add_trace(
            go.Scatter(
                y=cumulative_returns,
                name='Strategy Returns'
            )
        )

        fig.update_layout(
            title='Trading Strategy Performance',
            yaxis_title='Cumulative Returns',
            xaxis_title='Time'
        )

        fig.show()

def main():
    # Load and prepare data
    X, y, tscv = trading_strategy.prepare_data()

    # Initialize results storage
    results = []

    # Cross-validation
    for train_idx, test_idx in tscv.split(X):
        X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
        y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

        # Train models
        models = {
            'random_forest': RandomForestClassifier(n_estimators=100, random_state=42),
            'xgboost': xgb.XGBClassifier(n_estimators=100, random_state=42),
        }

        for name, model in models.items():
            # Train
            model.fit(X_train, y_train)
1. Feature Engineering:

    Task:
        You are provided with a dataset containing OHLC (Open, High, Low, Close) data for a given asset or set of assets.
        Engineer a comprehensive set of features (N features, where more is better) that capture various aspects of the data. These could include:
            Technical Indicators: Moving averages, RSI, MACD, Bollinger Bands, etc.
            Price Action Features: High-low spreads, momentum, volatility, etc.
            Statistical Features: Rolling means, variances, skewness, kurtosis, etc.
            Lagged Features: Previous price, volume, and other indicators.

2. Normalization and Standardization:

    Task:
        Normalize and standardize the features to ensure they are on the same scale and improve model performance. This step is critical for training machine learning models.

3. Machine Learning Model Development and Backtesting:

    Task:
        Train a machine learning model (e.g., Random Forest, Gradient Boosting, or Neural Networks) on the engineered features to predict future price movements or generate trading signals.
        Implement a backtesting framework to simulate trades based on the predictions and assess the performance of the strategy. Include metrics like:
            Sharpe Ratio
            Max Drawdown
            Accuracy
            Precision and Recall (if classification)
            Cumulative Returns
        Generate reports summarizing the model's performance and trading results.

4. Validation and Out-of-Sample Testing:

    Task:
        Apply the trained model to validation and out-of-sample datasets to ensure the strategy generalizes well and avoids overfitting.
        Compare the performance on the validation and out-of-sample datasets with the results on the training set.

5. Strategy Development:

    Task:
        Based on the model’s predictions and backtest results, design a final trading strategy. Define clear entry/exit rules, risk management techniques, and position sizing based on the model output.

Deliverables:

    Python code or Jupyter Notebook implementing the feature engineering, machine learning model, and backtesting.
    A detailed report including a summary of the features created, model performance (on training, validation, and out-of-sample datasets), and backtest results.
    A tear sheet that includes performance metrics, visualizations like equity curves, and drawdown charts.

            # Predict
            y_pred = model.predict(X_test)

            # Calculate returns
            actual_returns = feature_matrix.loc[X_test.index, 'target']

            # Backtest
            backtest = BacktestFramework(y_pred, actual_returns)
            metrics = backtest.calculate_metrics()

            # Store results
            results.append({
                'model': name,
                'metrics': metrics,
                'predictions': y_pred
            })

    # Print results
    print("\nBacktest Results:")
    for result in results:
        print(f"\nModel: {result['model']}")
        print("Metrics:")
        for metric, value in result['metrics'].items():
            print(f"{metric}: {value:.4f}")

if __name__ == "__main__":
    trading_strategy = MLTradingStrategy(feature_matrix)
    main()


Backtest Results:

Model: random_forest
Metrics:
sharpe_ratio: 1.1100
max_drawdown: -0.0431
final_value: 11438.6100
total_return: 14.3861

Model: xgboost
Metrics:
sharpe_ratio: 0.7989
max_drawdown: -0.0316
final_value: 11009.3233
total_return: 10.0932

Model: random_forest
Metrics:
sharpe_ratio: 0.2422
max_drawdown: -0.0354
final_value: 10284.8391
total_return: 2.8484

Model: xgboost
Metrics:
sharpe_ratio: 0.2015
max_drawdown: -0.0459
final_value: 10231.8955
total_return: 2.3190

Model: random_forest
Metrics:
sharpe_ratio: 0.2451
max_drawdown: -0.0589
final_value: 10348.6321
total_return: 3.4863

Model: xgboost
Metrics:
sharpe_ratio: 0.2265
max_drawdown: -0.0588
final_value: 10318.5599
total_return: 3.1856

Model: random_forest
Metrics:
sharpe_ratio: 0.7456
max_drawdown: -0.0382
final_value: 11261.1878
total_return: 12.6119

Model: xgboost
Metrics:
sharpe_ratio: 1.0435
max_drawdown: -0.0382
final_value: 11827.5991
total_return: 18.2760

Model: random_forest
Metrics:
sharpe_ratio: -0.4

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import ta
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

# 1. Load and prepare the data
def load_data(file_path):
    df = pd.read_csv(file_path)
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df.set_index('timestamp', inplace=True)
    return df

# 2. Feature Engineering
def engineer_features(df):
    # Technical Indicators
    # Trend
    df['sma_7'] = ta.trend.sma_indicator(df['close'], window=7)
    df['sma_21'] = ta.trend.sma_indicator(df['close'], window=21)
    df['ema_9'] = ta.trend.ema_indicator(df['close'], window=9)
    df['macd'] = ta.trend.macd_diff(df['close'])

    # Momentum
    df['rsi'] = ta.momentum.rsi(df['close'])
    df['stoch'] = ta.momentum.stoch(df['high'], df['low'], df['close'])
    df['stoch_signal'] = ta.momentum.stoch_signal(df['high'], df['low'], df['close'])

    # Volatility
    df['bb_high'] = ta.volatility.bollinger_hband(df['close'])
    df['bb_low'] = ta.volatility.bollinger_lband(df['close'])
    df['bb_mid'] = ta.volatility.bollinger_mavg(df['close'])
    df['atr'] = ta.volatility.average_true_range(df['high'], df['low'], df['close'])

    # Volume
    df['obv'] = ta.volume.on_balance_volume(df['close'], df['volume'])

    # Price Action Features
    df['hl_spread'] = df['high'] - df['low']
    df['oc_spread'] = df['open'] - df['close']
    df['daily_return'] = df['close'].pct_change()

    # Statistical Features
    df['rolling_mean_5'] = df['close'].rolling(window=5).mean()
    df['rolling_std_5'] = df['close'].rolling(window=5).std()
    df['rolling_skew_5'] = df['close'].rolling(window=5).skew()

    # Lagged Features
    for i in range(1, 4):
        df[f'close_lag_{i}'] = df['close'].shift(i)
        df[f'volume_lag_{i}'] = df['volume'].shift(i)

    # Target Variable (1 if price goes up in next period, 0 if down)
    df['target'] = (df['close'].shift(-1) > df['close']).astype(int)

    return df

# 3. Data Preprocessing
def preprocess_data(df):
    # Remove NaN values
    df = df.dropna()

    # Separate features and target
    features = df.drop(['target', 'open', 'high', 'low', 'close', 'volume'], axis=1)
    target = df['target']

    # Scale features
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)
    features_scaled = pd.DataFrame(features_scaled, columns=features.columns, index=features.index)

    return features_scaled, target

# 4. Model Training and Evaluation
def train_and_evaluate_model(X, y):
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

    # Train model
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    # Make predictions
    y_pred = model.predict(X_test)

    # Calculate metrics
    accuracy = accuracy_score(y_test, y_pred)
    classification_rep = classification_report(y_test, y_pred)

    return model, X_test, y_test, y_pred, accuracy, classification_rep

# 5. Backtesting
def backtest_strategy(df, predictions, initial_capital=10000):
    portfolio = pd.DataFrame(index=predictions.index)
    portfolio['position'] = predictions
    portfolio['close'] = df['close']
    portfolio['returns'] = portfolio['close'].pct_change()

    # Calculate strategy returns
    portfolio['strategy_returns'] = portfolio['position'].shift(1) * portfolio['returns']
    portfolio['cumulative_returns'] = (1 + portfolio['strategy_returns']).cumprod()
    portfolio['cumulative_value'] = initial_capital * portfolio['cumulative_returns']

    # Calculate metrics
    total_return = (portfolio['cumulative_returns'].iloc[-1] - 1) * 100
    sharpe_ratio = portfolio['strategy_returns'].mean() / portfolio['strategy_returns'].std() * np.sqrt(252)
    max_drawdown = (portfolio['cumulative_value'].cummax() - portfolio['cumulative_value']) / portfolio['cumulative_value'].cummax()
    max_drawdown = max_drawdown.max() * 100

    return portfolio, total_return, sharpe_ratio, max_drawdown

# 6. Plotting Results
def plot_results(portfolio):
    plt.figure(figsize=(15, 10))

    # Plot cumulative returns
    plt.subplot(2, 1, 1)
    portfolio['cumulative_returns'].plot()
    plt.title('Cumulative Returns')
    plt.xlabel('Date')
    plt.ylabel('Returns')

    # Plot drawdown
    plt.subplot(2, 1, 2)
    ((portfolio['cumulative_value'].cummax() - portfolio['cumulative_value']) /
     portfolio['cumulative_value'].cummax()).plot()
    plt.title('Drawdown')
    plt.xlabel('Date')
    plt.ylabel('Drawdown')

    plt.tight_layout()
    plt.show()

# Main execution
def main():
    # Load data
    file_path = 'drive/MyDrive/datasets/quant/ml/BTCUSDT_1m.csv'
    df = load_data(file_path)

    # Engineer features
    df = engineer_features(df)

    # Preprocess data
    X, y = preprocess_data(df)

    # Train and evaluate model
    model, X_test, y_test, y_pred, accuracy, classification_rep = train_and_evaluate_model(X, y)

    # Backtest strategy
    portfolio, total_return, sharpe_ratio, max_drawdown = backtest_strategy(
        df.loc[X_test.index],
        pd.Series(y_pred, index=X_test.index)
    )

    # Print results
    print("\nModel Performance:")
    print(f"Accuracy: {accuracy:.2%}")
    print("\nClassification Report:")
    print(classification_rep)

    print("\nBacktest Results:")
    print(f"Total Return: {total_return:.2f}%")
    print(f"Sharpe Ratio: {sharpe_ratio:.2f}")
    print(f"Maximum Drawdown: {max_drawdown:.2f}%")

    # Plot results
    plot_results(portfolio)

if __name__ == "__main__":
    main()