<a href="https://colab.research.google.com/github/aagamaar/Algo_Trading_Python/blob/main/AlgoPy_1ipynb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

MOUNTED GOOGLE DRIVE

In [21]:
# Mount Google Drive

from google.colab import drive
drive.mount('/content/drive')
print("Google Drive mounted successfully!")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Google Drive mounted successfully!


CREATING PROJECT DIRECTORY STRUCTURE

In [22]:
# Creating Project Directory Structure
import os

# Defining the project root in Google Drive
project_root = '/content/drive/MyDrive/algo_trading_prototype'

# Creating the main project directory
os.makedirs(project_root, exist_ok=True)
print(f"Project root created: {project_root}")

# Creating subdirectories
subdirectories = [
    'config',
    'data',
    'strategy',
    'backtester',
    'analytics',
    'sheets',
    'utils',
    'logs' # Directory for log files
]

for subdir in subdirectories:
    path = os.path.join(project_root, subdir)
    os.makedirs(path, exist_ok=True)
    print(f"Created directory: {path}")

# I have added __init__.py files to make directories into Python packages
# This is crucial for Python to recognize the folders as modules for importing.

for pkg_dir in ['data', 'strategy', 'backtester', 'analytics', 'sheets', 'utils']:
    with open(os.path.join(project_root, pkg_dir, '__init__.py'), 'w') as f:
        pass # Created an empty __init__.py file
    print(f"Created __init__.py in {pkg_dir}")

print("\nDirectory structure created successfully.")

Project root created: /content/drive/MyDrive/algo_trading_prototype
Created directory: /content/drive/MyDrive/algo_trading_prototype/config
Created directory: /content/drive/MyDrive/algo_trading_prototype/data
Created directory: /content/drive/MyDrive/algo_trading_prototype/strategy
Created directory: /content/drive/MyDrive/algo_trading_prototype/backtester
Created directory: /content/drive/MyDrive/algo_trading_prototype/analytics
Created directory: /content/drive/MyDrive/algo_trading_prototype/sheets
Created directory: /content/drive/MyDrive/algo_trading_prototype/utils
Created directory: /content/drive/MyDrive/algo_trading_prototype/logs
Created __init__.py in data
Created __init__.py in strategy
Created __init__.py in backtester
Created __init__.py in analytics
Created __init__.py in sheets
Created __init__.py in utils

Directory structure created successfully.


INSTALLING THE DEPENDENCIES

In [23]:
# Installing the Dependencies
# Used --quiet to suppress verbose output
!pip install pandas numpy ta gspread oauth2client scikit-learn requests yfinance python-telegram-bot --quiet
print("All required Python dependencies installed.")

All required Python dependencies installed.


HOLDS ALL CONFIGURATIONS

In [24]:
# Cell 4: config/settings.py
%%writefile /content/drive/MyDrive/algo_trading_prototype/config/settings.py
# config/settings.py

# Stock Data API (using yfinance)
# No API key is needed for basic yfinance usage
STOCK_SYMBOLS = ["RELIANCE.NS", "TCS.NS", "HDFCBANK.NS"] # Three NIFTY 50 stocks for NSE.

# Google Sheets
GOOGLE_SHEET_ID = "1W8nP1H7oIKwdy7m5dGii0lZcf_cKxn0mPSdzhAddm80"
TRADE_LOG_SHEET_NAME = "Trade Log"
SUMMARY_PL_SHEET_NAME = "Summary P&L"
WIN_RATIO_SHEET_NAME = "Win Ratio"

# Strategy Parameters
RSI_PERIOD = 14
RSI_BUY_THRESHOLD = 30
SHORT_MA_PERIOD = 20 # 20-Day Moving Average
LONG_MA_PERIOD = 50  # 50-Day Moving Average

# Backtesting Parameters
BACKTEST_DURATION_MONTHS = 6

# ML Model Parameters
FEATURES = ['RSI', 'MACD', 'Volume', 'Close'] # Features for ML model
TARGET = 'Next_Day_Movement' # Targets for ML model

# Telegram Alerts (Bonus)
TELEGRAM_BOT_TOKEN = "8144019769:AAF-f7tW-XV9URIgJAAFyQgNtE0Tce0naXw"
TELEGRAM_CHAT_ID = "1463467106"

Overwriting /content/drive/MyDrive/algo_trading_prototype/config/settings.py


SETTING UP LOGGING TO BOTH CONSOLE AND A FILE IN LOGS DIRECTORY

In [25]:
%%writefile /content/drive/MyDrive/algo_trading_prototype/utils/logger.py
# utils/logger.py

import logging
import os
from datetime import datetime

def setup_logging():
    """Configures logging for the application."""
    # Adjusted log_dir to be within your mounted Google Drive project structure.
    log_dir = '/content/drive/MyDrive/algo_trading_prototype/logs'
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)

    log_file = os.path.join(log_dir, f"algo_trading_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")

    logging.basicConfig(
        level=logging.INFO, # Log INFO, WARNING, ERROR, CRITICAL messages
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file), # Log to a file
            logging.StreamHandler()        # Also print to console
        ]
    )
    # Suppressing verbose logging from libraries to keep console clean
    logging.getLogger('requests').setLevel(logging.WARNING)
    logging.getLogger('urllib3').setLevel(logging.WARNING)
    logging.getLogger('yfinance').setLevel(logging.WARNING)
    logging.getLogger('gspread').setLevel(logging.WARNING)

Overwriting /content/drive/MyDrive/algo_trading_prototype/utils/logger.py


HANDLES FETCHING STOCK DATA USING YFINANCE

In [26]:
%%writefile /content/drive/MyDrive/algo_trading_prototype/data/data_fetcher.py
# data/data_fetcher.py

import yfinance as yf
import pandas as pd
import logging
from datetime import datetime, timedelta
# Import BACKTEST_DURATION_MONTHS from settings for use in get_historical_data
from config import settings # Import settings module directly

logger = logging.getLogger(__name__)

class DataFetcher:
    def __init__(self):
        # yfinance doesn't require an API key for basic usage
        pass

    def fetch_historical_data(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame:
        """
        Fetches historical stock data for a given symbol using yfinance.
        Parameters:
            symbol (str): Stock ticker symbol (e.g., "RELIANCE.NS").
            start_date (str): Start date in 'YYYY-MM-DD' format.
            end_date (str): End date in 'YYYY-MM-DD' format.
        Returns:
            pd.DataFrame: DataFrame with historical OHLCV data, or empty if fetching fails.
        """
        try:
            # yf.download returns a DataFrame directly with uppercase columns
            df = yf.download(symbol, start=start_date, end=end_date, progress=False) # progress=False to reduce console output during download
            if df.empty:
                logger.warning(f"No data fetched for {symbol} between {start_date} and {end_date}.")
                return pd.DataFrame()

            # Ensure we have the standard OHLCV columns and index is named 'Date'
            df = df[['Open', 'High', 'Low', 'Close', 'Volume']]
            df.index.name = 'Date'
            logger.info(f"Successfully fetched historical data for {symbol}.")
            return df
        except Exception as e:
            logger.error(f"Error fetching data for {symbol} using yfinance: {e}")
            return pd.DataFrame()

def get_historical_data(symbols: list, duration_months: int) -> dict:
    """
    Fetches historical data for multiple symbols for the last 'duration_months' using yfinance.
    Parameters:
        symbols (list): List of stock ticker symbols.
        duration_months (int): Number of months for which to fetch historical data.
    Returns:
        dict: A dictionary where keys are symbols and values are pandas DataFrames.
    """
    data_fetcher = DataFetcher()
    all_data = {}
    end_date = datetime.now()
    # Calculate start date based on duration_months
    start_date = end_date - timedelta(days=duration_months * 30) # Approximate 30 days per month

    # Format dates as 'YYYY-MM-DD' for yfinance
    start_date_str = start_date.strftime('%Y-%m-%d')
    end_date_str = end_date.strftime('%Y-%m-%d')

    for symbol in symbols:
        df = data_fetcher.fetch_historical_data(symbol, start_date_str, end_date_str)
        if not df.empty:
            all_data[symbol] = df
    return all_data

Overwriting /content/drive/MyDrive/algo_trading_prototype/data/data_fetcher.py


FUNCTIONS TO CALCULATE TECHNICAL INDICATORS LIKE RSI AND MOVING AVERAGES

In [27]:
# Cell 7: strategy/indicators.py
%%writefile /content/drive/MyDrive/algo_trading_prototype/strategy/indicators.py
# strategy/indicators.py

import pandas as pd
import ta # Technical Analysis library

def calculate_rsi(df: pd.DataFrame, window: int = 14) -> pd.Series:
    """
    Calculates Relative Strength Index (RSI).
    Parameters:
        df (pd.DataFrame): DataFrame with 'Close' prices.
        window (int): The window period for RSI calculation.
    Returns:
        pd.Series: A Series containing RSI values.
    """
    return ta.momentum.RSIIndicator(df["Close"], window=window).rsi()

def calculate_sma(df: pd.DataFrame, window: int) -> pd.Series:
    """
    Calculates Simple Moving Average (SMA).
    Parameters:
        df (pd.DataFrame): DataFrame with 'Close' prices.
        window (int): The window period for SMA calculation.
    Returns:
        pd.Series: A Series containing SMA values.
    """
    return ta.trend.SMAIndicator(df["Close"], window=window).sma_indicator()

def calculate_macd(df: pd.DataFrame) -> pd.DataFrame:
    """
    Calculates Moving Average Convergence Divergence (MACD), Signal Line, and MACD Histogram.
    Parameters:
        df (pd.DataFrame): DataFrame with 'Close' prices.
    Returns:
        pd.DataFrame: DataFrame with 'MACD', 'MACD_Signal', and 'MACD_Hist' columns.
    """
    macd_indicator = ta.trend.MACD(df["Close"])
    return pd.DataFrame({
        'MACD': macd_indicator.macd(),
        'MACD_Signal': macd_indicator.macd_signal(),
        'MACD_Hist': macd_indicator.macd_diff()
    })

Overwriting /content/drive/MyDrive/algo_trading_prototype/strategy/indicators.py


IMPLEMENTS THE TRADING STRATEGY LOGIC (RSI + Moving Average crossover)

In [28]:
# Cell 8: strategy/trading_strategy.py
%%writefile /content/drive/MyDrive/algo_trading_prototype/strategy/trading_strategy.py
# strategy/trading_strategy.py

import pandas as pd
import logging
from strategy.indicators import calculate_rsi, calculate_sma
from config import settings # Import settings module directly

logger = logging.getLogger(__name__)

class TradingStrategy:
    def __init__(self):
        self.rsi_period = settings.RSI_PERIOD
        self.rsi_buy_threshold = settings.RSI_BUY_THRESHOLD
        self.short_ma_period = settings.SHORT_MA_PERIOD
        self.long_ma_period = settings.LONG_MA_PERIOD

    def generate_signals(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Generates buy/sell signals based on RSI and Moving Average Crossover strategy.
        Adds 'RSI', 'SMA_short', 'SMA_long', 'Signal' (1=Buy, -1=Sell, 0=Hold), and 'Position' columns.
        Parameters:
            df (pd.DataFrame): DataFrame with historical OHLCV data.
        Returns:
            pd.DataFrame: DataFrame with added indicator, signal, and position columns.
        """
        if df.empty:
            logger.warning("Empty DataFrame provided to generate_signals.")
            return df

        # Calculate indicators
        df['RSI'] = calculate_rsi(df, self.rsi_period)
        df[f'SMA_{self.short_ma_period}'] = calculate_sma(df, self.short_ma_period)
        df[f'SMA_{self.long_ma_period}'] = calculate_sma(df, self.long_ma_period)

        df['Signal'] = 0 # Default to hold signal
        df['Position'] = 0 # 1 for Long, 0 for Flat (no position)

        # Drop NaN values introduced by indicator calculations before generating signals
        df.dropna(subset=['RSI', f'SMA_{self.short_ma_period}', f'SMA_{self.long_ma_period}'], inplace=True)
        if df.empty:
            logger.warning("DataFrame became empty after dropping NaNs for indicator calculations.")
            return df

        # Buy condition: RSI < 30 AND 20-DMA crosses above 50-DMA
        # Check current day's MA relationship and previous day's relationship for a crossover
        buy_condition = (df['RSI'] < self.rsi_buy_threshold) & \
                        (df[f'SMA_{self.short_ma_period}'] > df[f'SMA_{self.long_ma_period}']) & \
                        (df[f'SMA_{self.short_ma_period}'].shift(1) <= df[f'SMA_{self.long_ma_period}'].shift(1))

        # Sell condition: 20-DMA crosses below 50-DMA (as a simple exit)
        sell_condition = (df[f'SMA_{self.short_ma_period}'] < df[f'SMA_{self.long_ma_period}']) & \
                         (df[f'SMA_{self.short_ma_period}'].shift(1) >= df[f'SMA_{self.long_ma_period}'].shift(1))

        # Apply signals
        df.loc[buy_condition, 'Signal'] = 1
        df.loc[sell_condition, 'Signal'] = -1

        # Determine position based on signals
        # This simulates opening a position on a buy signal and closing on a sell signal.
        # It's a simplified approach for backtesting.
        current_position = 0
        position_list = []
        for i in range(len(df)):
            if df['Signal'].iloc[i] == 1: # Buy signal
                current_position = 1 # Go long
            elif df['Signal'].iloc[i] == -1: # Sell signal
                current_position = 0 # Close position (go flat)
            position_list.append(current_position)

        df['Position'] = position_list
        logger.info("Signals generated successfully.")
        return df

Overwriting /content/drive/MyDrive/algo_trading_prototype/strategy/trading_strategy.py


SIMULATES TRADING STRATEGY BASED ON HISTORICAL DATA

In [29]:

# Cell 9: backtester/backtester.py
%%writefile /content/drive/MyDrive/algo_trading_prototype/backtester/backtester.py
# backtester/backtester.py

import pandas as pd
import logging
from strategy.trading_strategy import TradingStrategy

logger = logging.getLogger(__name__)

class Backtester:
    def __init__(self):
        self.strategy = TradingStrategy()
        self.trade_log = [] # To store details of each trade

    def run_backtest(self, symbol: str, historical_data: pd.DataFrame) -> dict:
        """
        Runs a backtest on the given historical data for a symbol.
        Simulates trades based on signals and calculates P&L.
        Parameters:
            symbol (str): Stock ticker symbol.
            historical_data (pd.DataFrame): DataFrame with historical OHLCV data.
        Returns:
            dict: Dictionary containing backtest summary (initial/final capital, total P&L)
                  and the detailed trade log. Returns empty dict if backtest cannot run.
        """
        if historical_data.empty:
            logger.warning(f"No historical data provided for backtesting {symbol}.")
            return {}

        df = historical_data.copy()
        df = self.strategy.generate_signals(df)

        # Filter out NaN rows that resulted from indicator calculations for cleaner iteration
        df.dropna(subset=['RSI', 'Signal', 'Position'], inplace=True)
        if df.empty:
            logger.warning(f"Data for {symbol} became empty after dropping NaNs for signals. Cannot backtest.")
            return {}

        initial_capital = 100000 # Example initial capital
        current_capital = initial_capital
        position_open = False
        buy_price = 0
        shares_held = 0

        # Reset trade_log for each backtest run
        self.trade_log = []

        for i in range(len(df)):
            date = df.index[i]
            close_price = df['Close'].iloc[i]
            signal = df['Signal'].iloc[i]
            current_position_status = df['Position'].iloc[i] # Current position from strategy

            # Execute buy trade
            if signal == 1 and not position_open: # Buy signal and no open position
                shares_to_buy = int(current_capital / close_price) # Buy as many shares as possible
                if shares_to_buy > 0:
                    buy_price = close_price
                    shares_held = shares_to_buy
                    current_capital -= (shares_held * buy_price) # Deduct cost
                    position_open = True
                    self.trade_log.append({
                        'Symbol': symbol,
                        'Date': date.strftime('%Y-%m-%d'),
                        'Type': 'BUY',
                        'Price': round(buy_price, 2),
                        'Shares': shares_held,
                        'Capital_After_Trade': round(current_capital, 2),
                        'P&L': 0.0 # P&L is realized on sell
                    })
                    logger.info(f"{symbol} - {date.strftime('%Y-%m-%d')}: BUY at {buy_price:.2f} (Shares: {shares_held})")

            # Execute sell trade
            elif signal == -1 and position_open: # Sell signal and there is an open position
                sell_price = close_price
                pnl = (sell_price - buy_price) * shares_held
                current_capital += (shares_held * sell_price) # Add proceeds
                position_open = False
                self.trade_log.append({
                    'Symbol': symbol,
                    'Date': date.strftime('%Y-%m-%d'),
                    'Type': 'SELL',
                    'Price': round(sell_price, 2),
                    'Shares': shares_held,
                    'Capital_After_Trade': round(current_capital, 2),
                    'P&L': round(pnl, 2)
                })
                logger.info(f"{symbol} - {date.strftime('%Y-%m-%d')}: SELL at {sell_price:.2f} (P&L: {pnl:.2f})")
                buy_price = 0 # Reset buy price
                shares_held = 0 # Reset shares held

        # If a position is still open at the end of the backtest, close it forcibly
        if position_open:
            sell_price = df['Close'].iloc[-1]
            pnl = (sell_price - buy_price) * shares_held
            current_capital += (shares_held * sell_price)
            self.trade_log.append({
                'Symbol': symbol,
                'Date': df.index[-1].strftime('%Y-%m-%d'),
                'Type': 'SELL (Forced Exit)', # Indicate a forced exit at end of backtest period
                'Price': round(sell_price, 2),
                'Shares': shares_held,
                'Capital_After_Trade': round(current_capital, 2),
                'P&L': round(pnl, 2)
            })
            logger.info(f"{symbol} - Forced SELL at {sell_price:.2f} (P&L: {pnl:.2f}) at end of backtest.")

        total_pnl = current_capital - initial_capital
        logger.info(f"Backtest for {symbol} completed. Total P&L: {total_pnl:.2f}")

        return {
            'symbol': symbol,
            'initial_capital': initial_capital,
            'final_capital': current_capital,
            'total_pnl': total_pnl,
            'trade_log': pd.DataFrame(self.trade_log) # Convert to DataFrame for easier handling
        }

Overwriting /content/drive/MyDrive/algo_trading_prototype/backtester/backtester.py


IMPLEMENTS THE MACHINE LEARNING MODEL FOR NEXT DAY MOVEMENT PREDICTION

In [30]:
# Cell 10: analytics/ml_predictor.py
%%writefile /content/drive/MyDrive/algo_trading_prototype/analytics/ml_predictor.py
# analytics/ml_predictor.py

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
import logging
from strategy.indicators import calculate_rsi, calculate_macd # Assuming MACD calc is added
from config import settings # Import settings to access FEATURES and TARGET

logger = logging.getLogger(__name__)

class MLPredictor:
    def __init__(self, model_type: str = 'decision_tree'):
        """
        Initializes the ML Predictor with a specified model type.
        Parameters:
            model_type (str): 'decision_tree' or 'logistic_regression'.
        """
        if model_type == 'decision_tree':
            self.model = DecisionTreeClassifier(random_state=42)
        elif model_type == 'logistic_regression':
            self.model = LogisticRegression(random_state=42, solver='liblinear')
        else:
            raise ValueError("Unsupported model_type. Choose 'decision_tree' or 'logistic_regression'.")
        self.trained = False # Flag to check if model has been trained

    def prepare_data_for_ml(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Prepares data for ML model training by adding features (RSI, MACD, Volume)
        and the target variable (Next_Day_Movement).
        Parameters:
            df (pd.DataFrame): DataFrame with historical OHLCV data.
        Returns:
            pd.DataFrame: DataFrame with added features and target column, or empty if input is empty.
        """
        if df.empty:
            logger.warning("Empty DataFrame provided for ML data preparation.")
            return pd.DataFrame()

        # Calculate features
        df['RSI'] = calculate_rsi(df, window=settings.RSI_PERIOD) # Use RSI period from settings
        macd_df = calculate_macd(df)
        df = df.join(macd_df)

        # Calculate next-day close movement as target (1 for Up, 0 for Down/No Change)
        # Shift(-1) means the close price of the *next* row
        df['Next_Day_Close'] = df['Close'].shift(-1)
        # Target: 1 if next day's close is higher, 0 otherwise
        df['Next_Day_Movement'] = (df['Next_Day_Close'] > df['Close']).astype(int)

        # Drop rows with NaN values (due to indicator calculation or Next_Day_Close for last row)
        df.dropna(subset=settings.FEATURES + ['Next_Day_Movement'], inplace=True)
        return df

    def train_model(self, df: pd.DataFrame, features: list, target: str):
        """
        Trains the ML model and evaluates its performance.
        Parameters:
            df (pd.DataFrame): Prepared DataFrame with features and target.
            features (list): List of column names to use as features.
            target (str): Name of the target column.
        Returns:
            tuple: (accuracy, classification_report_string). Returns (0.0, "") if training fails.
        """
        if df.empty or not all(col in df.columns for col in features + [target]):
            logger.warning("Insufficient data or missing columns for ML training.")
            self.trained = False
            return 0.0, ""

        X = df[features]
        y = df[target]

        # Handle cases where only one class is present in the target variable
        if len(y.unique()) < 2:
            logger.warning(f"Only one class present in target variable for ML training (symbol has no 'Up' or 'Down' movements in data). Skipping training.")
            self.trained = False
            return 0.0, "Only one class in target"

        try:
            # Stratify ensures that the train/test split maintains the proportion of classes
            X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
        except ValueError as e:
            logger.warning(f"Could not split data for ML training due to: {e}. Check data balance or size.")
            self.trained = False
            return 0.0, str(e)

        self.model.fit(X_train, y_train)
        y_pred = self.model.predict(X_test)

        accuracy = accuracy_score(y_test, y_pred)
        # zero_division=0 prevents warnings when a class has no predicted samples
        report = classification_report(y_test, y_pred, zero_division=0)

        logger.info(f"ML Model Training Complete (Model: {type(self.model).__name__}):")
        logger.info(f"Accuracy: {accuracy:.4f}")
        logger.info(f"Classification Report:\n{report}")
        self.trained = True
        return accuracy, report

    def predict_next_day_movement(self, latest_data_df: pd.DataFrame, features: list) -> int:
        """
        Predicts the next day's movement (1 for Up, 0 for Down/No Change) based on the trained model.
        `latest_data_df` should contain enough historical rows to calculate required indicators
        for the *last* data point.
        Parameters:
            latest_data_df (pd.DataFrame): DataFrame containing recent historical OHLCV data.
                                           Should include enough rows to calculate all features.
            features (list): List of feature column names used during training.
        Returns:
            int: 1 for 'Up', 0 for 'Down/No Change', -1 if prediction cannot be made (e.g., not trained, no data).
        """
        if not self.trained:
            logger.warning("ML model is not trained. Cannot make prediction.")
            return -1

        if latest_data_df.empty:
            logger.warning("Empty DataFrame provided for ML prediction.")
            return -1

        # Prepare the data to get the latest features, similar to training data prep
        processed_df = self.prepare_data_for_ml(latest_data_df.copy())
        if processed_df.empty:
            logger.warning("Could not process latest data for prediction after feature engineering.")
            return -1

        # The last row of `processed_df` contains the latest calculated features
        # and its 'Next_Day_Movement' would be NaN (which is fine for prediction input)
        try:
            latest_features_row = processed_df[features].iloc[-1]
            # Reshape for prediction (sklearn expects 2D array, even for a single sample)
            prediction_input = latest_features_row.to_frame().T
            prediction = self.model.predict(prediction_input)[0]
            logger.info(f"Next day movement prediction: {'Up' if prediction == 1 else 'Down/No Change'}")
            return int(prediction)
        except IndexError:
            logger.warning("Not enough data points in processed_df for prediction after feature engineering.")
            return -1
        except KeyError as e:
            logger.warning(f"Missing feature for prediction: {e}. Check FEATURES in settings.py and data preparation.")
            return -1

Overwriting /content/drive/MyDrive/algo_trading_prototype/analytics/ml_predictor.py


MANAGES INTERACTIONS WITH GOOGLE SHEETS

In [31]:
# Cell 11: sheets/google_sheets_manager.py
%%writefile /content/drive/MyDrive/algo_trading_prototype/sheets/google_sheets_manager.py
# sheets/google_sheets_manager.py

import gspread
import pandas as pd
import logging
import os # Import os to set environment variable
from config import settings # Import settings module directly

logger = logging.getLogger(__name__)

class GoogleSheetsManager:
    def __init__(self):
        self.spreadsheet = None
        try:
            # IMPORTANT: For gspread to find your credentials.json,
            # you can either place it in a default gspread location
            # OR set the GOOGLE_APPLICATION_CREDENTIALS environment variable.
            # In Colab, upload your 'credentials.json' to your Drive,
            # for example, in the root of your project:
            # /content/drive/MyDrive/algo_trading_prototype/credentials.json
            # Then set the environment variable:
            os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/content/drive/MyDrive/algo_trading_prototype/credentials.json"
            self.gc = gspread.service_account() # gspread will now look for the path in the env var

            self.spreadsheet = self.gc.open_by_id(settings.GOOGLE_SHEET_ID)
            logger.info(f"Successfully connected to Google Sheet: {self.spreadsheet.title}")
        except Exception as e:
            logger.error(f"Error connecting to Google Sheets. "
                         f"Please ensure:\n"
                         f"1. Google Sheets API is enabled in your GCP project.\n"
                         f"2. Your 'credentials.json' is correctly placed at "
                         f"'/content/drive/MyDrive/algo_trading_prototype/credentials.json'.\n"
                         f"3. The Google Sheet (ID: {settings.GOOGLE_SHEET_ID}) is shared with the service account email "
                         f"(found in 'credentials.json').\n"
                         f"Error: {e}")
            self.spreadsheet = None

    def _get_or_create_worksheet(self, sheet_name: str):
        """Helper to get a worksheet by name or create it if it doesn't exist."""
        if not self.spreadsheet:
            logger.error(f"Cannot get/create worksheet '{sheet_name}'. Not connected to Google Sheets.")
            return None
        try:
            worksheet = self.spreadsheet.worksheet(sheet_name)
            logger.debug(f"Found worksheet: {sheet_name}")
            return worksheet
        except gspread.exceptions.WorksheetNotFound:
            logger.info(f"Worksheet '{sheet_name}' not found, creating it...")
            # Default rows/cols for new sheet. Adjust if your data is much larger.
            worksheet = self.spreadsheet.add_worksheet(title=sheet_name, rows=2000, cols=50)
            logger.info(f"Created new worksheet: {sheet_name}")
            return worksheet
        except Exception as e:
            logger.error(f"Error getting/creating worksheet '{sheet_name}': {e}")
            return None

    def log_trade_signals(self, trade_log_df: pd.DataFrame):
        """Logs trade signals to the 'Trade Log' tab in Google Sheets."""
        if trade_log_df.empty:
            logger.info("No trade logs to record.")
            return

        worksheet = self._get_or_create_worksheet(settings.TRADE_LOG_SHEET_NAME)
        if not worksheet:
            return

        # Ensure Date column is formatted as string for Google Sheets
        trade_log_df['Date'] = trade_log_df['Date'].astype(str)
        data_to_upload = trade_log_df.values.tolist()

        try:
            # Get existing header and expected header
            existing_header = worksheet.row_values(1)
            expected_header = trade_log_df.columns.tolist()

            # If header is missing or mismatch, clear sheet and write header + data
            if not existing_header or existing_header != expected_header:
                worksheet.clear() # Clear all contents (data and formatting)
                worksheet.update([expected_header], 'A1') # Write header
                logger.info(f"Header updated/added to '{settings.TRADE_LOG_SHEET_NAME}' worksheet.")
                # Append data after header, ensuring we start from the second row
                worksheet.append_rows(data_to_upload)
            else:
                # If header is present and correct, just append new rows
                worksheet.append_rows(data_to_upload)

            logger.info(f"Logged {len(trade_log_df)} trade signals to '{settings.TRADE_LOG_SHEET_NAME}'.")
        except Exception as e:
            logger.error(f"Error logging trade signals to Google Sheets: {e}")

    def update_summary_pnl(self, symbol_pnl_data: dict):
        """Updates the 'Summary P&L' tab with overall P&L for each symbol."""
        if not symbol_pnl_data:
            logger.info("No summary P&L data to record.")
            return

        worksheet = self._get_or_create_worksheet(settings.SUMMARY_PL_SHEET_NAME)
        if not worksheet:
            return

        # Prepare data for summary P&L
        summary_data = [
            ["Symbol", "Initial Capital", "Final Capital", "Total P&L"]
        ]
        for symbol, data in symbol_pnl_data.items():
            summary_data.append([
                symbol,
                data.get('initial_capital', 0),
                data.get('final_capital', 0),
                data.get('total_pnl', 0)
            ])

        try:
            worksheet.clear() # Clear existing data before updating
            worksheet.update(summary_data) # Update with new summary
            logger.info(f"Updated '{settings.SUMMARY_PL_SHEET_NAME}' worksheet.")
        except Exception as e:
            logger.error(f"Error updating summary P&L to Google Sheets: {e}")

    def update_win_ratio(self, trade_log_df: pd.DataFrame):
        """Calculates and updates the 'Win Ratio' tab in Google Sheets."""
        if trade_log_df.empty:
            logger.info("No trade data to calculate win ratio.")
            return

        worksheet = self._get_or_create_worksheet(settings.WIN_RATIO_SHEET_NAME)
        if not worksheet:
            return

        # Filter for sell trades (where P&L is realized)
        sell_trades = trade_log_df[trade_log_df['Type'].str.contains('SELL', na=False)]
        if sell_trades.empty:
            logger.info("No sell trades to calculate win ratio.")
            # Still update the sheet to show zero values if no trades
            win_ratio_data = [
                ["Metric", "Value"],
                ["Total Trades (Sell)", 0],
                ["Winning Trades", 0],
                ["Losing Trades", 0],
                ["Win Ratio (%)", 0.0]
            ]
            worksheet.clear()
            worksheet.update(win_ratio_data)
            return

        total_trades = len(sell_trades)
        winning_trades = len(sell_trades[sell_trades['P&L'] > 0])
        losing_trades = len(sell_trades[sell_trades['P&L'] <= 0])

        win_ratio = (winning_trades / total_trades) * 100 if total_trades > 0 else 0

        win_ratio_data = [
            ["Metric", "Value"],
            ["Total Trades (Sell)", total_trades],
            ["Winning Trades", winning_trades],
            ["Losing Trades", losing_trades],
            ["Win Ratio (%)", round(win_ratio, 2)]
        ]

        try:
            worksheet.clear() # Clear existing data
            worksheet.update(win_ratio_data) # Update with new win ratio data
            logger.info(f"Updated '{settings.WIN_RATIO_SHEET_NAME}' worksheet.")
        except Exception as e:
            logger.error(f"Error updating win ratio to Google Sheets: {e}")

    def get_signal_alerts(self, symbol: str, date: str, signal_type: str, price: float):
        """
        Generates a formatted string for a trade signal alert.
        """
        return f"🚨 TRADE ALERT 🚨\nSymbol: {symbol}\nDate: {date}\nSignal: {signal_type}\nPrice: {price:.2f}"

Overwriting /content/drive/MyDrive/algo_trading_prototype/sheets/google_sheets_manager.py


HANDLES TELEGRAM MESSAGES

In [32]:
# Cell 12: utils/alerts.py
%%writefile /content/drive/MyDrive/algo_trading_prototype/utils/alerts.py
# utils/alerts.py

import requests
import logging
from config import settings # Import settings module directly

logger = logging.getLogger(__name__)

def send_telegram_message(message: str):
    """
    Sends a message to a specified Telegram chat.
    Requires TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID from config/settings.py.
    Parameters:
        message (str): The text message to send.
    """
    try:
        bot_token = settings.TELEGRAM_BOT_TOKEN
        chat_id = settings.TELEGRAM_CHAT_ID
    except AttributeError:
        logger.error("Telegram bot token or chat ID not found in settings.py. Skipping Telegram alert.")
        return

    if not bot_token or not chat_id:
        logger.warning("Telegram BOT_TOKEN or CHAT_ID is not configured. Skipping alert.")
        return

    url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
    payload = {
        "chat_id": chat_id,
        "text": message,
        "parse_mode": "Markdown" # Allows basic markdown formatting in message
    }
    try:
        response = requests.post(url, json=payload)
        response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
        logger.info("Telegram message sent successfully.")
    except requests.exceptions.RequestException as e:
        logger.error(f"Error sending Telegram message: {e}")

Overwriting /content/drive/MyDrive/algo_trading_prototype/utils/alerts.py


PLACEHOLDER FOR THIS PROTOTYPE

In [33]:
# Cell 13: analytics/portfolio_analytics.py
%%writefile /content/drive/MyDrive/algo_trading_prototype/analytics/portfolio_analytics.py
# analytics/portfolio_analytics.py

# This file is a placeholder. Its functionalities (P&L, Win Ratio)
# are integrated directly into Backtester and GoogleSheetsManager for simplicity.
# For more complex analytics (e.g., Sharpe Ratio, Max Drawdown), you would
# implement them here.

Overwriting /content/drive/MyDrive/algo_trading_prototype/analytics/portfolio_analytics.py


In [34]:
# Cell 14: main.py
%%writefile /content/drive/MyDrive/algo_trading_prototype/main.py
# main.py

import logging
import pandas as pd
from datetime import datetime
import sys
import os

# Define the project root directly within main.py
# This ensures that main.py itself knows where the root of your project is,
# regardless of how it's executed, which helps Python find your modules.
project_root = '/content/drive/MyDrive/algo_trading_prototype'
if project_root not in sys.path:
    sys.path.insert(0, project_root)
    # Note: If you modify this or any other module, you might need to
    # restart your Colab runtime to ensure changes are picked up due to Python's module caching.

# Now, import modules using their top-level package names relative to project_root
from config import settings
from utils import logger
from data import data_fetcher
from backtester import backtester
from analytics import ml_predictor
from sheets import google_sheets_manager
from utils import alerts # Bonus: Telegram alerts

# Setup logging. This needs to be called to configure loggers globally.
logger.setup_logging()
main_logger = logging.getLogger(__name__) # Get a specific logger for the main script

def run_algo_prototype():
    """
    Main function to run the algo-trading prototype.
    Orchestrates data fetching, strategy application, backtesting,
    ML prediction, and Google Sheets logging.
    """
    main_logger.info("Starting Algo-Trading Prototype...")

    # Initialize components
    sheets_manager = google_sheets_manager.GoogleSheetsManager()
    backtester_instance = backtester.Backtester()
    ml_predictor_instance = ml_predictor.MLPredictor(model_type='decision_tree') # Can be 'logistic_regression'

    all_trade_logs = pd.DataFrame() # To aggregate trade logs from all symbols
    symbol_pnl_results = {} # To store P&L summary per symbol
    ml_accuracies = {} # To store ML model accuracies per symbol

    # --- 1. Data Ingestion & ML Model Training ---
    main_logger.info(f"Fetching historical data for {settings.STOCK_SYMBOLS} for {settings.BACKTEST_DURATION_MONTHS} months...")
    historical_data = data_fetcher.get_historical_data(settings.STOCK_SYMBOLS, settings.BACKTEST_DURATION_MONTHS)

    if not historical_data:
        main_logger.error("No historical data fetched. Exiting.")
        return

    # Train ML model for each stock using its historical data
    main_logger.info("Preparing data and training ML model for each stock...")
    for symbol, df in historical_data.items():
        if not df.empty:
            ml_data = ml_predictor_instance.prepare_data_for_ml(df.copy())
            if not ml_data.empty:
                accuracy, _ = ml_predictor_instance.train_model(ml_data, settings.FEATURES, settings.TARGET)
                ml_accuracies[symbol] = accuracy
            else:
                main_logger.warning(f"Could not prepare ML data for {symbol}.")
        else:
            main_logger.warning(f"No data for {symbol} to train ML model.")

    main_logger.info(f"ML Model Accuracies: {ml_accuracies}")

    # --- 2. Run Backtest for each stock ---
    for symbol, df in historical_data.items():
        if df.empty:
            main_logger.warning(f"Skipping backtest for {symbol} due to no data.")
            continue

        main_logger.info(f"Running backtest for {symbol}...")
        results = backtester_instance.run_backtest(symbol, df.copy())
        if results:
            symbol_pnl_results[symbol] = {
                'initial_capital': results['initial_capital'],
                'final_capital': results['final_capital'],
                'total_pnl': results['total_pnl']
            }
            if not results['trade_log'].empty:
                # Concatenate trade logs from each symbol into one DataFrame
                all_trade_logs = pd.concat([all_trade_logs, results['trade_log']], ignore_index=True)
        else:
            main_logger.warning(f"Backtest for {symbol} yielded no results.")

    # --- 3. Google Sheets Automation ---
    # Log all aggregated trade signals
    if not all_trade_logs.empty:
        main_logger.info("Logging trade signals to Google Sheets...")
        sheets_manager.log_trade_signals(all_trade_logs)
        # Update win ratio based on all logs
        sheets_manager.update_win_ratio(all_trade_logs)
    else:
        main_logger.info("No trade logs to write to Google Sheets.")

    # Update summary P&L
    if symbol_pnl_results:
        main_logger.info("Updating summary P&L in Google Sheets...")
        sheets_manager.update_summary_pnl(symbol_pnl_results)
    else:
        main_logger.info("No summary P&L to write to Google Sheets.")

    # --- 4. Generate Current Buy/Sell Signals and ML Predictions ---
    main_logger.info("Generating current (latest date) buy/sell signals and ML predictions...")
    for symbol, df in historical_data.items():
        if df.empty:
            main_logger.warning(f"Cannot generate signal for {symbol}: no data.")
            continue

        # Get enough historical data points to calculate all indicators for the latest day
        required_rows_for_indicators = max(settings.RSI_PERIOD, settings.SHORT_MA_PERIOD, settings.LONG_MA_PERIOD) + 1 # +1 for current day's signal
        latest_data_for_signal = df.tail(required_rows_for_indicators).copy()

        if latest_data_for_signal.empty:
            main_logger.warning(f"Not enough recent data for {symbol} to generate current signal after indicator calculation.")
            continue

        # Re-run strategy on the latest data to get the current day's signal
        processed_latest_data = backtester_instance.strategy.generate_signals(latest_data_for_signal)

        # Check if the last row (current day) has valid signal/data
        if processed_latest_data.empty or processed_latest_data.iloc[-1].isnull().any():
            main_logger.warning(f"Could not generate clean signal for {symbol} for the latest date after processing.")
            current_signal = 0 # Default to hold if signal is unreliable
            current_close = df['Close'].iloc[-1] if not df.empty else 0
            current_date = df.index[-1].strftime('%Y-%m-%d') if not df.empty else "N/A"
        else:
            current_signal = processed_latest_data['Signal'].iloc[-1]
            current_close = processed_latest_data['Close'].iloc[-1]
            current_date = processed_latest_data.index[-1].strftime('%Y-%m-%d')

        signal_type = "HOLD"
        if current_signal == 1:
            signal_type = "BUY"
        elif current_signal == -1:
            signal_type = "SELL"

        main_logger.info(f"[{symbol}] Latest Strategy Signal ({current_date}): {signal_type} at {current_close:.2f}")

        # ML prediction for next day movement
        # Need enough data points for ML features (RSI, MACD, Volume) for the current day
        # `prepare_data_for_ml` handles dropping NaNs, so pass a sufficient window of recent data
        ml_prediction_input_data = df.tail(max(settings.RSI_PERIOD, 26) + 2).copy() # MACD needs up to 26 periods + 1 for next day shift
        if not ml_prediction_input_data.empty and ml_predictor_instance.trained:
            next_day_pred = ml_predictor_instance.predict_next_day_movement(ml_prediction_input_data, settings.FEATURES)
            ml_pred_text = "Up" if next_day_pred == 1 else ("Down/No Change" if next_day_pred == 0 else "N/A - Prediction Failed")
            main_logger.info(f"[{symbol}] Next Day ML Prediction: {ml_pred_text}")
        else:
            ml_pred_text = "N/A - Model Not Trained or Insufficient Data"
            main_logger.warning(f"[{symbol}] {ml_pred_text}")

        # Bonus: Telegram Alert Integration
        if current_signal != 0 or next_day_pred != -1: # Alert for strategy signals or if ML made a valid prediction
            alert_message = sheets_manager.get_signal_alerts(symbol, current_date, signal_type, current_close)
            alert_message += f"\nML Prediction (Next Day): {ml_pred_text}"
            alerts.send_telegram_message(alert_message)

    main_logger.info("Algo-Trading Prototype finished.")

if __name__ == "__main__":
    run_algo_prototype()

Overwriting /content/drive/MyDrive/algo_trading_prototype/main.py
