# Fit Logistic model

In [1]:
import pandas as pd
import numpy as np
import os
import joblib
from glob import glob
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, roc_auc_score, classification_report
from sklearn.preprocessing import StandardScaler

# Paths
input_folder = "data/processed/stock_data"
summary_folder = "summary"
model_folder = os.path.join(summary_folder, "logistic")
os.makedirs(model_folder, exist_ok=True)

summary_results = []

for file_path in glob(os.path.join(input_folder, "*.csv")):
    stock_name = os.path.basename(file_path).replace("_features.csv", "")
    print(f"\n=== Processing {stock_name} ===")

    # Load data
    df = pd.read_csv(file_path)
    df.dropna(inplace=True)

    # Create binary target: 1 if price goes up tomorrow, 0 if down
    df['Direction'] = (df['Close'].shift(-1) > df['Close']).astype(int)
    df.dropna(inplace=True)

    # Features
    X = df.drop(columns=[
        "Date","Close","Target","Direction",
        "High","Low","Open","High_lag1","Low_lag1","Open_lag1"
    ])
    y = df['Direction']

    # Split
    train_size = int(len(X) * 0.8)
    X_train, X_test = X.iloc[:train_size], X.iloc[train_size:]
    y_train, y_test = y.iloc[:train_size], y.iloc[train_size:]

    # Scale features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # Logistic Regression
    log_model = LogisticRegression(max_iter=1000)
    log_model.fit(X_train_scaled, y_train)
    y_pred = log_model.predict(X_test_scaled)
    y_proba = log_model.predict_proba(X_test_scaled)[:, 1]

    # Metrics
    acc = accuracy_score(y_test, y_pred)
    auc = roc_auc_score(y_test, y_proba)
    print(f"Logistic Regression Accuracy: {acc:.4f}, AUC: {auc:.4f}")
    print(classification_report(y_test, y_pred))

    # Save model and scaler
    model_path = os.path.join(model_folder, f"{stock_name}_logistic_model.pkl")
    scaler_path = os.path.join(model_folder, f"{stock_name}_scaler.pkl")
    joblib.dump(log_model, model_path)
    joblib.dump(scaler, scaler_path)
    print(f"Saved model to {model_path} and scaler to {scaler_path}")

    # Append metrics to summary
    summary_results.append({
        "Stock": stock_name,
        "Accuracy": acc,
        "AUC": auc
    })

# Save summary
summary_df = pd.DataFrame(summary_results)
os.makedirs(summary_folder, exist_ok=True)
summary_file = os.path.join(summary_folder, "stock_data_model_summary_logistic.csv")
summary_df.to_csv(summary_file, index=False)
print(f"\nSummary saved to {summary_file}")



=== Processing AAPL_daily ===
Logistic Regression Accuracy: 0.5363, AUC: 0.4990
              precision    recall  f1-score   support

           0       0.50      0.10      0.16      1040
           1       0.54      0.91      0.68      1205

    accuracy                           0.54      2245
   macro avg       0.52      0.51      0.42      2245
weighted avg       0.52      0.54      0.44      2245

Saved model to summary\logistic\AAPL_daily_logistic_model.pkl and scaler to summary\logistic\AAPL_daily_scaler.pkl

=== Processing GE_daily ===
Logistic Regression Accuracy: 0.4822, AUC: 0.4880
              precision    recall  f1-score   support

           0       0.48      0.69      0.57      1582
           1       0.48      0.28      0.35      1614

    accuracy                           0.48      3196
   macro avg       0.48      0.48      0.46      3196
weighted avg       0.48      0.48      0.46      3196

Saved model to summary\logistic\GE_daily_logistic_model.pkl and scaler 

# Test logistic model 

In [None]:
import pandas as pd
import numpy as np
import os
import joblib
from glob import glob
import matplotlib.pyplot as plt
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

class DonchianBreakoutBacktester:
    def __init__(self, initial_capital=10000, commission=0.001):
        self.initial_capital = initial_capital
        self.commission = commission
        
    def load_model_and_scaler(self, model_path, scaler_path):
        """Load the trained model and scaler"""
        model = joblib.load(model_path)
        scaler = joblib.load(scaler_path)
        return model, scaler
    
    def calculate_donchian_channels(self, df, period=20):
        """Calculate Donchian channel breakouts"""
        df = df.copy()
        df['DC_High'] = df['High'].rolling(window=period).max()
        df['DC_Low'] = df['Low'].rolling(window=period).min()
        df['DC_Mid'] = (df['DC_High'] + df['DC_Low']) / 2
        return df
    
    def prepare_features(self, df):
        """Prepare features matching the training data format"""
        # Remove columns that were excluded during training
        feature_cols = [col for col in df.columns if col not in [
            "Date", "Close", "Target", "Direction", "High", "Low", "Open", 
            "High_lag1", "Low_lag1", "Open_lag1", "DC_High", "DC_Low", "DC_Mid"
        ]]
        return df[feature_cols]
    
    def generate_signals(self, df, model, scaler, confidence_threshold=0.6, donchian_period=20):
        """Generate trading signals using ML model and Donchian breakouts"""
        df = df.copy()
        
        # Calculate Donchian channels
        df = self.calculate_donchian_channels(df, period=donchian_period)
        
        # Prepare features for ML model
        X = self.prepare_features(df)
        
        # Handle missing values
        X = X.fillna(method='ffill').fillna(0)
        
        # Scale features
        X_scaled = scaler.transform(X)
        
        # Get ML predictions
        ml_probabilities = model.predict_proba(X_scaled)[:, 1]
        
        # Generate combined signals
        df['ML_Signal'] = (ml_probabilities > confidence_threshold).astype(int)
        df['DC_Long_Signal'] = (df['Close'] > df['DC_High'].shift(1)).astype(int)
        df['DC_Short_Signal'] = (df['Close'] < df['DC_Low'].shift(1)).astype(int)
        
        # Combined strategy: ML confirms Donchian breakouts
        df['Long_Signal'] = (df['DC_Long_Signal'] & df['ML_Signal']).astype(int)
        df['Short_Signal'] = (df['DC_Short_Signal'] & (1 - df['ML_Signal'])).astype(int)
        
        return df
    
    def backtest_strategy(self, df):
        """Execute the backtest with simple position logic"""
        df = df.copy()
        df['Position'] = 0
        df['Portfolio_Value'] = self.initial_capital
        df['Cash'] = self.initial_capital
        df['Holdings'] = 0
        df['Shares'] = 0
        
        position = 0  # 1 for long, -1 for short, 0 for neutral
        cash = self.initial_capital
        shares = 0
        
        for i in range(1, len(df)):
            current_price = df.loc[df.index[i], 'Close']
            prev_price = df.loc[df.index[i-1], 'Close']
            
            # Exit on opposite signal
            if position != 0:
                if (position == 1 and df.loc[df.index[i], 'Short_Signal']) or \
                   (position == -1 and df.loc[df.index[i], 'Long_Signal']):
                    # Close position
                    if position == 1:  # Close long
                        cash = shares * current_price * (1 - self.commission)
                        shares = 0
                    else:  # Close short (simplified PnL)
                        cash = cash + (shares * (prev_price - current_price)) * (1 - self.commission)
                        shares = 0
                    position = 0
            
            # Entry
            if position == 0:
                if df.loc[df.index[i], 'Long_Signal']:
                    # Enter long
                    shares = (cash * 0.95) / current_price  # Use 95% of cash
                    cash = cash - (shares * current_price * (1 + self.commission))
                    position = 1
                elif df.loc[df.index[i], 'Short_Signal']:
                    # Enter short (simplified)
                    shares = (cash * 0.95) / current_price
                    position = -1
            
            # Update portfolio value
            if position == 1:
                portfolio_value = cash + (shares * current_price)
            elif position == -1:
                portfolio_value = cash - (shares * (current_price - prev_price))
            else:
                portfolio_value = cash
            
            df.loc[df.index[i], 'Position'] = position
            df.loc[df.index[i], 'Portfolio_Value'] = portfolio_value
            df.loc[df.index[i], 'Cash'] = cash
            df.loc[df.index[i], 'Shares'] = shares
        
        return df
    
    def calculate_returns(self, df):
        """Calculate returns and performance metrics"""
        df = df.copy()
        df['Strategy_Return'] = df['Portfolio_Value'].pct_change()
        df['Benchmark_Return'] = df['Close'].pct_change()
        df['Cumulative_Strategy'] = (1 + df['Strategy_Return']).cumprod()
        df['Cumulative_Benchmark'] = (1 + df['Benchmark_Return']).cumprod()
        return df
    
    def calculate_metrics(self, df):
        """Calculate comprehensive performance metrics"""
        strategy_returns = df['Strategy_Return'].dropna()
        benchmark_returns = df['Benchmark_Return'].dropna()
        
        # Basic metrics
        total_return = (df['Portfolio_Value'].iloc[-1] / self.initial_capital) - 1
        benchmark_total_return = (df['Close'].iloc[-1] / df['Close'].iloc[0]) - 1
        
        # Volatility metrics
        strategy_vol = strategy_returns.std() * np.sqrt(252)
        benchmark_vol = benchmark_returns.std() * np.sqrt(252)
        
        # Risk-free rate (assumed)
        risk_free_rate = 0.02
        
        # Sharpe ratio
        excess_returns = strategy_returns - (risk_free_rate / 252)
        sharpe_ratio = (excess_returns.mean() / strategy_returns.std() * np.sqrt(252)) if strategy_returns.std() > 0 else 0
        
        # Sortino ratio
        downside_returns = strategy_returns[strategy_returns < 0]
        downside_std = (downside_returns.std() * np.sqrt(252)) if len(downside_returns) > 0 else 0
        sortino_ratio = (excess_returns.mean() / downside_std * np.sqrt(252)) if downside_std > 0 else 0
        
        # Maximum drawdown
        cumulative = df['Cumulative_Strategy'].dropna()
        running_max = cumulative.expanding().max()
        drawdown = (cumulative - running_max) / running_max
        max_drawdown = drawdown.min()
        
        # Win rate and profit factor (very simplified: uses returns around position flips)
        trades = df[df['Position'] != df['Position'].shift()]['Strategy_Return'].dropna()
        if len(trades) > 0:
            win_rate = (trades > 0).sum() / len(trades)
            winning_trades = trades[trades > 0].sum()
            losing_trades = abs(trades[trades < 0].sum())
            profit_factor = winning_trades / losing_trades if losing_trades > 0 else float('inf')
        else:
            win_rate = 0
            profit_factor = 0
        
        return {
            'Total_Return': total_return,
            'Benchmark_Return': benchmark_total_return,
            'Annual_Volatility': strategy_vol,
            'Sharpe_Ratio': sharpe_ratio,
            'Sortino_Ratio': sortino_ratio,
            'Max_Drawdown': max_drawdown,
            'Win_Rate': win_rate,
            'Profit_Factor': profit_factor,
            'Total_Trades': int(len(trades)) if len(trades) > 0 else 0
        }
    
    def monte_carlo_permutation_test(self, df, n_permutations=1000, random_state=42):
        """
        Perform Monte Carlo permutation test on Sharpe ratio.
        Returns dict including the full array of permuted Sharpes for plotting.
        """
        strategy_returns = df['Strategy_Return'].dropna().values
        
        if len(strategy_returns) == 0:
            return {
                'p_value': 1.0, 'is_significant': False,
                'actual_sharpe': 0.0,
                'permuted_sharpes_mean': np.nan,
                'permuted_sharpes_std': np.nan,
                'permuted_sharpes': np.array([])
            }
        
        # Actual strategy performance
        actual_sharpe = self.calculate_sharpe_from_returns(strategy_returns)
        
        # Generate random permutations (reproducible)
        rng = np.random.default_rng(random_state)
        permuted_sharpes = np.empty(n_permutations, dtype=float)
        for i in range(n_permutations):
            permuted_returns = rng.permutation(strategy_returns)
            permuted_sharpes[i] = self.calculate_sharpe_from_returns(permuted_returns)
        
        # Calculate p-value (one-sided: permuted Sharpe >= actual Sharpe)
        p_value = np.mean(permuted_sharpes >= actual_sharpe)
        
        return {
            'p_value': float(p_value),
            'is_significant': bool(p_value < 0.05),
            'actual_sharpe': float(actual_sharpe),
            'permuted_sharpes_mean': float(np.mean(permuted_sharpes)),
            'permuted_sharpes_std': float(np.std(permuted_sharpes, ddof=1)),
            'permuted_sharpes': permuted_sharpes
        }
    
    def calculate_sharpe_from_returns(self, returns):
        """Helper function to calculate Sharpe ratio from returns"""
        if len(returns) == 0 or np.std(returns) == 0:
            return 0.0
        excess_returns = returns - (0.02 / 252)  # Risk-free rate (daily)
        return np.mean(excess_returns) / np.std(returns, ddof=1) * np.sqrt(252)

    @staticmethod
    def plot_permutation_distribution(stock_name, mc_results, save_dir):
        """
        Plot histogram of permuted Sharpe ratios with vertical line at actual Sharpe.
        """
        os.makedirs(save_dir, exist_ok=True)
        perm = mc_results['permuted_sharpes']
        if perm.size == 0:
            print(f"[{stock_name}] No returns to plot permutation distribution.")
            return None
        
        actual = mc_results['actual_sharpe']
        pval = mc_results['p_value']
        
        plt.figure(figsize=(10, 6))
        plt.hist(perm, bins=40, alpha=0.8, edgecolor='black')
        plt.axvline(actual, linestyle='--', linewidth=2, label=f'Actual Sharpe = {actual:.3f}')
        plt.title(f'{stock_name} — Monte Carlo Permutation Test (Sharpe)\n'
                  f'p-value = {pval:.4f}')
        plt.xlabel('Permuted Sharpe Ratios')
        plt.ylabel('Frequency')
        plt.legend(loc='upper right')
        plt.tight_layout()
        
        outpath = os.path.join(save_dir, f"{stock_name}_mc_permutation_sharpe.png")
        plt.savefig(outpath, dpi=300, bbox_inches='tight')
        plt.close()
        print(f"[{stock_name}] Permutation plot saved: {outpath}")
        return outpath


def run_backtests():
    """Main function to run backtests for all stocks"""
    # Paths
    input_folder = "data/processed/stock_data"
    model_folder = "summary/logistic"
    output_folder = "backtest_results_logistic"
    plots_folder = os.path.join(output_folder, "permutation_plots_logistic")
    os.makedirs(output_folder, exist_ok=True)
    os.makedirs(plots_folder, exist_ok=True)
    
    backtester = DonchianBreakoutBacktester(initial_capital=10000)
    results_summary = []
    
    # Get all available models
    model_files = glob(os.path.join(model_folder, "*_logistic_model.pkl"))
    
    for model_file in model_files:
        stock_name = os.path.basename(model_file).replace("_logistic_model.pkl", "")
        scaler_file = os.path.join(model_folder, f"{stock_name}_scaler.pkl")
        data_file = os.path.join(input_folder, f"{stock_name}_features.csv")
        
        if not os.path.exists(scaler_file) or not os.path.exists(data_file):
            print(f"Missing files for {stock_name}, skipping...")
            continue
        
        print(f"\n=== Backtesting {stock_name} ===")
        
        try:
            # Load model and data
            model, scaler = backtester.load_model_and_scaler(model_file, scaler_file)
            df = pd.read_csv(data_file)
            
            # Use last 500 points for out-of-sample backtesting
            df = df.tail(500).copy().reset_index(drop=True)
            df.dropna(inplace=True)
            
            if len(df) < 50:  # Minimum data requirement
                print(f"Insufficient data for {stock_name}")
                continue
            
            # Generate signals and run backtest
            df = backtester.generate_signals(df, model, scaler, confidence_threshold=0.6, donchian_period=20)
            df = backtester.backtest_strategy(df)
            df = backtester.calculate_returns(df)
            
            # Calculate performance metrics
            metrics = backtester.calculate_metrics(df)
            
            # Monte Carlo permutation test (return full distribution for plotting)
            mc_results = backtester.monte_carlo_permutation_test(df, n_permutations=1000, random_state=42)
            
            # Plot and save permutation distribution for this stock
            DonchianBreakoutBacktester.plot_permutation_distribution(
                stock_name, mc_results, plots_folder
            )
            
            # Combine results
            result = {
                'Stock': stock_name,
                **metrics,
                **{f'MC_{k}': v for k, v in mc_results.items() if k != 'permuted_sharpes'}  # exclude large array
            }
            results_summary.append(result)
            
            # Save individual backtest results
            df.to_csv(os.path.join(output_folder, f"{stock_name}_backtest.csv"), index=False)
            
            # Print summary
            print(f"Total Return: {metrics['Total_Return']:.2%}")
            print(f"Sharpe Ratio: {metrics['Sharpe_Ratio']:.3f}")
            print(f"Sortino Ratio: {metrics['Sortino_Ratio']:.3f}")
            print(f"Max Drawdown: {metrics['Max_Drawdown']:.2%}")
            print(f"Profit Factor: {metrics['Profit_Factor']:.3f}")
            print(f"Win Rate: {metrics['Win_Rate']:.2%}")
            print(f"P-value: {mc_results['p_value']:.4f}")
            print(f"Statistically Significant: {mc_results['is_significant']}")
            
        except Exception as e:
            print(f"Error processing {stock_name}: {str(e)}")
            continue
    
    # Save summary results
    if results_summary:
        results_df = pd.DataFrame(results_summary)
        results_df.to_csv(os.path.join(output_folder, "backtest_summary.csv"), index=False)
        
        # Overall summary
        print("\n" + "="*50)
        print("OVERALL BACKTEST SUMMARY")
        print("="*50)
        print(f"Total Stocks Backtested: {len(results_df)}")
        print(f"Average Return: {results_df['Total_Return'].mean():.2%}")
        print(f"Average Sharpe: {results_df['Sharpe_Ratio'].mean():.3f}")
        print(f"Average Sortino: {results_df['Sortino_Ratio'].mean():.3f}")
        print(f"Average Profit Factor: {results_df['Profit_Factor'].mean():.3f}")
        print(f"Profitable Strategies: {(results_df['Total_Return'] > 0).sum()}/{len(results_df)}")
        print(f"Statistically Significant: {results_df['MC_is_significant'].sum()}/{len(results_df)}")
        
        # Top performers
        print(f"\nTop 5 by Total Return:")
        top_returns = results_df.nlargest(5, 'Total_Return')[['Stock', 'Total_Return', 'Sharpe_Ratio', 'MC_p_value']]
        print(top_returns.to_string(index=False))
        
        print(f"\nTop 5 by Sharpe Ratio:")
        top_sharpe = results_df.nlargest(5, 'Sharpe_Ratio')[['Stock', 'Total_Return', 'Sharpe_Ratio', 'MC_p_value']]
        print(top_sharpe.to_string(index=False))
    
    return results_summary

if __name__ == "__main__":
    results = run_backtests()
