# In this notebook we would create the LIGHTGBM model


In [None]:
DROP_COLS = [ 
    'open', 'high', 'low', 'high_low', 'high_close', 'low_close', 'typical_price',
    'volume_breakout', 'volume_breakdown', 'break_upper_band', 'break_lower_band',
    'vol_spike_1_5x', 'rsi_oversold', 'rsi_overbought', 'stoch_overbought',
    'stoch_oversold', 'cci_overbought', 'cci_oversold', 'near_upper_band',
    'near_lower_band', 'overbought_reversal', 'oversold_reversal',
    'ema_cross_up', 'ema_cross_down', 'macd_cross_up', 'macd_cross_down',
    'trending_market', 'trend_alignment', 'ema7_above_ema21', 'macd_rising',
    'bollinger_upper', 'bollinger_lower', 'bullish_scenario_1',
    'bullish_scenario_5', 'bearish_scenario_1'
]

In [None]:
"""
Improved LightGBM hyperparameter optimization for BTC direction prediction
--------------------------------------------------------------------------
* Maintains chronological order with proper validation
* Optimizes weighted-F1 with threshold tuning
* Comprehensive parameter search space
* Robust error handling and cross-validation
* Enhanced evaluation metrics
"""

import numpy as np
import pandas as pd
import random
import time
import warnings
from pathlib import Path
from lightgbm import LGBMClassifier, early_stopping, log_evaluation
from sklearn.metrics import (precision_recall_fscore_support, roc_auc_score, 
                           precision_recall_curve, classification_report)
from sklearn.model_selection import TimeSeriesSplit
import optuna
import matplotlib.pyplot as plt
import seaborn as sns

warnings.filterwarnings('ignore')

class LightGBMOptimizer:
    def __init__(self, csv_path, drop_cols=None, val_frac=0.20, w_precision=2.0, 
                 random_state=42, use_optuna=True):
        """
        Initialize LightGBM Optimizer
        
        Parameters:
        -----------
        csv_path : str
            Path to the CSV dataset
        drop_cols : list
            Columns to drop from dataset
        val_frac : float
            Fraction of data for validation (chronological split)
        w_precision : float
            Weight for precision in weighted F1 calculation
        random_state : int
            Random state for reproducibility
        use_optuna : bool
            Whether to use Optuna for optimization (vs random search)
        """
        self.csv_path = csv_path
        self.drop_cols = drop_cols or []
        self.val_frac = val_frac
        self.w_precision = w_precision
        self.random_state = random_state
        self.use_optuna = use_optuna
        self.best_params = None
        self.best_score = -1
        self.best_threshold = 0.5
        
    def load_and_preprocess_data(self):
        """Load and preprocess the dataset"""
        print("Loading and preprocessing data...")
        
        # Load data
        df = pd.read_csv(self.csv_path, index_col=0, parse_dates=True)
        print(f"Original dataset shape: {df.shape}")
        
        # Drop specified columns
        existing_drop_cols = [c for c in self.drop_cols if c in df.columns]
        if existing_drop_cols:
            df.drop(columns=existing_drop_cols, inplace=True)
            print(f"Dropped columns: {existing_drop_cols}")
        
        # Log transform volume (if exists)
        if "Volume BTC" in df.columns:
            df["Volume BTC"] = np.log1p(df["Volume BTC"])
            print("Applied log1p transformation to Volume BTC")
        
        # Create target (next period direction)
        if "target" not in df.columns:
            df["target"] = (df["close"].shift(-1) > df["close"]).astype(int)
            print("Created target column (next period direction)")
        
        # Clean data
        df = df.dropna().select_dtypes(include=[np.number])
        print(f"Final dataset shape after cleaning: {df.shape}")
        
        # Separate features and target
        X = df.drop(columns=["target"])
        y = df["target"].astype(int)
        
        # Chronological split
        split_idx = int(len(df) * (1 - self.val_frac))
        self.X_train = X.iloc[:split_idx]
        self.X_val = X.iloc[split_idx:]
        self.y_train = y.iloc[:split_idx]
        self.y_val = y.iloc[split_idx:]
        
        print(f"Training set: {self.X_train.shape[0]} samples")
        print(f"Validation set: {self.X_val.shape[0]} samples")
        
        # Check class distribution
        train_dist = self.y_train.value_counts().sort_index()
        val_dist = self.y_val.value_counts().sort_index()
        print(f"Training class distribution: {dict(train_dist)}")
        print(f"Validation class distribution: {dict(val_dist)}")
        
        return self.X_train, self.X_val, self.y_train, self.y_val
    
    def weighted_f1_with_threshold_tuning(self, y_true, y_pred_prob, w=2.0):
        """
        Calculate weighted F1 with optimal threshold tuning
        
        Parameters:
        -----------
        y_true : array
            True binary labels
        y_pred_prob : array
            Predicted probabilities
        w : float
            Weight for precision in weighted F1
        
        Returns:
        --------
        best_score : float
            Best weighted F1 score
        best_threshold : float
            Optimal threshold
        """
        # Try different thresholds
        thresholds = np.arange(0.1, 0.9, 0.05)
        best_score = -1
        best_threshold = 0.5
        
        for thr in thresholds:
            y_pred = (y_pred_prob >= thr).astype(int)
            
            # Handle edge cases
            if len(np.unique(y_pred)) < 2:
                continue
                
            try:
                prec, rec, f1, _ = precision_recall_fscore_support(
                    y_true, y_pred, average="binary", pos_label=1, zero_division=0
                )
                
                # Weighted F1: emphasizes precision more
                if prec + rec > 0:
                    weighted_f1 = (1 + w) * prec * rec / (w * prec + rec)
                    if weighted_f1 > best_score:
                        best_score = weighted_f1
                        best_threshold = thr
            except:
                continue
        
        return best_score, best_threshold
    
    def evaluate_model(self, model, X_val, y_val):
        """Comprehensive model evaluation"""
        y_pred_prob = model.predict_proba(X_val)[:, 1]
        
        # Get weighted F1 with optimal threshold
        weighted_f1_score, optimal_threshold = self.weighted_f1_with_threshold_tuning(
            y_val, y_pred_prob, self.w_precision
        )
        
        # Calculate other metrics with optimal threshold
        y_pred = (y_pred_prob >= optimal_threshold).astype(int)
        
        try:
            prec, rec, f1, _ = precision_recall_fscore_support(
                y_val, y_pred, average="binary", pos_label=1, zero_division=0
            )
            roc_auc = roc_auc_score(y_val, y_pred_prob)
        except:
            prec = rec = f1 = roc_auc = 0.0
        
        return {
            'weighted_f1': weighted_f1_score,
            'optimal_threshold': optimal_threshold,
            'precision': prec,
            'recall': rec,
            'f1': f1,
            'roc_auc': roc_auc
        }
    
    def objective_optuna(self, trial):
        """Objective function for Optuna optimization"""
        try:
            # Suggest parameters with expanded search space
            params = {
                'objective': 'binary',
                'metric': 'binary_logloss',
                'boosting_type': 'gbdt',
                'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
                'num_leaves': trial.suggest_int('num_leaves', 10, 300),
                'max_depth': trial.suggest_int('max_depth', 3, 15),
                'feature_fraction': trial.suggest_float('feature_fraction', 0.6, 1.0),
                'bagging_fraction': trial.suggest_float('bagging_fraction', 0.6, 1.0),
                'bagging_freq': trial.suggest_int('bagging_freq', 1, 7),
                'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
                'min_child_weight': trial.suggest_float('min_child_weight', 1e-3, 10.0, log=True),
                'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 10.0, log=True),
                'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 10.0, log=True),
                'subsample_for_bin': trial.suggest_int('subsample_for_bin', 50000, 300000),
                'min_split_gain': trial.suggest_float('min_split_gain', 0.0, 15.0),
                'verbose': -1,
                'random_state': self.random_state,
                'n_jobs': -1
            }
            
            # Create and train model
            model = LGBMClassifier(
                n_estimators=2000,
                **params
            )
            
            model.fit(
                self.X_train, self.y_train,
                eval_set=[(self.X_val, self.y_val)],
                eval_metric="binary_logloss",
                callbacks=[
                    early_stopping(stopping_rounds=100),
                    log_evaluation(0)  # silent
                ]
            )
            
            # Evaluate model
            metrics = self.evaluate_model(model, self.X_val, self.y_val)
            return metrics['weighted_f1']
            
        except Exception as e:
            print(f"Trial failed: {e}")
            return 0.0
    
    def random_search(self, n_trials=50):
        """Random search optimization"""
        print(f"Starting random search with {n_trials} trials...")
        
        # Expanded search space
        space = {
            "learning_rate": [0.01, 0.02, 0.05, 0.1, 0.15, 0.2],
            "num_leaves": [15, 31, 63, 127, 255],
            "max_depth": [3, 5, 7, 10, 12, 15],
            "feature_fraction": [0.6, 0.7, 0.8, 0.9, 1.0],
            "bagging_fraction": [0.6, 0.7, 0.8, 0.9, 1.0],
            "bagging_freq": [1, 3, 5],
            "min_child_samples": [5, 10, 20, 40, 80],
            "reg_alpha": [0.0, 0.1, 0.5, 1.0, 2.0],
            "reg_lambda": [0.0, 0.1, 0.5, 1.0, 2.0]
        }
        
        def random_param():
            return {k: random.choice(v) for k, v in space.items()}
        
        best_score = -1
        best_params = None
        best_threshold = 0.5
        
        for t in range(1, n_trials + 1):
            try:
                params = random_param()
                params.update({
                    'objective': 'binary',
                    'metric': 'binary_logloss',
                    'verbose': -1,
                    'random_state': self.random_state + t
                })
                
                model = LGBMClassifier(
                    n_estimators=2000,
                    **params
                )
                
                model.fit(
                    self.X_train, self.y_train,
                    eval_set=[(self.X_val, self.y_val)],
                    eval_metric="binary_logloss",
                    callbacks=[
                        early_stopping(stopping_rounds=100),
                        log_evaluation(0)
                    ]
                )
                
                metrics = self.evaluate_model(model, self.X_val, self.y_val)
                score = metrics['weighted_f1']
                
                print(f"Trial {t:02d} → weighted-F1 = {score:.4f}, threshold = {metrics['optimal_threshold']:.3f}")
                
                if score > best_score:
                    best_score = score
                    best_params = params
                    best_threshold = metrics['optimal_threshold']
                    
            except Exception as e:
                print(f"Trial {t:02d} failed: {e}")
                continue
        
        self.best_score = best_score
        self.best_params = best_params
        self.best_threshold = best_threshold
        
        return best_params, best_score, best_threshold
    
    def optuna_search(self, n_trials=100, timeout=3600):
        """Optuna-based optimization"""
        print(f"Starting Optuna optimization with {n_trials} trials...")
        
        # Create study
        study = optuna.create_study(
            direction='maximize',
            sampler=optuna.samplers.TPESampler(seed=self.random_state)
        )
        
        # Optimize
        study.optimize(self.objective_optuna, n_trials=n_trials, timeout=timeout)
        
        # Get best parameters
        self.best_params = study.best_params
        self.best_params.update({
            'objective': 'binary',
            'metric': 'binary_logloss',
            'verbose': -1,
            'random_state': self.random_state
        })
        
        # Train best model to get threshold
        best_model = LGBMClassifier(n_estimators=2000, **self.best_params)
        best_model.fit(
            self.X_train, self.y_train,
            eval_set=[(self.X_val, self.y_val)],
            eval_metric="binary_logloss",
            callbacks=[early_stopping(stopping_rounds=100), log_evaluation(0)]
        )
        
        metrics = self.evaluate_model(best_model, self.X_val, self.y_val)
        self.best_score = metrics['weighted_f1']
        self.best_threshold = metrics['optimal_threshold']
        
        return self.best_params, self.best_score, self.best_threshold
    
    def cross_validate(self, params, n_splits=5):
        """Time series cross-validation"""
        print(f"Performing time series cross-validation with {n_splits} splits...")
        
        tscv = TimeSeriesSplit(n_splits=n_splits)
        scores = []
        
        # Combine train and val for CV
        X_full = pd.concat([self.X_train, self.X_val])
        y_full = pd.concat([self.y_train, self.y_val])
        
        for fold, (train_idx, val_idx) in enumerate(tscv.split(X_full)):
            try:
                X_train_fold = X_full.iloc[train_idx]
                X_val_fold = X_full.iloc[val_idx]
                y_train_fold = y_full.iloc[train_idx]
                y_val_fold = y_full.iloc[val_idx]
                
                model = LGBMClassifier(n_estimators=1000, **params)
                model.fit(
                    X_train_fold, y_train_fold,
                    eval_set=[(X_val_fold, y_val_fold)],
                    eval_metric="binary_logloss",
                    callbacks=[early_stopping(stopping_rounds=50), log_evaluation(0)]
                )
                
                metrics = self.evaluate_model(model, X_val_fold, y_val_fold)
                scores.append(metrics['weighted_f1'])
                print(f"  Fold {fold + 1}: {metrics['weighted_f1']:.4f}")
                
            except Exception as e:
                print(f"  Fold {fold + 1} failed: {e}")
                continue
        
        cv_mean = np.mean(scores) if scores else 0.0
        cv_std = np.std(scores) if scores else 0.0
        
        print(f"CV weighted-F1: {cv_mean:.4f} ± {cv_std:.4f}")
        return cv_mean, cv_std
    
    def optimize(self, method='optuna', n_trials=100, timeout=3600):
        """
        Run hyperparameter optimization
        
        Parameters:
        -----------
        method : str
            'optuna' or 'random'
        n_trials : int
            Number of trials
        timeout : int
            Timeout in seconds (for Optuna)
        """
        start_time = time.time()
        
        if method == 'optuna':
            best_params, best_score, best_threshold = self.optuna_search(n_trials, timeout)
        else:
            best_params, best_score, best_threshold = self.random_search(n_trials)
        
        end_time = time.time()
        
        print(f"\\nOptimization completed in {end_time - start_time:.1f} seconds")
        print("\\n" + "="*50)
        print("BEST PARAMETERS")
        print("="*50)
        
        for k, v in best_params.items():
            if k not in ['objective', 'metric', 'verbose', 'random_state']:
                print(f"{k:<20}: {v}")
        
        print(f"\\nBest weighted-F1   : {best_score:.4f}")
        print(f"Optimal threshold   : {best_threshold:.3f}")
        
        # Cross-validation with best parameters
        cv_mean, cv_std = self.cross_validate(best_params)
        
        return {
            'best_params': best_params,
            'best_score': best_score,
            'best_threshold': best_threshold,
            'cv_mean': cv_mean,
            'cv_std': cv_std
        }

def main():
    """Main execution function"""
    
    # Configuration
    CSV_PATH = r"C:\\Users\\ADMIN\\Desktop\\Coding_projects\\stock_market_prediction\\Stock-Market-Prediction\\data\\processed\\gemini_btc_data_final_version_with_features_2016_final.csv"
    DROP_COLS = ["vol_ratio_24h", "macd_diff", "macd_line", "upper_shadow", "lower_shadow"]
    VAL_FRAC = 0.20
    W_PRECISION = 2.0
    
    # Initialize optimizer
    optimizer = LightGBMOptimizer(
        csv_path=CSV_PATH,
        drop_cols=DROP_COLS,
        val_frac=VAL_FRAC,
        w_precision=W_PRECISION,
        random_state=42,
        use_optuna=True
    )
    
    try:
        # Load and preprocess data
        X_train, X_val, y_train, y_val = optimizer.load_and_preprocess_data()
        
        # Run optimization
        results = optimizer.optimize(
            method='optuna',  # or 'random'
            n_trials=100,
            timeout=3600  # 1 hour
        )
        
        print("\\n🎉 Optimization completed successfully!")
        return optimizer, results
        
    except Exception as e:
        print(f"❌ Error: {e}")
        import traceback
        traceback.print_exc()
        return None, None

if __name__ == "__main__":
    optimizer, results = main()

Training until validation scores don't improve for 150 rounds
Early stopping, best iteration is:
[22]	valid_0's binary_logloss: 0.69157
Trial 01 → weighted-F1 = 0.4786  |  params: {'learning_rate': 0.02, 'num_leaves': 63, 'feature_fraction': 0.8, 'bagging_fraction': 0.7, 'bagging_freq': 1, 'min_child_samples': 40}
Training until validation scores don't improve for 150 rounds
Early stopping, best iteration is:
[5]	valid_0's binary_logloss: 0.692375
Trial 02 → weighted-F1 = 0.5135  |  params: {'learning_rate': 0.05, 'num_leaves': 127, 'feature_fraction': 0.9, 'bagging_fraction': 0.8, 'bagging_freq': 1, 'min_child_samples': 20}
Training until validation scores don't improve for 150 rounds
Early stopping, best iteration is:
[22]	valid_0's binary_logloss: 0.691437
Trial 03 → weighted-F1 = 0.4799  |  params: {'learning_rate': 0.02, 'num_leaves': 63, 'feature_fraction': 0.9, 'bagging_fraction': 0.8, 'bagging_freq': 1, 'min_child_samples': 40}
Training until validation scores don't improve for

In [9]:
"""
Final LightGBM model
--------------------
Uses the best mini-search parameters:

    learning_rate     = 0.05
    num_leaves        = 63
    feature_fraction  = 0.9
    bagging_fraction  = 0.8
    bagging_freq      = 1
    min_child_samples = 20

Prints accuracy, precision, recall and F1 for each class.
"""

# ───────────────────── imports ─────────────────────
import numpy as np
import pandas as pd
from lightgbm import LGBMClassifier, early_stopping, log_evaluation
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

# ────────── file path & columns to drop ────────────
CSV_PATH  = r"C:\Users\ADMIN\Desktop\Coding_projects\stock_market_prediction\Stock-Market-Prediction\data\processed\gemini_btc_data_final_version_with_features_2016_final.csv"
DROP_COLS = ["vol_ratio_24h", "macd_diff", "macd_line",
             "upper_shadow", "lower_shadow"]

VAL_FRAC = 0.20         # 80 % train · 20 % validation
PREC_W   = 2.0          # precision weight for weighted-F1

# ───────────────── data preparation ────────────────
df = pd.read_csv(CSV_PATH, index_col=0, parse_dates=True)
df.drop(columns=[c for c in DROP_COLS if c in df.columns], inplace=True)
df["Volume BTC"] = np.log1p(df["Volume BTC"])

df["target"] = (df["close"].shift(-1) > df["close"]).astype(int)   # 1 = Up
df = df.dropna().select_dtypes(include=[np.number])

X = df.drop(columns=["target"])
y = df["target"].astype(int)

split_idx = int(len(df) * (1 - VAL_FRAC))
X_train, X_val = X.iloc[:split_idx], X.iloc[split_idx:]
y_train, y_val = y.iloc[:split_idx], y.iloc[split_idx:]

# ─────────── best-parameter LightGBM model ─────────
best_params = dict(
    objective         = "binary",
    learning_rate     = 0.05,
    num_leaves        = 63,
    feature_fraction  = 0.9,
    bagging_fraction  = 0.8,
    bagging_freq      = 1,
    min_child_samples = 20,
    n_estimators      = 4000,   # large upper bound – early stop will trim
    verbose           = -1,
    random_state      = 42
)

model = LGBMClassifier(**best_params)

model.fit(
    X_train, y_train,
    eval_set=[(X_val, y_val)],
    eval_metric="binary_logloss",
    callbacks=[
        early_stopping(stopping_rounds=300, first_metric_only=True),
        log_evaluation(100)
    ]
)

# ─────────────────── evaluation ────────────────────
y_prob = model.predict_proba(X_val)[:, 1]
y_pred = (y_prob >= 0.5).astype(int)

acc  = accuracy_score(y_val, y_pred)
prec, rec, f1, _ = precision_recall_fscore_support(
    y_val, y_pred, labels=[0, 1], zero_division=0
)

print("\n──── Validation metrics (thr = 0.50) ────")
print(f"Accuracy          : {acc:6.3f}")
print(f"Class 0 (Down) →  Precision: {prec[0]:6.3f}  Recall: {rec[0]:6.3f}  F1: {f1[0]:6.3f}")
print(f"Class 1 (Up  ) →  Precision: {prec[1]:6.3f}  Recall: {rec[1]:6.3f}  F1: {f1[1]:6.3f}")
print(f"Macro-F1          : {f1.mean():6.3f}")


Training until validation scores don't improve for 300 rounds
[100]	valid_0's binary_logloss: 0.701439
[200]	valid_0's binary_logloss: 0.705638
[300]	valid_0's binary_logloss: 0.709782
Early stopping, best iteration is:
[6]	valid_0's binary_logloss: 0.691879
Evaluated only: binary_logloss

──── Validation metrics (thr = 0.50) ────
Accuracy          :  0.529
Class 0 (Down) →  Precision:  0.517  Recall:  0.580  F1:  0.547
Class 1 (Up  ) →  Precision:  0.544  Recall:  0.481  F1:  0.511
Macro-F1          :  0.529
