In [3]:
import gym
from gym import spaces
import numpy as np
import random as rnd
import matplotlib.pyplot as plt

In [6]:
class EnvTest(gym.Env):
    def __init__(self, balance=100, spread=0.001, max_variation=0.00005, simulation_duration="10", nb_stock=10):
        self.spread = spread # valeur du spread
        self.max_variation = max_variation # variation maximale de la courbe en pourcentage
        self.simulation_duration = simulation_duration # durée de la simulation demandées
        self.nb_stock = 10
        self.action_spaces = spaces.Discrete(3) # 0 = achat, 1 = nothing, 2 = vendre
        self.observation_spaces = spaces.Box(low=-np.inf,
                                           high=np.inf,
                                           shape=(2,),
                                           dtype=np.float32)
        
        self.starting_balance = balance # valeur du porte feuille
        self.timestep = 1.0 # simulation sur un pas de 1 seconde
        self.max_time = self.timeConverter(self.simulation_duration) # temps maximum d'un épisode
        
        np.set_printoptions(precision=5)
        self.reset()
        
        
    def step(self, action):
        """effectue un 'step' dans la simulation"""
        
        if self.current_time >= self.max_time:
            self.done = True
        self.current_time += self.timestep
        
        self.checkPos(action)
                
            
            
        """
        traitement de l'action à faire
        gestion des rewards en fonction de l'action
        
        """
        self.generateCurveData()
        self.setObservations()       
        print(f" **** done : {self.done} - current_time : {self.current_time} - max_time : {self.max_time}****")
        return self.next_state, self.reward, self.done, self.info
    
                      
    def reset(self):
        """Reinitialise les variables modifiable"""
        self.current_balance = self.starting_balance
        self.curve_values = [] 
        self.current_curve_value = self.initializeCurve()
        self.profit_loss = 0.0
        
        self.current_time = 0.0
        self.reward = 0.0
        self.done = False
        self.info = {}
        
        self.state = np.zeros(self.observation_spaces.shape[0])
        self.next_state = self.state
        last_action = 1 # derniere action effectué par l'agent, initialisé à 1 "nothing"
        self.last_buy_or_sell_value = False # valeur de la courbe lors de l'achat ou de la vente
        
        self.pos = "Pas de position"
        
        
        return self.next_state, self.reward, self.done, self.info
    
        
    def render(self):
        """Affiche l'environnement, dans notre cas la courbe"""
        pass
        
    def initializeCurve(self):
        """Initialise une courbe entre les valeurs min et max"""
        minimum_value = 1.053
        maximum_value = 1.062
        value = rnd.uniform(minimum_value, maximum_value)
        self.curve_values.append(value)
        return np.array([value])
    
    def generateCurveData(self):
        """génère la prochaine valeur de la courbe en fonction du taux de variation
        sauvegarde la valeur dans l'historique"""
        variation = rnd.uniform(-self.max_variation, self.max_variation)
        self.current_curve_value += self.current_curve_value * variation
        self.saveCurveData()
    
    def saveCurveData(self):
        """Sauvegarde les données de la courbe dans une liste"""
        self.curve_values.append(self.current_curve_value)
    
    def updateProfitAndLoss(self, action):
        if self.last_buy_or_sell == False: # Si l'agent n'a jamais acheté ou vendu
            return 0.0
        """
        gestion des profit et des pertes en fonction :
        - valeur de la courbe lors de l'achat ou vente
        - spread
        - valeur d'un pips ?
        - nb de stock acheté ou vendu ?
        """     
        pass
    
    def setObservations(self):
        """Gestion des observations"""
        self.next_state[0] = self.curve_values[-1] ## Valeur actuelle de la courbe/last value de l'histo
        self.next_state[1] = self.profit_loss ## Valeur actuelle des profits/pertes
              
    def setReward(self):
        """Gestion des rewards"""
        pass
    
    def checkPos(self, action):
        if action == 0:
            if self.pos == "Pas de position":
                print(f"Achat de {self.nb_stock} stocks à {self.current_curve_value}")
                self.pos = "Achat"
                self.last_action = 0
                self.last_buy_or_sell_value = np.array([self.current_curve_value])
            elif self.pos == "Vente":
                print(f"Fin de la position de vente de {self.nb_stock} stocks à {self.last_buy_or_sell_value}")
                self.pos = "Pas de position"
                self.last_action = 0
                self.last_buy_or_sell_value = False
            else:
                self.last_action = 0

        if action == 2:
            if self.pos == "Pas de position":
                print(f"Vente de {self.nb_stock} stocks à {self.current_curve_value}")
                self.pos = "Vente"
                self.last_action = 2
                self.last_buy_or_sell_value = np.array([self.current_curve_value])
            elif self.pos == "Achat":
                print(f"Fin de la position d'achat de {self.nb_stock} stocks à {self.last_buy_or_sell_value}")
                self.pos = "Pas de position"
                self.last_action = 2
                self.last_buy_or_sell_value = False
            else:
                self.last_action = 2

        if action == 1:
            self.last_action = 1
    
    def timeConverter(self, time):
        """Convertie la durée entrée manuellement
        Renvois la durée maximale en seconde (ou en fonction du time step)"""
        number = ""
        value = ""
        for string in time:
            if string.isdigit():
                number += string
            else:
                value += string

        if not self.translate(value):
            print("Durée de simulation incorrect...")
            exit() # ajouter la gestion de mauvaises données
        time_converted = int(number) * self.translate(value)
        return time_converted
    
    def translate(self, value):
        """fonction qui permet de traduire et la renvoie en seconde"""
        
        ## Constantes de temps en secondes
        SECONDE = 1.0
        MINUTE = 60.0
        HOUR = 60.0 * MINUTE
        DAY = 24.0 * HOUR
        WEEK = 7.0 * DAY
        MONTH = 30.0 * DAY
        YEAR = 365.0 * DAY
        
        d = {"seconde" : ["s", "seconde", "secondes", "sec", "secs"],
            "minute" : ["m", "min", "minute", "minutes", "mins", "minutes"],
            "hour" : ["h", "heure", "hour", "hours", "heures"],
            "day" : ["d", "j", "jour", "day", "jours"],
            "week" : ["w", "week", "semaine", "weeks", "semaines"],
            "year" : ["y", "year", "année", "annee", "an", "années", "years"]
            }

        for key in d.keys():
            if value.lower() in key:
                if key == "seconde":
                    return SECONDE
                elif key == "minute":
                    return MINUTE
                elif key == "hour":
                    return HOUR
                elif key == "day":
                    return DAY
                elif key == "week":
                    return WEEK
                elif key == "year":
                    return YEAR
                
        return False

    
class Agent():
    def __init__(self):
        pass
    
    def act(self):
        pass

In [8]:
env = EnvTest(simulation_duration="10")
agent = Agent()
print(f"env observation spaces : {env.observation_spaces.shape}")
print(f"env action spaces : {env.action_spaces.n}")
print()
num_action = env.action_spaces.n
num_observation = env.observation_spaces.shape[0]

nb_max_episode = 1

for episode in range(nb_max_episode):
    state = env.reset()
    done = False
    score = 0
    
    while not done:
        action = env.action_spaces.sample()
        next_state, reward, done, _ = env.step(action)
        #print(f"episode : {episode+1} - score total : {score}")
        
        score += reward
        state = next_state
        
        if done:
            #env.render()
            #print(f"episode : {episode+1} - score total : {score}")
            pass

env observation spaces : (2,)
env action spaces : 3

 **** done : False - current_time : 1.0 - max_time : 10.0****
Achat de 10 stocks à [1.05482]
 **** done : False - current_time : 2.0 - max_time : 10.0****
Fin de la position d'achat de 10 stocks à [[1.05482]]
 **** done : False - current_time : 3.0 - max_time : 10.0****
Achat de 10 stocks à [1.05491]
 **** done : False - current_time : 4.0 - max_time : 10.0****
 **** done : False - current_time : 5.0 - max_time : 10.0****
Fin de la position d'achat de 10 stocks à [[1.05491]]
 **** done : False - current_time : 6.0 - max_time : 10.0****
Vente de 10 stocks à [1.05501]
 **** done : False - current_time : 7.0 - max_time : 10.0****
 **** done : False - current_time : 8.0 - max_time : 10.0****
 **** done : False - current_time : 9.0 - max_time : 10.0****
 **** done : False - current_time : 10.0 - max_time : 10.0****
 **** done : True - current_time : 11.0 - max_time : 10.0****


In [46]:
class OrderImbalanceEnv(gym.Env):
    def __init__(self, data, lookback, fee):
        self.data = data
        self.lookback = lookback
        self.fee = fee
        self.action_space = spaces.Discrete(3) # 0 = sell, 1 = hold, 2 = buy
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(lookback, 3), dtype=np.float32)
        self.reset()
        
        self.temp_data = []

    def step(self, action):
        if action == 0:
            self.balance *= 1 + reward
            self.shares -= self.shares * 2
        elif action == 2:
            self.balance *= 1 + reward
            self.shares += self.balance * 0.5 / self.data[self.current_step]
        
        self.get_rewards(action)
        print(f"action : {action} - balance : {self.balance} - shares : {self.shares}")
        
        self.current_step += 1
        done = self.current_step >= len(self.data) - 1
        obs = self._next_observation()
        print(f"observation : {obs}")
        return obs, self.reward, done, {}

    def reset(self):
        self.current_step = 0
        self.balance = 1.0
        self.shares = 0.0
        
        self.reward = 0.0
        return self._next_observation()

    def _next_observation(self):
        obs = np.array([
            self.data[self.current_step - self.lookback:self.current_step, 0],
            self.data[self.current_step - self.lookback:self.current_step, 1],
            self.data[self.current_step - self.lookback:self.current_step, 2]
        ]).T
        return obs
    
    def get_rewards(self, action):
        # récompense lorsque l'agent achète ou vend
        if action == 0 or action == 2:
            self.reward = (self.data[self.current_step+1] - self.data[self.current_step]) / self.data[self.current_step] - self.fee
        else:
            reward = (self.data[self.current_step+1] - self.data[self.current_step]) / self.data[self.current_step]
            

In [3]:


# generate sample data
data = np.random.normal(100, 10, size=(1000, 3))

# create environment
env = OrderImbalanceEnv(data, lookback=10, fee=0.01)

# reset environment and get initial observation
obs = env.reset()

# take random actions for 100 time steps
for i in range(100):
    action = env.action_space.sample()
    obs, reward, done, info = env.step(action)
    print(f"Step {i+1}: Action={action}, Reward={reward:.4f}, Balance={env.balance:.4f}, Shares={env.shares:.4f}")

    if done:
        print("Episode finished after {} timesteps".format(i+1))
        break

NameError: name 'reward' is not defined