In [16]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats as sps
import warnings
from math import isclose
sns.set()

plt.rc('font', size=30)
plt.rc('axes', titlesize=30)
plt.rc('axes', labelsize=30)
plt.rc('xtick', labelsize=30)
plt.rc('ytick', labelsize=30)
plt.rc('legend', fontsize=30)
plt.rc('figure', titlesize=30)

from sklearn.base import BaseEstimator
from sklearn.linear_model import LinearRegression
from sklearn.decomposition import PCA
from sklearn.decomposition import KernelPCA
from sklearn.covariance import MinCovDet
import cvxpy as cvx

In [17]:
class NewPortfolioOptimizer(BaseEstimator):
    '''
    Класс, оптимизирующий ковариационную матрицу Марковица нужным способом, находящий оптимальные веса 
    для различных компонент портфеля, считающий по полученным весам прибыль и метрики. 
    Интерфейс класса схож с интерфейсом класса со sklearn, при этом PortfolioOptimizer является наследником
    класса BaseEstimator, что позволяет использовать написанный класс в оптимизаторах гиперпараметров.
    '''
    
    def __init__(self, R=2e-3, R_quantille=None, method=None, n_top_companies=20,
                 n_components=1, kernel='poly', kernelgamma=None, kerneldegree=3, kernelcoef0=1, kernelparams=None, 
                 size_of_window=None, risk_free_value=1, smooth_function=None, period_change_portfolio=360,
                 metric='sharp', VAR_quantile=0.05, verbosity=False, threshold=1e-5):
        '''
        Функция инициализации.
        
        Параметры:
        ----------
            1) Параметры портфеля:
                R : float, default=2e-3. Ожидаемый доход портфеля (за один блок). Если R_quantille != None, 
                    то используется R_quantile.
                R_quantille : float/None, default=none. Квантиль ожидаемого дохода портфеля среди топ-компаний.
                              Должен быть в промежутке [0, 1]. Если None, то используется R.
            **********
            
            2) Параметры оптимизаторов матрицы ковариации:
                +++ Метод оптимизации ковариационной матрицы +++
                    method : str/None, default=None. Метод оптимизации ковариационной матрицы.
                
                +++ Параметры PCA/KernelPCA +++
                    n_components : int, default=1. Сколько компонент брать в PCA/KernelPCA. 
                                   Используется, если is_PCA или is_kernel_PCA == True;
                    kernel : str, default='poly'. Какое ядро использовать в KernelPCA. 
                                  Используется, если is_kernel_PCA == True;
                    kernelgamma : int/None, default=None. Параметр gamma в KernelPCA.
                                  Используется, если is_kernel_PCA == True;
                    kerneldegree : int, default=3. Степень полиномиального ядра в KernelPCA.
                                   Используется, если is_kernel_PCA == True и kernel=='poly';
                    kernelcoef0 : float, dedault=1. Параметр coef0 в KernelPCA.
                                  Используется, если is_kernel_PCA == True;
                    kernelparams : dict/None, default=None. Параметр kernel_params в KernelPCA.
                                   Пока нигде не используется, но может полезен при работе с другими ядрами.
            **********
            
            3) Параметры окна, периода и числа топ-компаний:
                size_of_window : int/None, default=None. Размер окна (в строчках), по которому используем данные.
                period_change_portfolio : int/None, default=360. Период, с которым меняем портфель.
                n_top_companies : int, default=20. 
                                  Количество топ-компаний по средней доходности, которые мы рассматриваем.
            **********
            
            4) Параметры подсчета метрики:
                metric : str, default='sharp'. Если 'sortino', то используем в качестве метрики коэффициент Сортино,
                         если 'sharp', то используем в качестве метрики коэффициент Шарпа, 
                         если 'VAR', то используем в качестве метрики Value at Risk.
                risk_free_value : float, default=1. Value безрискового дохода.
                VAR_quantille : float, default=0.05. Кватниль, которую мы берем в качестве оценки метрики Value at Risk.
            **********
            
            5) Параметры сглаживания и :
                smooth_function : function/None, default=None. Функция сглаживания.
            **********
            
            6) Параметры логов:
                verbosity : bool, default=False. Стоит ли выводить логи (пока что выводятся только логи CVXPY).
            **********
            
            7) Трешхолды:
                threshold : float, default=1e-5. Трешхолд, по которому обрезаются веса компаний.
            **********
        ----------
        
        Возвращает: 
        ----------
        '''
        
        # сохраняем гиперпараметры и инициализируем пустыми массивами массивы весов и тикеров
        self.R = R
        self.R_quantille = R_quantille
        self.method = method
        self.n_components = n_components
        self.n_top_companies = n_top_companies
        self.kernel = kernel
        self.kernelgamma = kernelgamma
        self.kerneldegree = kerneldegree
        self.kernelcoef0 = kernelcoef0
        self.kernelparams = kernelparams        
        self.size_of_window = size_of_window            
        self.risk_free_value = risk_free_value
        self.smooth_function = smooth_function
        self.metric = metric
        self.VAR_quantile = VAR_quantile
        self.period_change_portfolio = period_change_portfolio
        self.verbosity = verbosity
        self.threshold = threshold
        self.w_ = []
        self.arg_top_returns_ = []
    
    
    def _decrease_risk(self):
        '''
        Функция решения задачи оптимизации портфельной теории Марковица.
        
        Параметры: 
        ----------
        
        Возвращает:
        ----------
            opt_weights : np.array(float). Оптимальные веса по портфельной теории Марковица.
            
        '''
        
        p = len(self.optimized_mu_)
        # объявляем переменную оптимизации
        w = cvx.Variable(p)
        # объявляем задачу минимизации
        obj = cvx.Minimize(1/2 * cvx.quad_form(w, self.optimized_Sigma_))

        # объявляем ограничения
        equal_constraints_1 = [self.optimized_mu_.T @ w == self.R]
        equal_constraints_2 = [np.ones(p) @ w == 1]
        eyes = np.eye(p)
        nonequal_constraints = [eye @ w >= 0 for eye in eyes]
        constraints = equal_constraints_1 + equal_constraints_2 + nonequal_constraints
        
        # решаем задачу минимизации
        if np.all(np.linalg.eigvals(self.optimized_Sigma_) > 0):
            try:
                problem = cvx.Problem(obj, constraints=constraints)
                result = problem.solve(verbose=self.verbosity, solver='SCS')
            except:
                warnings.warn('''SolverError: solver can't solve this task. 
                Trying to solve with another solver''')
                problem = cvx.Problem(obj, constraints=constraints)
                result = problem.solve(verbose=self.verbosity, solver='CVXOPT')  
            
        else:
            warnings.warn("Covariance matrix is not a positive-definite")
            problem = cvx.Problem(obj, constraints=constraints)
            result = problem.solve(verbose=self.verbosity, solver='CVXOPT')       
          
        # сохраняем найденные оптимальные веса и сохраняем их
        opt_weights = w.value
        return opt_weights
    
    
    def _count_portfolio_value(self, block, cumulative=True, t=0):
        '''
        Функция подсчета value портфеля.
        
        Параметры: 
        ----------
            block : pd.DataFrame. Блок, за который считаем value.
        ----------
        
        Возвращает:
        ----------
            portfolio_value: pd.Series. Таблица из 2 столбцов: индексом служит время,
                              во втором столбце кумулятивные value, соответствующие этому времени.
        '''
        
        portfolio_value = None
        # если веса не определены, проинициализируем их по дефолту
        if self.w_[t] is None:
            warnings.warn(f'w in count value is None; w was set by default')
            self.w_[t] = np.ones(shape=(self.n_top_companies)) / self.n_top_companies
        
        # проходимся по всем компаниям, считаем их value и взвешенно суммируем
        for w, col in zip(self.w_[t], self.arg_top_returns_[t]):
            if cumulative:
                one_company_value = w * (block[col] + 1).cumprod()
            else:
                one_company_value = w * (block[col] + 1)
            if portfolio_value is None:
                portfolio_value = one_company_value
            else:
                portfolio_value += one_company_value
        
        return portfolio_value

    def _count_portfolio_value_by_day(self, X_test):
        '''
        Функция подсчета value портфеля по дням.
        
        Параметры: 
        ----------
            X_test : pd.DataFrame. Тестовый датасет, за который считаем value.
        ----------
        
        Возвращает:
        ----------
            all_value_by_day: pd.Series. Таблица из 2 столбцов: индексом служит время,
                              во втором столбце value, соответствующие этому времени.
        '''
        # если period_change_portfolio None, то поставим его больше, чем длина теста
        if self.period_change_portfolio is None:
            self.period_change_portfolio = len(X_test) + 1
        
        # series, который будем возвращать
        all_value_by_day = pd.Series(dtype='float64')
        
        # проходимся по периодам
        for t in range(len(self.w_)):
            # вырежем период
            if t != len(self.w_) - 1:
                period = X_test.iloc[t * self.period_change_portfolio: (t + 1) * self.period_change_portfolio]
            else:
                period = X_test.iloc[t * self.period_change_portfolio:]
                
            # найдем некумулятивные value за период
            period_value = self._count_portfolio_value(period, cumulative=False, t=t)
            
            # добавим найденные value в датафрейм из предсказаний
            all_value_by_day = pd.concat([all_value_by_day, period_value])
        
        return all_value_by_day
        
        
    def refit(self, X_train, Y_train=None):
        '''
        Функция переобучения под новые данные. Очищает все веса и тикеры и обучается под новые данные.
        
        Параметры: 
        ----------
            X_train : pd.DataFrame. Датасет, на котором обучаемся.
            Y_train : pd.Series/None, default=None. 
            Параметр, который не используется. Нужен, чтоб некоторые функции принимали реализуемый класс.
        ----------
        
        Возвращает:
        ----------
            self : NewPortfolioOptimizer class. Обученный объект класса.
        
        '''
        
        self.w_ = []
        self.arg_top_returns_ = []
        self.fit(X_train)
       
    
    def _optimize_cov_PCA_OR_Kernel_PCA(self):
        '''
        Функция выделения главных компонент и подсчета среднего дохода и ковариационной матрицы дохода компонент
        с помощью PCA/KernelPCA.
        
        Параметры: 
        ----------
        
        Возвращает:
        ----------
        '''
        
        # если PCA, то обучим PCA и сохраним компоненты
        if self.method == 'PCA':
            pca = PCA(n_components=min(self.n_components, self.n_top_companies))
            pca.fit(self.top_returns_.cov())
            self.components_ = pca.components_
        # если KernelPCA, то обучим KernelPCA и сохраним компоненты
        else:
            kpca = KernelPCA(n_components=min(self.n_components, self.n_top_companies), 
            kernel=self.kernel, degree=self.kerneldegree, gamma=self.kernelgamma, 
            coef0=self.kernelcoef0, kernel_params=self.kernelparams)
            kpca.fit(self.top_returns_.cov())
            self.components_ = kpca.eigenvectors_.T
        self.components_ = pd.DataFrame(self.components_)
        # уберем компоненты, которые < 0
        self.components_[self.components_ < 0] = 0
        # сбалансируем компоненты, чтоб сумма весов в компонентах была равна 1
        self.components_ = self.components_.div(self.components_.sum(axis=1), axis=0)
        assert isclose(self.components_.sum(axis='columns').iloc[0], 1, rel_tol=1e-3), \
        f'sum of weights of the first component {self.components_.sum(axis=0).iloc[0]} is not close with 1'
        # сделаем датасет ретернов по компонентам
        new_data = self.top_returns_.to_numpy() @ self.components_.to_numpy().T
        self.optimized_data_ = pd.DataFrame(columns=np.linspace(1, new_data.shape[1], new_data.shape[1], dtype=int), 
                                            index=self.top_returns_.index, data=new_data)
        # посчитаем средние доходов компонент и ковариационную матрицу компонент
        self.optimized_mu_ = self.optimized_data_[self.optimized_data_.columns].mean().to_numpy()
        self.optimized_Sigma_ = self.optimized_data_[self.optimized_data_.columns].cov()
        
        # проверка на положительную определенность и симметричность ковариационной матрицы
        if not np.all(np.linalg.eigvals(self.optimized_Sigma_) > 0) & \
                np.all(self.optimized_Sigma_ == self.optimized_Sigma_.T):
            warnings.warn("Covariance matrix after PCA/KernelPCA is not a positive-definite or symmetric")
            self.optimized_data_ = self.X_train_window_
            self.optimized_Sigma_ = self.Sigma_
            self.optimized_mu_ = self.mu_
                
                
    def _optimize_cov_MCD(self):
        '''
        Функция оптимизации матрицы с помощью MinCovDet.
        
        Параметры: 
        ----------
        
        Возвращает:
        ----------
        '''
            
        # обучим MinCovDet и сохраним оптимизированную матрицу ковариаций
        mcd = MinCovDet()
        mcd.fit(self.top_returns_)
        self.optimized_mu_ = self.mu_
        self.optimized_Sigma_ = mcd.covariance_
        
        
    def _set_weights(self, weights=None, tickers=None):
        '''
        
        Функция добавления оптимальных или установленных весов и тикеров в массивы весов и тикеров соответственно.
        
        Параметры:
        weights : np.array(float)/None, default=None. Массив весов, которые надо добавить.
        tickers : np.array(str)/None, default=None. Массив тикеров, которые надо добавить.
        ----------
        
        Возвращает:
        ----------
        '''
        # если веса и тикеры даны, то просто добавим их в соответствующие массивы и выйдем из функции
        if weights is not None:
            self.w_.append(weights)
            tickers = pd.Index(tickers)
            self.arg_top_returns_.append(tickers)
            return
        
        # решение задачи минимизации ковариационной матрицы портфеля
        w = self._decrease_risk()
        
        # если PCA или KernelPCA, то надо восстановить веса с учетом компонент
        if self.method == 'PCA' or self.method == 'KernelPCA':
            if w is None:
                warnings.warn(f'w after PCA/KernelPCA is None; w was set by default')
                w = np.ones(shape=(self.n_components)) / self.n_components
            w = self.components_.T @ w
        
        if w is None:
            warnings.warn(f'w is None; w was set by default')
            w = np.ones(shape=(self.n_top_companies)) / self.n_top_companies
            
        # обрезание весов по трешхолду 
        w[w < self.threshold] = 0
        # после обрезания весов их сумма может быть не близка к единице. Если так, то перебалансируем
        if not isclose(np.sum(w), 1, rel_tol=1e-3):
            warnings.warn(f'sum of weights {np.sum(w)} is not close with 1; the weights will be rebalanced')
            w = w / np.sum(w)
        # добавляем новые веса и тикеры в массивы
        self.w_.append(w)
        self.arg_top_returns_.append(self.top_returns_.columns)
    
        
    def fit(self, X_train, Y_train=None):
        '''
        Функция для обучения модели.
        1) Выбирает последние элементы по размеру окна.
        2) Вычисляет вектор ожидаемой доходности и ковариационной матрицы доходностей.
        3) Оптимизирует коваривационную матрицу, если это требуется.
        4) Решает задачу минимизации ковариационной матрицы портфеля. Находит оптимальные веса активов.
        
        Параметры: 
        ----------
            X_train : pd.DataFrame. Датасет, на котором обучаемся.
            Y_train : pd.Series/None, default=None. 
            Параметр, который не используется. Нужен, чтоб некоторые функции принимали реализуемый класс.
        ----------
        
        Возвращает:
        ----------
            self : PortfolioOptimizer class. Обученный объект класса.
        '''
        
        # размерность данных должна быть больше, чем число компонент
        if self.n_components > self.n_top_companies:
            warnings.warn('''self.n_components > self.n_top_companies;
            self.n_components is set equal to self.n_top_companies''')
            self.n_components = self.n_top_companies
        
        # размер окна должен быть больше, чем период смены портфеля 
        if self.size_of_window is not None and self.period_change_portfolio is not None \
            and self.size_of_window < self.period_change_portfolio:
            warnings.warn('''size_of_window less than period_change_portfolio;
                          size_of_window was set the same as period_change_portfolio''')
            self.size_of_window = self.period_change_portfolio
        
        self.X_train_ = X_train
        
        # выбор последних значений по размеру окна
        if self.size_of_window is not None:
            self.X_train_window_ = X_train.iloc[-self.size_of_window:]
            
        else:
            self.X_train_window_ =  X_train

        # вычисление вектора ожидаемой доходности и ковариационной матрицы доходностей
        mean = self.X_train_window_[self.X_train_window_.columns].mean()
        self.top_returns_ = self.X_train_window_[mean.sort_values(ascending=False)[:self.n_top_companies].index]
        self.mu_ = mean.sort_values(ascending=False)[:self.n_top_companies].to_numpy()

        self.Sigma_ = (self.top_returns_[self.top_returns_.columns]).cov()
                        
        # оптимизация ковариационной матрицы
        if self.method == 'PCA' or self.method == 'KernelPCA':
            self._optimize_cov_PCA_OR_Kernel_PCA()
                
        elif self.method == 'MCD':
            self._optimize_cov_MCD()

        else:
            self.optimized_mu_ = self.mu_
            self.optimized_Sigma_ = self.Sigma_
        
        if not np.all(np.linalg.eigvals(self.optimized_Sigma_) > 0) & \
                    np.all(self.optimized_Sigma_ == self.optimized_Sigma_.T):
            warnings.warn("Covariance matrix is not a positive-definite or symmetric")

        # установка значения R
        if self.R_quantille is not None:
            self.R = np.quantile(self.optimized_mu_, self.R_quantille)
        
        if np.sort(self.optimized_mu_)[0] > self.R:
            warnings.warn(f'R = {self.R} less than the least of expecting returns = {self.optimized_mu_[0]}')
            self.R = min(np.quantile(self.optimized_mu_, 0.75), self.optimized_mu_[-1])
        
        if np.sort(self.optimized_mu_)[-1] < self.R:
            warnings.warn(f'R = {self.R} greater than the greatest of expecting returns = {self.optimized_mu_[-1]}')
            self.R = min(np.quantile(self.optimized_mu_, 0.75), self.optimized_mu_[-1])
        
        self._set_weights()
        return self

    
    def _rebalance_fit(self, period):
        '''
        Функция для обучения с учетом новых данных и перебалансировки весов.
        
        Параметры:
        ----------
            period : pd.DataFrame. Датасет за определенный период, который добавляем в обучение (новые данные).
        ----------
        
        Возвращает:
        ----------
            self : PortfolioOptimizer class. Обученный с учетом новых данных объект класса.
        '''
        
        # добавляем новый период в трейн и обучаемся на новом трейне
        X_train = pd.concat([self.X_train_, period])
        self.fit(X_train)
        return self
    
    
    def predict(self, X_test, Y_test=None):
        '''
        Функция для предсказания ретернов на исторических данных.
        
        Параметры: 
        ----------
            X_test : pd.DataFrame. Тестовый датасет.
            Y_test : pd.Series/None, default=None. 
            Параметр, который не используется. Нужен, чтоб некоторые функции принимали реализуемый класс.
        ----------
        
        Возвращает:
        ----------
            block_return : np.array(float). Массив ретернов за блоки.
            all_return : pd.Series. Таблица из 2 столбцов: индексом служит время,
                         во втором столбце кумулятивные ретёрны, соответствующие этому времени.
        '''
        
        # сохраняем старые трейн и массивы весов и тикеров, чтобы после предикта их восстановить
        old_X_train = self.X_train_
        old_w = self.w_
        old_arg_top_returns = self.arg_top_returns_
        
        # если period_change_portfolio None, то поставим его больше, чем длина теста
        if self.period_change_portfolio is None:
            self.period_change_portfolio = len(X_test) + 1
        
        all_value = pd.Series(dtype='float64')
        
        # разбиваем данные на периоды
        num_periods = 1 + int(len(X_test) / self.period_change_portfolio)
        
        # пройдемся по периодам
        for t in range(num_periods):
            # вырежем период
            if t != num_periods - 1:
                period = X_test.iloc[t * self.period_change_portfolio: (t + 1) * self.period_change_portfolio]
            else:
                period = X_test.iloc[t * self.period_change_portfolio:]
            
            # сделаем предсказание на период
            period_value = self._count_portfolio_value(period, t=t)
            if len(all_value) > 0:
                period_value.iloc[:] = np.array(period_value.iloc[:]) * all_value[-1]
            all_value = pd.concat([all_value, period_value])
            
            # сделаем фит с учетом новых данных
            if t != num_periods - 1:
                self._rebalance_fit(period)
        
        # вернем старые трейн и массивы весов и тикеров
        self.X_train_ = old_X_train
        self.w_ = old_w
        self.arg_top_returns_ = old_arg_top_returns
            
        return all_value
    
    
    def score(self, X_test, Y_test=None, how=None, was_predicted=False):
        '''
        Функция для подсчета метрики.
        
        Параметры: 
        ----------
            X_test : pd.DataFrame. Тестовый датасет.
            Y_test : pd.Series/None, default=None. 
            Параметр, который не используется. Нужен, чтоб некоторые функции принимали реализуемый класс.
            how : str/None, default=None. Какую метрику использовать. Если 'sortino', 
            то используется коэффициент Сортино, если None или 'sharp', то используется коэффициет Шарпа.
        ----------
        
        Возвращает:
        ----------
            sc : нужный коэффициент (Шарпа или Сортино)
        '''
        
        # если how None, используем метрику класса
        if how is None:
            how = self.metric

        # если не предсказывали на этих данных, то предскажем и сохраним предсказание
        if not was_predicted:
            if len(self.w_) != 0:
                self.w_ = [self.w_[0]]
            self.value_ = self._count_portfolio_value_by_day(X_test).to_numpy()
        
        # если метрика VaR, то считаем его
        if how == 'VAR':
            sc = np.quantile(self.value_, self.VAR_quantile)
            self.score_ = sc
            return sc
        
        # считаем среднюю избыточную доходность 
        avg_reduntant_return = np.mean(self.value_ - self.risk_free_value)
        
        # считаем стандартное отклонение портфеля
        if how == 'sharp':
            std_deviation_portfolio = np.std(self.value_ - self.risk_free_value, ddof=1)
        else:
            std_deviation_portfolio = np.std(np.minimum(0, self.value_ - \
                                                        self.risk_free_value), ddof=1)
        
        # считаем нужный коэффициент
        sc = avg_reduntant_return / std_deviation_portfolio
        self.score_ = sc
        
        return sc