# Config
---

In [None]:
import json
import os

config_path = "./config/train.json"
if not os.path.exists(config_path):
  raise FileNotFoundError(f"Config file not found: {config_path}")

with open(config_path, "r") as f:
  try:
    config = json.load(f)
  except json.JSONDecodeError as e:
    raise ValueError(f"Error parsing config file: {e}")

# Model Class
---

In [None]:
import torch
import torch.nn as nn
import os

class DuelingDQN(nn.Module):
  def __init__(self, input_size, hidden_size=config["hidden_size"], output_size=2, num_layers=config["num_layers"]):
    super(DuelingDQN, self).__init__()
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.lstm = nn.LSTM(
      input_size=input_size,
      hidden_size=hidden_size,
      num_layers=num_layers,
      batch_first=True,
      dropout=config["dropout_rate"],
      bidirectional=False
    )
    self.ln = nn.LayerNorm(hidden_size)
    self.attention = nn.MultiheadAttention(hidden_size, num_heads=config["attention_heads"], batch_first=True)
    self.value_stream = nn.Sequential(
      nn.Linear(hidden_size, 128),
      nn.ReLU(),
      nn.Dropout(0.3),
      nn.Linear(128, 64),
      nn.ReLU(),
      nn.Linear(64, 1)
    )
    self.advantage_stream = nn.Sequential(
      nn.Linear(hidden_size, 128),
      nn.ReLU(),
      nn.Dropout(0.3),
      nn.Linear(128, 64),
      nn.ReLU(),
      nn.Linear(64, output_size)
    )
    self.apply(self._InitWeights)


### Initialize Model Weights

In [None]:
def modelInitWeights(self, module):
  if isinstance(module, nn.Linear):
    nn.init.xavier_uniform_(module.weight)
    nn.init.constant_(module.bias, 0)
  elif isinstance(module, nn.LSTM):
    for name, param in module.named_parameters():
      if 'weight' in name:
        nn.init.xavier_uniform_(param)
      elif 'bias' in name:
        nn.init.constant_(param, 0)

### Model Forward Pass

In [None]:
def modelforward(self, x):
  batch_size = x.size(0)

  # LSTM processing
  lstm_out, _ = self.lstm(x)
  lstm_out = self.ln(lstm_out)

  # Self-attention
  attended_out, _ = self.attention(lstm_out, lstm_out, lstm_out)

  # Use the last time step
  final_hidden = attended_out[:, -1, :]

  # Dueling streams
  value = self.value_stream(final_hidden)
  advantage = self.advantage_stream(final_hidden)

  # Combine value and advantage
  q_values = value + advantage - advantage.mean(dim=1, keepdim=True)

  return q_values

### Export Model

In [None]:
def modelSave(self, file_path):
  os.makedirs(os.path.dirname(file_path), exist_ok=True)
  torch.save({
      'model_state_dict': self.state_dict(),
      'input_size': self.input_size,
      'hidden_size': self.hidden_size
  }, file_path)

### Model Method Linking

In [None]:
DuelingDQN.forwardpass = modelforward
DuelingDQN._InitWeights = modelInitWeights
DuelingDQN.save = modelSave

# Environment Class
---

In [None]:
import numpy as np

class MetaFilterEnvironment:
  def __init__(self, drawdown_series: np.ndarray, trade_log, window_size=config["window_size"]):
    self.drawdown = drawdown_series.astype(np.float32)
    self.trade_log = trade_log
    self.window_size = window_size
    self.total_steps = len(drawdown_series)
    self.current_step = window_size
    self.strategy_active = False
    self.state_history = []
    self.activity_flags = np.zeros(self.total_steps, dtype=bool)
    self.native_equity_curve = []
    self.ai_equity_curve = []
    self.switch_count = 0
    self.last_action = 0
    self.consecutive_same_action = 0
    self.total_native_pnl = 0.0
    self.total_ai_pnl = 0.0
    self.max_drawdown_native = 0.0
    self.max_drawdown_ai = 0.0
    self.volatility_penalty = 0.0
    self.pnl_values = self._extract_pnl_values(trade_log)
    self.additional_features = self._calculateFeatures()

### Step Forward in Environment

In [None]:
def envforward(self, action: int):
  if action == self.last_action:
    self.consecutive_same_action += 1
  else:
    self.consecutive_same_action = 0
    if self.current_step > self.window_size:
      self.switch_count += 1
  self.last_action = action

  if action == 1:
    self.strategy_active = True
  elif action == 0:
    self.strategy_active = False

  self.activity_flags[self.current_step] = self.strategy_active

  current_pnl = self.pnl_values[self.current_step]
  self.total_native_pnl += current_pnl
  self.native_equity_curve.append(self.total_native_pnl)

  if self.strategy_active:
    self.total_ai_pnl += current_pnl
  self.ai_equity_curve.append(self.total_ai_pnl)

  if len(self.native_equity_curve) > 1:
    peak_native = max(self.native_equity_curve)
    current_dd_native = (peak_native - self.total_native_pnl) / max(abs(peak_native), 1)
    self.max_drawdown_native = max(self.max_drawdown_native, current_dd_native)

  if len(self.ai_equity_curve) > 1:
    peak_ai = max(self.ai_equity_curve)
    current_dd_ai = (peak_ai - self.ai_pnl) / max(abs(peak_ai), 1)
    self.max_drawdown_ai = max(self.max_drawdown_ai, current_dd_ai)

  reward = self._calculateReward(current_pnl, action)
  done = self.current_step >= self.total_steps - 1
  self.current_step += 1
  next_state = self._get_enhanced_state() if not done else self._get_enhanced_state()

  return next_state, reward, done

### Reset Environment

In [None]:
def envreset(self):
  self.current_step = self.window_size
  self.strategy_active = False
  self.activity_flags = np.zeros(self.total_steps, dtype=bool)
  self.native_equity_curve = []
  self.ai_equity_curve = []
  self.switch_count = 0
  self.last_action = 0
  self.consecutive_same_action = 0
  self.total_native_pnl = 0.0
  self.total_ai_pnl = 0.0
  self.max_drawdown_native = 0.0
  self.max_drawdown_ai = 0.0
  self.volatility_penalty = 0.0
  self.state_history = []

### Calculate Reward

In [None]:
def envcalculateReward(self):
    pass

### Calculate Features

In [None]:
def envcalculateFeatures(self):
    pass

### Environment Method Linking

In [None]:
MetaFilterEnvironment.forward = envforward
MetaFilterEnvironment.reset = envreset
MetaFilterEnvironment._calculateReward = envcalculateReward
MetaFilterEnvironment._calculateFeatures = envcalculateFeatures

# Agent Class
---

In [None]:
class MetaFilterAgent:
  def __init__(self, drawdown_data, trade_log, model_path="", window_size=config["window_size"]):
    pass

### Model Forward Pass Method

In [None]:
def get_action(self, state, training=True):
    pass

### Train Single Episode Method

In [None]:
def train_episode(self):
    pass

### Train All Episodes Method

In [None]:
def train(self, episodes=config["episodes"], eval_every=config["eval_every"]):
  pass

### Test the Model

In [None]:
def test(self):
    pass

### Evaluate Episode

In [None]:
def evaluate_episode(self):
  pass

### Save Model

In [None]:
def save_model(self, path):
  pass

### Agent Method Linking

In [None]:
MetaFilterAgent.get_action = get_action
MetaFilterAgent.train_episode = train_episode
MetaFilterAgent.train = train
MetaFilterAgent.test = test
MetaFilterAgent.evaluate_episode = evaluate_episode