<a href="https://colab.research.google.com/github/PritamMoitra/Learning_Python_for_Finance/blob/main/Stochastics_strategy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt
import os
!pip install openpyxl
import matplotlib.pyplot as plt
from datetime import datetime, timedelta





In [None]:
symbols = ['TATAMOTORS.NS']  # Add your symbols here
index_symbol = "^NSEI"
start_date = '2025-06-04'
end_date = '2025-06-05'
inter='1m'
starting_capital = 50000
target_pct = 0.03
stop_pct = 0.015
stoch_k = 14
stoch_d = 3
df=yf.download(symbols, start=start_date, end=end_date, interval=inter)
df.tail()

[*********************100%***********************]  1 of 1 completed


Price,Close,High,Low,Open,Volume
Ticker,TATAMOTORS.NS,TATAMOTORS.NS,TATAMOTORS.NS,TATAMOTORS.NS,TATAMOTORS.NS
Datetime,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2
2025-06-04 09:55:00+00:00,708.799988,708.799988,708.450012,708.450012,71263
2025-06-04 09:56:00+00:00,709.099976,709.099976,708.75,708.799988,26163
2025-06-04 09:57:00+00:00,709.099976,709.200012,709.0,709.150024,32723
2025-06-04 09:58:00+00:00,710.0,710.0,709.25,709.25,82487
2025-06-04 09:59:00+00:00,710.25,710.5,710.0,710.0,69692


In [None]:
# FUNCTION TO CHECK IF MARKET IS IN UPTREND (based on 15m trend)
def get_nifty_trend():
    nifty = fetch_and_prepare_data(index_symbol, start=start_date, end=end_date, interval='1d')
    nifty = fetch_and_prepare_data(index_symbol, start=start_date-1, end=end_date-1, interval='1d')
    print(nifty.head())
    nifty = nifty.reset_index()  # This ensures 'Date' is a column
    nifty["Trend"] = np.where(nifty["Close"] > nifty["Open"], "UP", "DOWN")
    return nifty[["Datetime", "Trend"]]

In [None]:
def fetch_and_prepare_data(symbol, start=start_date, end=end_date, interval=inter):
    """
    Downloads and prepares OHLCV data for a symbol.

    Args:
        symbol (str): Ticker symbol (e.g., "RELIANCE.NS")
        start (str): Start date (e.g., "2024-06-01")
        end (str): End date (e.g., "2024-06-10")
        interval (str): Interval (e.g., "5m", "15m", "1d")

    Returns:
        pd.DataFrame: Cleaned DataFrame with standardized columns
    """
    try:
        df = yf.download(symbol, start=start, end=end, interval=inter)
        df=flatten_ohlcv_dataframe(df)
        # Flatten MultiIndex columns if present
        if isinstance(df.columns, pd.MultiIndex):
            df.columns = [col[0] for col in df.columns]  # e.g., ('Close', 'RELIANCE.NS') → 'Close'

        # Standardize column names
        df.columns = [col.capitalize() for col in df.columns]

        # Ensure required columns are present
        required_cols = {'Open', 'High', 'Low', 'Close', 'Volume'}
        if not required_cols.issubset(df.columns):
            print(f"Skipping {symbol} — missing required columns: {required_cols - set(df.columns)}")
            return pd.DataFrame()

        df.dropna(subset=['Close', 'High', 'Low'], inplace=True)
        return df

    except Exception as e:
        print(f"Error downloading {symbol}: {e}")
        return pd.DataFrame()

In [None]:
def flatten_ohlcv_dataframe(df):
    if isinstance(df.columns, pd.MultiIndex):
        df.columns = [col[0] for col in df.columns]  # Use only the OHLCV field name
    df.columns = [col.capitalize() for col in df.columns]  # Capitalize to match expected names
    return df


In [None]:
# Output directory for CSVs (optional)
os.makedirs("trade_logs", exist_ok=True)
os.makedirs("plots", exist_ok=True)

In [None]:
# ----------------------------------------------
# TECHNICAL INDICATOR FUNCTION
# ----------------------------------------------
def generate_stochastic_signals(df, k_period=14, d_period=3):
    df = df.copy()
    low_min = df['Low'].rolling(window=k_period).min()
    high_max = df['High'].rolling(window=k_period).max()
    df['%K'] = 100 * ((df['Close'] - low_min) / (high_max - low_min))
    df['%D'] = df['%K'].rolling(window=d_period).mean()

    df['Signal'] = 0
    df.loc[
        (df['%K'].shift(1) < df['%D'].shift(1)) & (df['%K'] > df['%D']) & (df['%K'] < 20),
        'Signal'
    ] = 1  #buy
    df.loc[
        (df['%K'].shift(1) > df['%D'].shift(1)) & (df['%K'] < df['%D']) & (df['%K'] > 80),
        'Signal'
    ] = -1 #sell
    return df


In [None]:
# Signal generation function

def generate_trading_log_with_target_stoploss_exit_eod(df, symbol, index_trend_df, target_pct, stop_pct, starting_capital=50000):
    capital = starting_capital
    position = 0  # 1 for long, -1 for short
    entry_price = 0
    shares = 0
    log = []

    df = df.reset_index()
    df['DateTime'] = pd.to_datetime(df['Datetime']) if 'Datetime' in df.columns else df['Date']
    df = pd.merge_asof(df.sort_values('DateTime'),
                   trend_df.sort_values('DateTime'),
                   on='DateTime',
                   direction='backward')

    for i in range(len(df)):
        row = df.iloc[i]
        price = row['Close']
        signal = row.get('Signal', 0)
        current_date = row['DateOnly']
        next_date = row['NextDate'] if i < len(df) - 1 else None

        # Determine index trend for the current date
        trend_row = index_trend_df[index_trend_df['Date'] == pd.to_datetime(current_date)]
        index_trend = trend_row['Trend'].values[0] if not trend_row.empty else 'UP'


        # ENTRY CONDITIONS
        if position == 0 and price > 0:
            if index_trend == 'UP' and signal == 1:
                shares = int(capital // price)
                if shares == 0:
                    continue
                entry_price = price
                target_price = entry_price * (1 + target_pct)
                stop_price = entry_price * (1 - stop_pct)
                capital -= shares * entry_price
                position = 1  # long
                log.append({
                    'Date': row.name,
                    'Symbol': symbol,
                    'Action': 'BUY',
                    'Price': round(price, 2),
                    'Shares': shares,
                    'Capital Before': round(capital + shares * entry_price, 2),
                    'Capital After': round(capital, 2),
                    'Note': f'Target: {round(target_price,2)}, Stop: {round(stop_price,2)}'
                })

            elif index_trend == 'DOWN':
                shares = int(capital // price)
                if shares == 0:
                    continue
                entry_price = price
                target_price = entry_price * (1 - target_pct)
                stop_price = entry_price * (1 + stop_pct)
                capital += shares * entry_price
                position = -1  # short
                log.append({
                    'Date': row.name,
                    'Symbol': symbol,
                    'Action': 'SHORT SELL',
                    'Price': round(price, 2),
                    'Shares': shares,
                    'Capital Before': round(capital - shares * entry_price, 2),
                    'Capital After': round(capital, 2),
                    'Note': f'Target: {round(target_price,2)}, Stop: {round(stop_price,2)}'
                })

        # EXIT CONDITIONS
        elif position != 0:
            exit_reason = None
            sell_price = None
            cap_before = capital

            if position == 1:  # long
                if price >= target_price:
                    exit_reason = 'SELL (Target Hit)'
                    sell_price = target_price
                elif price <= stop_price:
                    exit_reason = 'SELL (Stop Loss Hit)'
                    sell_price = stop_price
                elif next_date != current_date:
                    exit_reason = 'SELL (End of Day Exit)'
                    sell_price = price

                if exit_reason:
                    capital += shares * sell_price
                    log.append({
                        'Date': row.name,
                        'Symbol': symbol,
                        'Action': exit_reason,
                        'Price': round(sell_price, 2),
                        'Shares': shares,
                        'Capital Before': round(cap_before, 2),
                        'Capital After': round(capital, 2),
                        'Note': f'Exit reason: {exit_reason.split()[1]}'
                    })
                    position = 0
                    shares = 0

            elif position == -1:  # short
                if price <= target_price:
                    exit_reason = 'BUY TO COVER (Target Hit)'
                    sell_price = target_price
                elif price >= stop_price:
                    exit_reason = 'BUY TO COVER (Stop Loss Hit)'
                    sell_price = stop_price
                elif next_date != current_date:
                    exit_reason = 'BUY TO COVER (End of Day Exit)'
                    sell_price = price

                if exit_reason:
                    capital -= shares * sell_price
                    log.append({
                        'Date': row.name,
                        'Symbol': symbol,
                        'Action': exit_reason,
                        'Price': round(sell_price, 2),
                        'Shares': shares,
                        'Capital Before': round(cap_before, 2),
                        'Capital After': round(capital, 2),
                        'Note': f'Exit reason: {exit_reason.split()[2]}'
                    })
                    position = 0
                    shares = 0

    return pd.DataFrame(log), float(capital)


In [None]:
# ----------------------------------------------
# PLOT FUNCTION
# ----------------------------------------------
def plot_trades(df, trade_log, symbol):
    plt.figure(figsize=(14, 6))
    plt.plot(df['Close'], label='Close Price', color='blue')

    buys = trade_log[trade_log['Action'].str.startswith('BUY')]
    sells = trade_log[trade_log['Action'].str.startswith('SELL')]

    plt.scatter(buys['Date'], buys['Price'], marker='^', color='green', label='Buy', s=100)
    plt.scatter(sells['Date'], sells['Price'], marker='v', color='red', label='Sell', s=100)

    plt.title(f'{symbol} - Trade Signals')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(f"plots/{symbol}_trades.png")
    plt.close()

In [None]:
# ----------------------------------------------
# SUMMARY FUNCTION
# ----------------------------------------------
def summarize_trades(trade_log):
    long_entries = trade_log[trade_log['Action'].isin(['BUY'])]
    long_exits = trade_log[trade_log['Action'].str.startswith('SELL')]

    short_entries = trade_log[trade_log['Action'].isin(['SHORT SELL'])]
    short_exits = trade_log[trade_log['Action'].str.startswith('BUY TO COVER')]

    trades = []

    # Pair long entries and exits
    for buy, sell in zip(long_entries.itertuples(), long_exits.itertuples()):
        pnl = (sell.Price - buy.Price) * buy.Shares
        trades.append(pnl)

    # Pair short entries and exits
    for short, cover in zip(short_entries.itertuples(), short_exits.itertuples()):
        pnl = (short.Price - cover.Price) * short.Shares
        trades.append(pnl)

    wins = sum(1 for pnl in trades if pnl > 0)
    losses = sum(1 for pnl in trades if pnl <= 0)
    total_profit = sum(trades)
    total_trades = len(trades)

    print(f'\n📊 Trade Summary:')
    print(f'Total Trades: {total_trades}')
    print(f'Wins: {wins}, Losses: {losses}')
    if total_trades > 0:
        print(f'Win Rate: {wins / total_trades:.2%}')
    print(f'Total Profit/Loss: ₹{total_profit:.2f}')



In [None]:
def get_detailed_trades(trade_log):
    trades = []
    i = 0
    summarize_trades(trade_log)
    while i < len(trade_log) - 1:
        entry = trade_log.iloc[i]
        exit_ = trade_log.iloc[i + 1]

        if (
            (entry['Action'] == 'BUY' and exit_['Action'].startswith('SELL'))
            or
            (entry['Action'] == 'SHORT SELL' and exit_['Action'].startswith('BUY TO COVER'))
        ):
            trade_type = 'LONG' if entry['Action'] == 'BUY' else 'SHORT'
            entry_price = entry['Price']
            exit_price = exit_['Price']
            shares = entry['Shares']
            pnl = (exit_price - entry_price) * shares if trade_type == 'LONG' else (entry_price - exit_price) * shares

            trades.append({
                'Type': trade_type,
                'Entry Time': entry['Date'],
                'Entry Price': entry_price,
                'Exit Time': exit_['Date'],
                'Exit Price': exit_price,
                'Shares': shares,
                'P&L': round(pnl, 2),
                'Exit Reason': exit_['Note']
            })
            i += 2
        else:
            i += 1

    return pd.DataFrame(trades)


In [None]:
# --------------------------------------------------------
# MAIN LOOP
# --------------------------------------------------------
trend_df = get_nifty_trend()
print(trend_df)
all_logs = []
final_capital = starting_capital

for symbol in symbols:
    print(f"Processing: {symbol}")
    df = fetch_and_prepare_data(symbol, start=start_date, end=end_date, interval=inter)
    trade_log, final_capital = generate_trading_log_with_target_stoploss_exit_eod(df, symbol, trend_df, target_pct,stop_pct)


    all_logs.append(trade_log)
    print(f"Completed {symbol}. Capital: ₹{final_capital:.2f}")

get_detailed_trades(trade_log)


[*********************100%***********************]  1 of 1 completed


TypeError: unsupported operand type(s) for -: 'str' and 'int'