# Experiment 1 - Prompt Refinement Loop with Write-Judge

In this notebook we run the Writer-Judge loop, we assume the baseline prompt was already constructed using the `writer_trainer_v1` prompt in the Baseline Prompt construction and EDA notebook.

## Prepare Notebook

In [None]:
import os
import sys
import logging
import warnings
import re

warnings.filterwarnings("ignore")

%load_ext dotenv

FUNDAMENTALS_PATH = os.getenv("FUNDAMENTALS_PATH")
LLM_PROMPTS_PATH = os.getenv("LLM_PROMPTS_PATH")
FUNDAMENTALS_PATH = os.getenv("FUNDAMENTALS_PATH")
HISTORIC_PATH = os.getenv("HISTORIC_PATH")
MACRO_PATH = os.getenv("MACRO_PATH")
OPTIONS_PATH = os.getenv("OPTIONS_PATH")
LLM_OUTPUT_PATH = os.getenv("LLM_OUTPUT_PATH")
LOGS_PATH = os.getenv("LOGS_PATH")
paths = [LLM_OUTPUT_PATH, LOGS_PATH]
for path in paths:
    if path and not os.path.exists(path):
        os.makedirs(path)

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pandas.tseries.offsets import BDay
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from openai import OpenAI
from pydantic import BaseModel

module_path = os.path.abspath(os.path.join(os.getcwd(), 'utils'))
if module_path not in sys.path:
    sys.path.append(module_path)

from data_utils import *
from rl_agent_utils import *


def enum_to_str_representer(dumper, data):
    """Helper function to represent enums as their string values in YAML."""
    return dumper.represent_str(data.value)

yaml.add_representer(Action, enum_to_str_representer)

## Environment and Constants

In [None]:
TARGET = "META"
OPENAI_MODEL = os.getenv("OPENAI_MODEL")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
START_DATE = '2012-01-01'
SPLITDATE = '2018-01-01'
END_DATE = '2020-01-01'
RISK_EXPERIMENT = 'r'
PROMPT_VERSION = 'v2'

WRITER_PROMPT_YML = f'{LLM_PROMPTS_PATH}/writer_generator_v1.yml'
JUDGE_PROMPT_YML = f'{LLM_PROMPTS_PATH}/judge_prompt_v1.yml'

NUM_SAMPLES = 2
SAMPLE_HORIZON = 30  # days
WRITER_PROMPT_YML = f'{LLM_PROMPTS_PATH}/writer_generator_v1.yml'
JUDGE_PROMPT_YML = f'{LLM_PROMPTS_PATH}/judge_prompt_v1.yml'

OPENAI_CLIENT = OpenAI(api_key=OPENAI_API_KEY)



In [None]:
output_file = f"{HISTORIC_PATH}/engineered_{TARGET}_data.parquet"
stock_aug_data = pd.read_parquet(output_file)
stock_aug_data['Date'] = pd.to_datetime(stock_aug_data['Date'], utc=True)
stock_aug_data.set_index('Date', inplace=True)

sample_start_date = pd.Timestamp(START_DATE, tz='UTC')
sample_end_date = pd.Timestamp(END_DATE, tz='UTC')

stock_aug_data = stock_aug_data.loc[sample_start_date:sample_end_date]
stock_aug_data.tail(3)


# Writer Judge Loop

In [None]:
class JudgeResponse(BaseModel):
    features: str
    judge_critique: str


In [None]:
def generate_random_sample_dates(dataframe, num_samples=15):
    timestamps = pd.to_datetime(dataframe.index, utc=True)
    random_dates = np.random.choice(timestamps, size=num_samples, replace=False)
    return pd.to_datetime(random_dates)


## Optimization Loop

Using a cost effective LLM for writing and a rationale superior LLM for judging.
Juding output will be used to distil the former LLM.

In [None]:
FEATURES_SEED = r"""
  Last_Strategy_Used_Data:
    Cumulative_Returns: "{Last_LLM_Strat_Cum_Returns}"
    Peak_Returns: "{Last_LLM_Strat_Best_Returns}"
    Worst_Losses: "{Last_LLM_Strat_Worse_Returns}"
    Rationale: |
       "{Last_LLM_Strat}"

  Stock_Data:
    General:
      Beta: {Market_Beta}
      Classification: {classification}

    Last_Weeks_Price:
      Close: "{Close}"
      Volume: "{Volume}"

    Weekly_Past_Returns: "{Weekly_Past_Returns}"

    Historical_Volatility:
      HV_Close: "{HV_Close}"

    Implied_Volatility:
      IV_Close: "{IV_Close}"

  Fundamental_Data:
    Ratios:
      Current_Ratio: "{Current_Ratio}"
      Quick_Ratio: "{Quick_Ratio}"
      Debt_to_Equity_Ratio: "{Debt_to_Equity_Ratio}"
      PE_Ratio: "{PE_Ratio}"
    Margins:
      Gross_Margin: "{Gross_Margin}"
      Operating_Margin: "{Operating_Margin}"
      Net_Profit_Margin: "{Net_Profit_Margin}"
    Growth Metrics:
      EPS_YoY: "{EPS_YoY_Growth}"
      Net_Income_YoY: "{Net_Income_YoY_Growth}"
      Free_Cash_Flow_YoY: "{Free_Cash_Flow_Per_Share_YoY_Growth}"

  Technical_Analysis:
    Moving_Averages:
      20MA: "{20MA}"
      50MA: "{50MA}"
      200MA: "{200MA}"
    MACD:
      Value: "{MACD}"
      Signal_Line: "{Signal_Line}"
      MACD_Strength: {MACD_Strength}
    RSI:
      Value: "{RSI}"
    ATR: "{ATR}"

  Macro_Data:
    Macro_Indices:
      SPX:
        Close: "{SPX_Close}"
        Close_20MA: "{SPX_Close_MA}"
        Close_Slope: "{SPX_Close_Slope}"
      VIX:
        Close: "{VIX_Close}"
        Close_20MA: "{VIX_Close_MA}"
        Close_Slope: "{VIX_Close_Slope}"
    Economic_Data:
      GDP_QoQ: "{GDP_QoQ}"
      PMI: "{PMI}"
      Consumer_Confidence_QoQ: "{Consumer_Confidence_QoQ}"
      M2_Money_Supply_QoQ: "{M2_Money_Supply_QoQ}"
      PPI_YoY: "{PPI_YoY}"
      Treasury_Yields_YoY: "{Treasury_Yields_YoY}"

  Options_Data:
    Put_IV_Skews:
      OTM_Skew: "{OTM_Skew}"
      ATM_Skew: "{ATM_Skew}"
      ITM_Skew: "{ITM_Skew}"
    20Day_Moving_Averages:
      OTM_Skew_MA: "{MA_OTM_Skew}"
      ATM_Skew_MA: "{MA_ATM_Skew}"
      ITM_Skew_MA: "{MA_ITM_Skew}"
"""

In [None]:
INSTRUCTIONS_SEED = r"""
persona: {persona}
risk_profile: {risk_profile}
portfolio_objectives: {portfolio_objectives}
instructions: |
  Develop a trading strategy for the next month based on the given context and aligned with the specified `portfolio_objectives` and `risk_profile`. Use the following process:

  1. Comprehensive Data Analysis:
     - Stock Data: Examine price trends, volume, and HV/IV metrics for momentum or risk signals.
     - Fundamental Data: Focus on profitability margins and ratios based on risk tolerance.
     - Technical Analysis: Use RSI for overbought/oversold conditions, MAs for trend confirmation, and MACD for momentum analysis.
     - Macro Data: Evaluate GDP, PMI, and VIX trends to assess broader sentiment.
     - Options Data: Prioritize implied volatility metrics to capture sentiment shifts.
     - Dynamic Feature Weighting by Risk Profile:
      - High-Risk Profile:
        - Prioritize volatility and momentum indicators such as RSI, MACD, ATR, and Options_Data.ATM_IV.Call.
        - Weigh macroeconomic indicators (e.g., VIX, GDP_QoQ) for risk-on sentiment.
      - Low-Risk Profile:
        - Focus on stability metrics like Debt-to-Equity Ratio, Operating Margin, and Current Ratio.
        - Analyze implied volatility skews (Options_Data.ITM_Skew) for downside risk mitigation.
        - Use macroeconomic stability indicators such as Consumer Confidence QoQ and Treasury Yields.

  2. Reflection and Iterative Learning:
      - If not None, evaluate `Last_Strategy_Used_Data` for performance gaps and reflect on your chosen `action`.
      - If not None, use CoT reasoning to adjust mismatches between past assumptions and actual market behavior.
      - Weigh the feature importance in your rationale using a Likert scale of 3:
        - level: 1
          description: The feature has minimal relevance or impact; it is not necessary and can be ignored.
        - level: 2
          description: The feature has some relevance and contributes to the strategy but is not crucial.
        - level: 3
          description: The feature is important and significantly contributes to achieving a successful strategy.

Output:
  action: str. LONG or SHORT.
  explanation: >
    A clear, concise rationale (max 350 words) including the top 5 weighted features with the news as a factor used in decision-making (ICL Example: "Stock_Data.Price.Close, Weight 3, Technical_Analysis.RSI.Value, Weight 1, Options_Data.ATM_IV.Call, Weight 2"), and if `news_factors` was provided, the top 3 ranked news factors, weighted the same using the Likert scale (ICL Example: "Earning Call next month, with positive analyst expectations, Weight 3.")
"""


In [None]:
from datetime import timedelta
import yaml
from jinja2 import Template

def flatten_yaml_keys(d, parent_key='', sep='.'):
    items = {}
    for k, v in d.items():
        key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.update(flatten_yaml_keys(v, key, sep=sep))
        else:
            items[key] = v
    return items


def fill_yaml_template(context, template_path):
    with open(template_path, "r") as f:
        raw_template = f.read()
    return Template(raw_template).render(**context)


def call_writer(strategy_context, template_path, model, client):
    prompt_filled = fill_yaml_template(strategy_context, template_path)
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt_filled}],
        temperature=0.7,
        max_tokens=2300,
    )
    return response.choices[0].message.content.strip()


def call_judge(train_template, template_path, model, client, distil=False):
    context = {'train_template': train_template, 'shared_memory': ""}
    prompt_filled = fill_yaml_template(context, template_path)
    response = client.beta.chat.completions.parse(
        model=model,
        messages=[{"role": "user", "content": prompt_filled}],
        temperature=0.7,
        store=distil,
        response_format=JudgeResponse,
        max_tokens=2300,
    )
    parsed = response.choices[0].message.parsed
    return {"Selected Features": parsed.features, "strategy Rationale": parsed.judge_critique}


def backtest_strategy_from_llm(instructions, data_slice, client, model, features_input=None):
    if data_slice.shape[0] < 2 or 'Close' not in data_slice.columns:
        return np.nan
    messages = [{"role": "user", "content": instructions}]
    if features_input:
        messages.append({"role": "user", "content": features_input})
    try:
        response = client.chat.completions.create(
            model=model,
            messages=messages,
            temperature=0.0,
            max_tokens=800,
        )
        llm_reply = response.choices[0].message.content.strip().upper()
    except Exception as e:
        print(f"[!] LLM call failed: {e}")
        return np.nan
    if "SHORT" in llm_reply:
        direction = "SHORT"
    elif "LONG" in llm_reply:
        direction = "LONG"
    else:
        return np.nan
    start_price = data_slice['Close'].iloc[0]
    end_price = data_slice['Close'].iloc[-1]
    return (end_price - start_price) / start_price if direction == "LONG" else (start_price - end_price) / start_price


def run_iterative_writer_judge_loop(
    stock_data,
    sampled_dates,
    writer_prompt_path,
    judge_prompt_path,
    risk_profile,
    portfolio_objectives,
    model,
    client,
    features_seed_yaml,
    sample_horizon=30,
    distil=False,
    regret_threshold=0.01,
    max_iterations=3
):
    results = []
    features_seed_dict = yaml.safe_load(features_seed_yaml)
    features_reranked_dict = {k: 2 for k in flatten_yaml_keys(features_seed_dict)}

    for dt in sampled_dates:
        try:
            start_dt = pd.to_datetime(dt)
            end_dt = start_dt + timedelta(days=sample_horizon)
            sliced_data = stock_data.loc[start_dt:end_dt]
            previous_critiques = ""
            regret = float("inf")
            iteration = 0
            best_return = -np.inf
            best_instruction = None
            best_features = None
            best_critique = None

            while regret > regret_threshold and iteration < max_iterations:
                context = {
                    'Features_Reranked': features_reranked_dict,
                    'judge_critique': previous_critiques if previous_critiques else "",
                    'risk_profile': risk_profile,
                    'portfolio_objectives': portfolio_objectives
                }

                instructions = call_writer(context, writer_prompt_path, model, client)
                realized_return = backtest_strategy_from_llm(
                    instructions=instructions,
                    data_slice=sliced_data,
                    client=client,
                    model=model,
                    features_input=features_seed_yaml
                )

                if not np.isnan(realized_return) and realized_return > best_return:
                    best_return = realized_return
                    best_instruction = instructions

                regret = abs(0.0 - realized_return)
                if regret <= regret_threshold:
                    break

                judge_feedback = call_judge(instructions, judge_prompt_path, model, client, distil)
                previous_critiques += "\n" + judge_feedback["strategy Rationale"]
                best_features = judge_feedback["Selected Features"]
                best_critique = judge_feedback["strategy Rationale"]
                iteration += 1

            results.append({
                'sample_start': start_dt,
                'sample_end': end_dt,
                'strategy_text': best_instruction,
                'selected_features': best_features,
                'judge_explanation': best_critique,
                'return': best_return,
                'regret': regret,
                'iterations': iteration
            })

        except Exception as e:
            print(f"[!] Error processing date {dt}: {e}")
            continue

    return pd.DataFrame(results)


In [None]:
sampled_dates = generate_random_sample_dates(stock_aug_data, NUM_SAMPLES)

final_df = run_iterative_writer_judge_loop(
    stock_data=stock_aug_data,
    sampled_dates=sampled_dates,
    writer_prompt_path=WRITER_PROMPT_YML,
    judge_prompt_path=JUDGE_PROMPT_YML,
    risk_profile=HIGH_RISK_PROFILE,
    portfolio_objectives=HIGH_OBJECTIVES,
    model=OPENAI_MODEL,
    client=OPENAI_CLIENT,
    features_seed_yaml=FEATURES_SEED,
)


In [None]:
final_df.head(1)
