#CLA 1 - instalacje:

In [None]:
!pip install func-timeout
!pip install PyPortfolioOpt

#CLA 2 - kod:

In [None]:
from google.colab import drive
from pypfopt.cla import CLA
from gym import Env, spaces
import pandas as pd
import numpy as np
from typing import List
import matplotlib.pyplot as plt
from datetime import datetime
import os
from func_timeout import func_timeout
from func_timeout.exceptions import FunctionTimedOut
drive.mount('/content/gdrive')

def read_file(fname, v, rolling_len, offset):
    df = pd.read_excel(fname)
    data = df.iloc[offset:offset+rolling_len, 1:].values
    if v == "train":
        data = data[:int(len(data) * 8 / 10)]
    elif v == "test":
        data = data[int(len(data) * 8 / 10):]
    else:
        pass
    return data.shape[1], data

class FinanceEnv(Env):
    def __init__(
        self,
        fname,
        v,
        rolling_len,
        offset
    ):
        super().__init__()

        self.observation_length = 20
        self.sequence_count, self.raw_sequences = read_file(fname, v, rolling_len, offset)

        self.total_profit = 0
        self.total_profits = []

        self.curr_step = 0

        first_state = self.read_frame(0)
        self.observation_space = spaces.Box(
            low=-100, high=100, shape=(np.expand_dims(first_state, axis=0).shape)
        )
        self.action_space = spaces.Box(low=-1, high=1, shape=(self.sequence_count,))

        self.profit_history = []

    def read_frame(self, index):
        if index + self.observation_length < self.raw_sequences.shape[-2] - 1:
            frame = self.raw_sequences[index: index + self.observation_length, :]
            return frame
        else:
            return None

    def step(self, actions: List[float]):

        actions = actions.flatten()

        done = False

        shift = 1
        prices = self.read_frame(self.curr_step + shift)

        if prices is not None:
            prices = prices[-2:, :]
            price_change = (prices[-1, :] - prices[-shift-1, :]) / prices[-shift-1, :]
            profit = (actions * price_change).sum().item()
            self.total_profit += profit
            self.profit_history += [profit]

            reward = profit

            self.curr_step += 1

            next_state = self.read_frame(self.curr_step)

        elif prices == None:
            done = True
            self.total_profits.append(self.total_profit)
            next_state = np.empty(self.observation_space.shape, dtype=object)
            reward = 0

        return (
            next_state,
            reward,
            done,
            {
                "curr_step": self.curr_step,
                "rewards": [reward],
                "wallet": self.total_profit,
            },
        )

    def reset(self):
        self.total_profit = 0
        self.curr_step = 0

        self.profit_history = []

        observation = self.read_frame(0)

        return observation

    def render(self, mode="human"):
        pass

now = datetime.now().strftime("%d-%m-%Y_%H-%M-%S")
res_dir = "gdrive/MyDrive/PracaMag/results/{}_CLA".format(now)
os.mkdir(res_dir)
wallets = []
naive_wallets = []
rolling_len = 500
offset = 0

for i in range(6):
    test_env = FinanceEnv('gdrive/MyDrive/PracaMag/Dane.xlsx', "test", rolling_len, offset)

    obs = test_env.reset()
    wallet = []
    actions = []

    done = False
    while not done:
        obs = obs.transpose((1, 0))
        mean = np.mean(obs, axis=1)
        cov = np.cov(obs)
        try:
            ord = func_timeout(60, CLA(mean, cov).max_sharpe)
            action = np.array([ord[k] for k in ord.keys()])
        except FunctionTimedOut:
            action = np.full((test_env.sequence_count,), 1 / test_env.sequence_count)
        except:
            action = np.full_like((test_env.sequence_count,), 1 / test_env.sequence_count)
        obs, reward, done, info = test_env.step(action)
        wallet += [info["wallet"]]
        actions += [action]

    naive_score = 0
    done = False
    obs = test_env.reset()

    while not done:
        action = np.full((test_env.sequence_count,), 1 / test_env.sequence_count)
        obs, reward, done, info = test_env.step(action)
        naive_score = info["wallet"]
        actions += [action]

    naive_wallets.append(naive_score)

    fig = plt.figure()
    plt.title(label="Test wallet through days")
    plt.ylabel("Final wallet")
    plt.xlabel('day of trading')
    plt.plot(wallet)
    fig.savefig("{}/{}_b".format(res_dir, i))
    plt.close()

    wallets.append(wallet[-1])
    offset += 200
    test_env = None

fig = plt.figure()
plt.title(label="Final wallet")
plt.plot(list(range(1, len(naive_wallets)+1)), wallets, label="cla")
plt.plot(list(range(1, len(naive_wallets)+1)), naive_wallets, label="naive")
plt.legend()
fig.savefig("{}/wallet".format(res_dir))
plt.close()
np.savetxt("{}/walletarr".format(res_dir), wallets)