Python      3.12.2

In [None]:
%pip freeze

In [None]:
import pandas as pd
import requests as rq
import numpy as np
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
import plotly.express as px
from pathlib import Path
import plotly.graph_objects as go
from tqdm import tqdm

# Преобразование данных мосбиржи

## Пример получения данных по акции Лукойл с помощью API мосбиржи

In [None]:
url = f"https://iss.moex.com/iss/engines/stock/markets/shares/securities/LKOH/candles.json?from=2024-04-01"

# Отправляем GET-запрос по указанной ссылке
response = rq.get(url)

# Проверяем статус ответа
if response.status_code == 200:
    # Загружаем данные из ответа в переменную data в случае успеха
    data = response.json()
else:
    print("Ошибка при получении данных")
    
# Выбираем датасет с ценами    
columns = response.json()["candles"]['columns']
data = response.json()["candles"]["data"]

df = pd.DataFrame(data, columns=columns)
df

## Парсинг данных цен

### Сначала составим список акций, которые будут участвовать в нашем портфеле

Выберем акции с market cap > 1e+9 рублей

In [None]:
url = "https://iss.moex.com/iss/engines/stock/markets/shares/securities.json"

# Отправляем GET-запрос по указанной ссылке
response = rq.get(url)

# Проверяем статус ответа
if response.status_code == 200:
    # Загружаем данные из ответа в переменную data в случае успеха
    data = response.json()
else:
    print("Ошибка при получении данных")
    
columns = response.json()["securities"]['columns']
data = response.json()["securities"]["data"]

df = pd.DataFrame(data, columns=columns)
df = df[df["BOARDID"]=="TQBR"]
df = df[df["ISSUESIZE"]*df["PREVWAPRICE"]>1e+9].reset_index().drop('index', axis=1)
ticker_names = df["SECID"].values.tolist()
ticker_names

Выберем те, которые появлись позже 17.06.2023

In [None]:
tickers = []
date_today = (datetime.now()-timedelta(days=1)).strftime("%Y-%m-%d")


for ticker in ticker_names:
    url = f"https://iss.moex.com/iss/engines/stock/markets/shares/securities/{ticker}/candles.json?from={date_today}"

    # Отправляем GET-запрос по указанной ссылке
    response = rq.get(url)

    # Проверяем статус ответа
    if response.status_code == 200:
        # Загружаем данные из ответа в переменную data в случае успеха
        data = response.json()
        
        columns = data["candles"]['columns']
        data = data["candles"]["data"]

        df = pd.DataFrame(data, columns=columns)
        #print(df)

        if not df.empty:
            url = f"https://iss.moex.com/iss/engines/stock/markets/shares/securities/{ticker}/candles.json?from={datetime(2023, 6, 1).strftime("%Y-%m-%d")}"

            # Отправляем GET-запрос по указанной ссылке
            response = rq.get(url)

            # Проверяем статус ответа
            if response.status_code == 200:
                # Загружаем данные из ответа в переменную data в случае успеха
                data = response.json()
                
                columns = data["candles"]['columns']
                data = data["candles"]["data"]

                df = pd.DataFrame(data, columns=columns)
                #print(df)

                # Преобразуем столбец "begin" в формат datetime
                df['begin'] = pd.to_datetime(df['begin'])
                min_begin_date = df['begin'].min()
                if min_begin_date <= datetime(2023, 6, 1, 9 , 50, 0):
                    tickers.extend([ticker])
    else:
        print(f"Ошибка при получении данных для акции {ticker}")

print(tickers)

### Теперь перейдём непосредственно к пасрингу

Сначала мы создаём массив, который содержить все даты, за которые мы хотим получить данные

In [None]:
# Дата такая, потому что начиная с неё появляется информация
start_date = datetime(2023, 6, 1)
end_date = datetime.now()

current_date = start_date
array_date = []
while current_date <= end_date:
    array_date += [current_date.strftime("%Y-%m-%d")]
    current_date += timedelta(days=1)

Тут мы формируем массив из дней, а затем, если сервер работает хорошо, то возвращаем данные за весь промежуток от 8-го декабря 2011 года до сегодняшнего дня. 

Если сервер перестаёт отвечать, тогда полученные данные мы собираем в файл и при повторном запуске программа проверить наши файлы и начнёт с последнего записанного дня.

В качестве отладки печатается дата дня, который мы сейчас получаем. 

In [None]:
def add_new_day(date_today: str, ticker: str): 
    url = f"https://iss.moex.com/iss/engines/stock/markets/shares/securities/{ticker}/candles.json?from={date_today}&till={date_today}" 

    while True: 
        try: 
            response = rq.get(url) 
        except Exception as e: 
            # Обработка других исключений 
            print(f"Произошла ошибка: {e}") 
        else: 
            break 
    
    # Проверяем статус ответа 
    if response.status_code != 200: 
        print("Ошибка при получении данных") 
        add_new_day(date_today, ticker) 
 
    # Создание датафрейма
    df_for_add = pd.DataFrame(response.json()["candles"]["data"], columns=response.json()["candles"]['columns']) 
    df_for_add.reset_index().drop('index', axis=1) 
    return df_for_add 
 
def process_ticker(ticker: str):
    # Проверяем существование файла '{ticker}_full_date_price.csv'
    if Path(f'{ticker}_full_date_price.csv').exists():
        print(f"Файл '{ticker}_full_date_price.csv' существует.")

        # Загружаем данные из файла
        df = pd.read_csv(f'{ticker}_full_date_price.csv')

        # Преобразование столбца 'begin' в формат даты
        df['begin'] = pd.to_datetime(df['begin'])

        # Находим индекс последней даты с данными
        last_date_str = df['begin'].iloc[-1].strftime('%Y-%m-%d')
        
        if last_date_str in array_date:
            df_array = [df]
            # Добавляем данные для каждого дня из списка array_date, начиная со следующего дня после date_tomorrow
            for date in array_date[array_date.index(last_date_str):]:
                df_array += [add_new_day(date, ticker)]

            # Объединяем все DataFrame из df_array и сбрасываем индексы
            df = pd.concat(df_array).reset_index(drop=True)

            df = df.drop_duplicates()

            # Сохраняем обновленные данные в файл
            df.to_csv(f'{ticker}_full_date_price.csv', sep=',', index=False, encoding='utf-8')
    else:
        print(f"Файл '{ticker}_full_date_price.csv' не существует.")
        df_array = []

        # Добавляем данные для каждого дня из списка array_date
        for date in array_date:
            df_array += [add_new_day(date, ticker)]
            # print(date)

        # Объединяем все DataFrame из df_array и сбрасываем индексы
        df = pd.concat(df_array).reset_index().drop('index', axis=1)

        # Сохраняем данные в файл '{ticker}_full_date_price.csv'
        filename = f'{ticker}_full_date_price.csv'
        df.to_csv(filename, sep=',', index=False, encoding='utf-8')


# При работе с несколькими акциями используется многопоточность
with ThreadPoolExecutor() as executor: 
    executor.map(process_ticker, tickers)
#process_ticker()

## Работа с данными

### Нарисуем график цен всех акций

In [None]:
# Создаем пустую фигуру
fig = go.Figure()

# Проходимся по каждому тикеру
for ticker in tickers:
    # Загрузка данных из CSV файла в DataFrame
    df = pd.read_csv(f'{ticker}_full_date_price.csv')

    # Преобразование столбца 'begin' в формат даты
    df['begin'] = pd.to_datetime(df['begin'])
    
    # Находим начальное значение цены
    initial_price = df['close'].iloc[0]

    # Нормируем цены
    normalized_close = df['close'] / initial_price

    # Добавляем нормированный график цены для текущего тикера
    fig.add_trace(go.Scatter(x=df['begin'], y=normalized_close, mode='lines', name=f'{ticker}'))

# Настраиваем макет графика
fig.update_layout(title='Нормированные графики цен для нескольких тикеров', xaxis_title='begin', yaxis_title='Normalized Close Price')

# Отображаем рисунок
fig.show()

### Разобьём данные на 9 месяцев (тренировочные данные), 10-ый месяц (тестовые данные)

In [None]:
grouped_data = []
start_date = datetime(2023, 6, 1)
min_length_train = float('inf')
min_length_test =  float('inf') # Инициализируем минимальную длину как бесконечность

# Загрузка данных для каждого тикера и создание групп данных
for ticker in tickers:
    # Загрузка данных из CSV файла
    df = pd.read_csv(f'{ticker}_full_date_price.csv')

    # Преобразование столбца 'begin' в формат даты
    df['begin'] = pd.to_datetime(df['begin'])

    # Определяем дату окончания тренировочного периода (9 месяцев)
    end_train_date = start_date + pd.DateOffset(months=9)
    
    # Определяем дату окончания тестового периода (1 месяц после окончания тренировочного периода)
    end_test_date = end_train_date + pd.DateOffset(months=1)
    
    # Выбираем данные для тренировочного набора
    train_data = df[(df['begin'] >= start_date) & (df['begin'] < end_train_date)]
    
    # Выбираем данные для тестового набора
    test_data = df[(df['begin'] >= end_train_date) & (df['begin'] < end_test_date)]
    
    # Обновляем минимальную длину
    min_length_train = min(min_length_train, len(train_data))
    min_length_test = min(min_length_test, len(test_data))

    # Добавляем тренировочные и тестовые данные в список
    grouped_data.append((train_data, test_data))
    
# Обрезаем данные до минимальной длины для каждой группы
for i, (train_data, test_data) in enumerate(grouped_data):
    grouped_data[i] = (train_data[:min_length_train], test_data[:min_length_test])

### Преобразуем данные к нужному виду и запишем их в свой файл

Для данных о цене закрытия и объёме

In [None]:
# Определяем функцию generate_thresholds для создания пороговых значений
def generate_thresholds(data):
    min_val = np.min(data)
    max_val = np.max(data)
    thresholds = np.array([])
    for i in range(15):
        thresholds = np.append(thresholds, min_val + (i + 1) * (max_val - min_val) / 15)
    return thresholds

# Создаём матрицу из 0 и 1
def generate_matrix(data):
    threshold_matrix = np.zeros((15, 32))
    thresholds = generate_thresholds(data)

    for i, value in enumerate(data):
        index = np.searchsorted(thresholds, value)
        if index < 15:  # Первые 15 строк - данные о цене закрытия
            threshold_matrix[14-index, i] = 1

    return threshold_matrix

# Определяем функцию generate_sequence для создания последовательности матриц
def generate_sequence(price_data, volume_data):
    matrix_sequence = []
    for i in range(len(price_data)-31):
        price_matrix = generate_matrix(price_data[i:i+32])
        volume_matrix = generate_matrix(volume_data[i:i+32])
        matrix_sequence.append(price_matrix)
        matrix_sequence.append(np.zeros((2, 32)))  # Добавляем две пустые строки
        matrix_sequence.append(volume_matrix)
    return matrix_sequence

# Запись матриц в файл
def write_data_X(train_or_test, matrices):
    name = 'test' if train_or_test % 2 == 1 else 'train'  # Определение train или test     

    mode = 'a' if Path(f'input_{name}_X.txt').exists() else 'w'

    with open(f'input_{name}_X.txt', mode) as f:
        for i, matrix in enumerate(matrices):
            if i >= (len(matrices) - 3):
                pass
            else:
                for row in matrix:
                    row_str = ' '.join(map(str, row.astype(int)))  # Преобразование к целочисленному типу
                    f.write(row_str + '\n')
                if i % 3 == 2:  # Добавляем 'E' после каждой третьей матрицы
                    f.write('E\n')

        # Заменяем последнюю строку на строку с символом 'F'
        f.seek(0, 2)  # Переходим в конец файла
        f.seek(f.tell() - 3, 0)  # Переходим к последнему символу перед EOF
        f.write('F\n')  # Заменяем 'E' на 'F'

# Определяем функцию для записи данных в файл для одной группы
def process_group_X(group):
    # Находим минимальную длину данных среди всех групп
    for i in range(2):
        price_data = group[i]['close']
        volume_data = group[i]['volume']

        # Создание последовательности матриц
        matrices = generate_sequence(price_data, volume_data)

        # Запись данных в файл
        write_data_X(i, matrices)

for group in tqdm(grouped_data, desc='Processing groups'):
    process_group_X(group)

Для данных о разнице цены

In [None]:
# Запись изменений цены в файл
def write_data_Y(train_or_test, diff_data):
    name = 'test' if train_or_test % 2 == 1 else 'train'  # Определение train или test        

    mode = 'a' if Path(f'input_{name}_Y.txt').exists() else 'w'

    with open(f'input_{name}_Y.txt', mode) as f:
        for value in diff_data[31:]:
            f.write(str(value) + '\n')


# Определяем функцию для записи данных в файл для одной группы
def process_group_Y(group):
    for i in range(2):
        # Добавление столбца с процентной разностью между последовательными значениями столбца 'close'
        df_new = group[i].copy()
        df_new['close_diff_percent'] = df_new['close'].pct_change() * 100  # Вычисляем процентное изменение
        df_new = df_new.dropna()

        # Округление значений до четвертого знака после запятой
        df_new['close_diff_percent'] = df_new['close_diff_percent'].round(4)

        # Запись столбца в файл inputY.txt, начиная с разницы между 32 и 33 элементами
        diff_data = df_new['close_diff_percent'].values
        # Запись данных в файл
        write_data_Y(i, diff_data)

for group in tqdm(grouped_data, desc='Processing groups'):
    process_group_Y(group)

### Сравнение доходности

In [None]:
# Создаем пустую фигуру
fig = go.Figure()

# Создаем пустой DataFrame для хранения нормализованных цен
normalized_prices = pd.DataFrame()

# Проходимся по каждому тикеру
for i, group in enumerate(grouped_data):
    group_reset = group[1].reset_index(drop=True)

    # Находим начальное значение цены
    initial_price = group_reset['close'].iloc[0]

    # Нормируем цены
    normalized_close = group_reset['close'] / initial_price
    
    # Добавляем нормированные цены в DataFrame
    normalized_prices = pd.concat([normalized_prices, normalized_close], axis=1)
# Вычисляем среднюю доходность за каждый промежуток
average_returns = normalized_prices.mean(axis=1)

# Добавляем график средней доходности
fig.add_trace(go.Scatter(x=list(range(len(average_returns))), y=average_returns,
                         mode='lines',
                         name='Средняя доходность',
                         line=dict(color='red')))

# Загрузка данных из файла
with open('avgDailyR.txt', 'r') as file:
    lines = file.readlines()
    daily_returns = [float(line.strip()) for line in lines]

# Вычисление накопленной прибыли
cumulative_returns = [1]  # Начальное значение 100, предполагая начальный капитал в размере 100%
for daily_return in daily_returns:
    cumulative_returns.append(cumulative_returns[-1] * (1 + daily_return / 100))

# Добавляем график накопленной прибыли на ту же фигуру
fig.add_trace(go.Scatter(x=list(range(len(cumulative_returns))), y=cumulative_returns,
                         mode='lines',
                         name='Накопленная Прибыль',
                         line=dict(color='green')))

# Обновляем макет графика
fig.update_layout(title='Сравнение средней доходности акций и нашей нейронной сети',
                  xaxis_title='Время',
                  yaxis_title='Доходность')

# Отображаем рисунок с обновленными данными
fig.show()