# Импорт всех необходимых библиотек

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import scipy.stats
import math
import os

# Подсчёт количества торговых минут

In [None]:
directory = 'C:/Users/Mark/Desktop/Данные/'
files = os.listdir(directory)

# ['GAZP', 'SIBN', ...]
tickers = [i.split('.')[0] for i in files if i[-3:] == 'csv'] 

# {'GAZP': 0, 'SIBN': 1, 'LKOH': 2, ...}
tickers = {ticker: index for index, ticker in enumerate(tickers)}     

months = ['Январь', 'Февраль', 'Март', 'Апрель', 'Май', 'Июнь', 
          'Июль', 'Август', 'Сентябрь', 'Октябрь', 'Ноябрь', 'Декабрь']

months_dict = {'Январь': 0, 'Февраль': 1, 'Март': 2, 'Апрель': 3, 
               'Май': 4, 'Июнь': 5, 'Июль': 6, 'Август': 7, 'Сентябрь': 8, 
               'Октябрь': 9, 'Ноябрь': 10, 'Декабрь': 11}

# порядковые номера месяцев
month_numbers = ['01', '02', '03', '04', '05', '06', 
                 '07', '08', '09', '10', '11', '12']

# {'01': 0, '02': 1, '03': 2, ...}
month_numbers_dict = {name: i for i, name in enumerate(month_numbers)}     


In [None]:
# датафрейм для количества торговых минут
data_df = pd.DataFrame(columns=months, index=tickers.keys())

for company in tickers.keys():
    # проходимся в цикле по всем компаниям
    with open(directory + f'{company}.csv') as file:
        df = pd.read_csv(file, delimiter=';')
        
        # количество минут для каждого месяца
        counts = [0]*12      
        
        for i in month_numbers:     
            for date in df['<DATE>']:
                if date[3:5] == i:
                    # если месяц совпал, увеличиваем счётчик
                    counts[month_numbers_dict[i]] += 1
                elif i != '12' and date[3:5] == month_numbers[month_numbers_dict[i]+1]: 
                    # если попали на след месяц, выходим из цикла
                    break
                    
        for i in range(len(counts)):
            # заполняем датафрейм
            data_df[months[i]][company] = counts[i]
            
# после анализа датафрейма удаляем лишние компании            
data_df = data_df.drop(['SIBN'])
del tickers['SIBN']

# Максимальные отклонения цен

In [None]:
def calculate_diff_log(array):
    """
    Функция для расчёта логарифмической доходности
    Принимает на вход массив значений цен закрытия 
    
    """
    diff = []
    for i in range(1, len(array)):
        diff.append(math.log(array[i] / array[i-1])*100)
    
    return diff

In [None]:
# датафрейм для макс скачков вверх
up_df = pd.DataFrame(columns=months, index=tickers.keys())
# датафрейм для макс скачков вверх
down_df = pd.DataFrame(columns=months, index=tickers.keys())

for company in tickers.keys():
    with open('C:/Users/Mark/Desktop/Данные/'+f'{company}.csv') as file:        
        mylist = []
        # так как файл с данными очень велик, будем считывать 
        # с помощью особого метода
        for chunk in pd.read_csv(file, sep=';', chunksize=20_000):
            mylist.append(chunk)
        df = pd.concat(mylist, axis= 0)
        del mylist
        
        # макс взлеты и падения для каждого месяца
        diff_up = [0]*12      
        diff_down = [0]*12      
    
        for i in month_numbers:         
            count = -1
            value_array = []    # считываем сюда цены закрытия
            for date in df['<DATE>']:
                count += 1
                if date[3:5] == i:   
                    value_array.append(df['<CLOSE>'][count])
                elif i != '12' and date[3:5] == month_numbers[month_numbers_dict[i]+1]:  
                    break
            
            # вызываем функцию расчета лог доходности
            now_diff = calculate_diff_log(array=value_array)
            
            # определяем макс и мин значения
            diff_up[month_numbers_dict[i]] = max(now_diff)
            diff_down[month_numbers_dict[i]] = min(now_diff)
        
                    
        for i in range(len(diff_up)):
            # заполняем наши датафреймы полученными данными
            up_df[months[i]][company] = round(diff_up[i], 3)
            down_df[months[i]][company] = round(diff_down[i], 2)

# Построение графика наибольших отклонений цены

In [None]:
def get_max_diff(data_frame_up, data_frame_down):
    """
    Возвращает месяц и компанию с самым крупным дневным взлётом
    На вход принимает параметр data_frame_up - датафрейм из pandas с положительными скачками
    И data_frame_down - датафрейм из pandas с отрицательными скачками
    
    """
    maximum = -10**5
    minimum = 10**5
    
    month_max, month_min = 0, 0
    company_max, company_min = 0, 0
    
    # двойной цикл, перебираем каждый месяц и каждую компанию
    for month in months:
        for company in tickers.keys():
            if data_frame_up[month][company] > maximum:
                # проверка на максимум
                maximum = data_frame_up[month][company]
                month_max = month
                company_max = company
            if data_frame_down[month][company] < minimum:
                # проверка на минимум
                minimum = data_frame_down[month][company]
                month_min = month
                company_min = company
                
    return month_max, company_max, month_min, company_min

# сохраняем найденные значения в переменные
month_max, company_max, month_min, company_min = get_max_diff(data_frame_up=up_df, data_frame_down=down_df)

In [None]:
def draw_maximum(mon_max, com_max, mon_min, com_min):
    """
    Функцию для отрисовки графика максимального взлёта и падения цены.
    Получает на вход четыре параметра:
    mon_max - месяц с макс взлетом
    com_max - компания с макс взлетом
    mon_min - месяц с макс падением
    com_min -компания с макс падением
    
    """
    com = [com_max, com_min]
    mon = [mon_max, mon_min]
    titles = ['максимальный взлёт', 'максимальное падение']
    
    for j, company in enumerate(com):
        close_value_array = []

        with open('C:/Users/Mark/Desktop/Данные/' + f'{company}.csv') as file:        
            mylist = []
            for chunk in pd.read_csv(file, sep=';', chunksize=20_000):
                mylist.append(chunk)
            df = pd.concat(mylist, axis= 0)
            del mylist

            count = -1
            i = months_dict[mon[j]]
            number = month_numbers[i]

            for date in df['<DATE>']:
                count += 1
                if date[3:5] == number:   
                    close_value_array.append(df['<CLOSE>'][count])

                elif number != '12' and date[3:5] == month_numbers[i+1]:  
                    break

        close_value_array = np.array(close_value_array)
        
        # х и у координаты для графика
        x_coord = np.arange(0, len(close_value_array), 1)
        y_coord = close_value_array
        
        # рисуем график с помощью plt
        fig, ax = plt.subplots()
        ax.plot(x_coord, y_coord)
        ax.set_title(f'График {company} за {mon[j]}, {titles[j]}')
        ax.set_xlabel('Дни')
        ax.set_ylabel('Стоимость актива, руб')

        plt.show()
    
    
draw_maximum(mon_max=month_max, com_max=company_max, mon_min=month_min, com_min=company_min)

# Генерация модельных данных

In [None]:
def random_tetta(data):
    """
    Функция оценивает такие параметры рандомной выборки как: мат ожидание и дисперсия
    
    """
    data = sorted(data)
    n = len(data)             
    r = 1 + int(math.log2(n))   # находим кол-во интервалов разбиения  
    max_el = max(data)
    min_el = min(data)
    h = (max_el-min_el) / r   # шаг
    
    middle_points = [min_el]   # храним середины интервалов
    intervals = []   # разбиение data на интервалы
    intervals.append([el for el in data if el < middle_points[-1]+h/2])
    
    for i in range(2, r):
        middle_points.append(middle_points[0]+(i-1)*h)  
        intervals.append([el for el in data if middle_points[-1]-0.5*h < el < middle_points[-1]+0.5*h])
    
    middle_points.append(middle_points[0]+(r-1)*h)   
    intervals.append([el for el in data if el > middle_points[-1]-0.5*h])     
    
    interval_len = [len(interval) for interval in intervals]
    
    m = [] 
    # находим мат ожидание по формуле
    for i in range(len(interval_len)):
        m.append(interval_len[i]*middle_points[i])
    m_ = (1/n) * sum(m) 
    
    s = [] 
    # находим дисперсию по формуле
    for i in range(len(interval_len)):
        s.append(interval_len[i]*(middle_points[i]-m_)**2)
    sigma_ = (1/n) *sum(s) 
    
    return m_, sigma_, interval_len, middle_points

In [None]:
def pirsons(n=16000):
    """
    Статистика критерия Пирсона для выборки размера n
    
    """
    data = scipy.stats.norm(0, 1).rvs(n)   # генерируем выборку

    r = 1 + int(math.log2(n))   # кол-во интервалов
    min_el = min(data)
    max_el = max(data)
    h = (max_el-min_el) / r   # шаг

    function = random_tetta(data)  # находим параметры выборки

    m, sigma, interval_len, middle_points = function    
    expected = scipy.stats.norm(m, math.sqrt(sigma))  # ожидаемое распределение
    
    prob = [expected.cdf(middle_points[0]+0.5*h)]  # вероятности попасть в интервал
    prob += [expected.cdf(middle_points[x]+0.5*h)-expected.cdf(middle_points[x]-0.5*h) for x in range(1, len(middle_points)-1)]
    prob += [1-expected.cdf(middle_points[-1]-0.5*h)]

    z = []
    # находим хи-квадрат Пирсона
    for i in range(len(prob)):
        z.append( ((interval_len[i]-n*prob[i])**2)/(n*prob[i]) )

    chi_2 = sum(z)

    return chi_2
    

In [None]:
N = 1000   # количество испытаний
n = 16000   # размер выборки 

pirs_stat = [pirsons(n) for _ in range(N)]   # 1000 значений хи-квадрат

# квантили для статистики Пирсона
chi2_999 = np.quantile(pirs_stat, np.arange(0.001, 1, 0.001))
chi2_9 = np.quantile(pirs_stat, np.arange(0.1, 1, 0.1))

df_quant_999 = pd.DataFrame()
df_quant_9 = pd.DataFrame()

# заполняем датафреймы для квантилей
df_quant_999['Месяц'] = np.round(chi2_999, 6)
df_quant_9['Месяц'] = np.round(chi2_9, 6)
    
index_999 = [quantile for quantile in list(np.arange(0.001, 1, 0.001))]
df_quant_999.index = index_999

index_9 = [quantile for quantile in list(np.arange(0.1, 1, 0.1))]
df_quant_9.index = index_9

df_quant_9

In [None]:
n_years = 12 
n_tickers = len(tickers) 
n_vb = 16000 


def pvaluesVR(n, q): 
    """
    Расчёт p-values вручную
    
    """
    u0 = pirsons(n)  # статистика Пирсона
    k = 0
    for i in range(len(q)): # расчёт P-value
        if q[i] > u0: 
            k += 1
    p = round(k/len(q), 3)
    return p


pvalues_data_VR = pd.DataFrame()
pvalues_list_VR =[]

# двойной цикл, расчитываем p-values
for i in range(n_tickers):
    probs = []
    for j in range(n_years):
        probs.append(pvaluesVR(n_vb, chi2_999))
        pvalues_list_VR.append(int(probs[j]*10))
    probs = pd.DataFrame(probs).transpose()
    pvalues_data_VR = pvalues_data_VR.append(probs, ignore_index=True)
    
pvalues_data_VR.columns = months
pvalues_data_VR.index = tickers.keys()

VR_interval = [pvalues_list_VR.count(i) for i in range(11)]  # для гистограммы 

pvalues_data_VR

In [None]:
def pvaluesKS(n): 
    """
    p-values с помощью критерия Колмогорова-Смирнова
    
    """
    x = np.random.normal(loc=0, scale=1, size=n) # выборка
    p = round(scipy.stats.kstest(x,'norm')[1], 3) # p-value
    return p

pvalues_data_KS = pd.DataFrame()
pvalues_list_KS =[]

for i in range(n_tickers):
    probs = []
    for j in range(n_years):
        probs.append(pvaluesKS(n_vb))
        pvalues_list_KS.append(int(probs[j]*10))
    probs = pd.DataFrame(probs).transpose()
    pvalues_data_KS = pvalues_data_KS.append(probs, ignore_index=True)

pvalues_data_KS.columns = months
pvalues_data_KS.index = tickers.keys()

KS_interval = [pvalues_list_KS.count(i) for i in range(11)] # для гистограммы

pvalue = scipy.stats.ks_2samp(pvalues_list_VR, pvalues_list_KS) # значение критерия Колмогорова
print(pvalue)

# рисуем первую гистограмму - для значений, вычисленных вручную
plt.hist(pvalues_list_VR, bins=[_ for _ in range(0, 11)], color='#0504aa', alpha=0.7, rwidth=0.85)
plt.title('Гистограмма p-values, вычисленных вручную')
plt.xticks([_ for _ in range(0, 11)], [digit/10 for digit in range(0, 11)])
plt.show()

# рисуем вторую гистограмму - для значений, вычисленных критерием Колмогорова
plt.hist(pvalues_list_KS, bins=[_ for _ in range(0, 11)], color='#0504aa', alpha=0.7, rwidth=0.85)
plt.title('Гистограмма p-values, вычисленных с критерием Колмогорова')
plt.xticks([_ for _ in range(0, 11)], [digit/10 for digit in range(0, 11)])
plt.show()

pvalues_data_KS

# Проверка гипотезы для реальных данных

In [None]:
def pirsons_real(array):
    """
    Статистика критерия Пирсона для выборки размера n
    На вход подаётся выборка 
    
    """
    n = len(array)  # длина выборки
    data = array

    r = 1 + int(math.log2(n)) # интервалы
    min_el = min(data)
    max_el = max(data)
    h = (max_el-min_el) / r # шаг

    function = random_tetta(data) 

    m, sigma, interval_len, middle_points = function # параметры выборки
    expected = scipy.stats.norm(m, math.sqrt(sigma))    # ожидаемое распределение
    
    prob = [expected.cdf(middle_points[0]+0.5*h)]  # вероятности попадания в интервал
    prob += [expected.cdf(middle_points[x]+0.5*h)-expected.cdf(middle_points[x]-0.5*h) for x in range(1, len(middle_points)-1)]
    prob += [1-expected.cdf(middle_points[-1]-0.5*h)]

    z = []
    # расчёт хи-квадрат
    for i in range(len(prob)):
        if n*prob[i] == 0:
            z.append(0)
        else:
            z.append( ((interval_len[i]-n*prob[i])**2)/(n*prob[i]) )

    chi_2 = sum(z)

    return chi_2

In [None]:
def pvalue_real(x):
    """
    Нахождение значения p-value для месяца реальных данных
    
    """
    u0 = pirsons_real(array=x)
    k = 0
    for i in range(len(x)):
        if x[i] > u0: 
            k += 1
    p = round(k/len(x), 10) # p-value
    return p


pvalue_data = pd.DataFrame(columns=months, index=tickers.keys()) 
all_pvalues = []

for company in tickers.keys():
    with open('C:/Users/Mark/Desktop/Данные/' + f'{company}.csv') as file:        
        mylist = []
        for chunk in pd.read_csv(file, sep=';', chunksize=20_000):
            mylist.append(chunk)
        df = pd.concat(mylist, axis= 0)
        del mylist
        
        pvalue_list = []
        
        for i in month_numbers:         
            count = -1
            value_array = []
            
            for date in df['<DATE>']:
                count += 1
                if date[3:5] == i:   
                    value_array.append(df['<CLOSE>'][count])
                elif i != '12' and date[3:5] == month_numbers[month_numbers_dict[i]+1]:  
                    break
                    
            now_diff = calculate_diff_log(array=value_array)
            pvalue_list.append(pvalue_real(now_diff))
        
        all_pvalues.extend([i*10 for i in pvalue_list])
        
        for i in range(len(months)):
            pvalue_data[months[i]][company] = pvalue_list[i]


# кол-во значений в каждом интервале 
p_interval = [all_pvalues.count(i) for i in range(11)]

# гистограмма на реальных данных
plt.hist(all_pvalues, bins=[i for i in range(11)],color='#0504aa', alpha=0.7, rwidth=0.85)
plt.title('Гистограмма p-значений для реальных данных')
plt.xticks([i for i in range(11)], [i/10 for i in range(11)])
plt.show()


pvalue = scipy.stats.ks_2samp(all_pvalues, pvalues_list_KS)
print(pvalue)

pv001 = 0
pv005 = 0
pv010 = 0

for month in months:
    for ticker in tickers.keys():
        if pvalue_data[month][ticker] > 0.01: 
            pv001 += 1
        if pvalue_data[month][ticker] > 0.05: 
            pv005 += 1
        if pvalue_data[month][ticker] > 0.1: 
            pv010 += 1
            
pv001 = round(pv001/len(tickers)/len(months), 5)
pv005 = round(pv005/len(tickers)/len(months), 5)
pv010 = round(pv010/len(tickers)/len(months), 5)

print('1% уровень значимости:', pv001)
print('5% уровень значимости:', pv005)
print('10% уровень значимости:', pv010)
