In [1]:
from typing import Tuple, List
import numpy as np
import pandas as pd
from scipy.stats import ks_2samp, mannwhitneyu
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import TimeSeriesSplit
import warnings
warnings.filterwarnings('ignore')

# Set seeds for reproducibility
np.random.seed(42)

# Preprocess data
df = pd.read_csv("nvidia_10yr_data.csv", parse_dates=["Date"])
df['Date'] = pd.to_datetime(df['Date'], format="%d/%m/%Y")
df = df.sort_values("Date")

# Feature engineering
df['Return'] = df['Close'].pct_change()
df['Volatility'] = df['Close'].rolling(10).std()
df['Price_Diff'] = df['High'] - df['Low']
df['Volume_Log'] = np.log1p(df['Volume'])

# Drop NaN หลัง rolling
df.dropna(inplace=True)

X = df[['Return', 'Volatility', 'Price_Diff', 'Volume_Log']]
y = df['Close']


class LinearRegressionModel:
    """
    Linear Regression model with standardization
    """
    def __init__(self, fit_intercept: bool = True):
        self.fit_intercept = fit_intercept
        self.scaler_X = StandardScaler()
        self.scaler_y = StandardScaler()
        self.model = LinearRegression(fit_intercept=fit_intercept)
        self.is_fitted = False
        
    def fit(self, X: pd.DataFrame, y: pd.Series):
        """
        Train Linear Regression model
        """
        # Scale features
        X_scaled = self.scaler_X.fit_transform(X)
        y_scaled = self.scaler_y.fit_transform(y.values.reshape(-1, 1)).flatten()
        
        # Train model
        self.model.fit(X_scaled, y_scaled)
        self.is_fitted = True
        
        return self
    
    def predict(self, X: pd.DataFrame):
        """
        Make predictions
        """
        if not self.is_fitted:
            raise ValueError("Model not fitted yet")
        
        # Scale features
        X_scaled = self.scaler_X.transform(X)
        
        # Predict
        y_pred_scaled = self.model.predict(X_scaled)
        
        # Inverse transform
        y_pred = self.scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten()
        
        return y_pred


class DriftPointDetector:
    """
    ตรวจจับจุดเกิด concept drift ในข้อมูล time series ด้วยการใช้
    หลายวิธีทดสอบและป้องกันการจับ pattern ที่ผิดพลาด
    """
    def __init__(self, window_size: int = 120, threshold: float = 0.001, 
                 step_size: int = 30, min_effect_size: float = 0.3,
                 stability_window: int = 60, confirmation_tests: int = 2):
        self.window_size = window_size
        self.threshold = threshold
        self.step_size = step_size
        self.min_effect_size = min_effect_size
        self.stability_window = stability_window
        self.confirmation_tests = confirmation_tests
        self.drift_points_: List[int] = []

    def _calculate_effect_size(self, window1: pd.Series, window2: pd.Series) -> float:
        """คำนวณขนาดผลกระทบ (Cohen's d)"""
        mean1, mean2 = window1.mean(), window2.mean()
        std1, std2 = window1.std(), window2.std()
        
        pooled_std = np.sqrt(((len(window1) - 1) * std1**2 + (len(window2) - 1) * std2**2) / 
                           (len(window1) + len(window2) - 2))
        
        if pooled_std == 0:
            return 0
        
        return abs(mean1 - mean2) / pooled_std

    def _test_multiple_statistics(self, window1: pd.DataFrame, window2: pd.DataFrame) -> Tuple[int, float]:
        """ทดสอบหลายวิธีเพื่อยืนยัน drift"""
        passed_tests = 0
        min_p_value = 1.0
        
        for col in window1.columns:
            col_tests = 0
            col_p_values = []
            
            # Test 1: Kolmogorov-Smirnov test
            try:
                stat, p_value = ks_2samp(window1[col], window2[col])
                col_p_values.append(p_value)
                if p_value < self.threshold:
                    col_tests += 1
            except:
                pass
            
            # Test 2: Mann-Whitney U test
            try:
                stat, p_value = mannwhitneyu(window1[col], window2[col], alternative='two-sided')
                col_p_values.append(p_value)
                if p_value < self.threshold:
                    col_tests += 1
            except:
                pass
            
            # Test 3: Effect size check
            effect_size = self._calculate_effect_size(window1[col], window2[col])
            if effect_size > self.min_effect_size:
                col_tests += 1
            
            if col_p_values:
                min_p_value = min(min_p_value, min(col_p_values))
            
            if col_tests >= self.confirmation_tests:
                passed_tests += 1
        
        return passed_tests, min_p_value

    def _check_stability_before_drift(self, X: pd.DataFrame, position: int) -> bool:
        """ตรวจสอบว่าช่วงก่อนหน้ามีเสถียรภาพหรือไม่"""
        if position < self.stability_window + self.window_size:
            return True
        
        stable_start = position - self.stability_window - self.window_size
        stable_end = position - self.window_size
        stable_window = X.iloc[stable_start:stable_end]
        
        mid_point = len(stable_window) // 2
        stable_part1 = stable_window.iloc[:mid_point]
        stable_part2 = stable_window.iloc[mid_point:]
        
        for col in X.columns:
            if len(stable_part1) > 0 and len(stable_part2) > 0:
                try:
                    stat, p_value = ks_2samp(stable_part1[col], stable_part2[col])
                    if p_value < self.threshold * 10:
                        return False
                except:
                    pass
        
        return True

    def _remove_pattern_drifts(self, drift_candidates: List[Tuple[int, float]]) -> List[int]:
        """กรองจุด drift ที่อาจเป็น pattern"""
        if len(drift_candidates) < 3:
            return [pos for pos, _ in drift_candidates]
        
        drift_candidates.sort(key=lambda x: x[0])
        
        intervals = []
        for i in range(1, len(drift_candidates)):
            interval = drift_candidates[i][0] - drift_candidates[i-1][0]
            intervals.append(interval)
        
        filtered_drifts = []
        if len(intervals) > 1:
            interval_std = np.std(intervals)
            interval_mean = np.mean(intervals)
            
            if interval_std / interval_mean < 0.3:
                drift_candidates.sort(key=lambda x: x[1])
                keep_count = max(1, len(drift_candidates) // 3)
                filtered_drifts = [pos for pos, _ in drift_candidates[:keep_count]]
            else:
                filtered_drifts = [pos for pos, _ in drift_candidates]
        else:
            filtered_drifts = [pos for pos, _ in drift_candidates]
        
        final_drifts = []
        min_distance = self.window_size * 2
        
        for pos in sorted(filtered_drifts):
            if not final_drifts or pos - final_drifts[-1] >= min_distance:
                final_drifts.append(pos)
        
        return final_drifts

    def detect(self, X: pd.DataFrame) -> List[int]:
        self.drift_points_ = []
        n = len(X)
        drift_candidates = []
        
        for i in range(self.window_size, n - self.window_size, self.step_size):
            if not self._check_stability_before_drift(X, i):
                continue
            
            window1 = X.iloc[i - self.window_size:i]
            window2 = X.iloc[i:i + self.window_size]
            
            passed_tests, min_p_value = self._test_multiple_statistics(window1, window2)
            
            if passed_tests >= 1:
                drift_candidates.append((i, min_p_value))
        
        self.drift_points_ = self._remove_pattern_drifts(drift_candidates)
        
        return self.drift_points_


class AdaptiveFoldGenerator:
    """
    สร้าง train/test folds โดยแบ่งตาม drift points ที่ตรวจจับได้
    """
    def __init__(self, min_fold_size: int = 200, test_ratio: float = 0.2):
        self.min_fold_size = min_fold_size
        self.test_ratio = test_ratio

    def split(self, X: pd.DataFrame, drift_points: List[int]) -> List[Tuple[np.ndarray, np.ndarray]]:
        folds = []
        points = [0] + drift_points + [len(X)]
        
        for i in range(len(points) - 1):
            start, end = points[i], points[i + 1]
            fold_length = end - start

            if fold_length < self.min_fold_size:
                continue

            split = int(start + (1 - self.test_ratio) * fold_length)
            train_idx = np.arange(start, split)
            test_idx = np.arange(split, end)

            # เพิ่มขนาดขั้นต่ำ
            if len(train_idx) > 50 and len(test_idx) > 20:
                folds.append((train_idx, test_idx))
        
        return folds


class DriftAdaptiveTimeSeriesCV:
    """
    ทำ cross-validation โดยใช้ fold ที่แบ่งตาม drift points สำหรับ Linear Regression
    """
    def __init__(self, model_params: dict = None):
        self.model_params = model_params or {'fit_intercept': True}

    def run(self, X: pd.DataFrame, y: pd.Series, drift_points: List[int]) -> Tuple[List[float], List[float]]:
        fold_gen = AdaptiveFoldGenerator()
        metrics_rmse, metrics_mae = [], []

        folds = fold_gen.split(X, drift_points)
        if not folds:
            print("Warning: No valid folds generated by AdaptiveFoldGenerator!")
            return [], []

        for i, (train_idx, test_idx) in enumerate(folds):
            print(f"\n[Adaptive Fold {i+1}] Training Linear Regression...")
            
            X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
            y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

            # สร้างโมเดล Linear Regression ใหม่สำหรับแต่ละ fold
            model = LinearRegressionModel(**self.model_params)
            
            try:
                model.fit(X_train, y_train)
                y_pred = model.predict(X_test)
                
                rmse = np.sqrt(mean_squared_error(y_test, y_pred))
                mae = mean_absolute_error(y_test, y_pred)
                
                print(f"[Adaptive Fold {i+1}] RMSE={rmse:.4f}, MAE={mae:.4f}")
                
                metrics_rmse.append(rmse)
                metrics_mae.append(mae)
                    
            except Exception as e:
                print(f"[Adaptive Fold {i+1}] Error: {e}")
                continue

        return metrics_rmse, metrics_mae


class BaselineTimeSeriesCV:
    """
    ทำ cross-validation แบบ TimeSeriesSplit ปกติ สำหรับ Linear Regression
    """
    def __init__(self, model_params: dict = None, n_splits: int = 5):
        self.model_params = model_params or {'fit_intercept': True}
        self.n_splits = n_splits

    def run(self, X: pd.DataFrame, y: pd.Series) -> Tuple[List[float], List[float]]:
        tscv = TimeSeriesSplit(n_splits=self.n_splits)
        metrics_rmse, metrics_mae = [], []

        for i, (train_idx, test_idx) in enumerate(tscv.split(X)):
            print(f"\n[Baseline Fold {i+1}] Training Linear Regression...")
            
            X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
            y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

            # สร้างโมเดล Linear Regression ใหม่สำหรับแต่ละ fold
            model = LinearRegressionModel(**self.model_params)
            
            try:
                model.fit(X_train, y_train)
                y_pred = model.predict(X_test)
                
                rmse = np.sqrt(mean_squared_error(y_test, y_pred))
                mae = mean_absolute_error(y_test, y_pred)
                
                print(f"[Baseline Fold {i+1}] RMSE={rmse:.4f}, MAE={mae:.4f}")
                
                metrics_rmse.append(rmse)
                metrics_mae.append(mae)
                    
            except Exception as e:
                print(f"[Baseline Fold {i+1}] Error: {e}")
                continue

        return metrics_rmse, metrics_mae


class LinearRegressionAnalysis:
    """
    วิเคราะห์ประสิทธิภาพของ Linear Regression
    """
    def __init__(self, model_params: dict = None):
        self.model_params = model_params or {'fit_intercept': True}
        
    def analyze(self, X: pd.DataFrame, y: pd.Series, drift_points: List[int]):
        """
        วิเคราะห์ Linear Regression ด้วย adaptive CV
        """
        print(f"\n{'='*50}")
        print(f"Linear Regression Analysis")
        print(f"{'='*50}")
        
        # Adaptive CV
        drift_cv = DriftAdaptiveTimeSeriesCV(self.model_params)
        drift_rmse, drift_mae = drift_cv.run(X, y, drift_points)
        
        # Baseline CV
        baseline_cv = BaselineTimeSeriesCV(self.model_params, n_splits=5)
        base_rmse, base_mae = baseline_cv.run(X, y)
        
        results = {
            'adaptive_rmse': drift_rmse,
            'adaptive_mae': drift_mae,
            'baseline_rmse': base_rmse,
            'baseline_mae': base_mae
        }
        
        return results
    
    def print_summary(self, results: dict):
        """
        พิมพ์สรุปผลลัพธ์
        """
        print("\n" + "="*80)
        print("LINEAR REGRESSION ANALYSIS SUMMARY")
        print("="*80)
        
        print(f"\nLinear Regression Results:")
        print("-" * 40)
        
        # Adaptive results
        if results['adaptive_rmse'] and results['adaptive_mae']:
            avg_rmse = np.mean(results['adaptive_rmse'])
            avg_mae = np.mean(results['adaptive_mae'])
            std_rmse = np.std(results['adaptive_rmse'])
            std_mae = np.std(results['adaptive_mae'])
            print(f"Adaptive CV:")
            print(f"  - RMSE: {avg_rmse:.4f} ± {std_rmse:.4f}")
            print(f"  - MAE:  {avg_mae:.4f} ± {std_mae:.4f}")
            print(f"  - Folds: {len(results['adaptive_rmse'])}")
        else:
            print("Adaptive CV - No valid results")
        
        # Baseline results
        if results['baseline_rmse'] and results['baseline_mae']:
            avg_rmse = np.mean(results['baseline_rmse'])
            avg_mae = np.mean(results['baseline_mae'])
            std_rmse = np.std(results['baseline_rmse'])
            std_mae = np.std(results['baseline_mae'])
            print(f"Baseline CV:")
            print(f"  - RMSE: {avg_rmse:.4f} ± {std_rmse:.4f}")
            print(f"  - MAE:  {avg_mae:.4f} ± {std_mae:.4f}")
            print(f"  - Folds: {len(results['baseline_rmse'])}")
        else:
            print("Baseline CV - No valid results")
        
        # เปรียบเทียบ
        if (results['adaptive_rmse'] and results['adaptive_mae'] and 
            results['baseline_rmse'] and results['baseline_mae']):
            
            adaptive_avg_rmse = np.mean(results['adaptive_rmse'])
            baseline_avg_rmse = np.mean(results['baseline_rmse'])
            
            adaptive_avg_mae = np.mean(results['adaptive_mae'])
            baseline_avg_mae = np.mean(results['baseline_mae'])
            
            print(f"\nComparison:")
            print(f"  - RMSE improvement: {((baseline_avg_rmse - adaptive_avg_rmse) / baseline_avg_rmse * 100):.2f}%")
            print(f"  - MAE improvement: {((baseline_avg_mae - adaptive_avg_mae) / baseline_avg_mae * 100):.2f}%")
            
            if adaptive_avg_rmse < baseline_avg_rmse:
                print("  - Adaptive CV shows better performance!")
            else:
                print("  - Baseline CV shows better performance!")


# --------------------------------------------------------
# ตัวอย่างการใช้งาน
if __name__ == "__main__":
    # 1) Detect drift points
    detector = DriftPointDetector(
        window_size=120, 
        threshold=0.001, 
        step_size=30,
        min_effect_size=0.3,
        stability_window=60,
        confirmation_tests=2
    )
    drift_points = detector.detect(X)
    print(f"Detected drift points at indices: {drift_points}")
    print(f"Total drift points found: {len(drift_points)}")

    # 2) วิเคราะห์ Linear Regression
    model_params = {'fit_intercept': True}
    
    analyzer = LinearRegressionAnalysis(model_params)
    results = analyzer.analyze(X, y, drift_points)
    analyzer.print_summary(results)
    
    # 3) ตัวอย่างการใช้โมเดล Linear Regression แยกกัน
    print("\n" + "="*50)
    print("INDIVIDUAL LINEAR REGRESSION EXAMPLE")
    print("="*50)
    
    # แบ่งข้อมูลเป็น train/test
    split_idx = int(len(X) * 0.8)
    X_train, X_test = X.iloc[:split_idx], X.iloc[split_idx:]
    y_train, y_test = y.iloc[:split_idx], y.iloc[split_idx:]
    
    # สร้างและเทรนโมเดล
    model = LinearRegressionModel(**model_params)
    model.fit(X_train, y_train)
    
    # ทำนาย
    y_pred = model.predict(X_test)
    
    # คำนวณ metrics
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    mae = mean_absolute_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)
    
    print(f"Single Model Performance:")
    print(f"  - RMSE: {rmse:.4f}")
    print(f"  - MAE:  {mae:.4f}")
    print(f"  - R²:   {r2:.4f}")
    
    print("\nModel created successfully!")
    print("Use .fit(X_train, y_train) to train and .predict(X_test) to predict")

Detected drift points at indices: [120, 420, 660, 1110, 1350, 1710, 1950, 2190]
Total drift points found: 8

Linear Regression Analysis

[Adaptive Fold 1] Training Linear Regression...
[Adaptive Fold 1] RMSE=0.5575, MAE=0.5353

[Adaptive Fold 2] Training Linear Regression...
[Adaptive Fold 2] RMSE=1.3440, MAE=1.2984

[Adaptive Fold 3] Training Linear Regression...
[Adaptive Fold 3] RMSE=1.2679, MAE=1.1673

[Adaptive Fold 4] Training Linear Regression...
[Adaptive Fold 4] RMSE=1.8223, MAE=1.6010

[Adaptive Fold 5] Training Linear Regression...
[Adaptive Fold 5] RMSE=5.4170, MAE=5.0177

[Adaptive Fold 6] Training Linear Regression...
[Adaptive Fold 6] RMSE=2.9093, MAE=2.4371

[Adaptive Fold 7] Training Linear Regression...
[Adaptive Fold 7] RMSE=11.2985, MAE=9.8716

[Adaptive Fold 8] Training Linear Regression...
[Adaptive Fold 8] RMSE=24.7924, MAE=22.5382

[Baseline Fold 1] Training Linear Regression...
[Baseline Fold 1] RMSE=1.7622, MAE=1.3741

[Baseline Fold 2] Training Linear Regress