In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
from typing import Tuple, List, Dict
import warnings
warnings.filterwarnings('ignore')

class LinearRegressionChannelStrategy:
    """
    Linear Regression Channel Trading Strategy
    
    This strategy uses linear regression to create upper and lower channel bounds
    around the price action. Trade signals are generated when price touches these bounds.
    """
    
    def __init__(self, window: int = 20, std_multiplier: float = 2.0):
        """
        Initialize the strategy
        
        Parameters:
        - window: Number of periods for linear regression calculation
        - std_multiplier: Standard deviation multiplier for channel width
        """
        self.window = window
        self.std_multiplier = std_multiplier
        self.signals = None
        self.results = None
        
    def calculate_linear_regression_channel(self, prices: pd.Series) -> pd.DataFrame:
        """
        Calculate linear regression channel with upper and lower bounds
        """
        df = pd.DataFrame({'price': prices})
        df['lr_line'] = np.nan
        df['upper_channel'] = np.nan
        df['lower_channel'] = np.nan
        df['residual_std'] = np.nan
        
        for i in range(self.window - 1, len(prices)):
            window_prices = prices.iloc[i - self.window + 1:i + 1]
            x = np.arange(len(window_prices))
            y = window_prices.values
            
            slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
            
            current_lr_value = slope * (len(window_prices) - 1) + intercept
            df.loc[df.index[i], 'lr_line'] = current_lr_value
            
            predicted_values = slope * x + intercept
            residuals = y - predicted_values
            residual_std = np.std(residuals)
            df.loc[df.index[i], 'residual_std'] = residual_std
            
            df.loc[df.index[i], 'upper_channel'] = current_lr_value + (self.std_multiplier * residual_std)
            df.loc[df.index[i], 'lower_channel'] = current_lr_value - (self.std_multiplier * residual_std)
        
        return df
    
    def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Generate trading signals based on channel touches
        
        Signals:
        - Buy when price touches or goes below lower channel
        - Sell when price touches or goes above upper channel
        - Exit when price crosses back through regression line
        """
        signals = data.copy()
        signals['signal'] = 0
        signals['position'] = 0
        signals['entry_price'] = np.nan
        signals['exit_price'] = np.nan
        signals['trade_return'] = np.nan
        
        position = 0
        entry_price = 0
        
        for i in range(1, len(signals)):
            current_price = signals['price'].iloc[i]
            prev_price = signals['price'].iloc[i-1]
            upper_channel = signals['upper_channel'].iloc[i]
            lower_channel = signals['lower_channel'].iloc[i]
            lr_line = signals['lr_line'].iloc[i]
            
            if pd.isna(upper_channel) or pd.isna(lower_channel) or pd.isna(lr_line):
                signals['position'].iloc[i] = position
                continue
            
            if position == 0:
                if current_price <= lower_channel:
                    position = 1
                    entry_price = current_price
                    signals['signal'].iloc[i] = 1
                    signals['entry_price'].iloc[i] = entry_price
                
                elif current_price >= upper_channel:
                    position = -1
                    entry_price = current_price
                    signals['signal'].iloc[i] = -1
                    signals['entry_price'].iloc[i] = entry_price
            
            elif position == 1:
                if current_price >= lr_line or current_price >= upper_channel:
                    signals['signal'].iloc[i] = -1  
                    signals['exit_price'].iloc[i] = current_price
                    signals['trade_return'].iloc[i] = (current_price - entry_price) / entry_price
                    position = 0
                    entry_price = 0
            
            elif position == -1: 
                if current_price <= lr_line or current_price <= lower_channel:
                    signals['signal'].iloc[i] = 1 
                    signals['exit_price'].iloc[i] = current_price
                    signals['trade_return'].iloc[i] = (entry_price - current_price) / entry_price
                    position = 0
                    entry_price = 0
            
            signals['position'].iloc[i] = position
        
        return signals
    
    def backtest(self, prices: pd.Series, initial_capital: float = 10000) -> Dict:
        """
        Backtest the strategy and return performance metrics
        """
        channel_data = self.calculate_linear_regression_channel(prices)
        
        self.signals = self.generate_signals(channel_data)
        
        returns = self.signals['trade_return'].dropna()
        
        if len(returns) == 0:
            return {
                'total_return': 0,
                'num_trades': 0,
                'win_rate': 0,
                'avg_return': 0,
                'sharpe_ratio': 0,
                'max_drawdown': 0
            }
        
        total_return = (1 + returns).prod() - 1
        num_trades = len(returns)
        win_rate = (returns > 0).sum() / num_trades
        avg_return = returns.mean()
        sharpe_ratio = returns.mean() / returns.std() * np.sqrt(252) if returns.std() > 0 else 0
        
        cumulative_returns = (1 + returns).cumprod()
        rolling_max = cumulative_returns.expanding().max()
        drawdown = (cumulative_returns - rolling_max) / rolling_max
        max_drawdown = drawdown.min()
        
        self.results = {
            'total_return': total_return,
            'num_trades': num_trades,
            'win_rate': win_rate,
            'avg_return': avg_return,
            'sharpe_ratio': sharpe_ratio,
            'max_drawdown': max_drawdown,
            'returns': returns
        }
        
        return self.results
    
    def plot_strategy(self, figsize: Tuple[int, int] = (15, 10)):
        """
        Plot the price chart with linear regression channel and signals
        """
        if self.signals is None:
            print("No signals generated. Run backtest first.")
            return
        
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=figsize, height_ratios=[3, 1])
        
        ax1.plot(self.signals.index, self.signals['price'], label='Price', color='black', linewidth=1)
        ax1.plot(self.signals.index, self.signals['lr_line'], label='Linear Regression Line', 
                color='blue', linewidth=1.5)
        ax1.plot(self.signals.index, self.signals['upper_channel'], label='Upper Channel', 
                color='red', linewidth=1, linestyle='--')
        ax1.plot(self.signals.index, self.signals['lower_channel'], label='Lower Channel', 
                color='green', linewidth=1, linestyle='--')
        
        ax1.fill_between(self.signals.index, self.signals['upper_channel'], 
                        self.signals['lower_channel'], alpha=0.1, color='gray')
        
        buy_signals = self.signals[self.signals['signal'] == 1]
        sell_signals = self.signals[self.signals['signal'] == -1]
        
        ax1.scatter(buy_signals.index, buy_signals['price'], 
                   marker='^', color='green', s=100, label='Buy Signal', zorder=5)
        ax1.scatter(sell_signals.index, sell_signals['price'], 
                   marker='v', color='red', s=100, label='Sell Signal', zorder=5)
        
        ax1.set_title('Linear Regression Channel Trading Strategy')
        ax1.set_ylabel('Price')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        ax2.plot(self.signals.index, self.signals['position'], label='Position', 
                color='purple', linewidth=2)
        ax2.set_ylabel('Position')
        ax2.set_xlabel('Time')
        ax2.set_ylim(-1.5, 1.5)
        ax2.grid(True, alpha=0.3)
        ax2.legend()
        
        plt.tight_layout()
        plt.show()
    
    def print_performance(self, epoch=None):
        if epoch != None:
          print("Epoch: ", epoch)
        """
        Print strategy performance metrics
        """
        if self.results is None:
            print("No results available. Run backtest first.")
            return
        
        print("=" * 50)
        print("LINEAR REGRESSION CHANNEL STRATEGY PERFORMANCE")
        print("=" * 50)
        print(f"Total Return: {self.results['total_return']:.2%}")
        print(f"Number of Trades: {self.results['num_trades']}")
        print(f"Win Rate: {self.results['win_rate']:.2%}")
        print(f"Average Return per Trade: {self.results['avg_return']:.2%}")
        print(f"Sharpe Ratio: {self.results['sharpe_ratio']:.2f}")
        print(f"Maximum Drawdown: {self.results['max_drawdown']:.2%}")
        print("=" * 50)

def generate_sample_data(n_points: int = 1000) -> pd.Series:
    """
    Generate sample price data for testing
    """
    np.random.seed(42)
    
    trend = np.linspace(100, 150, n_points)
    noise = np.random.normal(0, 2, n_points)
    cyclical = 5 * np.sin(np.linspace(0, 4 * np.pi, n_points))
    
    price_data = trend + noise + cyclical
    
    dates = pd.date_range(start='2023-01-01', periods=n_points, freq='D')
    
    return pd.Series(price_data, index=dates)

if __name__ == "__main__":
    print("Generating sample price data...")
    sample_prices = generate_sample_data(500)
    
    strategy = LinearRegressionChannelStrategy(window=20, std_multiplier=2.0)
    
    print("Running backtest...")
    results = strategy.backtest(sample_prices)
    
    strategy.print_performance()
    
    print("\nPlotting strategy results...")
    strategy.plot_strategy()
    
    print("\nTesting different parameters...")
    
    windows = [10, 20, 30]
    std_multipliers = [1.5, 2.0, 2.5]
    
    best_sharpe = -np.inf
    best_params = None
    
    for window in windows:
        for std_mult in std_multipliers:
            test_strategy = LinearRegressionChannelStrategy(window=window, std_multiplier=std_mult)
            test_results = test_strategy.backtest(sample_prices)
            
            if test_results['sharpe_ratio'] > best_sharpe:
                best_sharpe = test_results['sharpe_ratio']
                best_params = (window, std_mult)
    
    print(f"Best parameters: Window={best_params[0]}, Std Multiplier={best_params[1]}")
    print(f"Best Sharpe Ratio: {best_sharpe:.2f}")

In [None]:
"""RL model"""
import gymnasium
from gymnasium import spaces
import numpy as np
import pandas as pd
from typing import Tuple

class LRChannelEnv(gymnasium.Env):
    def __init__(self, price_series: pd.Series, window_range: Tuple[int, int], std_range: Tuple[float, float]):
        super().__init__()
        self.prices = price_series
        self.window_range = window_range
        self.std_range = std_range
        
        self.action_space = spaces.Box(
            low=np.array([window_range[0], std_range[0]]),
            high=np.array([window_range[1], std_range[1]]),
            dtype=np.float32
        )
        
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(len(price_series),), dtype=np.float32
        )
        
    def reset(self, seed=None, options=None):
      super().reset(seed=seed)
      self.current_step = 0
      obs = self.prices.values.astype(np.float32)
      info = {}  
      return obs, info

    def step(self, action):
      window = int(action[0])
      std_mult = float(action[1])
      
      strategy = LinearRegressionChannelStrategy(window=window, std_multiplier=std_mult)
      results = strategy.backtest(self.prices)

      reward = results['sharpe_ratio'] if results['sharpe_ratio'] is not None else -1.0
      terminated = True
      truncated = False
      obs = self.prices.values.astype(np.float32)
      info = results

      return obs, reward, terminated, truncated, info

from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env

price_data = generate_sample_data(500)

env = LRChannelEnv(price_series=price_data, window_range=(10, 40), std_range=(1.0, 3.0))

check_env(env)

model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=10000)

In [None]:
"""Tester"""

obs = env.reset()
action, _states = model.predict(obs)
print("Agent’s chosen parameters:", action)

# Apply those parameters to strategy
best_window = int(action[0])
best_std = float(action[1])

strategy = LinearRegressionChannelStrategy(window=best_window, std_multiplier=best_std)
strategy.backtest(price_data)
strategy.print_performance()
strategy.plot_strategy()
