In [2]:
import gym
import numpy as np
from gym import spaces
import pandas as pd
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
import torch
import torch.nn as nn

class BitcoinTradingEnv(gym.Env):
    def __init__(self, df, lookback_window_size=50):
        super(BitcoinTradingEnv, self).__init__()

        self.df = df.dropna().reset_index(drop=True)
        self.lookback_window_size = lookback_window_size
        self.initial_balance = 10000  # Starting balance 

        # Actions of the format Buy 1/10, Sell 1/10, Hold.
        self.action_space = spaces.Discrete(3)  # Updated this line

        # Observes the price and volume
        self.observation_space = spaces.Box(low=0, high=np.inf, shape=(lookback_window_size, 2))

        self.reward_range = (0, 10000) # Update according to your requirement

    def reset(self):
        self.balance = self.initial_balance
        self.net_worth = self.initial_balance
        self.btc_held = 0

        self.current_step = 0

        self.start = np.random.randint(0, len(self.df) - self.lookback_window_size)

        return self._next_observation()

    def step(self, action):
        self.current_step += 1

        if action == 0:  # Buy
            btc_bought = self.balance / self.df.loc[self.current_step, 'price']
            self.btc_held += btc_bought
            self.balance -= self.df.loc[self.current_step, 'price'] * btc_bought

        elif action == 1:  # Sell
            btc_sold = self.btc_held
            self.balance += self.df.loc[self.current_step, 'price'] * btc_sold
            self.btc_held -= btc_sold

        self.net_worth = self.balance + self.df.loc[self.current_step, 'price'] * self.btc_held

        if self.net_worth <= 0:
            return self._next_observation(), -10000, True, {}

        # Update this line
        return self._next_observation(), max(0, self.balance - self.initial_balance), False, {}

    def _next_observation(self):
        end = self.start + self.current_step
        obs = self.df.loc[end-self.lookback_window_size+1:end, ['price', 'total_volume']].to_numpy()

        return obs

    def render(self):
        print(f'Step: {self.current_step}, Balance: {self.balance}, Bitcoins held: {self.btc_held}, Net worth: {self.net_worth}')

# Initialize environment with training data
btc_data = pd.read_csv('C:/Users/user/Desktop/btc-usd-max.csv', parse_dates=['snapped_at'], index_col='snapped_at')
env = DummyVecEnv([lambda: BitcoinTradingEnv(df=btc_data)])

# Create and train the agent
model = PPO("MlpPolicy", env, verbose=1, device='cpu')
model.learn(total_timesteps=5000)

# Initialize environment with testing data
test_data = pd.read_csv('C:/Users/user/Desktop/btc-usd-max.csv', parse_dates=['snapped_at'], index_col='snapped_at')
test_env = DummyVecEnv([lambda: BitcoinTradingEnv(df=test_data)])

obs = test_env.reset()
for _ in range(len(test_data)):
    action, _ = model.predict(obs)
    obs, reward, done, info = test_env.step(action)
    test_env.render()

# You'll have to implement the 'get_trades' method if you want to visualize the trades.


ModuleNotFoundError: No module named 'stable_baselines3'

In [2]:
env = DummyVecEnv([lambda: BitcoinTradingEnv(df=btc_data)])

# Create and train the agent
model = PPO("MlpPolicy", env, verbose=1, device='cpu')

model.learn(total_timesteps=5000)


test_data = pd.read_csv('C:/Users/user/Desktop/btc-usd-max.csv', parse_dates=['snapped_at'], index_col='snapped_at')
test_env = DummyVecEnv([lambda: BitcoinTradingEnv(df=test_data)])

obs = test_env.reset()
for _ in range(len(test_data)):
    action, _ = model.predict(obs)
    obs, reward, done, info = test_env.step(action)
    test_env.render()

NameError: name 'DummyVecEnv' is not defined

In [None]:
import matplotlib.pyplot as plt

# Prepare the data
trades = pd.DataFrame(model.env.envs[0].env_method('get_trades'), columns=['portfolio_value', 'trade_type'])
trades['price'] = test_data['price'][:len(trades)]

# Plot the price and trades over time
plt.figure(figsize=(12,6))
plt.plot(trades['price'], label='Price')

buy_signals = trades[trades['trade_type'] == 'Buy']
sell_signals = trades[trades['trade_type'] == 'Sell']

plt.plot(buy_signals['price'], 'go', markersize=10, label='Buy')
plt.plot(sell_signals['price'], 'ro', markersize=10, label='Sell')

plt.title('Agent trading behavior')
plt.legend()
plt.show()