In [1]:
import aiohttp
import aiomoex
import random

from os.path import exists
from datetime import datetime
from typing import Callable

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import seaborn as sns
import mplfinance as mpf

from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.logger import configure
from finrl.agents.stablebaselines3.models import DRLAgent

import gymnasium as gym
from gymnasium import spaces
from gymnasium.utils import seeding

import os
import csv

%matplotlib inline

In [2]:
df = pd.read_csv('data.csv')
display(df.head())
print(df.info())

Unnamed: 0,date,open,high,low,close,volume
0,2022-05-04 09:00:00,129.1,129.1,129.1,129.1,111560
1,2022-05-04 10:00:00,129.1,131.5,125.77,127.0,17793280
2,2022-05-04 11:00:00,127.05,127.5,123.6,123.95,13508120
3,2022-05-04 12:00:00,123.89,126.23,123.6,125.44,6536410
4,2022-05-04 13:00:00,125.44,126.21,124.61,125.35,3540730


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10714 entries, 0 to 10713
Data columns (total 6 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   date    10714 non-null  object 
 1   open    10714 non-null  float64
 2   high    10714 non-null  float64
 3   low     10714 non-null  float64
 4   close   10714 non-null  float64
 5   volume  10714 non-null  int64  
dtypes: float64(4), int64(1), object(1)
memory usage: 502.3+ KB
None


In [3]:
### INDICATORS PARAMS ###

rsi_lenght = 9
ema_lenght = 21

def calc_rsi(over: pd.Series, fn_roll: Callable) -> pd.Series:

    delta = over.diff()
    up, down = delta.clip(lower=0), delta.clip(upper=0).abs()

    roll_up, roll_down = fn_roll(up), fn_roll(down)

    rs = roll_up / roll_down
    rsi = 100.0 - (100.0 / (1.0 + rs))

    rsi[:] = np.select([roll_down == 0, roll_up == 0, True], [100, 0, rsi])
    rsi.name = 'rsi'

    valid_rsi = rsi[rsi_lenght - 1:]
    assert ((0 <= valid_rsi) & (valid_rsi <= 100)).all()
    return rsi

df.index = df['date']
df = df.drop(['date'], axis=1)

df['feature_upper_chanel'] = df['high'].rolling(window=20).max()
df['feature_lower_chanel'] = df['low'].rolling(window=20).min()

df['feature_rsi'] = calc_rsi(df['close'], lambda s: s.ewm(span=rsi_lenght).mean())
df['feature_rsi_ema'] = df['feature_rsi'].ewm(span=ema_lenght).mean()
df['tic'] = 'SBER'
df = df.dropna()
df.reset_index(inplace=True)
display(df.head())

Unnamed: 0,date,open,high,low,close,volume,feature_upper_chanel,feature_lower_chanel,feature_rsi,feature_rsi_ema,tic
0,2022-05-05 18:00:00,124.3,124.8,124.25,124.8,2989720,131.5,122.5,65.137694,42.66972,SBER
1,2022-05-06 09:00:00,124.74,124.74,124.74,124.74,43600,131.5,122.5,62.626662,44.800752,SBER
2,2022-05-06 10:00:00,124.74,124.99,121.52,122.46,9430400,127.5,121.52,22.120918,42.416804,SBER
3,2022-05-06 11:00:00,122.42,123.27,121.9,122.53,3760250,126.23,121.52,24.007184,40.508814,SBER
4,2022-05-06 12:00:00,122.47,123.0,122.0,122.27,2522840,126.21,121.52,21.580421,38.57172,SBER


In [None]:
class StockTradingEnv(gym.Env):
    def __init__(
                    self,
                    df,
                    stock_dim,              # Размерность (количество) акций, доступных для торговли
                    hmax,                   # Максимальное количество акций, которые можно купить/продать за один шаг
                    initial_amount,         # Начальный баланс (сумма, с которой стартует агент)
                    buy_cost_pct,           # Процент комиссии при покупке
                    sell_cost_pct,          # Процент комиссии при продаже
                    reward_scaling,         # Коэффициент масштабирования вознаграждения            
                    state_space,            # Размерность пространства состояний
                    action_space,           # Размерность пространства действий (число акций, над которыми действуем)
                    tech_indicator_list,    # Список технических индикаторов для анализа акций
                    turbulence_threshold=None,          # Порог турбулентности рынка для изменения логики сделок (необязательно)
                    risk_indicator_col="turbulence",    # Название колонки с показателем риска/турбулентности
                    make_plots=True,       # Флаг: строить графики по результатам торговли или нет
                    print_verbosity=10,     # Частота вывода информации в консоль
                    timeframe=0,                  # Текущий день (индекс в данных)
                    initial=True,           # Флаг, определяющий, используется ли начальное состояние
                    previous_state=[],      # Предыдущее состояние (для передачи между эпизодами, если необходимо)
                    model_name="",          # Имя модели (используется при сохранении результатов)
                    mode="",                # Режим работы (например, 'train' или 'test')
                    iteration="",           # Итерация обучения или тестирования
                    rsi_on=False,           # Флаг, указывающий, используется ли RSI в торговле (необязательно)
                    bonus_buy_reward=100,   # Бонусная комиссия при покупке (необязательно)
                    bonus_sell_reward=100,  # Бонусная комиссия при продаже (необязательно)
                    reward_window=14,       # Размер окна для расчета вознаграждения (необязательно)
                    actions_dead_zone_threshold=0.25,
                    ema_alpha=0.1,         # Коэффициент сглаживания для EMA (необязательно)
                    # enable_trade_penalty = False,   # Добавлена возможность добавить штрафы за частую торговлю
                    # trade_penalty_window = 100     # Окно для расчета штрафов за частую торговлю
                 ):
        
        self.timeframe = timeframe                                      # Инициализация номера текущего таймфрейма
        self.df = df                                        # Сохраняем переданный DataFrame с данными        
        self.stock_dim = stock_dim                          # Размерность (количество) акций        
        self.hmax = hmax                                    # Максимальное количество акций для сделки       
        self.initial_amount = initial_amount                # Начальный баланс агента        
        self.buy_cost_pct = buy_cost_pct                    # Комиссия при покупке (в процентах)        
        self.sell_cost_pct = sell_cost_pct                  # Комиссия при продаже (в процентах)        
        self.reward_scaling = reward_scaling                # Коэффициент масштабирования вознаграждения        
        self.state_space = state_space                      # Размерность пространства состояний        
        self.action_space = action_space                    # Размерность пространства действий (количество активов)        
        self.tech_indicator_list = tech_indicator_list      # Список технических индикаторов           
        self.data = self.df.loc[self.timeframe, :]                # Инициализируем данные для текущего таймфрейма (выбираем строку по индексу timeframe)        
        self.terminal = False                               # Флаг терминального состояния (конец данных)        
        self.make_plots = make_plots                        # Флаг для построения графиков, если необходимо        
        self.print_verbosity = print_verbosity              # Частота вывода информации        
        self.turbulence_threshold = turbulence_threshold    # Порог турбулентности для изменения поведения торговли (если задан)        
        self.risk_indicator_col = risk_indicator_col        # Название колонки с показателем риска        
        self.initial = initial                              # Флаг, используемый для определения инициализации состояния        
        self.previous_state = previous_state                # Сохранение предыдущего состояния (если используется)        
        self.model_name = model_name                        # Имя модели для сохранения результатов        
        self.mode = mode                                    # Режим работы (например, 'train' или 'test')        
        self.iteration = iteration                          # Итерация обучения/тестирования        
        self.state = self._initiate_state()                 # Инициализируем состояние среды с помощью вспомогательного метода
        self.reward = 0                                     # Инициализация переменных для расчета вознаграждения
        self.turbulence = 0                                 # , турбулентности
        self.cost = 0                                       # , стоимости сделок
        self.trades = 0                                     # , количества сделок.
        self.episode = 0                                    # Счетчик эпизодов        
        self.asset_memory = [self.initial_amount]           # Список для хранения истории изменения баланса (активов)        
        self.rewards_memory = []                            # Список для хранения истории вознаграждений        
        self.actions_memory = []                            # Список для хранения истории действий
        self.reward_buffer = []                             # буфер для хранения недавних мгновенных вознаграждений        
        self.date_memory = [self._get_date()]               # Список для хранения дат (начинается с даты первого шага)        
        self._seed()                                        # Инициализация генератора случайных чисел
        self.reward_window = reward_window                  # количество шагов для скользящей средней вознаграждения
        self.reward_buffer = []                             # буфер для хранения недавних мгновенных вознаграждений
        self.actions_dead_zone_threshold = actions_dead_zone_threshold                              # порог активности, ниже которого действия будут игнорироваться.
        self.bonus_buy_reward = bonus_buy_reward
        self.bonus_sell_reward = bonus_sell_reward
        self.action_space = spaces.Box(low=-1, high=1, shape=(self.action_space,))                  # Определяем пространство действий как Box (все действия от -1 до 1)        
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.state_space,))    # Определяем пространство наблюдений (состояний) как Box с неограниченными значениями        
        # self.enable_trade_penalty = enable_trade_penalty    # Флаг включения штрафа за частые операции
        # self.trade_penalty_window = trade_penalty_window    # Количество шагов для расчёта штрафа
        self.trades_history = []                            # Буфер для количества сделок за каждый шаг
        self.last_trades_count = 0                          # Запоминаем, сколько сделок было до текущего шага
        self.rsi_on = rsi_on
        self.cumulative_reward_list = []                   # список для хранения накопленных вознаграждений за каждый эпизод
        self.random_n = random.randint(1000000, 9999999)   # случайное число для уникальности
        self.ema_alpha = ema_alpha                    # коэффициент сглаживания для EMA (необязательно)


    def render(self, mode="human", close=False):        
        return self.state
    

    def _seed(self, seed=None):       
        self.np_random, seed = seeding.np_random(seed)
        return [seed]
    
    
    def get_sb_env(self):        
        env = DummyVecEnv([lambda: self])
        obs = env.reset()
        return env, obs
    

    def _get_date(self):   
        return self.data['date']
    

    def save_asset_memory(self):  # Сохраняем историю баланса (активов) в DataFrame 
        return pd.DataFrame({"date": self.date_memory, "account_value": self.asset_memory})
    

    def save_action_memory(self): # Сохраняем историю действий в DataFrame
        date_list = self.date_memory[:-1]
        action_list = self.actions_memory
        df_actions = pd.DataFrame({"date": date_list, "actions": action_list})
        return df_actions
    
    def _initiate_state(self):
        if self.initial:
            state = (
                [self.initial_amount]  # Начальный капитал
                + [self.data['close']]    # Текущая цена актива 
                + [0]*self.stock_dim   # Список из нулей, длина которого равна количеству акций (или размерности актива) - На старте, если начальное состояние, агент не владеет никакими акциями, поэтому заполняется нулями.
                + sum([[self.data[tech]] for tech in self.tech_indicator_list], [])  # Значения всех технических индикаторов.
                )           
        else:
            state = (
                [self.previous_state[0]]    # Баланс, который агент имел на прошлом шаге
                + [self.data['close']]         # Текущая цена актива 
                + self.previous_state[ (self.stock_dim + 1) : (self.stock_dim * 1 + 1) ]  # Срез, который должен содержать данные о количестве акций агента - текущая позиция.
                + sum([[self.data[tech]] for tech in self.tech_indicator_list], [])       # Значения всех технических индикаторов.
                )
            
        return state
    
    
    def reset(self, seed=None, **kwargs):
        if seed:
            self._seed(seed)

        self.state = self._initiate_state()

        if self.initial:
            self.asset_memory = [self.initial_amount]               # История активов начинается с начального капитала
        else:
            previous_total_asset = (
                self.previous_state[0]                              # Баланс средств (деньги) агента на момент предыдущего шага
                + sum(
                    np.array(self.state[1 : (self.stock_dim + 1)])  # Текущие цены активов (например, закрытия) для каждого из активов
                    * np.array(self.previous_state[(self.stock_dim + 1) : (self.stock_dim * 2 + 1)])  # Данные о количестве акций, которыми владеет агент для каждого актива
                    ) # * Перемножаются - вычисляется рыночная стоимость активов (цена × количество).
                )     # sum суммирует стоимость активов с балансом портфеля
            self.asset_memory = [previous_total_asset] # Cохраняет начальное значение баланса данного эпизода (наличные средства + стоимость активов).

        # Сбрасываем счетчик таймфрейма, данные, турбулентность, затраты и сделки
        self.timeframe = 0
        self.data = self.df.loc[self.timeframe, :]
        self.turbulence = 0
        self.cost = 0
        self.trades = 0
        self.terminal = False
        # Сбрасываем историю вознаграждений, действий и дат
        self.rewards_memory = []
        self.actions_memory = []
        self.date_memory = [self._get_date()]

        # Увеличиваем счетчик эпизодов
        self.episode += 1

        return self.state, {}
    
    def _update_state(self):
        state = (
            [self.state[0]]                        # Баланс агента
            + [self.data['close']]                 # Оборачиваем цену закрытия в список
            + list(self.state[(self.stock_dim + 1):(self.stock_dim * 2 + 1)])  # Позиции по активам
            + sum([[self.data[tech]] for tech in self.tech_indicator_list], [])  # Технические индикаторы
        )
        return state
    

    def _buy_stock(self, index, action):

        available_amount = self.state[0] // self.state[index + 1]                       # Доступные средства для покупки акций (баланс / цена актива)
        buy_num_shares = min(available_amount, action)                                  # Количество акций, которые можно купить (доступные средства / цена актива)
        buy_amount = self.state[index + 1] * buy_num_shares * (1 + self.buy_cost_pct)   # Считаем стоимость покупки с учетом комиссии

        if buy_num_shares > 0:
            self.state[0] -= buy_amount                        # Уменьшаем баланс на сумму покупки
            self.state[index + self.stock_dim + 1] += buy_num_shares  # Увеличиваем количество акций в портфеле
            self.cost += self.state[index + 1] * buy_num_shares * self.buy_cost_pct  # Увеличиваем стоимость сделки на сумму комиссии
            self.state[0] -= self.cost                        # Уменьшаем баланс на сумму комиссии
            self.trades += 1

            return buy_num_shares
        
    def _sell_stock(self, index, action):

        # self.state[index + self.stock_dim + 1] = 100 #### !!!!!!!!!!!!! УДАЛИТЬ !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

        if self.state[index + self.stock_dim + 1] > 0:
            sell_num_shares = min(abs(action), self.state[index + self.stock_dim + 1])
            sell_amount = (self.state[index + 1] * sell_num_shares * (1 - self.sell_cost_pct))
            self.state[0] += sell_amount
            self.state[index + self.stock_dim + 1] -= sell_num_shares
            self.cost += (self.state[index + 1] * sell_num_shares * self.sell_cost_pct)
            self.state[0] -= self.cost

        else:
            sell_num_shares = 0

        return sell_num_shares
    

    def step(self, actions=np.array([0.8])):
        self.terminal = self.timeframe >= len(self.df.index.unique()) - 1

        if not self.terminal:
            current_price = self.data['close']                                                      # Текущая цена актива (например, закрытия)  : np.float64(124.8)
            current_pos = np.array( self.state[(self.stock_dim + 1) : (self.stock_dim * 2 + 1)] ) # Текущая позиция (количество акций) агента : array([0])

            actions = np.where(np.abs(actions) < self.actions_dead_zone_threshold, 0, actions)      # Игнорируем действия, которые меньше порога активности

            if current_pos == 0:
                if actions > 0:
                    actions = actions * self.hmax
                    actions = np.ceil(actions).astype(int)
                else:
                    actions = np.array([0])
            else:
                if actions < 0:
                    actions = np.array([current_pos]) * (-1)
                    actions = actions[0]
                else:
                    actions = np.array([0])

            begin_total_asset = self.state[0] + sum(current_price * current_pos)

            if actions[0] > 0:
                actions = self._buy_stock(0, actions[0])
            elif actions[0] < 0:
                actions = self._sell_stock(0, actions[0])
            else:
                pass

            self.actions_memory.append(actions)  # Сохраняем историю действий

            self.timeframe += 1 # Увеличиваем счетчик таймфрейма
            self.data = self.df.loc[self.timeframe, :] # Получаем данные для следующего таймфрейма

            self.state = self._update_state()

            end_total_asset = (
                self.state[0]
                + sum(
                    np.array( self.state[1 : (self.stock_dim + 1)] )
                    * np.array( self.state[(self.stock_dim + 1) : (self.stock_dim * 2 + 1)] )
                    )
                ) # Считаем итоговую стоимость активов (баланс + стоимость активов)
            
            self.asset_memory.append(end_total_asset) # Сохраняем историю активов (баланса)
            self.date_memory.append(self._get_date())

            # self.reward = end_total_asset - begin_total_asset
            # self.reward = self.reward * self.reward_scaling
            
            immediate_reward = end_total_asset - begin_total_asset                          # Награда за шаг (изменение стоимости активов)
            alpha = self.ema_alpha                                                          # коэффициент сглаживания    
            if not hasattr(self, 'ema_reward'):
                self.ema_reward = immediate_reward                                          # Инициализация EMA награды при первом шаге
            else:
                self.ema_reward = alpha * immediate_reward + (1 - alpha) * self.ema_reward  # EMA награда (экспоненциальное сглаживание)            
            self.reward = self.ema_reward * self.reward_scaling                             # Награда с учетом EMA и масштабирования

            self.rewards_memory.append(self.reward)

            return self.state, self.reward, self.terminal, False, {}
        
        else:
            



    
env = StockTradingEnv(df, stock_dim=1, hmax=100, initial_amount=100000, buy_cost_pct=0.001, sell_cost_pct=0.001, reward_scaling=1e2, state_space=10, action_space=1, tech_indicator_list=['feature_rsi', 'feature_rsi_ema', 'feature_upper_chanel', 'feature_lower_chanel'], turbulence_threshold=None, risk_indicator_col="turbulence", make_plots=True, print_verbosity=10)

env.step()


([np.float64(89996.032),
  np.float64(124.74),
  np.int64(80),
  np.float64(62.62666225406072),
  np.float64(44.8007516931252),
  np.float64(131.5),
  np.float64(122.5)],
 np.float64(-2476.799999999639),
 False,
 False,
 {})

In [None]:
"""
    Пример self.state

[np.float64(89996.032),           # Баланс агента
 np.float64(124.8),               # Цена актива (например, закрытия)
 np.int64(80),                    # Количество акций, которыми владеет агент
 np.float64(65.13769428846126),   # Значение RSI
 np.float64(42.66971991213428),   # Значение EMA RSI
 np.float64(131.5),               # Верхняя граница канала
 np.float64(122.5)]               # Нижняя граница канала
 
 """

In [None]:
np.

In [81]:
p = np.ndarray([5])
m = np.ndarray([-5])

p.where[: np.where(p < 0)[0].shape[0]]

ValueError: negative dimensions are not allowed

In [5]:
df.head(1)  # Проверяем, что данные загружены корректно

Unnamed: 0,date,open,high,low,close,volume,feature_upper_chanel,feature_lower_chanel,feature_rsi,feature_rsi_ema,tic
0,2022-05-05 18:00:00,124.3,124.8,124.25,124.8,2989720,131.5,122.5,65.137694,42.66972,SBER
