In [1]:
import pandas as pd
import numpy as np
import scipy.stats as st
from sklearn.preprocessing import MinMaxScaler
from scipy.stats import chi2
import os

In [2]:
class Get_investment_portfolio():
    """
    Класс генерирующий инвестиционный портфель, на основании переданных бумаг 
    и заданных параметров
    """
    
    def __init__(self,data_dir_path,portfolio_income,unrisk_income,confidence_level_normal = 0.05,
                 confidence_level_interval = 0.95,sharp = False):
        """
        Инициализация параметров класса
        """
        self.sharp = sharp
        self.data_dir_path = data_dir_path
        self.confidence_level_normal = confidence_level_normal
        self.income_data = 0
        self.confidence_level_interval = confidence_level_interval
        self.portfolio_income = portfolio_income
        self.unrisk_income = unrisk_income
        self.__basic_load()
        self.__merge()
        self.__calculate_income()
        self.__check_normaltest()
        if len(self.fail_shares) == 0:
            pass
#             self.__get_confidence_interval()
#             self.__get_corr_mask()
#             self.__format_cov_matrix()
#             self.__solve_Tobin()
        else:
            print("Не все акции прошли проверку, попрубой заменить акции, которые не прошли проверку")
            self.__get_confidence_interval()
            self.__get_corr_mask()
            self.__format_cov_matrix()
            self.__solve_Tobin()
        
    def __basic_load(self):
        """
        Метод отвечающий за загрузку данных из указанной директории
        """
        # Получаем имена всех фалов, находщихся в указанной директории
        all_files = [name for _,_,name in os.walk(self.data_dir_path)][0]
        # Генерируем словарь имен и значений полученных из фалов
        self.all_data = {name[:-4]:pd.read_csv(self.data_dir_path+"/"+name)[["Дата","Цена"]] for name in all_files}
    
    def __check_normaltest(self):
        """
        Метод отвечающий за провекру гипотеза на нормальность выборки
        """
        # Инициализируем словарь для акций, прошедших проверку
        self.pass_shares = {}
        # Инициализируем словарь для акций, не прошедших проверку
        self.fail_shares = {}
        # Переберем все акции из нашей таблицы доходности 
        for label in self.income_data.columns:
            # Получим для каждой выборки p-value
            k2, p = st.normaltest(self.income_data[label])

            # Сравним найденное p-value с уровнем значимости 
            if p < self.confidence_level_normal:  
                self.fail_shares[label] = p
            else:
                self.pass_shares[label] = p
        # Проверим все ли акции прошли проверку
        if len(self.pass_shares) == len(self.income_data.columns):
            print("Все отлично, все акции прошли проверку")
        else:
            # Выведем акции и их p-value, которые не прошли проверку 
            print("Проверка не пройдена. Вот неподходящие акции")
            print(self.fail_shares)
    
    def __merge(self):
        """
        Метод отвечающий за объединение таблиц в одну
        """
        count = 0
        for dataset in self.all_data.values():
            if count == 0:
                self.total = dataset.copy()
            else:
                self.total = self.total.merge(dataset, on='Дата')
            count+= 1
        # Установим признак "Дата" как индекс всей таблице
        self.total = self.total.set_index("Дата")
        self.total.columns = self.all_data.keys()
        
    def __format_data(self,replaced_value):
        """
        Метод отвечающий за приведение значений к формату, необходимому для обработки
        Метод заменяет разделитель "," на "." и формирует корректное число
        Принцип работы:  Заменим все запятые на точки, затем начиная со второй точки будем 
        
        """
        # Инициализируем счетчик точек
        count_dot = 0
        # Переберем все значения
        for letter_num in range(1,len(replaced_value)):
                count_dot += replaced_value[-letter_num] == '.'
                if count_dot >= 2:
                        end = len(replaced_value)-letter_num
                        replaced_value = replaced_value[:end] + "" + replaced_value[end + 1:]
                        count_dot-=1              
        return replaced_value
        
        
    def __calculate_income(self):
        """
        Метод вычисляющий еженедельную доходность в годовых процентах
        """
        count_weeks = self.total.shape[0]
        # Инициализир пустой датафрейм
        self.income_data = pd.DataFrame()
#         start_price = self.total.iloc[count_weeks-1]
        # Сгенерируем  имена столбцов 
        new_labels_income = ["Income"+"_"+name for name in self.total.columns]
        for new_label,old_label in zip(new_labels_income,self.total.columns):
            # Отформатируем стартовое значение
#             main = start_price[old_label].replace(",", ".")
#             main = self.format_data(main)

            for row in range(1,count_weeks):
                # Отформатируем текущее значение
                cur_val = self.total.iloc[row-1][old_label].replace(",", ".")
                cur_val = self.__format_data(cur_val)

                #Отформатируем следующее значение
                next_val = self.total.iloc[row][old_label].replace(",", ".")
                next_val = self.__format_data(next_val)

                # Вычислим недельную доходность
                self.income_data.loc[row-1,new_label] = ((float(cur_val)- float(next_val))/float(next_val))*52

                
    def __get_confidence_interval(self):
        # Инициализируем индексы, которые в дальнейше преобразуем в мультииндекс
        pre_index = [("estimate_mean","left_side"),
                      ("estimate_mean","right_side"),
                      ("estimate_std","left_side"),
                      ("estimate_std","right_side")]
        
        list_index = np.empty(len(pre_index), dtype=object)
        list_index[:] = pre_index

        index = pd.MultiIndex.from_tuples(list_index)
        # Инициализируем фрейм, в который будем записывать найденные оценки 
        self.period_estimate_table = pd.DataFrame(index = index)
        for label in self.income_data.columns:
            # Вычислим оценку среднего
            mean_estimate =  st.t.interval(self.confidence_level_interval,
                                           len(self.income_data[label])-1,
                                           self.income_data[label].mean(),
                                           self.income_data[label].sem())
            # Добавим найденные данные в таблицу
            self.period_estimate_table.loc[list_index[0],label] = mean_estimate[0]
            self.period_estimate_table.loc[list_index[1],label] = mean_estimate[1]


            # Вычислим оценку отклонения
            # Определим уровни
            p1 = (1 + self.confidence_level_interval)/2
            p2 = (1 - self.confidence_level_interval)/2 
            # Найдем, интересующие нас значения
            value1 = chi2.ppf(p1, self.income_data.shape[0])
            value2 = chi2.ppf(p2, self.income_data.shape[0])
            pre_culc = self.income_data[label].std()*(self.income_data.shape[0])


            # Дополним таблицу, найденными знаениями
            self.period_estimate_table.loc[list_index[2],label] = round((((pre_culc)/value1)**0.5),2)
            self.period_estimate_table.loc[list_index[3],label] = round((((pre_culc)/value2)**0.5),2)

            
    def __get_corr_mask(self):
        """
        Метод, который проверят гипотезу на равенство коэффициента коррелции равном 0
        """
        # Инициализируем пустой массив, а котором будет хранить маску
        self.corr_mask = []
        n = self.income_data.describe().loc["count"][0]
        # Переберем все коэффициенты
        for array, i in zip(self.income_data.corr().values,range(0,len(self.income_data.corr()))):
                pre_corr = []
                for ind in range(len(array)):
                    if ind != i:
                        # Найдем наблюдаемое значение
                        t_nabl = (array[ind]*(n-2)**0.5)/(1-array[ind]**2)**0.5
                        # Сравним наблюдаемое значение с критическим 
                        if st.t.ppf(1-0.05/2, n-2) < abs(t_nabl):
                            pre_corr.append(0)
                        else:
                            pre_corr.append(1)
                    else:
                        pre_corr.append(1)
                self.corr_mask.append(pre_corr)
                
                
    def __format_cov_matrix(self):
        """
        Метод, обрабатывающий матрицу ковариации, согласно найденной маске корреляции 
        """
        self.total_cov = []
        for array_corr,array_cov in zip(self.corr_mask,self.income_data.cov().values):
            pre_cov = []
            for i in range(len(array_corr)):
                if array_corr[i] == 1:
                    pre_cov.append(array_cov[i])
                else:
                    pre_cov.append(0)
            self.total_cov.append(pre_cov)
            
    def __solve_Tobin(self):
        """
        Метод, решиающий задачу Тобина
        """
        # Получаем обратную матрицу ковариаций 
        V_reverse = np.linalg.inv(self.total_cov)
        # Получаем вектор доходностей активов
        incomes_vector = self.income_data.describe().loc["mean",:].values
        # Преобразуем вектор доходностей с учетом безрискового актива
        ready_vector = incomes_vector - self.unrisk_income*np.ones(len(incomes_vector))
        # Решаем уравнение Тобина
        D = (ready_vector.T@V_reverse)@ready_vector
        self.portfolio_frac = ((self.portfolio_income-self.unrisk_income)/D).reshape(1,)*V_reverse@ready_vector
        print(self.portfolio_frac)

In [4]:
obj = Get_investment_portfolio("./data",portfolio_income=0.3,unrisk_income=0.12)

Проверка не пройдена. Вот неподходящие акции
{'Income_GOOGL': 0.010357994048990992, 'Income_YNDX': 0.026907142422766123}
Не все акции прошли проверку, попрубой заменить акции, которые не прошли проверку
[ 0.19478199  0.12610868  0.08029701 -0.00176981  0.22603568]
