In [None]:
from AITrader_Funcs import *

WINDOW_SIZE = 20
DATA_SAVE_DIRECTORY = "Data"
MODEL_LOGS_PATH = os.path.join("Training", "Logs")
MODEL_SAVE_PATH = os.path.join("Training", "Saved_Models", "DQN_Model")

# Get data
training_data = get_data(start_date = datetime(2018, 1, 1), end_date = datetime(2019, 1, 1), interval = '1d', output_type = 2, window_sample = WINDOW_SIZE)
time.sleep(1)
sample_data = get_data(start_date = datetime(2019,1,1), end_date = datetime(2021, 1, 1), interval = '1d', output_type = 2, window_sample = 0)
sample_data = pad_dummy_day(sample_data) # Do twice to show position and trade in plotting

# Create directory if it doesn't exist
if not os.path.exists(DATA_SAVE_DIRECTORY):
    os.makedirs(DATA_SAVE_DIRECTORY)

# Save data
training_data.to_csv(f"{DATA_SAVE_DIRECTORY}/training_data.csv")
sample_data.to_csv(f"{DATA_SAVE_DIRECTORY}/sample_data.csv")

# Make env
training_env = MyStocksEnv(df=training_data, window_size=WINDOW_SIZE, frame_bound=(WINDOW_SIZE, len(training_data)))
sample_env = MyStocksEnv(df=sample_data, window_size=WINDOW_SIZE, frame_bound=(WINDOW_SIZE, len(sample_data)))

def Make_Model (logs_path): 
    model = DQN("MlpPolicy", training_env, verbose=0, tensorboard_log = logs_path, batch_size = 32, buffer_size = 20000, gamma = 0.99, learning_rate = 0.001, exploration_initial_eps=1.0, exploration_fraction = 0.70, exploration_final_eps=0.05, seed = 29)
    return model

In [None]:
# ----------------- TRAIN AND TEST IN SAMPLE ---------------------

# Get initial observation
done = False
tick = 0
action_list = []
retrain_period = 60
training_data_size = 90
obs, info = sample_env.reset()

# Predict action for each timestep in data
for tick in trange(0, len(sample_data)):

    tick_window_adjusted = tick - WINDOW_SIZE

    # Train intervals after WINDOW_SIZE and once on start
    if (tick_window_adjusted > 0 and tick_window_adjusted % retrain_period == 0) or tick == 0:

        if tick == 0:
            # Get initial data
            training_data = pd.concat([training_data.iloc[-WINDOW_SIZE:].reset_index(drop=True), sample_data.iloc[WINDOW_SIZE: (WINDOW_SIZE +training_data_size)]], ignore_index = True)
            training_env = MyStocksEnv(df=training_data, window_size=WINDOW_SIZE, frame_bound=(WINDOW_SIZE, len(training_data)))

        # Update environment if past tick 0
        if tick_window_adjusted > 0:
            training_data = sample_data.iloc[(tick - WINDOW_SIZE):min((tick + training_data_size), len(sample_data))].reset_index(drop=True)
            # Remake env
            training_env = MyStocksEnv(df=training_data, window_size=WINDOW_SIZE, frame_bound=(WINDOW_SIZE, len(training_data)))

        # Train model
        model = Make_Model(MODEL_LOGS_PATH)
        model_unique_day_path = f'{MODEL_SAVE_PATH}_{tick}_days'
        train_model(model, training_env, training_timesteps = 30000, model_save_path = model_unique_day_path, n_evaluations = 30, reward_threshold = 10, verbose = 0)
        model = DQN.load(os.path.join(model_unique_day_path, 'best_model.zip'))

    tick += 1
    
    # Make prediction, next step
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = sample_env.step(action)
    done = terminated or truncated

    action_list.append(action)

    if done:
        # Get the action of the last index (current day)
        obs = sample_env._get_observation()
        action, states = model.predict(obs, deterministic=True)
        break

# Plot results
plt.cla()
sample_env.unwrapped.render_all(action = action)
plt.show()
# ---------------------------------------------------

In [None]:
# ----------------- TRAIN AND TEST OUT OF SAMPLE ---------------------

# Initialise variables
done = False
tick = 0
action_list = []
retrain_period = 60
training_data_size = 90
obs, info = sample_env.reset()

# Predict action for each timestep in data
for tick in trange(0, len(sample_data)):

    tick_window_adjusted = tick - WINDOW_SIZE

    # Train intervals after WINDOW_SIZE and once on start
    if (tick_window_adjusted > 0 and tick_window_adjusted % retrain_period == 0) or tick == 0:

        if tick == 0:
            # Get data
            training_data = training_data.iloc[-(WINDOW_SIZE + training_data_size):].reset_index(drop=True)
            training_env = MyStocksEnv(df=training_data, window_size=WINDOW_SIZE, frame_bound=(WINDOW_SIZE, len(training_data)))
    
        # Update environment if past tick 0
        if tick_window_adjusted > 0:
            # Add days to training data
            training_data = pd.concat([training_data, sample_data.iloc[tick_window_adjusted -retrain_period : tick_window_adjusted]], ignore_index=True)
            # Drop oldest days
            training_data = training_data.drop(training_data.index[:retrain_period]).copy()
            # Remake env
            training_env = MyStocksEnv(df=training_data, window_size=WINDOW_SIZE, frame_bound=(WINDOW_SIZE, len(training_data)))

        # Train model
        model = Make_Model(MODEL_LOGS_PATH)
        model_unique_day_path = f'{MODEL_SAVE_PATH}_{tick}_days'
        train_model(model, training_env, training_timesteps = 30000, model_save_path = model_unique_day_path, n_evaluations = 30, reward_threshold = 10, verbose = 0)
        model = DQN.load(os.path.join(model_unique_day_path, 'best_model.zip'))

    tick += 1
    
    # Make prediction, next step
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = sample_env.step(action)
    done = terminated or truncated

    action_list.append(action)

    if done:
        # Get the action of the last index (current day)
        obs = sample_env._get_observation()
        action, states = model.predict(obs, deterministic=True)
        break

# Plot results
plt.cla()
sample_env.unwrapped.render_all(action = action)
plt.show()
# ---------------------------------------------------

In [None]:
# ------------------ BACKTESTING PLOT -----------------------
from backtesting import Backtest, Strategy
import math

# Read data
sample_data = pd.read_csv("Data/sample_data.csv", index_col=0)
sample_data.index = pd.to_datetime(sample_data.index, utc = True)
sample_env.reset()

# Load initial model
model_unique_day_path = f'{MODEL_SAVE_PATH}_0_days'
model = DQN.load(os.path.join(model_unique_day_path, 'best_model.zip'))

class DQN_Strategy(Strategy):
    def init(self):
        self.env = sample_env
        self.obs = self.env.reset()[0]
        self.model = model
        self.entry_price = None  # Track entry price of the current position
        self.tick = 0
        self.last_trade_action = None
        self.lockout = False
        
    def next(self):

        self.tick += 1
        tick_window_adjusted = self.tick - WINDOW_SIZE
        current_price = self.data.Close[-1]

        # Investing / snowballing
        size = math.floor(self.equity / current_price)

        # Skip the early window data for sampling
        if len(self.data) <= self.env.window_size +1:
            return
        
        # If done close any open positions and return
        if self.tick == len(sample_data) -2:
            print(self.tick)
            if self.position:
                self.position.close() 
            return 
        
        action = action_list[tick_window_adjusted-1]

        # ----------- Lockout Trades -----------------
        # Skip if currently locked out and action is same as last trade
        if self.lockout and action == self.last_trade_action:
            return

        # If action is opposite of last trade, reset lockout
        if self.lockout and action != self.last_trade_action:
            self.lockout = False

        # Stop loss and lockout at -10% loss
        if self.position.is_long:
            perc_return = (current_price - self.entry_price) / self.entry_price
            if perc_return < -0.1:
                self.position.close()
                self.lockout = True
                self.entry_price = None
        if self.position.is_short:
            perc_return = (self.entry_price - current_price) / self.entry_price
            if perc_return < -0.1:
                self.position.close()
                self.lockout = True
                self.entry_price = None
        # ------------------------------------------------

        # Sell action
        if action == 0 and not self.lockout:
            if self.position.is_long or not self.position:
                self.position.close()
                self.sell(size = size)
                self.entry_price = current_price  # Set new entry
                self.last_trade_action = 0
                
        # Buy action        
        if action == 1 and not self.lockout:
            if self.position.is_short or not self.position:
                self.position.close()
                self.buy(size = size)
                self.entry_price = current_price  # Set new entry
                self.last_trade_action = 1
                
def commission_calc(size, price):

    # Mimmicking Trading-212 overnight fees and fx fees
    commission = abs((size * price * 0.005) * 0.0025) # fx fee

    # Trade duration is estimated as backtesting.py has limited data access in default commision function
    Average_Trade_Duration = 2
    
    if size < 0:
        commission += abs(0.000139 * (size * price) * Average_Trade_Duration) # overnight sell
    else:   
        commission += abs(0.00011 * (size * price) * Average_Trade_Duration) # overnight buy

    commission += abs((size * price) * 0.0001) # small slippage
    
    return commission

bt = Backtest(sample_data, DQN_Strategy, commission= commission_calc, margin= 1/5 , cash = 100000, trade_on_close = True)
stats = bt.run()
bt.plot()

trades = stats._trades

print(trades.tail())
print(stats)
# ---------------------------------------------------------