In [46]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import keras
from keras import layers, models
from keras.models import Sequential, load_model
from keras.layers import Dense, Dropout, Conv1D
from keras.optimizers import Adam
import datetime
import yfinance as yf
from collections import deque, Counter
import random
import math



import warnings
warnings.filterwarnings('ignore')


In [79]:
class DataReader:

    def __init__(self, ticker = 'SPY', start_date = '2010-03-03', end_date = '2023-01-01'):

        self.start_date = start_date
        self.end_date = end_date

        self.data = yf.download(ticker, start_date, end_date)
        self.data.dropna(axis = 1, inplace = True)


    def get_data(self):
        return self.data

    def get_dates(self):
        return {'start_date': self.start_date, 'end_date': self.end_date}





In [154]:
class Agent:

    def __init__(self, state_size, testing_mode = False, model_name = ""):

        self.state_size = state_size
        self.states_dict = Counter(['Buy','Sell','Hold'])
        self.action_size =3
        self.testing_mode = testing_mode #True if the testing has been initiated
        self.replay_memory = deque(maxlen = 1000)
        self.model_name = model_name
        self.inventory = []
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.99


        if self.testing_mode:
            self.model = load_model(model_name)
        else:
            self.model = self.build_model()

    def build_model(self, hidden_dim=64):

        model = Sequential([
            Dense(hidden_dim, input_dim = self.state_size, activation = 'relu'),
            Dense(hidden_dim//2, activation = 'relu'),
            Dense(hidden_dim//4, activation = 'relu'),
            Dense(self.action_size, activation = 'linear')
        ])

        self.compile_model(model)

        return model

    def compile_model(self, model, lr = 0.001):
        opt = Adam(lr = lr)
        model.compile(loss ='mae', optimizer = opt)
        return


    def act(self, state):
        """ epsilon_greedy approach"""
        if self.testing_mode == False:
            if random.random() <= self.epsilon:
                return random.randrange(self.action_size)

        next_states = self.model.predict(state)

        return np.argmax(next_states[0])

    def experience_replay(self, batch_size):
        l = len(self.replay_memory)
        mini_batch = self.replay_memory[l - batch_size+1: l]

        for state, action, reward, next_state, done in mini_batch:
            target = reward
            if done == False:

                target = reward + self.gamma * np.amax(self.model.predict(next_state)[0])


            target_f = self.model.predict(state)

            target_f[0][action] = target
            self.model.fit(state, target_f, epochs = 1, verbose = 0)


            if self.epsilon >  self.epsilon_min:
                self.epsilon *= self.epsilon_decay






In [155]:
class HelperFuncs:

    def __init__(self, data):

        self.data = data
    def price_string (self, n):

        sgn = ""
        if n<0 : sgn+="-"

        return sgn + "$" + "{0:.2f}".format(abs(n))

    def sigmoid(self, x):
        return 1/(1+math.exp(x))


    def get_states(self, t,n):

        d = t - n + 1

        if d >= 0:
            block = self.data[d:t+1]

        else:
            block = -d * [self.data[0]] + self.data[0:t+1]

        res = block[1:] - block[:-1]

        return res

In [158]:
class Training:

    def __init__(self,data,splitting_size = 0.8):

        self.data = data
        self.agent = None
        self.splitting_size = splitting_size

        self.training_data, self.validating_data = self.train_test_split(self.data,self.splitting_size)

    def train_test_split (self, data, splitting_size):

        split_at = (int)(len(data) * splitting_size)

        return data[ : split_at], data[split_at : ]



    def train_agent(self,  batch_size = 32, ep_count = 10):
        window_size = 1
        self.agent = Agent(window_size)

        data = self.training_data
        l = len(data)-1
        helper = HelperFuncs(self.training_data)
        for ep in range(ep_count + 1):

            print(f'Episode {str(ep)}/{str(ep+1)}:')
            state = helper.get_states(0, window_size + 1)
            total_profit = 0
            self.agent.replay_memory = []
            states_sell = []
            states_buy = []

            for t in range(l):
                print(t,l)
                action = self.agent.act(state)
                next_state = helper.get_states( t + 1, window_size + 1)
                reward = 0

                if self.agent.states_dict[action] == 'Buy':

                    self.agent.inventory.append(data[t])
                    states_buy.append(t)

                elif self.agent.states_dict[action] == 'Sell':

                    bought_price = self.agent.inventory.pop(0)
                    reward = max(data[t] - bought_price, 0 )
                    total_profit += data[t] - bought_price
                    states_sell.append(t)
                done = t== l - 1
                self.agent.replay_memory.append((state, action, reward, next_state, done))
                state = next_state
                if done:
                    print("--------------------------------")
                    print("Total Profit: " + helper.price_string(total_profit))
                    print("--------------------------------")

                    helper.plot(data,states_buy, states_sell, total_profit)
                if len(self.agent.replay_memory) > batch_size:
                    self.agent.experience_replay(batch_size)
            if ep % 2 == 0:

                self.agent.model.save('model_ep' + str(ep))




In [131]:
reader = DataReader()

[*********************100%%**********************]  1 of 1 completed


In [113]:
data = np.array(reader.get_data()['Close'])
train = Training(data)
train.train_agent()

array([112.30000305, 112.63999939, 114.25      , ..., 376.66000366,
       383.44000244, 382.42999268])