In [19]:
import os
import sys
import warnings

sys.path.append("..")

In [21]:
import gymnasium as Env
import numpy as np
import pandas as pd
import ray
import torch

from tqdm import trange
from copy import deepcopy
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.models import ModelCatalog

from ray.tune.registry import register_env

In [24]:
from ray.rllib.utils.metrics import (
    ENV_RUNNER_RESULTS,
    EPISODE_RETURN_MEAN,
    EPISODE_LEN_MEAN,
)

In [6]:
%load_ext autoreload
%autoreload 2

from src.environments.fx_environment import FxTradingEnv
from src.models.fx_model import FXModel

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


Todo 

- Select hourly, minutely or daily data
- Truncation in days
- Debug code
- Add features

- Write tests:

1. Unknown currency in initial portfolio
2. Missing currency in initial portfolio
3. Portfolio value in base
4. Portfolio weights == 1
5. reset works
6. etc

In [7]:
current_porfolio = {
    "usd": 100_000,
    "eur": 100_000,
    "jpy": 100_000,
    # "sgd": 100_000,
}

In [8]:
historical_data = pd.read_parquet("../data/FX_data.parquet.gzip")

In [9]:
env_config = {
    "historical_prices": historical_data[["eurjpy", "eurusd", "usdjpy"]],
    "initial_portfolio": current_porfolio,
    "start_datetime": pd.Timestamp("2011-01-03 09:00:00"),
}

In [10]:
def env_creator(env_config):
    """
    Create env
    """
    fx_env = FxTradingEnv(**env_config)
    fx_env.preprocess_data()
    return fx_env

In [11]:
register_env("fx_trading_env", env_creator)
ModelCatalog.register_custom_model("fx_model", FXModel)

In [12]:
ray.init(
    ignore_reinit_error=True,
    runtime_env={
        "working_dir": os.path.dirname(os.path.abspath(".")),
        "py_modules": [os.path.abspath(".")],
        "excludes": ["*.pyc", "__pycache__", "*.parquet.gzip", "data/", "notebooks/"],
    },
)

2026-02-06 10:38:21,503	INFO worker.py:2023 -- Started a local Ray instance.
2026-02-06 10:38:21,577	INFO packaging.py:588 -- Creating a file package for local module 'C:\Users\Ivan\rl_trading\notebooks'.
2026-02-06 10:38:21,602	INFO packaging.py:380 -- Pushing file package 'gcs://_ray_pkg_bef089bc5f91c955.zip' (0.20MiB) to Ray cluster...
2026-02-06 10:38:21,609	INFO packaging.py:393 -- Successfully pushed file package 'gcs://_ray_pkg_bef089bc5f91c955.zip'.
2026-02-06 10:38:23,308	INFO packaging.py:588 -- Creating a file package for local module 'C:\Users\Ivan\rl_trading'.
2026-02-06 10:38:25,362	INFO packaging.py:380 -- Pushing file package 'gcs://_ray_pkg_00932b458b147865.zip' (97.59MiB) to Ray cluster...
2026-02-06 10:38:26,178	INFO packaging.py:393 -- Successfully pushed file package 'gcs://_ray_pkg_00932b458b147865.zip'.


0,1
Python version:,3.12.10
Ray version:,2.52.1


[36m(pid=gcs_server)[0m [2026-02-06 10:38:47,720 E 5968 10912] (gcs_server.exe) gcs_server.cc:303: Failed to establish connection to the event+metrics exporter agent. Events and metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14
[33m(raylet)[0m [2026-02-06 10:38:55,543 E 6896 9108] (raylet.exe) main.cc:979: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14


In [14]:
suggested_workers = 1

if torch.cuda.is_available():
    num_gpus = torch.cuda.device_count()
else:
    num_gpus = 0

In [15]:
config = PPOConfig()

config = (
    config.environment(
        env="fx_trading_env",
        env_config=env_config,
    )
    .framework("torch")
    .training(
        model={"custom_model": "fx_model"},
        lr=1e-3,
        train_batch_size=2048,
    )
    # .debugging(log_level="ERROR")
    .resources(num_gpus=num_gpus)
    .env_runners(
        num_cpus_per_env_runner=1,
        num_env_runners=suggested_workers,
        rollout_fragment_length="auto",
    )
    .api_stack(
        enable_rl_module_and_learner=False, enable_env_runner_and_connector_v2=False
    )
)

In [16]:
trainer = config.build_algo()

`UnifiedLogger` will be removed in Ray 2.7.
  return UnifiedLogger(config, logdir, loggers=None)
The `JsonLogger interface is deprecated in favor of the `ray.tune.json.JsonLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
The `CSVLogger interface is deprecated in favor of the `ray.tune.csv.CSVLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
The `TBXLogger interface is deprecated in favor of the `ray.tune.tensorboardx.TBXLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
2026-02-06 10:39:08,980	INFO trainable.py:161 -- Trainable.setup took 31.054 seconds. If your trainable is slow to initialize, consider setting reuse_actors=True to reduce actor creation overheads.


In [17]:
num_episodes = 1

In [22]:
# warnings.filterwarnings('ignore')
episode_lengths = []
episode_rewards = []

for i in trange(num_episodes):

    result = trainer.train()

    average_episode_reward = np.mean(
        result[ENV_RUNNER_RESULTS]["hist_stats"]["episode_reward"]
    )
    average_episode_length = np.mean(
        result[ENV_RUNNER_RESULTS]["hist_stats"]["episode_lengths"]
    )

    episode_lengths.append(average_episode_length)
    episode_rewards.append(average_episode_reward)

100%|████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:32<00:00, 32.47s/it]
