In [None]:
import pandas as pd

df_shares_INTC = pd.read_csv("INTC.csv")
df_shares_INTC.name = "intel"
df_shares_IBM = pd.read_csv("IBM.csv")
df_shares_IBM.name = "ibm"
df_shares_NVDA = pd.read_csv("NVDA.csv")
df_shares_NVDA.name = "nvidia"
df_shares_AMD = pd.read_csv("AMD.csv")
df_shares_META = pd.read_csv("META.csv")
df_shares_GOOGLE = pd.read_csv("alphabet.csv")
df_shares_CISCO = pd.read_csv("CSCO.csv")

In [None]:
%pip install gym-anytrading
%pip install stable-baselines3

url = 'https://anaconda.org/conda-forge/libta-lib/0.4.0/download/linux-64/libta-lib-0.4.0-h166bdaf_1.tar.bz2'
!curl -L $url | tar xj -C /usr/lib/x86_64-linux-gnu/ lib --strip-components=1
url = 'https://anaconda.org/conda-forge/ta-lib/0.4.19/download/linux-64/ta-lib-0.4.19-py310hde88566_4.tar.bz2'
!curl -L $url | tar xj -C /usr/local/lib/python3.10/dist-packages/ lib/python3.10/site-packages/talib --strip-components=3
import talib

import gymnasium as gym

from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env

In [None]:
import math
import numpy as np
import matplotlib.pyplot as plt

from gym_anytrading.envs import StocksEnv, Actions, Positions

def reshape_obs(state, action, actual_ws):
    reshaped_state = state[:, 0]
    ws = len(reshaped_state)

    rsi_timeperiod = 9
    rsi_9 = talib.RSI(np.float64(reshaped_state[ws - rsi_timeperiod - 1:ws]), timeperiod=rsi_timeperiod)[-1]
    rsi_9 = rsi_9 / 100

    sma_200d = np.float32(talib.SMA(np.array(reshaped_state, dtype=np.float64), timeperiod=200))[-1]
    sma_200d = (reshaped_state[-1] / sma_200d) - 1

    reshaped_state = reshaped_state[ws - actual_ws:ws]

    reshaped_state = (reshaped_state - np.mean(reshaped_state)) / np.std(reshaped_state)
    reshaped_state = np.append(reshaped_state, action)
    reshaped_state = np.append(reshaped_state, rsi_9)
    reshaped_state = np.append(reshaped_state, sma_200d)
    return reshaped_state

class MyStocksEnv(StocksEnv):
    def __init__(self, actual_ws, **kwargs):
        super().__init__(**kwargs)

        self.actual_ws = actual_ws
        self.shape = (self.actual_ws + 3, )
        INF = 1e10
        self.observation_space = gym.spaces.Box(
            low=-INF, high=INF, shape=self.shape, dtype=np.float32,
        )

        self.last_action = 0

        self.number_of_trades = 0
        self.days_holding_position = 1
        self.history_days_holding_position = []

        self._total_reward = 0.
        self.trade_fee_bid_percent = 0
        self.trade_fee_ask_percent = 0


    def is_trade(self, action):
        if (action == Actions.Sell.value and self._position == Positions.Long):
            return True
        return False


    def _calculate_reward(self, action):
        step_reward = 0

        if self.is_trade(action) or (self._truncated and self._position == Positions.Long):
            self.number_of_trades += 1
            current_price = self.prices[self._current_tick]
            last_trade_price = self.prices[self._last_trade_tick]
            step_reward = (current_price / last_trade_price) - 1

        return step_reward


    def reset(self, seed=None, options=None):
        super().reset(seed=seed, options=options)
        self.action_space.seed(int((self.np_random.uniform(0, seed if seed is not None else 1))))

        self.number_of_trades = 0
        self.days_holding_position = 1
        self.history_days_holding_position = []

        self._truncated = False
        self._current_tick = self._start_tick
        self._last_trade_tick = self._current_tick - 1
        self._position = Positions.Short
        self._position_history = (self.window_size * [None]) + [self._position]
        self._total_reward = 0.
        self._total_profit = 1.  # unit
        self._first_rendering = True
        self.history = {}

        observation = self._get_observation()
        observation = reshape_obs(observation, 0, self.actual_ws)
        info = self._get_info()

        if self.render_mode == 'human':
            self._render_frame()

        return observation, info


    def step(self, action):
        self._truncated = False
        self._current_tick += 1

        if self._current_tick == self._end_tick:
            self.history_days_holding_position.append(self.days_holding_position)
            self._truncated = True

        step_reward = self._calculate_reward(action)
        self._total_reward += step_reward

        self._update_profit(action)

        trade = False
        if (
            (action == Actions.Buy.value and self._position == Positions.Short) or
            (action == Actions.Sell.value and self._position == Positions.Long)
        ):
            trade = True

        if trade:
            self._position = self._position.opposite()
            self._last_trade_tick = self._current_tick

        self._position_history.append(self._position)
        observation = self._get_observation()
        observation = reshape_obs(observation, action, self.actual_ws)
        info = self._get_info()
        self._update_history(info)

        if self.last_action == action:
            self.days_holding_position += 1
        else:
            self.history_days_holding_position.append(self.days_holding_position)
            self.days_holding_position = 1
            self.last_action = action

        if self.render_mode == 'human':
            self._render_frame()

        return observation, step_reward, self._truncated, self._truncated, info

    def get_stats(self):
        return self.number_of_trades, np.mean(self.history_days_holding_position), self.history_days_holding_position

    def get_trend(self):
        if self.prices[self.window_size] - self.prices[-1] > 0:
            return "decreasing"
        return "rising"

    def render_all(self, max_days, title=None):
        figsize_x = math.ceil((max_days / 100) * 3)
        plt.figure(figsize=(figsize_x, 6))
        window_ticks = np.arange(len(self._position_history))
        plt.plot(self.prices)

        short_ticks = []
        long_ticks = []
        last_pos = None
        for i, tick in enumerate(window_ticks):
            current_pos = self._position_history[i]

            if current_pos == last_pos:
                continue

            if  current_pos == Positions.Short:
                short_ticks.append(tick)
            elif current_pos == Positions.Long:
                long_ticks.append(tick)

            last_pos = current_pos

        plt.plot(short_ticks, self.prices[short_ticks], 'ro', label="Sprzedaj")
        plt.plot(long_ticks, self.prices[long_ticks], 'go', label="Kup")

        plt.legend()

        if title:
            plt.title(f"Stocks from day {self.frame_bound[0]}")

        plt.suptitle(
            "Total Reward: %.6f" % self._total_reward + ' ~ ' +
            "Total Profit: %.6f" % self._total_profit
        )
        plt.show()

In [None]:
def make_my_env(dataframe, window_size, start, period, actual_ws):
    env = MyStocksEnv(
        actual_ws,
        df=dataframe,
        window_size=window_size,
        frame_bound=(start, start+period+1),
    )
    return env

def make_env():
    print(df_shares.name)
    return make_my_env(df_shares, window_size, start, period, actual_ws)

# Test RSI 9 dni oraz SMA 200 dni

In [None]:
window_size = 200
actual_ws = 100
start = 200
period = len(df_shares_INTC)


df_shares = df_shares_INTC
vec_env = make_vec_env(make_env, n_envs=4,)
model = PPO("MlpPolicy", vec_env, verbose=1,)
model.learn(total_timesteps=250000)

period = len(df_shares_IBM)
df_shares = df_shares_IBM
vec_env = make_vec_env(make_env, n_envs=4,)
model = PPO("MlpPolicy", vec_env, verbose=1,)
model.learn(total_timesteps=250000)

model.save("ppo_z_score_index_4")

In [None]:
shares_storage = {
    "amd": df_shares_AMD,
    "meta": df_shares_META,
    "google": df_shares_GOOGLE,
    "cisco": df_shares_CISCO,
}

stats = {
    "profits": [],
    "num_of_trades": [],
    "avg_days_holding_positions": [],
    "trends": {
        "decreasing": {
            "counter": 0,
            "num_of_profits": 0,
            "profit_values": [],
        },
        "rising": {
            "counter": 0,
            "num_of_profits": 0,
            "profit_values": [],
        },
    }
}


for key, t_shares in shares_storage.items():
  print(f"--------------------------{key}--------------------------")
  for i in range(200, len(t_shares) - 365, 100):
      env = make_my_env(t_shares, window_size, i, 365, actual_ws)
      obs = env.reset()[0]
      done = False

      while not done:
          action, _states = model.predict(obs, deterministic=True)
          obs, reward, truncated, done, info = env.step(action)
      env.render_all(365, True)

      trend = env.get_trend()
      num_of_trades, avg_days_holding_positions, _ = env.get_stats()
      profit = info["total_profit"]

      stats['profits'].append(profit)
      stats["num_of_trades"].append(num_of_trades)
      stats["avg_days_holding_positions"].append(avg_days_holding_positions)
      stats["trends"][trend]["counter"] += 1
      if profit >= 1.0:
        stats["trends"][trend]["num_of_profits"] += 1
      stats["trends"][trend]["profit_values"].append(profit)

print(f"--------------------------STATS--------------------------")
print(f"Median profit: {np.median(stats['profits'])}")
print(f"Lowest profit achived: {(np.min(stats['profits']) - 1) * 100}")
print(f"Highest profit achived: {(np.max(stats['profits']) - 1) * 100}")
print(f"Average num_of_trades: {np.mean(stats['num_of_trades'])}")
print(f"Average avg_days_holding_positions: {np.mean(stats['avg_days_holding_positions'])}")
print(f"Profits achvied on decreasing trend: {stats['trends']['decreasing']['num_of_profits']} / {stats['trends']['decreasing']['counter']}, {stats['trends']['decreasing']['num_of_profits'] / stats['trends']['decreasing']['counter'] * 100}%")
print(f"Median profit achvied on decreasing trend: {np.median(stats['trends']['decreasing']['profit_values'])}, {(np.median(stats['trends']['decreasing']['profit_values']) - 1) * 100}%")
print(f"Profits achvied on rising trend: {stats['trends']['rising']['num_of_profits']} / {stats['trends']['rising']['counter']}, {stats['trends']['rising']['num_of_profits'] / stats['trends']['rising']['counter'] * 100}%")
print(f"Median profit achvied on rising trend: {np.median(stats['trends']['rising']['profit_values'])}, {(np.median(stats['trends']['rising']['profit_values']) - 1) * 100}%")


# **Dodanie do stanu SMA 100 dni oraz SMA 50 dni**

In [None]:
import math
import numpy as np
import matplotlib.pyplot as plt

from gym_anytrading.envs import StocksEnv, Actions, Positions

def reshape_obs(state, action, actual_ws):
    reshaped_state = state[:, 0]
    ws = len(reshaped_state)

    rsi_timeperiod = 9
    rsi_9 = talib.RSI(np.float64(reshaped_state[ws - rsi_timeperiod - 1:ws]), timeperiod=rsi_timeperiod)[-1]
    rsi_9 = rsi_9 / 100

    sma_200d = np.float32(talib.SMA(np.array(reshaped_state, dtype=np.float64), timeperiod=200))[-1]
    sma_200d = (reshaped_state[-1] / sma_200d) - 1

    sma_100d = np.float32(talib.SMA(np.array(reshaped_state, dtype=np.float64), timeperiod=100))[-1]
    sma_100d = (reshaped_state[-1] / sma_100d) - 1

    sma_50d = np.float32(talib.SMA(np.array(reshaped_state, dtype=np.float64), timeperiod=50))[-1]
    sma_50d = (reshaped_state[-1] / sma_50d) - 1

    reshaped_state = reshaped_state[ws - actual_ws:ws]

    reshaped_state = (reshaped_state - np.mean(reshaped_state)) / np.std(reshaped_state)
    reshaped_state = np.append(reshaped_state, action)
    reshaped_state = np.append(reshaped_state, rsi_9)
    reshaped_state = np.append(reshaped_state, sma_200d)
    reshaped_state = np.append(reshaped_state, sma_100d)
    reshaped_state = np.append(reshaped_state, sma_50d)
    return reshaped_state

class MyStocksEnv(StocksEnv):
    def __init__(self, actual_ws, **kwargs):
        super().__init__(**kwargs)

        self.actual_ws = actual_ws
        self.shape = (self.actual_ws + 5, )
        INF = 1e10
        self.observation_space = gym.spaces.Box(
            low=-INF, high=INF, shape=self.shape, dtype=np.float32,
        )

        self.last_action = 0

        self.number_of_trades = 0
        self.days_holding_position = 1
        self.history_days_holding_position = []

        self._total_reward = 0.
        self.trade_fee_bid_percent = 0
        self.trade_fee_ask_percent = 0


    def is_trade(self, action):
        if (action == Actions.Sell.value and self._position == Positions.Long):
            return True
        return False


    def _calculate_reward(self, action):
        step_reward = 0

        if self.is_trade(action) or (self._truncated and self._position == Positions.Long):
            self.number_of_trades += 1
            current_price = self.prices[self._current_tick]
            last_trade_price = self.prices[self._last_trade_tick]
            step_reward = (current_price / last_trade_price) - 1

        return step_reward


    def reset(self, seed=None, options=None):
        super().reset(seed=seed, options=options)
        self.action_space.seed(int((self.np_random.uniform(0, seed if seed is not None else 1))))

        self.number_of_trades = 0
        self.days_holding_position = 1
        self.history_days_holding_position = []

        self._truncated = False
        self._current_tick = self._start_tick
        self._last_trade_tick = self._current_tick - 1
        self._position = Positions.Short
        self._position_history = (self.window_size * [None]) + [self._position]
        self._total_reward = 0.
        self._total_profit = 1.  # unit
        self._first_rendering = True
        self.history = {}

        observation = self._get_observation()
        observation = reshape_obs(observation, 0, self.actual_ws)
        info = self._get_info()

        if self.render_mode == 'human':
            self._render_frame()

        return observation, info


    def step(self, action):
        self._truncated = False
        self._current_tick += 1

        if self._current_tick == self._end_tick:
            self.history_days_holding_position.append(self.days_holding_position)
            self._truncated = True

        step_reward = self._calculate_reward(action)
        self._total_reward += step_reward

        self._update_profit(action)

        trade = False
        if (
            (action == Actions.Buy.value and self._position == Positions.Short) or
            (action == Actions.Sell.value and self._position == Positions.Long)
        ):
            trade = True

        if trade:
            self._position = self._position.opposite()
            self._last_trade_tick = self._current_tick

        self._position_history.append(self._position)
        observation = self._get_observation()
        observation = reshape_obs(observation, action, self.actual_ws)
        info = self._get_info()
        self._update_history(info)

        if self.last_action == action:
            self.days_holding_position += 1
        else:
            self.history_days_holding_position.append(self.days_holding_position)
            self.days_holding_position = 1
            self.last_action = action

        if self.render_mode == 'human':
            self._render_frame()

        return observation, step_reward, self._truncated, self._truncated, info

    def get_stats(self):
        return self.number_of_trades, np.mean(self.history_days_holding_position), self.history_days_holding_position

    def get_trend(self):
        if self.prices[self.window_size] - self.prices[-1] > 0:
            return "decreasing"
        return "rising"

    def render_all(self, max_days, title=None):
        figsize_x = math.ceil((max_days / 100) * 3)
        plt.figure(figsize=(figsize_x, 6))
        window_ticks = np.arange(len(self._position_history))
        plt.plot(self.prices)

        short_ticks = []
        long_ticks = []
        last_pos = None
        for i, tick in enumerate(window_ticks):
            current_pos = self._position_history[i]

            if current_pos == last_pos:
                continue

            if  current_pos == Positions.Short:
                short_ticks.append(tick)
            elif current_pos == Positions.Long:
                long_ticks.append(tick)

            last_pos = current_pos

        plt.plot(short_ticks, self.prices[short_ticks], 'ro', label="Sprzedaj")
        plt.plot(long_ticks, self.prices[long_ticks], 'go', label="Kup")

        plt.legend()

        if title:
            plt.title(f"Stocks from day {self.frame_bound[0]}")

        plt.suptitle(
            "Total Reward: %.6f" % self._total_reward + ' ~ ' +
            "Total Profit: %.6f" % self._total_profit
        )
        plt.show()

# Test RSI 9 dni oraz SMA 200, 100 oraz 50 dni

In [None]:
window_size = 200
actual_ws = 100
start = 200
period = len(df_shares_INTC)


df_shares = df_shares_INTC
vec_env = make_vec_env(make_env, n_envs=4,)
model = PPO("MlpPolicy", vec_env, verbose=1,)
model.learn(total_timesteps=250000)

period = len(df_shares_IBM)
df_shares = df_shares_IBM
vec_env = make_vec_env(make_env, n_envs=4,)
model = PPO("MlpPolicy", vec_env, verbose=1,)
model.learn(total_timesteps=250000)

model.save("ppo_z_score_index_8")

In [None]:
shares_storage = {
    "amd": df_shares_AMD,
    "meta": df_shares_META,
    "google": df_shares_GOOGLE,
    "cisco": df_shares_CISCO,
}

stats = {
    "profits": [],
    "num_of_trades": [],
    "avg_days_holding_positions": [],
    "trends": {
        "decreasing": {
            "counter": 0,
            "num_of_profits": 0,
            "profit_values": [],
        },
        "rising": {
            "counter": 0,
            "num_of_profits": 0,
            "profit_values": [],
        },
    }
}


for key, t_shares in shares_storage.items():
  print(f"--------------------------{key}--------------------------")
  for i in range(200, len(t_shares) - 365, 100):
      env = make_my_env(t_shares, window_size, i, 365, actual_ws)
      obs = env.reset()[0]
      done = False

      while not done:
          action, _states = model.predict(obs, deterministic=True)
          obs, reward, truncated, done, info = env.step(action)
      env.render_all(365, True)

      trend = env.get_trend()
      num_of_trades, avg_days_holding_positions, _ = env.get_stats()
      profit = info["total_profit"]

      stats['profits'].append(profit)
      stats["num_of_trades"].append(num_of_trades)
      stats["avg_days_holding_positions"].append(avg_days_holding_positions)
      stats["trends"][trend]["counter"] += 1
      if profit >= 1.0:
        stats["trends"][trend]["num_of_profits"] += 1
      stats["trends"][trend]["profit_values"].append(profit)

print(f"--------------------------STATS--------------------------")
print(f"Median profit: {np.median(stats['profits'])}")
print(f"Lowest profit achived: {(np.min(stats['profits']) - 1) * 100}")
print(f"Highest profit achived: {(np.max(stats['profits']) - 1) * 100}")
print(f"Average num_of_trades: {np.mean(stats['num_of_trades'])}")
print(f"Average avg_days_holding_positions: {np.mean(stats['avg_days_holding_positions'])}")
print(f"Profits achvied on decreasing trend: {stats['trends']['decreasing']['num_of_profits']} / {stats['trends']['decreasing']['counter']}, {stats['trends']['decreasing']['num_of_profits'] / stats['trends']['decreasing']['counter'] * 100}%")
print(f"Median profit achvied on decreasing trend: {np.median(stats['trends']['decreasing']['profit_values'])}, {(np.median(stats['trends']['decreasing']['profit_values']) - 1) * 100}%")
print(f"Profits achvied on rising trend: {stats['trends']['rising']['num_of_profits']} / {stats['trends']['rising']['counter']}, {stats['trends']['rising']['num_of_profits'] / stats['trends']['rising']['counter'] * 100}%")
print(f"Median profit achvied on rising trend: {np.median(stats['trends']['rising']['profit_values'])}, {(np.median(stats['trends']['rising']['profit_values']) - 1) * 100}%")
