In [None]:
from google import genai
from google.genai import types
import pandas as pd

In [None]:
def load_and_prepare_data(filepath):
    df = pd.read_csv(filepath)
    df.drop(columns=['Datetime'], inplace=True)
    FEATURES = df.columns.tolist()
    df = df[FEATURES]
    return df, FEATURES

In [None]:
import torch
import numpy as np
from model import CTTS

df, features = load_and_prepare_data("data\Data_RELIANCE.NS.csv")

In [None]:
import os
import torch
import numpy as np
from model import CTTS 
from sklearn.preprocessing import MinMaxScaler
import warnings
warnings.filterwarnings("ignore")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def load_and_prepare_data(filepath):
    df = pd.read_csv(filepath)
    df.drop(columns=['Datetime'], inplace=True)
    FEATURES = df.columns.tolist()
    df = df[FEATURES]
    return df, FEATURES

def normalize_data(df):
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(df.astype(float))
    return scaled_data, scaler

def load_model(path, input_dim):
    model = CTTS(input_dim=input_dim).to(device)
    model.load_state_dict(torch.load(path))
    model.eval()
    return model

def predict_price(ticker, steps_ahead=1, seq_length=300, model_dir="models", data_dir="data"):
    print(f"\nPredicting {steps_ahead} future step(s) for: {ticker}")

    filepath = os.path.join(data_dir, f"Data_{ticker}.csv")
    model_path = os.path.join(model_dir, f"model.pth") # "model_{ticker}.pth"

    try:
        # Load and normalize data
        df, features = load_and_prepare_data(filepath)
        data, scaler = normalize_data(df)

        if len(data) < seq_length:
            raise ValueError("Not enough data to form a prediction window.")

        input_dim = len(features)
        model = load_model(model_path, input_dim=input_dim)

        # Start with the last known sequence
        last_sequence = data[-seq_length:]
        model_input = torch.tensor(last_sequence, dtype=torch.float32).unsqueeze(0).to(device)

        predictions = []

        model.eval()
        for _ in range(steps_ahead):
            with torch.no_grad():
                output = model(model_input).cpu().numpy()[0][0]

            predictions.append(output)

            # Prepare next input by sliding the window and adding new predicted value
            last_sequence = np.vstack([last_sequence[1:], last_sequence[-1]])  # copy last row
            last_sequence[-1, 0] = output  # update target feature only
            model_input = torch.tensor(last_sequence, dtype=torch.float32).unsqueeze(0).to(device)

        # Inverse transform predictions
        dummy_array = np.zeros((steps_ahead, len(features)))
        dummy_array[:, 0] = predictions
        future_actuals = scaler.inverse_transform(dummy_array)[:, 0]

        print(f"\n Future Predictions ({ticker}):")
        for i, value in enumerate(future_actuals, 1):
            print(f"Step {i}: {value:.2f}")

        return future_actuals

    except Exception as e:
        print(f" Future prediction failed for {ticker}: {e}")
        return None

In [None]:
predict_price("RELIANCE.NS", steps_ahead=1)

In [None]:
def get_previous_prices(ticker, steps_ahead):
    data = pd.read_csv("data/Data_" + ticker + ".csv")
    return data.head(steps_ahead)['Close'].values

def get_future_prices(ticker, steps_ahead):
    return predict_price(ticker=ticker, steps_ahead=steps_ahead, seq_length=300)


In [58]:
from google import genai
from google.genai import types

# Function schema definition remains unchanged
predict_price_function = {
    "name": "predict_price",
    "description": "Predicts the stock price after a given number of timestamps using a trained CNN+Transformer (CTTS) model.",
    "parameters": {
        "type": "object",
        "properties": {
            "ticker": {
                "type": "string",
                "description": "The stock ticker symbol (e.g., 'RELIANCE.NS')"
            },
            "seq_length": {
                "type": "integer",
                "description": "The sequence length to use for forecasting"
            }
        },
        "required": ["ticker"]
    }
}

# Placeholder functions for price data (to be implemented based on your data source)
def get_previous_prices(ticker, steps):
    return 100.0  # Replace with actual price fetching logic

def get_future_prices(ticker, steps):
    return 105.0  # Replace with actual prediction logic

def simulate_stock(steps_ahead, tickers, balance=10000, model_dir="models", data_dir="data", sim_steps=15):
    if len(tickers) > 3:
        raise ValueError("Our bot only supports up to 3 tickers")
    
    # Initialize Gemini client
    gemini_api_key = "AIzaSyC8TrtizS97mIfgFasT0UQV2OkqROIgX8A"
    client = genai.Client(api_key=gemini_api_key)

    # Set up Gemini tool and config
    tools = types.Tool(function_declarations=[predict_price_function])
    config = types.GenerateContentConfig(tools=[tools])

    # Initialize portfolio variables
    ticker_map = {ticker: i for i, ticker in enumerate(tickers)}
    shares = [0] * len(tickers)
    short_positions = [0] * len(tickers)
    current_balance = balance

    def buy_stock(ticker, number_of_shares, price):
        nonlocal current_balance
        if number_of_shares <= 0:
            print("Cannot buy 0 or fewer shares, something is wrong")
            return

        total_cost = number_of_shares * price
        if total_cost > current_balance:
            print("Not enough balance, something is wrong")
            return

        idx = ticker_map[ticker]
        if short_positions[idx] > 0:
            if number_of_shares >= short_positions[idx]:
                covering = short_positions[idx]
                short_positions[idx] = 0
                number_of_shares -= covering
                current_balance -= covering * price
                print(f"{covering} shorted shares of {ticker} were covered at Rs.{price:.2f}")
            else:
                short_positions[idx] -= number_of_shares
                print(f"{number_of_shares} shorted shares of {ticker} were partially covered at Rs.{price:.2f}")
                current_balance -= total_cost
                return

        shares[idx] += number_of_shares
        current_balance -= total_cost
        print(f"{number_of_shares} shares of {ticker} were bought at Rs.{price:.2f}")

    def sell_stock(ticker, number_of_shares, price):
        nonlocal current_balance
        if number_of_shares <= 0:
            print("Cannot sell 0 or fewer shares, something is wrong")
            return

        idx = ticker_map[ticker]
        owned = shares[idx]

        if owned >= number_of_shares:
            shares[idx] -= number_of_shares
            current_balance += number_of_shares * price
            print(f"{number_of_shares} shares of {ticker} were sold at Rs.{price:.2f}")
        else:
            print(f"Not enough owned shares of {ticker}, initiating short for the remaining")
            if owned > 0:
                shares[idx] = 0
                current_balance += owned * price
                print(f"{owned} shares of {ticker} were sold at Rs.{price:.2f}")
            
            short_shares = number_of_shares - owned
            short_stock(ticker, short_shares, price)

    def short_stock(ticker, number_of_shares, price):
        nonlocal current_balance
        if number_of_shares <= 0:
            print("Cannot short 0 or fewer shares, something is wrong")
            return

        total_proceeds = number_of_shares * price
        current_balance += total_proceeds
        short_positions[ticker_map[ticker]] += number_of_shares
        print(f"{number_of_shares} shares of {ticker} were shorted at Rs.{price:.2f}")

    previous = [get_previous_prices(ticker, steps_ahead) for ticker in tickers]
    results = [get_future_prices(ticker, steps_ahead + sim_steps) for ticker in tickers]
    
    sys_prompt = f"Predict the price of {', '.join(tickers)} in the next {steps_ahead} minutes."

    def recur_gemini(previous, steps_ahead, amounts, short_positions, results, tickers):
        nonlocal current_balance
        try:
            resp = client.models.generate_content(
                model="gemini-2.0-flash",
                contents=sys_prompt,
                config=config,
            )
            function_call = resp.candidates[0].content.parts[0].function_call
            print(f"\n📍 Function to call: {function_call.name}")
            print(f"📦 Arguments: {function_call.args}")

            args = function_call.args
            if "seq_length" not in args:
                args["seq_length"] = 300

            trader_prompt = f"""You are an expert intraday trader, your quantitative trading assistent has reported that the
            price of 3 stocks, {tickers} which was previously {previous} for the previous {steps_ahead} minutes and predicted to be {results} for the next {steps_ahead} minutes. 
            You already have {amounts} of the stocks respectively in your portfolio. You have short positions of {short_positions} for the stocks respectively in your portfolio.
            You have {current_balance} rupees remaining in your account that you are allowed to spend further.
            Looking at the entire data, You have to chose to either Hold, Buy, Sell or Short one or more of the stocks, in order to maximize your profit.
            Only return the following: 1: Stock name, 2: Your choice of Hold, Buy, Sell, Short 3: Number of stocks bought, sold or shorted. 3: One line explaination of your decision, all in a python list format. You can return multiple actions as List of Lists Don't return anything else.
            Sample output: 
            [["RELIANCE.NS", "buy", 5, "The stock price is predicted to increase significantly over the next 120 minutes, presenting a buying opportunity."], ["INFY.NS", "sell", 4, "The stock price is predicted to decrease over the next 120 minutes."]]
            """
            
            trader_response = client.models.generate_content(
                model="gemini-2.0-flash-lite",
                contents=trader_prompt
            )
            responses = trader_response.candidates[0].content.parts[0].text
            responses = list(responses)
            for response in responses:
                if response[1].lower() == "buy":
                    buy_stock(response[0], response[2], results[ticker_map[response[0]]])
                elif response[1].lower() == "sell":
                    sell_stock(response[0], response[2], results[ticker_map[response[0]]])
                elif response[1].lower() == "short":
                    short_stock(response[0], response[2], results[ticker_map[response[0]]])
                print(f"\nReasoning: {response[-1]}")
                print(f"Available balance: {current_balance}")
                print(f"Number of stocks: {amounts}")
            
        except (AttributeError, IndexError) as e:
            print(f"❌ Error processing Gemini response: {str(e)}")
            print("Gemini response:", resp)
            return
        except Exception as e:
            print(f"❌ Unexpected error: {str(e)}")
            return

    remaining_steps = sim_steps
    while remaining_steps > 0:
        recur_gemini(previous, steps_ahead, shares, short_positions, results, tickers)
        remaining_steps -= 1

    return current_balance, shares, short_positions

# Example usage
if __name__ == "__main__":
    tickers = ["RELIANCE.NS", "INFY.NS", "TCS.NS"]
    final_balance, final_shares, final_shorts = simulate_stock(120, tickers)
    print(f"\nFinal balance: {final_balance}")
    print(f"Final shares: {final_shares}")
    print(f"Final short positions: {final_shorts}")


📍 Function to call: predict_price
📦 Arguments: {'ticker': 'RELIANCE.NS', 'seq_length': 120}
❌ Error processing Gemini response: string index out of range
Gemini response: candidates=[Candidate(content=Content(parts=[Part(video_metadata=None, thought=None, code_execution_result=None, executable_code=None, file_data=None, function_call=FunctionCall(id=None, args={'ticker': 'RELIANCE.NS', 'seq_length': 120}, name='predict_price'), function_response=None, inline_data=None, text=None), Part(video_metadata=None, thought=None, code_execution_result=None, executable_code=None, file_data=None, function_call=FunctionCall(id=None, args={'seq_length': 120, 'ticker': 'INFY.NS'}, name='predict_price'), function_response=None, inline_data=None, text=None), Part(video_metadata=None, thought=None, code_execution_result=None, executable_code=None, file_data=None, function_call=FunctionCall(id=None, args={'seq_length': 120, 'ticker': 'TCS.NS'}, name='predict_price'), function_response=None, inline_data=

KeyboardInterrupt: 

In [None]:
steps_ahead = 120
tickers = ["RELIANCE.NS", "INFY.NS", "ITC.NS"]
simulate_stock(steps_ahead ,tickers, balance=10000, model_dir="models", data_dir="data", sim_steps = 15)