### Imports
The *crypto_env* and *gym_anytrading* imports are needed to create gymnasium environments.

In [None]:
import crypto_env

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import gymnasium as gym
import gym_anytrading
import quantstats as qs

from stable_baselines3 import A2C, PPO
from stable_baselines3.common.callbacks import BaseCallback
from tqdm import tqdm

### Defining the DataFrame

In [None]:
dataset_path = "data/stocks/aapl.csv"
dataset_type = "stocks-v0"  # "stocks-v0", "forex-v0", "crypto-v0"

df = pd.read_csv(
    dataset_path,
    header=0,
    parse_dates=["Date"],
    index_col="Date",
)
df.head()

### Creating the environment

In [None]:
seed = 69
total_num_episodes = 50
total_learning_timesteps = 100_000
window_size = 15

env = gym.make(
    dataset_type,
    df=df,
    window_size=window_size,
    frame_bound=(window_size, len(df)),
)

# Matplotlib
plot_settings = {}
plot_data = {"x": [i for i in range(1, total_num_episodes + 1)]}

### Defining utility functions

In [None]:
def train_test_model(model, gym_env):
    obs = gym_env.reset(seed=seed)
    vec_env = None

    if model is not None:
        model.learn(total_timesteps=total_learning_timesteps, callback=ProgressBar(100))
        # model.learn(total_timesteps=total_learning_timesteps, progress_bar=True)
        # ImportError: You must install tqdm and rich in order to use the progress bar callback.
        # It is included if you install stable-baselines with the extra packages: `pip install stable-baselines3[extra]`

        vec_env = model.get_env()
        obs = vec_env.reset()

    reward_over_episodes = []
    tbar = tqdm(range(total_num_episodes))

    for episode in tbar:
        if vec_env:
            obs = vec_env.reset()
        else:
            obs, info = gym_env.reset()

        total_reward = 0
        done = False

        while not done:
            if model is not None:
                action, _states = model.predict(obs)
                obs, current_reward, done, info = vec_env.step(action)
            else:  # random
                action = gym_env.action_space.sample()
                obs, current_reward, terminated, truncated, info = gym_env.step(action)
                done = terminated or truncated

            total_reward += current_reward

        reward_over_episodes.append(total_reward)
        if episode % 10 == 0:
            avg_reward = np.mean(reward_over_episodes)
            tbar.set_description(f"Episode: {episode}, Avg. Reward: {avg_reward:.3f}")
            tbar.update()

    tbar.close()
    return reward_over_episodes


def get_results(reward_over_episodes, model_name, print_results=False):
    avg_reward = np.mean(reward_over_episodes)
    min_reward = np.min(reward_over_episodes)
    max_reward = np.max(reward_over_episodes)

    if print_results:
        print(f"\nResults for {model_name} model:")
        print(f"Minimum reward: {min_reward:.3f}")
        print(f"Maximum reward: {max_reward:.3f}")
        print(f"Average reward: {avg_reward:.3f}\n")

    return min_reward, max_reward, avg_reward


# Progress bar for model.learn()
class ProgressBar(BaseCallback):
    def __init__(self, check_freq: int, verbose: int = 1):
        super().__init__(verbose)
        self.check_freq = check_freq

    def _on_training_start(self) -> None:
        self.progress_bar = tqdm(
            total=self.model._total_timesteps, desc="model.learn()"
        )

    def _on_step(self) -> bool:
        if self.n_calls % self.check_freq == 0:
            self.progress_bar.update(self.check_freq)
        return True

    def _on_training_end(self) -> None:
        self.progress_bar.close()

### Training and testing the model

1. With Advantage Actor-Critic algorithm
2. With Proximal Policy Optimization algorithm
3. With random actions

In [None]:
def cose(model_name):
    print(f"Training {model_name} model…")

    model = None
    if model_name == "A2C":
        model = A2C("MlpPolicy", env)
    elif model_name == "PPO":
        model = PPO("MlpPolicy", env)

    rewards = train_test_model(model, env)
    _, _, avg_res = get_results(rewards, model_name, print_results=True)

    plot_data[f"{model_name}_rewards"] = rewards
    plot_settings[f"{model_name}_rewards"] = {"label": model_name}


cose("A2C")
cose("PPO")
cose("random")

### Plotting the results

In [None]:
data = pd.DataFrame(plot_data)
plt.figure(figsize=(12, 6))
for key in plot_data:
    if key == "x":
        continue
    line = plt.plot("x", key, data=data, linewidth=1, label=plot_settings[key]["label"])

plt.xlabel("episode")
plt.ylabel("reward")
plt.title("Random vs Agents")
plt.legend()
plt.show()