In [None]:
!pip install backtrader

Collecting backtrader
  Downloading backtrader-1.9.78.123-py2.py3-none-any.whl.metadata (6.8 kB)
Downloading backtrader-1.9.78.123-py2.py3-none-any.whl (419 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m419.5/419.5 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: backtrader
Successfully installed backtrader-1.9.78.123


In [None]:
#!pip install gensim

In [None]:
import numpy as np
import pandas as pd
import yfinance as yf
import backtrader as bt
import torch
from collections import Counter, defaultdict
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from scipy.linalg import eigh
from typing import List, Dict, Tuple, Optional
import matplotlib.pyplot as plt
import random
import os

In [None]:
# Если GPU доступны, то, если возможно, они используются в качестве рабочего устройства
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")
if (device == "cuda"):
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))

Using cpu device


In [None]:
# Фиксация случайных переменных
def seed_everything_tf(seed: int = 42) -> None:
    #os.environ['PYTHONHASHSEED'] = str(seed)
    torch.manual_seed(seed)
    #tf.random.set_seed(seed)
    np.random.seed(seed)
    random.seed(seed)
    print(f"Using {seed} seed")
seed_everything_tf(42)

Using 42 seed


In [None]:
# Загрузка данных из файла
def load_data_from_file(file_path, separator=','):
  try:
    data = pd.read_csv(file_path, sep=separator)
    # Удаление символов '<' и '>' из названий колонок
    data.columns = data.columns.str.replace(r'[<>]', '', regex=True)
    # Названия колонок в верхний регистр
    data.columns = data.columns.str.upper()
    # Переименование колонок
    data = data.rename(columns={'VOL': 'VOLUME'})
    # Преобразование DATE и TIME в строки, добавление ведущих нулей при необходимости
    data['DATE'] = data['DATE'].astype(str).str.zfill(6)
    data['TIME'] = data['TIME'].astype(str).str.zfill(6)
    # Создание строки datetime (формат: 'YYMMDD HHMMSS')
    datetime_str = data['DATE'] + ' ' + data['TIME']
    # Преобразование в pandas datetime
    data['DATETIME'] = pd.to_datetime(datetime_str, format='%Y%m%d %H%M%S')
    # Удаление исходных колонок, если нужно
    data = data.drop(['DATE', 'TIME'], axis=1)
    # Переупорядочивание колонок, если нужно
    cols = ['TICKER', 'PER', 'DATETIME'] + [col for col in data.columns if col not in ['TICKER', 'PER', 'DATETIME']]
    data = data[cols]
    return data
  except FileNotFoundError:
      print(f"Файл '{file_path}' не найден.")
      return None

# Загрузка данных из yaho finance
def load_data_from_yf(ticker, timeframe='1d'):
    data = yf.download(ticker, interval=timeframe)
    data = data.reset_index()
    if isinstance(data.columns, pd.MultiIndex):
        data.columns = data.columns.droplevel(level=1)
    # Названия колонок в верхний регистр
    data.columns = data.columns.str.upper()
    if 'DATE' in data.columns:
        data['DATE'] = pd.to_datetime(data['DATE'], format='%Y%m%d')
        data = data.rename(columns={'DATE': 'DATETIME'})
    else:
        data['DATETIME'] = pd.to_datetime(data['DATETIME'], format='%Y%m%d %H%M%S')
    return data

In [None]:
# Проверка наличия пропусков и ошибок
def check_outliers(data, column="Close"):
    """
    Определение выбросов в данных с использованием межквартильного размаха.

    :param data: Датафрейм с данными.
    :param column: Название колонки, по которой проводится анализ.
    :return: Датафрейм с метками выбросов.
    """
    q1 = data[column].quantile(0.25)
    q3 = data[column].quantile(0.75)
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr
    outliers = (data[column] < lower_bound) | (data[column] > upper_bound)
    return outliers


def analyze_outliers(data, outliers, threshold=0.05):
    """
    Анализ выбросов и определение их природы.

    :param data: Датафрейм с данными.
    :param outliers: Множество индексов выбросов.
    :param threshold: Порог для определения значимости выброса.
    :return: Датафрейм с пометкой реальных данных и выбросов.
    """
    analyzed_data = data.copy()
    analyzed_data["Outlier"] = False
    analyzed_data.loc[outliers, "Outlier"] = True
    real_data = analyzed_data.query("Outlier == False")
    outlier_data = analyzed_data.query("Outlier == True")

    IsRealValues = len(outlier_data) / len(analyzed_data) <= threshold
    '''if IsRealValues == True:
        print("Выбросы скорее всего являются реальными данными.")
    else:
        print("Выбросы могут быть ошибочными данными.")
    '''
    return IsRealValues


def check_dataFrames(dataFrames):
    # Проверка пропусков
    for ticker, ticker_data in dataFrames.items():
        missing_data = ticker_data.isna().sum().sum()

        if missing_data > 0:
            print(f"{ticker}: Обнаружены пропуски в данных.")
        else:
            print(f"{ticker}: Пропусков в данных нет.")

        # Проверка выбросов
        outliers_open = check_outliers(ticker_data, "Open")
        if (outliers_open.sum() > 0):
            if (analyze_outliers(ticker_data, outliers_open) == False):
                print(f"{ticker}: обнаружены выбросы в Open")
        outliers_high = check_outliers(ticker_data, "High")
        if (outliers_high.sum() > 0):
            if (analyze_outliers(ticker_data, outliers_high) == False):
                print(f"{ticker}: обнаружены выбросы в High")
        outliers_low = check_outliers(ticker_data, "Low")
        if (outliers_low.sum() > 0):
            if (analyze_outliers(ticker_data, outliers_low) == False):
                print(f"{ticker}: обнаружены выбросы в Low")
        outliers_close = check_outliers(ticker_data, "Close")
        if (outliers_close.sum() > 0):
            if (analyze_outliers(ticker_data, outliers_close) == False):
                print(f"{ticker}: обнаружены выбросы в Close")
        outliers_volume = check_outliers(ticker_data, "Volume")
        if (outliers_volume.sum() > 0):
            if (analyze_outliers(ticker_data, outliers_volume) == False):
                print(f"{ticker}: обнаружены выбросы в Volume")

In [None]:
# Возвращает позицию (индекс), после которой идёт ровно `need_days` уникальных дней
def get_recent_data_positions(data, need_days):
    if not isinstance(data, pd.DataFrame) or 'DATETIME' not in data.columns:
        raise ValueError("Данные должны быть DataFrame с колонкой 'DATETIME'")

    if len(data) == 0:
        return 0

    # Извлекаем даты (без времени) и находим уникальные дни
    dates = pd.to_datetime(data['DATETIME'].dt.date)
    unique_dates = dates.unique()

    # Если нужно больше дней, чем есть в данных — возвращаем 0
    if len(unique_dates) <= need_days:
        return 0

    # Находим N-ю уникальную дату с конца (need_days дней назад)
    threshold_date = unique_dates[-need_days]

    # Находим первую строку, где дата >= threshold_date
    mask = dates >= threshold_date
    position = mask.idxmax() if mask.any() else len(data)

    return position

In [203]:
# Strategy parameters
s_ticker = "AAPL"                  # Рабочий инструмент
#s_file_path = 'SBER_1H_250420.csv' # Рабочий файл
s_file_path = 'GAZP_1H_250420.csv' # Рабочий файл
#s_file_path = 'SBER_1D_250420.csv' # Рабочий файл
#s_file_path = 'GAZP_1D_250420.csv' # Рабочий файл
#s_file_path = 'LKOH_1D_250420.csv' # Рабочий файл
#s_file_path = 'RTKM_1D_250420.csv' # Рабочий файл
#s_file_path = 'MAGN_1D_250420.csv' # Рабочий файл
#s_file_path = 'GMKN_1D_250420.csv' # Рабочий файл
#s_file_path = 'NVTK_1D_250420.csv' # Рабочий файл
#s_file_path = 'AFLT_1D_250420.csv' # Рабочий файл
s_start_date = "1990-01-01"
s_end_date = "2025-04-08"
i_start_capital = 10000
d_trader_marging = 0.0005
i_test_trend_days = 250            # Количество дней для тестирования
i_window_size = 4
i_predict_amount = 1
i_buffer_size = 9
i_ranges_amount = 5
i_amount_excluded = 6
i_polinom_order_model = 1
b_is_using_const_model = True
d_rmse_koef = 3.0
d_probably_min = 1e-10
i_density_amount = 10
b_is_short_enabled = False
d_zero_zone_koef = 0.01
d_trend_koef_up = 0.001
d_trend_koef_dn = 0.001
d_dither_koef = 0.7
b_is_gap_strategy = False
i_amount_random_discrims = 0      # Количество случайных плоскостей

In [204]:
# Загрузка данных из файла
data = load_data_from_file(s_file_path, separator=';')
# Load data using yfinance
#data = yf.download(s_ticker, start=s_start_date, end=s_end_date)
#data = load_data_from_yf(s_ticker, timeframe='1h')
if data.empty:
    raise ValueError("No data downloaded for the given ticker and date range")

In [205]:
data

Unnamed: 0,TICKER,PER,DATETIME,OPEN,HIGH,LOW,CLOSE,VOLUME
0,GAZP,60,2006-01-23 10:00:00,239.00,239.00,219.33,220.50,1380238
1,GAZP,60,2006-01-23 11:00:00,220.50,220.97,219.71,220.45,438153
2,GAZP,60,2006-01-23 12:00:00,220.70,221.33,220.20,220.85,520057
3,GAZP,60,2006-01-23 13:00:00,220.85,220.90,220.00,220.85,188593
4,GAZP,60,2006-01-23 14:00:00,220.85,221.33,220.52,221.33,326584
...,...,...,...,...,...,...,...,...
48789,GAZP,60,2025-04-18 19:00:00,135.40,136.00,134.60,135.91,4247520
48790,GAZP,60,2025-04-18 20:00:00,135.91,136.00,134.63,134.76,2825200
48791,GAZP,60,2025-04-18 21:00:00,134.75,135.30,134.68,134.87,1112980
48792,GAZP,60,2025-04-18 22:00:00,134.87,135.07,134.72,134.84,1181930


In [206]:
# Добавление новых признаков
data['MEAN'] = (data['OPEN'] + data['HIGH'] + data['LOW'] + data['CLOSE']) / 4  # Средняя цена
data['VALUE'] = data['VOLUME'] * data['MEAN']  # Объём в деньгах
# Prepare trend data
p_trend_datetime = data['DATETIME'].values
p_trend_mean = data['MEAN'].values
p_trend_open = data['OPEN'].values
p_trend_hi = data['HIGH'].values
p_trend_lo = data['LOW'].values
p_trend_close = data['CLOSE'].values
p_trend_volume = data['VOLUME'].values
p_trend_value = data['VALUE'].values

In [207]:
# Length of trend for training
i_prep_trend_length = get_recent_data_positions(data, i_test_trend_days)
#i_prep_trend_length2 = max(len(p_trend_mean) - i_test_trend_days, 0)
i_test_trend_size = len(data) - i_prep_trend_length
#i_test_trend_size

In [208]:
#i_prep_trend_length
i_test_trend_size

3630

In [209]:
# Normalization function
def norming_e(p_trend, i_target_position, i_vector_size, i_samples_pos, i_amount_samples, b_is_minus_offset):
    p_result_vector = np.zeros(i_vector_size)
    d_e = np.mean(p_trend[i_target_position - i_vector_size + i_samples_pos :
                          i_target_position - i_vector_size + i_samples_pos + i_amount_samples])
    d_offset = 1 if b_is_minus_offset else 0
    p_result_vector = (p_trend[i_target_position - i_vector_size : i_target_position] / d_e) - d_offset
    return p_result_vector

In [210]:
# Calculate dot product of two vectors
def get_multip(vector1: np.ndarray, vector2: np.ndarray, i_vector_size: int) -> float:
    return np.dot(vector1[-i_vector_size:], vector2[-i_vector_size:])

In [211]:
# Calculate covariance matrix
def kovariation_matrix(trends: List[np.ndarray], matrix_size: int) -> np.ndarray:
    kov = np.zeros((matrix_size, matrix_size))
    e = np.zeros(matrix_size)
    i_trend_cols = 0

    for d_line in trends:
        if d_line is None:
            continue

        i_trend_cols += 1
        for i1 in range(matrix_size):
            d_tmp1 = d_line[i1]
            e[i1] += d_tmp1
            for i2 in range(i1, matrix_size):
                d_tmp2 = d_line[i2]
                kov[i1, i2] += (d_tmp1 * d_tmp2)

    e /= i_trend_cols
    for i1 in range(matrix_size):
        d_tmp1 = e[i1]
        for i2 in range(i1, matrix_size):
            d_tmp2 = e[i2]
            kov[i1, i2] = ((kov[i1, i2] / i_trend_cols) - (d_tmp1 * d_tmp2))
            kov[i2, i1] = kov[i1, i2]

    return kov

In [212]:
# Find eigenvectors of symmetric matrix
def get_optim_matrix_by_kov(kov: np.ndarray, i_matrix_size: int) -> Tuple[np.ndarray, np.ndarray]:
    # Get sum of diagonal elements of covariance matrix
    d_norma_vals = np.trace(kov)
    if d_norma_vals == 0:
        # Covariance matrix is degenerate
        return None, None

    # Find eigenmatrix
    try:
        params, v = eigh(kov)
    except Exception as e:
        print(f"Error finding eigenvectors: {e}")
        return None, None

    # Check and normalize eigenvalues of covariance matrix
    params /= d_norma_vals
    if np.sum(params) == 0:
        return None, None

    # Copy values to matrix
    res_matrix = np.zeros((i_matrix_size, i_matrix_size))
    for w in range(i_matrix_size):
        i_multi = 1 if v[0, w] > 0 else -1
        for i in range(i_matrix_size):
            res_matrix[w, i] = i_multi * v[i, w]

    # Sort matrix rows in descending order of eigenvalues
    sorted_indices = np.argsort(-np.abs(params))
    params = params[sorted_indices]
    res_matrix = res_matrix[sorted_indices]

    return res_matrix, params

In [213]:
# Find optimal orthogonal matrix composed of eigenvectors
def get_optim_matrix(trends: List[np.ndarray], i_matrix_size: Optional[int] = None) -> Tuple[np.ndarray, np.ndarray]:
    if i_matrix_size is None:
        i_matrix_size = len(trends[0])

    # Find covariance matrix
    kov = kovariation_matrix(trends, i_matrix_size)

    # Find eigenvectors of symmetric matrix
    return get_optim_matrix_by_kov(kov, i_matrix_size)

In [214]:
# Create random linear models
def create_random_surface(p_discrims, i_amount_logic_discrims, i_dimensions, d_point_min=0, d_point_max=1):
    rnd_fabric = random.Random()

    while len(p_discrims) < i_amount_logic_discrims:
        p_new_surface = np.zeros(i_dimensions + 1)

        # Формирование компонент единичного вектора нормали
        d_scale = 0
        for i in range(i_dimensions):
            # Генерируется компонента случайного вектора нормали
            d_normal = rnd_fabric.random() - 0.5
            p_new_surface[i] = d_normal
            d_scale += (d_normal * d_normal)

        d_scale = np.sqrt(d_scale)

        for i in range(i_dimensions):
            # Значения нормали масштабируются на длину вектора
            p_new_surface[i] /= d_scale

            # Генерируется координата случайной точки
            d_point = d_point_min + rnd_fabric.random() * (d_point_max - d_point_min)

            # Формируется свободный член
            p_new_surface[i_dimensions] -= (p_new_surface[i] * d_point)

        p_discrims.append(p_new_surface)

In [215]:
# Class representing probability density function of random variable
class DensityProbFunc:

    class DataCont:
        def __init__(self):
            self.count = 0
            self.avg = 0
            self.min = float('inf')
            self.max = float('-inf')
            self.value_p = 0

    def __init__(self, p_trend: np.ndarray, i_max_amount_distribution_points: int = 100,
                 d_min_value: Optional[float] = None, d_max_value: Optional[float] = None):
        self.d_area_min = d_min_value if d_min_value is not None else np.min(p_trend)
        self.d_area_max = d_max_value if d_max_value is not None else np.max(p_trend)
        self.m_distribution = {}

        self.create_density(p_trend, i_max_amount_distribution_points, d_min_value, d_max_value)

    def create_density(self, p_trend: np.ndarray, i_max_amount_distribution_points: int = 100,
                      d_min_value: Optional[float] = None, d_max_value: Optional[float] = None):
        if i_max_amount_distribution_points <= 0:
            raise ValueError("Invalid number of distribution function ranges")

        self.m_distribution.clear()
        self.d_area_min = d_min_value if d_min_value is not None else np.min(p_trend)
        self.d_area_max = d_max_value if d_max_value is not None else np.max(p_trend)

        d_inc_val = (self.d_area_max - self.d_area_min) / i_max_amount_distribution_points
        if d_inc_val == 0:
            self.m_distribution[0] = self.DataCont()
            self.m_distribution[0].count = len(p_trend)
            self.m_distribution[0].avg = self.d_area_min
            self.m_distribution[0].min = self.d_area_min
            self.m_distribution[0].max = self.d_area_max
            self.m_distribution[0].value_p = 1
            return

        for i_index in range(i_max_amount_distribution_points):
            p_data_cont = self.DataCont()
            p_data_cont.min = self.d_area_min + (d_inc_val * i_index)
            p_data_cont.max = p_data_cont.min + d_inc_val
            self.m_distribution[i_index] = p_data_cont

        for d_value in p_trend:
            i_index = int((d_value - self.d_area_min) / d_inc_val)
            if i_index < 0 or i_index >= i_max_amount_distribution_points:
                continue
            p_data_cont = self.m_distribution[i_index]
            p_data_cont.avg += d_value
            p_data_cont.count += 1

        # Set values for non-zero clusters
        for i_index in range(i_max_amount_distribution_points):
            p_data_cont = self.m_distribution.get(i_index)
            if p_data_cont is None or p_data_cont.count == 0:
                continue
            p_data_cont.avg /= p_data_cont.count
            p_data_cont.value_p = p_data_cont.count / len(p_trend)

    # Get probability density value
    def get_density_value(self, d_trend_val: float) -> float:
        if not self.m_distribution:
            return 0

        d_inc_val = (self.d_area_max - self.d_area_min) / len(self.m_distribution)
        if d_inc_val == 0:
            d_inc_val = float('inf')

        i_index = int((d_trend_val - self.d_area_min) / d_inc_val)
        if i_index < 0 or i_index >= len(self.m_distribution):
            return 0

        return self.m_distribution.get(i_index, self.DataCont()).value_p

In [216]:
# Building a plane based on target and feature data
class GMDH:

    def __init__(self):
        self.m_mgua_k = None
        self.m_mgua_max_context = 0
        self.m_mgua_max_order = 0
        self.m_mgua_total_arg_numb = 0
        self.m_mgua_is_const_used = True

    # Create model from linear vector
    def create_from_linear_vector(self, p_vector_data: np.ndarray, i_max_context: int = 0,
                                is_const_used: bool = False, d_const_val: float = 0):
        # Save context size
        if i_max_context <= 0:
            i_max_context = len(p_vector_data)
        self.m_mgua_max_context = i_max_context

        # Save model order
        self.m_mgua_max_order = 1

        # Save use of constant offset
        self.m_mgua_is_const_used = is_const_used

        # Determine number of model arguments
        i_offset = 1 if self.m_mgua_is_const_used else 0
        self.m_mgua_total_arg_numb = self.m_mgua_max_context + i_offset

        # Create vector and copy data to it
        self.m_mgua_k = np.zeros(self.m_mgua_total_arg_numb)
        for i in range(self.m_mgua_max_context):
            self.m_mgua_k[i + i_offset] = p_vector_data[i]
        if self.m_mgua_is_const_used:
            self.m_mgua_k[0] = d_const_val

    # Group method of data handling with order up to maximum
    def create_model(self, vals: np.ndarray, trend: np.ndarray, i_max_context: int,
                    i_max_order: int, i_target_position: int = 0, i_start_position: int = 0,
                    is_const_used: bool = True) -> bool:
        # Save context size
        self.m_mgua_max_context = i_max_context
        if self.m_mgua_max_context < 2:
            return False

        # Save model order
        self.m_mgua_max_order = i_max_order
        if self.m_mgua_max_order < 1:
            return False

        # Save use of constant offset
        self.m_mgua_is_const_used = is_const_used

        # If end index is not set, take whole sample length
        if i_target_position <= 0:
            i_target_position = len(vals)

        # Determine number of model arguments
        self.m_mgua_total_arg_numb = self.get_param_numb(i_max_context, self.m_mgua_max_order, is_const_used)

        # Create buffers if they haven't been created or their size is less than needed
        if self.m_mgua_k is None or len(self.m_mgua_k) < self.m_mgua_total_arg_numb:
            self.m_mgua_k = np.zeros(self.m_mgua_total_arg_numb)

        # Initialize buffers
        a_matrix = np.zeros((self.m_mgua_total_arg_numb, self.m_mgua_total_arg_numb))
        b_vector = np.zeros(self.m_mgua_total_arg_numb)

        # For all input vectors
        for t in range(i_start_position, i_target_position):
            # Set input parameter values to vector K
            k_vector = np.zeros(self.m_mgua_total_arg_numb)
            self.set_param_buf(self.m_mgua_max_context, self.m_mgua_max_order, vals[t], k_vector, is_const_used)

            # Value from training vector
            d_curr_value = trend[t]

            # Update statistics
            for i0 in range(self.m_mgua_total_arg_numb):
                k_i0 = k_vector[i0]
                for i1 in range(i0 + 1):
                    a_matrix[i0, i1] += (k_i0 * k_vector[i1])
                b_vector[i0] += (k_i0 * d_curr_value)

        # Find model coefficients vector using least squares
        try:
            self.m_mgua_k = np.linalg.lstsq(a_matrix, b_vector, rcond=None)[0]
        except np.linalg.LinAlgError:
            return False

        return (not np.isnan(self.m_mgua_k).any()) and (not np.isinf(self.m_mgua_k).any()) and (np.sum(np.abs(self.m_mgua_k)) != 0)

    #  Calculate predicted value using existing model
    def calculate_prognos(self, sample: np.ndarray) -> float:
        if self.m_mgua_max_order < 1:
            return 0

        # Calculate predicted value using found model
        k_vector = np.zeros(self.m_mgua_total_arg_numb)
        self.set_param_buf(self.m_mgua_max_context, self.m_mgua_max_order, sample, k_vector, self.m_mgua_is_const_used)
        return np.dot(k_vector, self.m_mgua_k)

    # Return number of variables for given context size
    @staticmethod
    def get_param_numb(i_max_context: int, i_max_order: int, is_const_used: bool) -> int:
        i_count = 1
        i_total = 1 if is_const_used else 0  # Account for constant component

        for i_curr_order in range(1, i_max_order + 1):
            j = i_max_context + i_curr_order - 1
            i_count = i_count * j // i_curr_order
            i_total += i_count

        return i_total

    # Set input vector of length i_max_context to vector that is argument for GMDH method
    @staticmethod
    def set_param_buf(i_max_context: int, i_max_order: int, inp_buf: np.ndarray,
                     out_buf: np.ndarray, is_const_used: bool):
        i_count = 0

        for i_curr_order in range(0 if is_const_used else 1, i_max_order + 1):
            i_count = GMDH.order_param_create(i_count, i_curr_order, 0, i_max_context, 1, inp_buf, out_buf)

    # Recursive function for setting buffer
    @staticmethod
    def order_param_create(i_count: int, i_level: int, i_context: int, i_max_context: int,
                          d_upper: float, inp_buf: np.ndarray, out_buf: np.ndarray) -> int:
        if i_level > 0:
            # This function is called recursively
            for i_context in range(i_context, i_max_context):
                i_count = GMDH.order_param_create(i_count, i_level - 1, i_context, i_max_context,
                                                d_upper * inp_buf[i_context], inp_buf, out_buf)
        else:
            out_buf[i_count] = d_upper
            i_count += 1

        return i_count

In [217]:
# Forming list of normalized trend changes
p_delta_filter_trends = {}
for tt in range(i_window_size, i_prep_trend_length - i_predict_amount + 1):
    if b_is_gap_strategy:
        p_delta_filter_trends[tt] = (p_trend_open[tt] / p_trend_close[tt - 1]) - 1
    else:
        p_norm_trend = norming_e(p_trend_mean, tt + i_predict_amount, i_window_size + i_predict_amount, 0, i_window_size, True)
        p_delta_filter_trends[tt] = p_norm_trend[i_window_size] - p_norm_trend[i_window_size - 1]

In [218]:
# Forming list of normalized trends for all positions
p_buffer_trends = {}
for tt in range(i_buffer_size, len(p_trend_mean) + 1):
    p_buffer = np.concatenate([
        norming_e(p_trend_open, tt, i_buffer_size, 0, i_buffer_size, True),
        norming_e(p_trend_lo, tt, i_buffer_size, 0, i_buffer_size, True),
        norming_e(p_trend_hi, tt, i_buffer_size, 0, i_buffer_size, True),
        norming_e(p_trend_close, tt, i_buffer_size, 0, i_buffer_size, True),
        norming_e(p_trend_volume, tt, i_buffer_size, 0, i_buffer_size, True),
        norming_e(p_trend_value, tt, i_buffer_size, 0, i_buffer_size, True)
    ])
    if ((not np.isinf(p_buffer).any()) and (not np.isnan(p_buffer).any())):
        p_buffer_trends[tt] = p_buffer

In [219]:
# Forming trend arrays grouped by change value
d_delta = (len(p_delta_filter_trends) - (i_buffer_size - i_window_size if i_buffer_size > i_window_size else 0)) / i_ranges_amount
if d_delta < 2:
    raise ValueError("Minimum 2 elements per range required")

i_counter = 0
i_zero_index = 0
p_ranges_trends = [set() for _ in range(i_ranges_amount)]

for tt, val in sorted(p_delta_filter_trends.items(), key=lambda x: x[1]):
    if tt not in p_buffer_trends:
        continue
    i_index = min(int(i_counter / d_delta), i_ranges_amount - 1)
    p_ranges_trends[i_index].add(tt)
    if val < 0 and i_zero_index < i_index:
        i_zero_index = i_index
    i_counter += 1

# Refine zero position
i_pos_count = sum(1 for tt in p_ranges_trends[i_zero_index] if p_delta_filter_trends[tt] > 0)
i_neg_count = sum(1 for tt in p_ranges_trends[i_zero_index] if p_delta_filter_trends[tt] < 0)
d_zero_value = i_neg_count / (i_neg_count + i_pos_count) if (i_neg_count + i_pos_count) > 0 else 0.5

In [220]:
# Orthogonal matrices for ranges
p_ort_matrix = [None] * i_ranges_amount
p_eigen_vals = [None] * i_ranges_amount
p_range_models = [None] * i_ranges_amount
p_models_distrib_funcs = [None] * i_ranges_amount

for i_index1 in range(i_ranges_amount):
    # Form orthogonal matrix
    p_curr_data_set_src = [p_buffer_trends[tt] for tt in p_ranges_trends[i_index1]]
    if not p_curr_data_set_src:
        continue

    # Calculate optimal orthogonal matrix
    p_ort_matrix[i_index1], p_eigen_vals[i_index1] = get_optim_matrix(p_curr_data_set_src)

    # Recalculate for entire buffer
    p_buffer_trends2 = {}
    for tt, values in p_buffer_trends.items():
        p_matrix_spektr = np.zeros(len(values) - i_amount_excluded)
        for w in range(len(p_matrix_spektr)):
            d_val = np.dot(p_ort_matrix[i_index1][w], values[:len(p_ort_matrix[i_index1][w])])
            p_matrix_spektr[w] = d_val
        p_buffer_trends2[tt] = p_matrix_spektr

    # Add to list of unit orthogonal functions planes
    p_models_curr = []
    p_range_models[i_index1] = p_models_curr

    # Добавление в список случайных плоскостей
    p_logic_discrims = []
    create_random_surface(p_logic_discrims, i_amount_random_discrims, len(p_buffer_trends2[next(iter(p_buffer_trends2))]))
    for item in p_logic_discrims:
        p_ort_matrix_model = GMDH()
        p_ort_matrix_model.create_from_linear_vector(item[:-1], len(item)-1, True, item[-1])
        p_models_curr.append(p_ort_matrix_model)

    # Добавление в список плоскостей единичных ортогональных функций
    # для оценки диапазона изменений по кажой координате в отдельности
    p_base_vector = np.zeros(len(p_buffer_trends2[next(iter(p_buffer_trends2))]))
    for w in range(len(p_base_vector)):
        p_base_vector[w] = 1
        p_ort_matrix_model = GMDH()
        p_ort_matrix_model.create_from_linear_vector(p_base_vector, len(p_base_vector))
        p_models_curr.append(p_ort_matrix_model)
        p_base_vector[w] = 0

    # Calculate a GMDH model to define the boundary between positive and negative samples
    p_curr_model = GMDH()
    p_src_vals_neg = [p_buffer_trends2[key] for key, value in p_delta_filter_trends.items()
                    if value < 0 and key in p_buffer_trends2]
    p_src_vals_pos = [p_buffer_trends2[key] for key, value in p_delta_filter_trends.items()
                    if value > 0 and key in p_buffer_trends2]
    p_src_list = np.array(p_src_vals_pos + p_src_vals_neg)
    p_dst_list = np.array([1.0] * len(p_src_vals_pos) + [-1.0] * len(p_src_vals_neg))
    if p_curr_model.create_model(
        p_src_list,
        p_dst_list,
        len(p_src_list[0]),   # Length of the feature vector
        i_polinom_order_model,# Polinom order
        len(p_src_list),      # Amount of samples
        0,                    # Initial position
        b_is_using_const_model# Using of const offset
    ):
        p_models_curr.append(p_curr_model)

    # Forming mutual range boundary models
    p_curr_data_set1 = [p_buffer_trends2[tt] for tt in p_ranges_trends[i_index1]]
    if not p_curr_data_set1:
        continue

    # Создаем модель GMDH с плоскостью с нормалью в центре кластера точек
    p_base_vector = np.mean(p_curr_data_set1, axis=0)
    p_curr_model = GMDH()
    p_curr_model.create_from_linear_vector(p_base_vector, len(p_base_vector))
    p_models_curr.append(p_curr_model)

    # Создаем новую модель GMDH для целевых значений
    p_curr_model = GMDH()
    p_dst_list = np.array([p_delta_filter_trends[i] for i in p_ranges_trends[i_index1]])
    if p_curr_model.create_model(
        p_curr_data_set1,
        p_dst_list,
        len(p_curr_data_set1[0]),# Длина вектора признаков
        i_polinom_order_model,   # Порядок модели
        len(p_curr_data_set1),   # Количество образцов
        0,                       # Начальная позиция
        b_is_using_const_model   # Использование постоянной составляющей
    ):
        p_models_curr.append(p_curr_model)

    for i_index2 in range(i_ranges_amount):
        if i_index2 == i_index1:
            continue

        p_curr_data_set2 = [p_buffer_trends2[tt] for tt in p_ranges_trends[i_index2]]
        if not p_curr_data_set2:
            continue

        # Calculate a GMDH model to define the boundary between p_curr_data_set1 and p_curr_data_set2
        p_curr_model = GMDH()
        p_src_list = np.array(p_curr_data_set1 + p_curr_data_set2)
        p_dst_list = np.array([1.0] * len(p_curr_data_set1) + [-1.0] * len(p_curr_data_set2))
        if p_curr_model.create_model(
            p_src_list,
            p_dst_list,
            len(p_src_list[0]),   # Length of the feature vector
            i_polinom_order_model,# Polinom order
            len(p_src_list),      # Amount of samples
            0,                    # Initial position
            b_is_using_const_model# Using of const offset
        ):
            p_models_curr.append(p_curr_model)

    # Calculate probability densities for mutual GMDH range boundary models
    p_curr_distrib_funcs = []
    for w in range(len(p_models_curr)):
        p_matrix_spektr = np.array([p_models_curr[w].calculate_prognos(sample) for sample in p_curr_data_set1])
        d_e = np.mean(p_matrix_spektr)
        d_ee = np.mean(p_matrix_spektr ** 2)
        d_sigma = np.sqrt(d_ee - (d_e * d_e))
        p_min_value = max(d_e - (d_rmse_koef * d_sigma), np.min(p_matrix_spektr))
        p_max_value = min(d_e + (d_rmse_koef * d_sigma), np.max(p_matrix_spektr))
        p_curr_distrib_funcs.append(DensityProbFunc(p_matrix_spektr, i_density_amount, p_min_value, p_max_value))
    p_models_distrib_funcs[i_index1] = p_curr_distrib_funcs

In [221]:
# Recalculation of output buffer trends considering distribution function
p_out_result_trends = {}
for tt, values in p_buffer_trends.items():
    d_sum_vals = 0
    p_result = np.zeros(i_ranges_amount)

    for i_index in range(i_ranges_amount):
        d_val = 0
        # Recalculate buffer by optimal orthogonal matrix for range
        if p_ort_matrix[i_index] is None:
            continue

        p_buf_curr = np.zeros(len(values) - i_amount_excluded)
        for w in range(len(p_buf_curr)):
            p_buf_curr[w] = np.dot(p_ort_matrix[i_index][w], values[:len(p_ort_matrix[i_index][w])])

        # Calculate probabilities of divergence by mutual range boundary models vectors
        p_models_curr = p_range_models[i_index]
        if p_models_curr is not None:
            p_curr_distrib_funcs = p_models_distrib_funcs[i_index]
            for w in range(len(p_curr_distrib_funcs)):
                d_sp = p_models_curr[w].calculate_prognos(p_buf_curr)
                d_sp = p_curr_distrib_funcs[w].get_density_value(d_sp)
                if d_sp == 0:
                    d_sp = d_probably_min
                d_val += np.log(d_sp)

        d_val = np.exp(d_val)
        d_sum_vals += d_val
        p_result[i_index] = d_val

    p_result /= d_sum_vals
    if ((not np.isinf(p_result).any()) and (not np.isnan(p_result).any())):
        d_probs0 = np.sum(p_result[:i_zero_index]) + d_zero_value * p_result[i_zero_index]
        d_probs1 = np.sum(p_result[i_zero_index+1:]) + (1.0 - d_zero_value) * p_result[i_zero_index]
        p_out_result_trends[tt] = (d_probs1 - d_probs0) / (d_probs0 + d_probs1)

  p_result /= d_sum_vals


In [222]:
# Оценка точности направления движения цены
sorted_keys = sorted(p_out_result_trends.keys())
real_vals = [(p_trend_close[tt]/p_trend_close[tt - 1]) - 1 for tt in sorted_keys[:-1]]
pred_vals = [p_out_result_trends[tt] for tt in sorted_keys[:-1]]
pred_sign = np.sign(pred_vals)
real_sign = np.sign(real_vals)
direction_accuracy = accuracy_score(real_sign, pred_sign)
print(f"Точность направления движения цены: {direction_accuracy:.4f}")

Точность направления движения цены: 0.4863


In [223]:
sorted_keys = sorted(set(p_delta_filter_trends.keys()) & set(p_out_result_trends.keys()))
delta_vals = [p_delta_filter_trends[tt] for tt in sorted_keys]
real_vals = [(p_trend_close[tt]/p_trend_close[tt - 1]) - 1 for tt in sorted_keys]
pred_vals = [p_out_result_trends[tt] for tt in sorted_keys]
pred_sign = np.sign(pred_vals)
real_sign = np.sign(real_vals)
sum_delta_accuracy = np.dot((pred_sign * real_sign), np.abs(delta_vals))
sum_delta_max = np.sum(np.abs(delta_vals))
print(f"Точность направления движения цены суммой по амплитуде изменения: {sum_delta_accuracy:.4f}")
print(f"Максимальная точность направления движения цены суммой по амплитуде: {sum_delta_max:.4f}")

Точность направления движения цены суммой по амплитуде изменения: 38.1173
Максимальная точность направления движения цены суммой по амплитуде: 140.1626


In [224]:
# Статистика по величинам
count_less_than_0 = sum(1 for value in p_out_result_trends.values() if value < 0)
count_great_than_0 = sum(1 for value in p_out_result_trends.values() if value > 0)
count_not_value = sum(1 for value in p_out_result_trends.values() if np.isnan(value) or np.isinf(value))
print(f"Количество элементов меньше 0: {count_less_than_0}")
print(f"Количество элементов больше 0: {count_great_than_0}")
print(f"Количество элементов не чисел: {count_not_value}")

Количество элементов меньше 0: 24387
Количество элементов больше 0: 24370
Количество элементов не чисел: 0


In [225]:
# Подготовка данных для тестирования
test_data_datetime = []
test_data_open = []
test_data_lo = []
test_data_hi = []
test_data_close = []
test_data_mean = []
test_data_volume = []
test_data_predict = []

# Получаем ключи словаря и сортируем их
sorted_keys = sorted(p_out_result_trends.keys())

# Берем последние i_test_trend_size элементов
for tt in sorted_keys[-i_test_trend_size:]:
    test_data_datetime.append(p_trend_datetime[tt - 1])
    test_data_open.append(p_trend_open[tt - 1])
    test_data_lo.append(p_trend_lo[tt - 1])
    test_data_hi.append(p_trend_hi[tt - 1])
    test_data_close.append(p_trend_close[tt - 1])
    test_data_mean.append(p_trend_mean[tt - 1])
    test_data_volume.append(p_trend_volume[tt - 1])
    test_data_predict.append(p_out_result_trends[tt])

test_data = pd.DataFrame({#'Date':test_data_date, 'Time':test_data_time,
    'OPEN':test_data_open, 'LOW':test_data_lo, 'HIGH':test_data_hi, 'CLOSE':test_data_close,
    'MEAN':test_data_mean, 'VOLUME':test_data_volume, 'SIGNAL':test_data_predict
}, index=test_data_datetime)
#}, index=pd.to_datetime([f"{d} {t}" for d, t in zip(test_data_date, test_data_time)], format='%Y%m%d %H%M%S'))

In [226]:
# Простая реализация процесса торговли для тестирования стратегии
# с экспоненциальным сглаживанием вероятности предсказания и порогом
def TradingSimple(strategy_data, commission = 0.0, start_money = 1.0):
    result_money = start_money
    max_position = start_money
    result_instr = 0.0
    trades_count = 0
    commission_money = 0.0
    max_drop_down = 0
    d_avg_index_prev = 0

    # Для расчета дневных доходностей
    daily_values = [start_money]
    daily_returns = []

    # Для расчета экстремальных цен
    d_extreme_price_min = strategy_data.iloc[0]['OPEN']
    d_extreme_price_max = strategy_data.iloc[0]['OPEN']

    # Проход по всем позициям
    for index, row in strategy_data.iterrows():
        # Экспоненциальное сглаживание сигнала
        d_avg_index = row['SIGNAL']
        if (d_avg_index_prev != 0):
            d_avg_index = (d_dither_koef * d_avg_index) + (1 - d_dither_koef) * d_avg_index_prev
        d_avg_index_prev = d_avg_index

        # Торговля в конце дня
        if (d_avg_index >= d_zero_zone_koef): # Предсказание повышения
            d_th_price = d_extreme_price_min * (1.0 + d_trend_koef_up);
            if ((result_instr == 0) and (d_th_price <= row['HIGH'])):
                #if (row['LOW'] <= d_th_price):
                #    d_th_price = (row['HIGH'] + d_th_price) / 2
                #else:
                #    d_th_price = row['MEAN']
                d_th_price = row['CLOSE']
                # Выход в LONG
                result_money /= (commission + 1.0)
                result_instr = result_money / d_th_price
                commission_money += result_money * commission
                result_money = 0.0
                trades_count += 1
                d_extreme_price_max = d_extreme_price_min = d_th_price;
        elif (d_avg_index <= -d_zero_zone_koef): # Предсказание понижения
            d_th_price = d_extreme_price_max * (1.0 - d_trend_koef_dn);
            if ((result_instr > 0) and (row['LOW'] <= d_th_price)):
                #if (d_th_price <= row['HIGH']):
                #    d_th_price = (row['LOW'] + d_th_price) / 2
                #else:
                #    d_th_price = row['MEAN']
                d_th_price = row['CLOSE']
                # Выход в CASHE
                money_val = result_instr * d_th_price
                commission_val = abs(money_val) * commission
                result_money = (money_val - commission_val)
                commission_money += commission_val
                result_instr = 0.0
                trades_count += 1
                d_extreme_price_max = d_extreme_price_min = d_th_price;

        # Обновление экстремальных цен
        d_extreme_price_min = min(d_extreme_price_min, row['LOW'])
        d_extreme_price_max = max(d_extreme_price_max, row['HIGH'])

        # Расчет текущей позиции
        curr_position = result_money + (result_instr * row['CLOSE'])
        max_position = max(curr_position, max_position)

        # Определение максимальной просадки
        curr_drop_down = 1 - (curr_position / max_position)
        max_drop_down = max(curr_drop_down, max_drop_down)

        # Сохранение дневных значений для расчета доходностей
        daily_values.append(curr_position)
        if len(daily_values) > 1:
            daily_returns.append((daily_values[-1] / daily_values[-2]) - 1)

    # Завершение торговли (выход в кэш если остались позиции)
    if (result_instr != 0):
        money_val = result_instr * strategy_data['CLOSE'].iloc[-1]
        commission_val = abs(money_val) * commission
        result_money += (money_val - commission_val)
        commission_money += commission_val
        result_instr = 0.0
        trades_count += 1

        # Обновление дневного значения
        daily_values[-1] = curr_position
        if len(daily_values) > 1:
            daily_returns[-1] = (daily_values[-1] / daily_values[-2]) - 1

    # Расчет метрик эффективности
    total_return = ((result_money / start_money) - 1) * 100
    annualized_return = ((result_money / start_money) ** (252 / len(strategy_data))) - 1

    # Расчет волатильности и критерия Шарпа
    daily_returns = np.array(daily_returns)
    avg_daily_return = np.mean(daily_returns)
    volatility = np.std(daily_returns)

    # Безрисковая ставка (можно настроить)
    risk_free_rate = 0.0

    if volatility > 0:
        sharpe_ratio = (avg_daily_return - risk_free_rate/252) / volatility * np.sqrt(252)

        # Расчет Sortino Ratio (учитывает только отрицательную волатильность)
        negative_returns = daily_returns[daily_returns < 0]
        downside_volatility = np.std(negative_returns) if len(negative_returns) > 0 else 0
        sortino_ratio = (avg_daily_return - risk_free_rate/252) / downside_volatility * np.sqrt(252) if downside_volatility > 0 else 0
    else:
        sharpe_ratio = 0
        sortino_ratio = 0

    # Расчет profit factor
    gross_profit = sum(r for r in daily_returns if r > 0)
    gross_loss = abs(sum(r for r in daily_returns if r < 0))
    profit_factor = gross_profit / gross_loss if gross_loss > 0 else np.inf

    # Доходность только по индексу бумвги
    index_profit = ((strategy_data.iloc[-1]['CLOSE'] / strategy_data.iloc[0]['OPEN']) - 1) * 100

    return {
        'initial_capital': start_money,
        'final_capital': result_money,
        'total_return_%': total_return,
        'annualized_return_%': annualized_return * 100,
        'trades_count': trades_count,
        'commission_paid': commission_money,
        'max_drop_down': max_drop_down * 100,
        'sharpe_ratio': sharpe_ratio,
        'sortino_ratio': sortino_ratio,
        'profit_factor': profit_factor,
        'volatility_annual_%': volatility * np.sqrt(252) * 100,
        'avg_daily_return_%': avg_daily_return * 100,
        'win_rate_%': len(daily_returns[daily_returns > 0]) / len(daily_returns) * 100 if len(daily_returns) > 0 else 0,
        'money_per_candle': (result_money ** (1/len(strategy_data))) - 1,
        'money_per_trade': (result_money ** (1/max(trades_count, 1))) - 1,
        'index_profit_%': index_profit
    }

In [227]:
# Проверка результата торговли
sim_result = TradingSimple(test_data, commission=0.0005, start_money=1000)
print(f"Результат работы стратегии:")
print(f"Начальные ДС: {sim_result['initial_capital']}")
print(f"Конечные ДС: {sim_result['final_capital']}")
print(f"Доходность стратегии %: {sim_result['total_return_%']}")
print(f"Доходность индекса %: {sim_result['index_profit_%']}")
print(f"Макс. просадка %: {sim_result['max_drop_down']}")
print(f"Количество сделок: {sim_result['trades_count']}")
print(f"Сумма комиссий: {sim_result['commission_paid']}")
print(f"Коэффициент Шарпа: {sim_result['sharpe_ratio']}")
print(f"Коэффициент Сортино: {sim_result['sortino_ratio']}")
print(f"Фактор прибыли: {sim_result['profit_factor']}")
print(f"Годовая волатильность %: {sim_result['volatility_annual_%']}")
print(f"Годовая доходность %: {sim_result['avg_daily_return_%']}")
print(f"Процент прибыльных дней %: {sim_result['win_rate_%']}")
print(f"Доходность на одну сделку: {sim_result['money_per_trade']}")
print(f"Доходность на одну свечку: {sim_result['money_per_candle']}")

Результат работы стратегии:
Начальные ДС: 1000
Конечные ДС: 400.7663221010978
Доходность стратегии %: -59.92336778989022
Доходность индекса %: -14.69173313077603
Макс. просадка %: 60.245262763844146
Количество сделок: 1730
Сумма комиссий: 580.5332136173321
Коэффициент Шарпа: -0.8636226479923919
Коэффициент Сортино: -1.0684517764453036
Фактор прибыли: 0.7941699189901062
Годовая волатильность %: 7.0614753048171055
Годовая доходность %: -0.024200198418567583
Процент прибыльных дней %: 21.349862258953166
Доходность на одну сделку: 0.003470388549654313
Доходность на одну свечку: 0.001652432226671774


In [None]:
# ************* Trading simulation on historical data *************
# Создание стратегии для backtrader
class SimulationStrategySimple(bt.Strategy):
    def __init__(self):
        self.prediction_index = 0  # Индекс для отслеживания текущего предсказания

    def next(self):
        print(f"next() вызван для бара {len(self.data)}")
        if self.prediction_index < len(test_data_predict):
            prediction = test_data_predict[self.prediction_index]
            print(f"position.size = {self.position.size}")
            print(f"prediction = {prediction}")
            print(f"close price = {test_data_close[self.prediction_index]}")
            if prediction > 0:  # Если предсказание роста цены
                if not self.position: #abs(self.position.size) <= 1e-10:
                    self.order = self.buy(price=test_data_close[self.prediction_index],
                             exectype=bt.Order.Limit)  # Открываем длинную позицию
                    #self.buy(price=1)
            elif prediction < 0:  # Если предсказание падения цены
                if self.position: #abs(self.position.size) > 1e-10:
                    self.order = self.sell(price=test_data_close[self.prediction_index],
                              exectype=bt.Order.Limit)  # Закрываем позицию
                    #self.sell(price=10000)
            self.prediction_index += 1
        print(f"-----------------------------------")
    def notify_order(self, order):
        if order.status in [order.Completed]:
            action = 'Покупка' if order.isbuy() else 'Продажа'
            print(f"Исполнено: {action} по {order.executed.price:.2f}")
            self.order = None
        elif order.status in [order.Rejected, order.Canceled]:
            action = 'Покупка' if order.isbuy() else 'Продажа'
            print(f"Ордер отменён: {action} с ценой {order.price:.2f}")
            self.order = None

    def stop(self):
            if self.position: #abs(self.position.size) > 1e-10:
                self.close()  # Закрываем позицию

In [None]:
# ************* Trading simulation on historical data *************
# Backtrader strategy implementation
class BacktestStrategyComplex(bt.Strategy):
    params = (
        ('out_result_trends', p_out_result_trends),
        ('prep_trend_length', i_prep_trend_length),
        ('zero_zone_koef', d_zero_zone_koef),
        ('trend_koef_up', d_trend_koef_up),
        ('trend_koef_dn', d_trend_koef_dn),
        ('dither_koef', d_dither_koef),
        ('is_short_enabled', b_is_short_enabled)
    )

    def __init__(self):
        self.extreme_price_min = self.data.close[0]
        self.extreme_price_max = self.data.close[0]
        self.avg_index_prev = 0
        self.order = None
        self.money_count_trend = []

    def next(self):
        current_idx = len(self.data) - 1
        t = current_idx + self.p.prep_trend_length

        if t not in self.p.out_result_trends:
            return

        # Get complex indicator value for this position
        d_avg_index = self.p.out_result_trends[t]

        # Exponential smoothing
        if self.avg_index_prev != 0:
            d_avg_index = (self.p.dither_koef * d_avg_index) + (1 - self.p.dither_koef) * self.avg_index_prev
        self.avg_index_prev = d_avg_index

        # Check if in cash (no stocks)
        if abs(self.position.size) <= 1e-10:
            # Check for Long entry
            d_th_price = self.extreme_price_min * (1.0 + self.p.trend_koef_up)
            if (d_th_price <= self.data.high[0]) and (d_avg_index > self.p.zero_zone_koef):
                if d_th_price < self.data.open[0]:
                    d_th_price = self.data.open[0]
                # Need to buy
                self.order = self.buy(price=d_th_price)
                self.extreme_price_max = self.extreme_price_min = d_th_price
            elif self.p.is_short_enabled:
                # Check for Short entry
                d_th_price = self.extreme_price_max * (1.0 - self.p.trend_koef_dn)
                if (self.data.low[0] <= d_th_price) and (d_avg_index < -self.p.zero_zone_koef):
                    if d_th_price > self.data.open[0]:
                        d_th_price = self.data.open[0]
                    # Need to sell
                    self.order = self.sell(price=d_th_price)
                    self.extreme_price_min = self.extreme_price_max = d_th_price

        # In Long position
        elif self.position.size > 0:
            # Check for exit to Cash
            d_th_price = self.extreme_price_max * (1.0 - self.p.trend_koef_dn)
            if (self.data.low[0] <= d_th_price) and (d_avg_index < -self.p.zero_zone_koef):
                if d_th_price > self.data.open[0]:
                    d_th_price = self.data.open[0]
                # Need to sell
                self.order = self.close(price=d_th_price)
                self.extreme_price_min = self.extreme_price_max = d_th_price

        # In Short position
        elif self.position.size < 0:
            # Check for exit to Cash
            d_th_price = self.extreme_price_min * (1.0 + self.p.trend_koef_up)
            if (d_th_price <= self.data.high[0]) and (d_avg_index > self.p.zero_zone_koef):
                if d_th_price < self.data.open[0]:
                    d_th_price = self.data.open[0]
                self.order = self.close(price=d_th_price)
                self.extreme_price_max = self.extreme_price_min = d_th_price

        # Search for minimum for position entry
        self.extreme_price_min = min(self.extreme_price_min, self.data.low[0])
        # Search for maximum for position entry
        self.extreme_price_max = max(self.extreme_price_max, self.data.high[0])

        # Save portfolio value
        self.money_count_trend.append(self.broker.getvalue())

    def stop(self):
        # Final portfolio value
        self.money_count_trend.append(self.broker.getvalue())

        # Output results
        print(f"Maximum result value = {self.broker.getvalue()}")
        print(f"Optimal data window size = {i_window_size}")
        print(f"Optimal prediction count = {i_predict_amount}")
        print(f"Optimal buffer window size = {i_buffer_size}")
        print(f"Optimal range count = {i_ranges_amount}")
        print(f"Optimal probability density intervals = {i_density_amount}")
        print(f"Optimal smoothing coefficient = {d_dither_koef}")
        print(f"Optimal Up trade cutoff coefficient = {d_trend_koef_up}")
        print(f"Optimal Dn trade cutoff coefficient = {d_trend_koef_dn}")
        print(f"Optimal 'zero zone' coefficient = {d_zero_zone_koef}")

        # Plot results
        plt.figure(figsize=(12, 6))
        plt.plot(self.money_count_trend)
        plt.title("Portfolio Value Over Time")
        plt.xlabel("Time")
        plt.ylabel("Portfolio Value")

        # Save plot
        output_dir = "results"
        os.makedirs(output_dir, exist_ok=True)
        output_file = os.path.join(output_dir,
            f"{s_ticker}_Result={self.broker.getvalue():.5f}_WndSize={i_window_size}_Predict={i_predict_amount}.png")
        plt.savefig(output_file)
        plt.close()

In [None]:
# Create cerebro engine
cerebro = bt.Cerebro()

# Add strategy
#cerebro.addstrategy(BacktestStrategy)
cerebro.addstrategy(SimulationStrategySimple)

# Prepare data feed
#test_data = data.iloc[-i_test_trend_size:]  # Выбираем тестовую часть данных
data_feed = bt.feeds.PandasData(dataname=test_data, datetime=None, openinterest=None)
cerebro.adddata(data_feed)

# Add analyzers
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name="ta")
cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name="sharpe", timeframe=bt.TimeFrame.Days)
cerebro.addanalyzer(bt.analyzers.DrawDown, _name="drawdown")
cerebro.addanalyzer(bt.analyzers.Returns, _name="returns")
cerebro.addanalyzer(bt.analyzers.PyFolio, _name="pyfolio")
cerebro.addanalyzer(bt.analyzers.TimeReturn, _name='timereturn')

# Set initial capital
cerebro.broker.setcash(i_start_capital)

# Set commission
cerebro.broker.setcommission(commission=d_trader_marging)

# Run backtest
print('Starting Portfolio Value: %.2f' % cerebro.broker.getvalue())
analysis = cerebro.run()
print('Final Portfolio Value: %.2f' % cerebro.broker.getvalue())

# Визуализация
cerebro.plot(style='candlestick', volume=False)

Starting Portfolio Value: 10000.00
next() вызван для бара 1
position.size = 0
prediction = 0.9984461272899947
close price = 313.49
-----------------------------------
next() вызван для бара 2
position.size = 0
prediction = 0.9949221147041093
close price = 314.85
-----------------------------------
Исполнено: Покупка по 314.85
next() вызван для бара 3
position.size = 1
prediction = 0.9985773175804629
close price = 318.12
-----------------------------------
next() вызван для бара 4
position.size = 1
prediction = 0.40602677016183747
close price = 319.7
-----------------------------------
next() вызван для бара 5
position.size = 1
prediction = 0.9999327328489582
close price = 322.91
-----------------------------------
next() вызван для бара 6
position.size = 1
prediction = 0.9304834521907501
close price = 323.16
-----------------------------------
next() вызван для бара 7
position.size = 1
prediction = -0.9962000386965084
close price = 321.08
-----------------------------------
Исполнено: 

<IPython.core.display.Javascript object>

[[<Figure size 640x480 with 3 Axes>]]

In [None]:
# Вывод дополнительных характеристик
def print_trade_analysis(analyzer):
    print("\n--- Trade Analysis ---")
    print(f"Total Trades: {analyzer.total.total}")
    print(f"Total Closed Trades: {analyzer.total.closed}")
    print(f"Total Open Trades: {analyzer.total.open}")
    print(f"Total Won: {analyzer.won.total}")
    print(f"Total Lost: {analyzer.lost.total}")
    print(f"Win Rate: {analyzer.won.total / analyzer.total.closed * 100:.2f}%")
    print(f"Average Win: {analyzer.won.pnl.average:.2f}")
    print(f"Average Loss: {analyzer.lost.pnl.average:.2f}")
    #print(f"Profit Factor: {analyzer.pnl.net.profit_factor:.2f}")

def print_sharpe_ratio(analyzer):
    print("\n--- Sharpe Ratio ---")
    print(f"Sharpe Ratio: {analyzer.get('sharperatio', 'N/A'):.2f}")

def print_drawdown(analyzer):
    print("\n--- Drawdown ---")
    print(f"Max Drawdown: {analyzer.max.drawdown:.2f}%")
    print(f"Max Money Drawdown: {analyzer.max.moneydown:.2f}")

# Вывод результатов анализа
print_trade_analysis(analysis[0].analyzers.ta.get_analysis())
print_sharpe_ratio(analysis[0].analyzers.sharpe.get_analysis())
print_drawdown(analysis[0].analyzers.drawdown.get_analysis())
returns = analysis[0].analyzers.returns.get_analysis()
capital_history = analysis[0].analyzers.timereturn.get_analysis().items()

print(f"Общая доходность: {returns['rtot']:.2%}")

# Визуализация результатов
cerebro.plot()


--- Trade Analysis ---
Total Trades: 47
Total Closed Trades: 46
Total Open Trades: 1
Total Won: 18
Total Lost: 28
Win Rate: 39.13%
Average Win: 5.63
Average Loss: -2.71

--- Sharpe Ratio ---
Sharpe Ratio: 0.01

--- Drawdown ---
Max Drawdown: 4.65%
Max Money Drawdown: 496.17
Общая доходность: 1.90%


[[<Figure size 640x480 with 6 Axes>]]