In [4]:
import numpy as np
import pandas as pd
import gymnasium as gym
import pandas_ta as ta
from tensorly.decomposition import Tucker
import torch
from portfolio_env_with_tcost import AbstractPortfolioEnvWithTCost
from typing import Tuple, Optional
import numpy.typing as npt

In [5]:
class MPTWithTCost(AbstractPortfolioEnvWithTCost):

    def get_obs_space(self) -> gym.spaces.Box:
        self.state_shape = (self.universe_size + 1, 31)
        self.get_indicators()
        return gym.spaces.Box(low=-np.inf, high=np.inf, shape=self.state_shape, dtype=np.float64)
    
    def get_data(self) -> Tuple[int, int]:
        # read SNP data
        df = pd.read_csv('crsp_snp100_2010_to_2024.csv', dtype='string')
    
        # convert datatypes
        df = df[['date', 'TICKER', 'PRC', 'VOL', 'ASKHI', 'BIDLO', 'FACPR']]
        df.date = pd.to_datetime(df.date)
        df.FACPR = df.FACPR.fillna('0.0')
        df.astype({
            'PRC': float,
            'VOL': float,
            'ASKHI': float,
            'BIDLO': float,
            'FACPR': float
        })
    
        # drop duplicates and nans
        df = df.drop_duplicates(subset=['date', 'TICKER'])
        df.dropna(inplace=True)
    
        # only include stocks that are present in all dates
        ticker_ok = df.TICKER.value_counts() == df.TICKER.value_counts().max()

        def is_max_val_count(ticker: str) -> bool:
          return ticker_ok[ticker]
        
        ok = df.apply(lambda row: is_max_val_count(row['TICKER']), axis=1)
        df = df[ok]
        df = df[(df.date.dt.year >= 2010) & (df.date.dt.year <= 2019)]
    
        # create stock array
        self.stock_df = df.pivot(index='date', columns='TICKER', values='PRC').astype(float)
        
        # adjust for stock splits
        facpr_df = df.pivot(index='date', columns='TICKER', values='FACPR').astype(float)
        self.stock_df = self.stock_df * (1+facpr_df).cumprod(axis=0)
        self.ret = np.log(self.stock_df.pct_change().iloc[1:, :] + 1)
        self.times = df.date.unique()[1:]
        self.tickers = df.TICKER.unique()

        return len(self.times)-100-1, len(self.tickers)

    def get_indicators(self):
        self.conv3d = torch.nn.Conv3d(in_channels=4, out_channels=32, kernel_size=(1, 3, 1), padding="same")
        self.relu = torch.nn.ReLU()
        self.tucker = Tucker(rank=self.state_shape, init="random")
        self.m = 28
        self.t = self.m
        self.w1, self.w2, self.w3 = 28, 14, 9
        
        df = (pd.DataFrame(self.stock_df, columns=self.tickers))
        df = df.dropna()
        mp = {ticker: pd.DataFrame(df[ticker]).rename(columns={ticker: "close"}) for ticker in self.tickers}
        # SMA df
        sma = {ticker: pd.DataFrame(mp[ticker].ta.sma(self.w1)).rename(columns={"SMA_28": ticker}) for ticker in self.tickers}
        sma_df = (pd.concat(sma.values(), axis=1))
        # RSI df
        rsi = {ticker: pd.DataFrame(mp[ticker].ta.rsi(self.w2)).rename(columns={"RSI_14": ticker}) for ticker in self.tickers}
        rsi_df = (pd.concat(rsi.values(), axis=1))
        # MACD df
        macd = {ticker: pd.DataFrame(mp[ticker].ta.macd(self.w3, 26, 12)["MACD_9_26_12"]).rename(columns={"MACD_9_26_12": ticker}) for ticker in self.tickers}
        macd_df = (pd.concat(macd.values(), axis=1))

        # Compute F = V @ Corr
        V = np.array([np.array(x.T) for x in [df, sma_df, rsi_df, macd_df]])
        Corr = np.array([np.corrcoef(x) for x in V])
        self.F = np.einsum('aki,akj->akij', V, Corr)

    def get_state(self) -> npt.NDArray[np.float64]:
        f = self.F[:, :, self.t - self.m : self.t, :]
        f = torch.Tensor(np.expand_dims(f, axis=0))
        f = self.relu(self.conv3d(f)).detach().numpy().squeeze(axis=0)
        core, _ = self.tucker.fit_transform(f)
        return core 

    def get_prices(self) -> npt.NDArray[np.float64]:
        return np.append(self.stock_df.loc[self.times[self.t+100], :].to_numpy().flatten(), 1.0)
    

In [6]:
env = MPTWithTCost()

In [19]:
env.F.shape

(4, 84, 2516, 84)