In [18]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from typing import Tuple, Dict
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
from tensorflow.keras.models import Sequential # type: ignore
from tensorflow.keras.layers import LSTM, Dense, Conv1D, MaxPooling1D, Flatten, Dropout # type: ignore # type: ignore
from typing import Dict

#ingor warnings
import warnings
warnings.filterwarnings("ignore")

class ChartPatternDetector:
    """CNN model for detecting chart patterns (H1)"""
    def __init__(self, sequence_length: int):
        self.sequence_length = sequence_length
        self.model = self._build_cnn()
        
    def _build_cnn(self) -> Sequential:
        model = Sequential([
            Conv1D(32, 3, activation='relu', input_shape=(self.sequence_length, 1)),
            MaxPooling1D(2),
            Conv1D(64, 3, activation='relu'),
            MaxPooling1D(2),
            Conv1D(64, 3, activation='relu'),
            Flatten(),
            Dense(64, activation='relu'),
            Dense(4, activation='softmax')  # head&shoulders, double top/bottom, no pattern
        ])
        model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
        return model
    
    def train(self, X_train: np.ndarray, y_train: np.ndarray, epochs: int = 10, batch_size: int = 32):
        """Train the CNN model"""
        self.model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_split=0.2)
    
    def prepare_pattern_data(self, prices: np.array) -> np.array:
        """Prepare data for pattern detection"""
        windows = []
        for i in range(len(prices) - self.sequence_length):
            window = prices[i:(i + self.sequence_length)]
            normalized_window = (window - window.min()) / (window.max() - window.min())
            windows.append(normalized_window)
        return np.array(windows)
    
    def evaluate_patterns(self, y_true, y_pred) -> Dict:
        """Calculate pattern recognition metrics"""
        return {
            'accuracy': accuracy_score(y_true.argmax(axis=1), y_pred.argmax(axis=1)),
            'precision': precision_score(y_true.argmax(axis=1), y_pred.argmax(axis=1), average='weighted'),
            'recall': recall_score(y_true.argmax(axis=1), y_pred.argmax(axis=1), average='weighted'),
            'f1': f1_score(y_true.argmax(axis=1), y_pred.argmax(axis=1), average='weighted')
        }

class PricePredictor:
    """RNN model for price prediction compared to traditional methods (H2)"""
    def __init__(self, sequence_length: int, n_features: int):
        self.sequence_length = sequence_length
        self.n_features = n_features
        self.rnn_model = self._build_rnn()
        
    def _build_rnn(self) -> Sequential:
        model = Sequential([
            LSTM(100, return_sequences=True, input_shape=(self.sequence_length, self.n_features)),
            Dropout(0.2),
            LSTM(100, return_sequences=False),
            Dropout(0.2),
            Dense(1)
        ])
        model.compile(optimizer='adam', loss='mse')
        return model
    
    def moving_average_strategy(self, prices: pd.Series, short_window=20, long_window=50) -> pd.Series:
        """Traditional moving average crossover strategy"""
        short_ma = prices.rolling(window=short_window).mean()
        long_ma = prices.rolling(window=long_window).mean()
        signals = pd.Series(0, index=prices.index)
        signals[short_ma > long_ma] = 1  # Buy signal
        signals[short_ma < long_ma] = -1  # Sell signal
        return signals

    def compare_performance(self, rnn_predictions: np.array, ma_signals: pd.Series, 
                          actual_returns: pd.Series) -> Dict:
        """Compare RNN vs Moving Average performance"""
        rnn_returns = pd.Series(rnn_predictions).pct_change()
        ma_returns = ma_signals.shift(1) * actual_returns
        
        return {
            'rnn_sharpe': self.calculate_sharpe_ratio(rnn_returns),
            'ma_sharpe': self.calculate_sharpe_ratio(ma_returns),
            'rnn_max_drawdown': self.calculate_max_drawdown(rnn_returns),
            'ma_max_drawdown': self.calculate_max_drawdown(ma_returns)
        }

class TradingSystem:
    """Combined trading system using multiple indicators (H3)"""
    def __init__(self, pattern_detector: ChartPatternDetector, price_predictor: PricePredictor):
        self.pattern_detector = pattern_detector
        self.price_predictor = price_predictor
        
    def generate_signals(self, data: pd.DataFrame) -> pd.Series:
        """Generate trading signals combining all indicators"""
        pattern_signals = self.pattern_detector.predict(data['Close'].values)
        price_predictions = self.price_predictor.predict(data)
        technical_signals = self.analyze_technical_indicators(data)
        
        # Combine signals using weighted approach
        combined_signals = (
            0.4 * pattern_signals +
            0.4 * np.sign(price_predictions - data['Close'].values) +
            0.2 * technical_signals
        )
        
        return pd.Series(combined_signals, index=data.index)
    
    def analyze_technical_indicators(self, data: pd.DataFrame) -> np.array:
        """Analyze technical indicators for signal generation"""
        signals = np.zeros(len(data))
        
        # RSI signals
        signals[data['RSI'] < 30] += 1  # Oversold
        signals[data['RSI'] > 70] -= 1  # Overbought
        
        # MACD signals
        signals[data['MACD'] > data['Signal_Line']] += 1
        signals[data['MACD'] < data['Signal_Line']] -= 1
        
        # Bollinger Bands signals
        signals[data['Close'] < data['Lower_Band']] += 1
        signals[data['Close'] > data['Upper_Band']] -= 1
        
        return signals
    
    def calculate_performance_metrics(self, signals: pd.Series, 
                                   returns: pd.Series) -> Dict:
        """Calculate trading performance metrics"""
        strategy_returns = signals.shift(1) * returns
        
        return {
            'sharpe_ratio': self.calculate_sharpe_ratio(strategy_returns),
            'max_drawdown': self.calculate_max_drawdown(strategy_returns),
            'total_return': (1 + strategy_returns).prod() - 1,
            'annualized_return': (1 + strategy_returns).prod() ** (252/len(returns)) - 1
        }
    
    @staticmethod
    def calculate_sharpe_ratio(returns: pd.Series, risk_free_rate=0.02) -> float:
        """Calculate Sharpe Ratio"""
        excess_returns = returns - risk_free_rate/252
        return np.sqrt(252) * excess_returns.mean() / returns.std()
    
    @staticmethod
    def calculate_max_drawdown(returns: pd.Series) -> float:
        """Calculate Maximum Drawdown"""
        cumulative = (1 + returns).cumprod()
        running_max = cumulative.expanding().max()
        drawdowns = cumulative / running_max - 1
        return drawdowns.min()

class DataLoader:
    def __init__(self, data_path: str):
        self.data_path = data_path
        self.scaler = MinMaxScaler()
        
    def load_and_prepare_data(self) -> Tuple[np.ndarray, np.ndarray]:
        # Load master data
        df = pd.read_csv(self.data_path)
        df['Date'] = pd.to_datetime(df['Date'])
        
        # Sort by date
        df = df.sort_values('Date')
        
        # Prepare features
        feature_columns = ['Close', 'Volume', 'RSI', 'MACD', 'Signal_Line', 
                         'Upper_Band', 'Lower_Band', 'Returns', 'Volatility', 'Volume_Ratio']
        
        # Scale features
        scaled_data = self.scaler.fit_transform(df[feature_columns])
        
        # Prepare sequences
        X, y = self._prepare_sequences(scaled_data)
        
        # Split into train/test
        split_idx = int(len(X) * 0.8)
        X_train, X_test = X[:split_idx], X[split_idx:]
        y_train, y_test = y[:split_idx], y[split_idx:]
        
        return {'train': X_train, 'test': X_test}, {'train': y_train, 'test': y_test}
    
    def _prepare_sequences(self, data: np.ndarray, sequence_length: int = 60) -> Tuple[np.ndarray, np.ndarray]:
        X, y = [], []
        for i in range(len(data) - sequence_length):
            X.append(data[i:(i + sequence_length)])
            y.append(data[i + sequence_length, 0])
        return np.array(X), np.array(y)

def main():
    # Initialize data loader
    data_loader = DataLoader('stock_data/sp500_master_data.csv')
    X_dict, y_dict = data_loader.load_and_prepare_data()
    
    # Initialize models from previous implementation
    pattern_detector = ChartPatternDetector(sequence_length=60)
    price_predictor = PricePredictor(sequence_length=60, n_features=10)
    trading_system = TradingSystem(pattern_detector, price_predictor)
    
    # Dictionary to store results
    results = {}
    
    # Process each stock
    for symbol in X_dict.keys():
        try:
            # Train models
            pattern_detector.train(X_dict[symbol]['train'], y_dict[symbol]['train'])
            price_predictor.train(X_dict[symbol]['train'], y_dict[symbol]['train'])
            
            # Generate predictions
            signals = trading_system.generate_signals(X_dict[symbol]['test'])
            
            # Calculate performance metrics
            performance = trading_system.calculate_performance_metrics(
                signals, y_dict[symbol]['test']
            )
            
            results[symbol] = performance
            
        except Exception as e:
            print(f"Error processing {symbol}: {str(e)}")
            continue
    
    # Save results
    results_df = pd.DataFrame.from_dict(results, orient='index')
    results_df.to_csv('stock_data/trading_results.csv')
    
    # Calculate and display aggregate metrics
    print("\nAggregate Performance Metrics:")
    print(results_df.mean())

if __name__ == "__main__":
    main()

Error processing train: only integers, slices (`:`), ellipsis (`...`), numpy.newaxis (`None`) and integer or boolean arrays are valid indices
Error processing test: only integers, slices (`:`), ellipsis (`...`), numpy.newaxis (`None`) and integer or boolean arrays are valid indices

Aggregate Performance Metrics:
Series([], dtype: float64)
