In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
from collections import deque
import random
from backtesting import Backtest, Strategy
from backtesting.lib import crossover
import os
import glob

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [8]:
# Define the DQN Network
class DQNetwork(nn.Module):
    def __init__(self, input_dim, action_dim):
        super(DQNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        return self.fc4(x)

# Define a simple stock trading environment
class TradingEnv:
    def __init__(self, data_list):
        self.data_list = data_list
        self.initial_balance = 10000  # Define initial balance here
        self.reset_stock()  # Call reset_stock after defining initial_balance

    def reset_stock(self):
        self.current_data = random.choice(self.data_list)
        self.current_step = 0
        self.balance = self.initial_balance  # Now this works because initial_balance is defined
        self.position = 0
        self.total_value = self.initial_balance  # Reset total value
        return self.get_state()

    def reset(self):
        return self.reset_stock()

    def step(self, action):
        if self.current_step >= len(self.current_data) - 1:
            done = True
            return self.get_state(), 0, done, {}

        current_price = self.current_data['Close'][self.current_step]
        reward = 0
        done = False

        if action == 0:  # Buy
            if self.balance >= current_price:
                self.position += 1
                self.balance -= current_price
                reward = 1  # Incentivize buying

        elif action == 1:  # Sell
            if self.position > 0:
                self.position -= 1
                self.balance += current_price
                reward = 1  # Incentivize selling

        self.total_value = self.balance + self.position * current_price
        self.current_step += 1
        done = self.current_step >= len(self.current_data) - 1

        return self.get_state(), reward, done, {}

    def get_state(self):
        return np.array([
            self.balance,
            self.position,
            self.current_data['Open'][self.current_step],
            self.current_data['High'][self.current_step],
            self.current_data['Low'][self.current_step],
            self.current_data['Close'][self.current_step]
        ])


In [3]:
# Load stock data from CSV files in a folder using *
def get_stock_data_from_folder(folder_path):
    csv_files = glob.glob(os.path.join(folder_path, "*.csv"))
    data_list = []
    for file in csv_files:
        df = pd.read_csv(file)
        df = df[['Open', 'High', 'Low', 'Close', 'Volume']].dropna()
        data_list.append(df)
    return data_list


In [None]:
class DQNBacktestStrategy(Strategy):
    def __init__(self, model, lookback=50):
        self.model = model
        self.lookback = lookback

    def init(self):
        self.data_len = len(self.data)
        self.last_trade = 0

    def next(self):
        # Your logic for the strategy
        # For example, using the model to make predictions on the current state
        state = self.data[-self.lookback:].to_numpy()  # Get the last `lookback` data points
        action = self.model.predict(state)  # Use the model to predict the next action

        if action == 0:  # Buy
            if self.position == 0:
                self.buy()
        elif action == 1:  # Sell
            if self.position > 0:
                self.sell()

In [13]:
class DQNBacktestStrategy(Strategy):
    def __init__(self, model, lookback=50):
        self.model = model
        self.lookback = lookback

    def init(self):
        self.data_len = len(self.data)
        self.last_trade = 0
        
    def next(self):
        state = np.array([
            self.balance,
            self.position,
            self.data.Open[-1],
            self.data.High[-1],
            self.data.Low[-1],
            self.data.Close[-1]
        ])
        state = self.data[-self.lookback:].to_numpy()  # Get the last `lookback` data points
        action = self.model.predict(state)  # Use the model to predict the next action

        if action == 0:  # Buy
            if self.position == 0:
                self.buy()
        elif action == 1:  # Sell
            if self.position > 0:
                self.sell()

In [9]:
# Train the model
def train_dqn(data_list, model, target_model, episodes=100, gamma=0.99, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995):
    env = TradingEnv(data_list)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    replay_buffer = deque(maxlen=2000)
    batch_size = 32

    for episode in range(episodes):
        state = torch.FloatTensor(env.reset()).to(device)
        done = False
        total_reward = 0

        while not done:
            if random.random() < epsilon:
                action = random.choice([0, 1])  # Random action
            else:
                with torch.no_grad():
                    action = torch.argmax(model(state)).item()

            next_state, reward, done, _ = env.step(action)
            next_state = torch.FloatTensor(next_state).to(device)
            reward = torch.tensor(reward, device=device, dtype=torch.float)
            total_reward += reward.item()

            replay_buffer.append((state, action, reward, next_state, done))
            state = next_state

            if len(replay_buffer) >= batch_size:
                batch = random.sample(replay_buffer, batch_size)
                states, actions, rewards, next_states, dones = zip(*batch)

                states = torch.stack(states).to(device)
                actions = torch.LongTensor(actions).to(device)
                rewards = torch.stack(rewards).to(device)
                next_states = torch.stack(next_states).to(device)
                dones = torch.BoolTensor(dones).to(device)

                with torch.no_grad():
                    q_next = target_model(next_states).max(1)[0]
                    q_targets = rewards + gamma * q_next * (~dones)

                q_values = model(states).gather(1, actions.view(-1, 1)).squeeze()

                loss = nn.MSELoss()(q_values, q_targets)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        epsilon = max(epsilon * epsilon_decay, epsilon_min)

        if episode % 10 == 0:
            target_model.load_state_dict(model.state_dict())

        print(f"Episode {episode + 1}/{episodes} - Total Reward: {total_reward}, Epsilon: {epsilon}")


In [10]:

# Save and load the model
def save_model(model, path="dqn_trading_model.pth"):
    torch.save(model.state_dict(), path)
    print(f"Model saved to {path}")

def load_model(path="dqn_trading_model.pth", input_dim=6, action_dim=2):
    model = DQNetwork(input_dim=input_dim, action_dim=action_dim)
    model.load_state_dict(torch.load(path, map_location=torch.device('cpu')))
    model.eval()
    return model


In [11]:

# Load the stock data and train the model
folder_path = "/home/artzuros/Documents/CS/Trader/trading/data/SCRIP/train"
data_list = get_stock_data_from_folder(folder_path)

input_dim = 6
action_dim = 2
model = DQNetwork(input_dim=input_dim, action_dim=action_dim).to(device)
target_model = DQNetwork(input_dim=input_dim, action_dim=action_dim).to(device)
target_model.load_state_dict(model.state_dict())

# Train the model (you can train for more episodes)
train_dqn(data_list, model, target_model, episodes=100)

# Save the trained model
save_model(model)

Episode 1/100 - Total Reward: 3250.0, Epsilon: 0.995
Episode 2/100 - Total Reward: 3154.0, Epsilon: 0.990025
Episode 3/100 - Total Reward: 3240.0, Epsilon: 0.985074875
Episode 4/100 - Total Reward: 2906.0, Epsilon: 0.9801495006250001
Episode 5/100 - Total Reward: 3192.0, Epsilon: 0.9752487531218751
Episode 6/100 - Total Reward: 2813.0, Epsilon: 0.9703725093562657
Episode 7/100 - Total Reward: 5982.0, Epsilon: 0.9655206468094844
Episode 8/100 - Total Reward: 459.0, Epsilon: 0.960693043575437
Episode 9/100 - Total Reward: 1397.0, Epsilon: 0.9558895783575597
Episode 10/100 - Total Reward: 2841.0, Epsilon: 0.9511101304657719
Episode 11/100 - Total Reward: 3132.0, Epsilon: 0.946354579813443
Episode 12/100 - Total Reward: 2883.0, Epsilon: 0.9416228069143757
Episode 13/100 - Total Reward: 5874.0, Epsilon: 0.9369146928798039
Episode 14/100 - Total Reward: 1277.0, Epsilon: 0.9322301194154049
Episode 15/100 - Total Reward: 3087.0, Epsilon: 0.9275689688183278
Episode 16/100 - Total Reward: 2703.0

In [None]:
# Get data for backtesting from a single stock (you can use any dataset)
data = pd.read_csv('/home/artzuros/Documents/CS/Trader/trading/data/SCRIP/test/ADANIGREEN.csv')

# Run the backtest with the DQN strategy
backtest = Backtest(data, DQNBacktestStrategy, cash=10000, commission=0.002)
backtest.run()


# Print results of the backtest
backtest.plot()

  backtest = Backtest(data, DQNBacktestStrategy, cash=10000, commission=0.002)


TypeError: DQNBacktestStrategy.__init__() takes from 2 to 3 positional arguments but 4 were given