In [1]:
import numpy as np
import pandas as pd

from Utils.data_proccessing import get_historical_ohlc_by
from datetime import datetime, UTC
from pybit.unified_trading import HTTP
from constants import API_KEY, API_SECRET
import csv

In [2]:
session = HTTP(demo=True, api_key=API_KEY, api_secret=API_SECRET)
start_date_str = "2024-10-20" 

ohlc_m15 = get_historical_ohlc_by(session, 'BTCUSDT', '15', start_date_str, max_batches=80)
ohlc_m5 = get_historical_ohlc_by(session, 'BTCUSDT', '5', start_date_str, max_batches=220)

Досягнуто вказаної дати. Завершуємо завантаження.
Завантаження (BTCUSDT, 15) завершено. Отримано 35197 свічок.
Досягнуто вказаної дати. Завершуємо завантаження.
Завантаження (BTCUSDT, 5) завершено. Отримано 105591 свічок.


In [3]:
class Indicators:
    """
    Клас для розрахунку технічних індикаторів без pandas.
    Методи є статичними, оскільки їх результат залежить лише від вхідних даних.
    """
    @staticmethod
    def detect_fvg(three_candles):
        """
        Визначає Fair Value Gap (FVG) на основі 3 свічок.
        :param three_candles: Список з 3 кортежів (свічок).
        :return: Кортеж (fvg_signal, fvg_top, fvg_bottom).
                 fvg_signal: 1 (бичачий), -1 (ведмежий), 0 (немає).
        """
        if three_candles[2][3] > three_candles[0][2]:
            # Бичачий FVG
            return (1, three_candles[2][3], three_candles[0][2])

        elif three_candles[2][2] < three_candles[0][3]:
            # Ведмежий FVG
            return (-1, three_candles[0][3], three_candles[2][2])

        return (0, 0, 0)

    @staticmethod
    def detect_fractal(three_candles):
        """
        Визначає фрактал на основі 3 свічок.
        :param three_candles: Список з 3 кортежів (свічок).
        :return: Кортеж (fractal_signal, fractal_level).
                 fractal_signal: 1 (верхній), -1 (нижній), 0 (немає), 2 (обидва).
        """
        fractal_signal = 0
        fractal_level = 0
        
        is_high_fractal = three_candles[1][2] > three_candles[0][2] and three_candles[1][2] > three_candles[2][2]
        is_low_fractal = three_candles[1][3] < three_candles[0][3] and three_candles[1][3] < three_candles[2][3]

        if is_high_fractal:
            fractal_signal = 1
            fractal_level = three_candles[1][2]
        
        if is_low_fractal:
            fractal_signal = 2 if fractal_signal == 1 else -1
            fractal_level = three_candles[1][3]
            
        return (fractal_signal, fractal_level)

In [4]:
class SignalGenerator:
    """Знаходить торгові сетапи (сигнали) на основі даних M15 і M5."""
    def __init__(self, m15_data, m5_data, max_lookahead=100):
        self.m15_data = m15_data
        self.m5_data = m5_data
        self.max_lookahead = max_lookahead

    def _precompute_indicators(self, data):
        """
        ОНОВЛЕНО: Тепер також зберігає день тижня.
        """
        indicators = []
        for i in range(2, len(data)):
            data_slice = data[i-2 : i+1]
            fvg_signal, fvg_top, fvg_bottom = Indicators.detect_fvg(data_slice)
            fractal_signal, fractal_level = Indicators.detect_fractal(data_slice)
            
            current_timestamp = data_slice[2][0]
            
            day_name = datetime.fromtimestamp(current_timestamp / 1000.0, UTC).strftime('%A')
            
            indicators.append({
                "timestamp": current_timestamp,
                "day_of_week": day_name,  
                "fvg": fvg_signal, "fvg_top": fvg_top, "fvg_bottom": fvg_bottom,
                "fractal": fractal_signal, "fractal_level": fractal_level
            })
        return indicators

    def generate(self):
        """
        ОНОВЛЕНО: Додано фільтр по днях тижня.
        """
        signals = []
        m15_indicators = self._precompute_indicators(self.m15_data)
        m5_indicators = self._precompute_indicators(self.m5_data)
        
        m5_fractals = [(i, ind) for i, ind in enumerate(m5_indicators) if ind['fractal'] != 0]

        # Дні, в які НЕ ТОРГУЮ
        forbidden_days = ['Friday', 'Saturday', 'Sunday']

        for fvg_index, fvg_ind in enumerate(m15_indicators):
            if fvg_ind['fvg'] == 0:
                continue
                
            if fvg_ind['day_of_week'] in forbidden_days:
                continue 

            mitigation_time = float('inf') 
            start_check_index = fvg_index + 3
            
            for i in range(start_check_index, len(self.m15_data)):
                candle_close = self.m15_data[i][4]
                is_long_mitigated = fvg_ind['fvg'] == 1 and candle_close < fvg_ind['fvg_top']
                is_short_mitigated = fvg_ind['fvg'] == -1 and candle_close > fvg_ind['fvg_bottom']
                
                if is_long_mitigated or is_short_mitigated:
                    mitigation_time = self.m15_data[i][0]
                    break 

            relevant_fractals = [(i, frac) for i, frac in m5_fractals if frac['timestamp'] > fvg_ind['timestamp']]
            if len(relevant_fractals) < 2:
                continue

            for i in range(len(relevant_fractals) - 1):
                f1_pos, f1 = relevant_fractals[i]
                f2_pos, f2 = relevant_fractals[i+1]
                
                is_long_pair = (fvg_ind['fvg'] == 1 and f1['fractal'] == 1 and f1['fractal_level'] > fvg_ind['fvg_top'] and
                                f2['fractal'] == -1 and fvg_ind['fvg_bottom'] < f2['fractal_level'] < fvg_ind['fvg_top'])
                is_short_pair = (fvg_ind['fvg'] == -1 and f1['fractal'] == -1 and f1['fractal_level'] < fvg_ind['fvg_bottom'] and
                                 f2['fractal'] == 1 and fvg_ind['fvg_bottom'] < f2['fractal_level'] < fvg_ind['fvg_top'])

                if not (is_long_pair or is_short_pair):
                    continue

                bos_pos = None
                end_pos = min(len(self.m5_data), f2_pos + self.max_lookahead)
                for pos in range(f2_pos + 1, end_pos):
                    close_price = self.m5_data[pos][4]
                    if (is_long_pair and close_price > f1['fractal_level']) or \
                       (is_short_pair and close_price < f1['fractal_level']):
                        bos_pos = pos
                        break
                
                if bos_pos:
                    bos_time = self.m5_data[bos_pos][0]
                    
                    # 4.Сигнал валідний, тільки якщо BOS стався ДО мітигації
                    if bos_time < mitigation_time:
                        signals.append({
                            "fvg_dir": fvg_ind['fvg'], "fvg_time": fvg_ind['timestamp'],
                            "fvg_top": fvg_ind['fvg_top'], "fvg_bottom": fvg_ind['fvg_bottom'],
                            "f1_price": f1['fractal_level'], "f2_price": f2['fractal_level'],
                            "bos_time": bos_time, "bos_pos": bos_pos,
                            "bos_close": self.m5_data[bos_pos][4]
                        })
                        break 
            
        return signals

In [5]:
class Portfolio:
    """Керує капіталом, угодами та статистикою."""
    def __init__(self, risk_usd=1000):
        self.risk_usd = risk_usd
        self.trades = []

    def _qty_from_risk(self, entry_price, stop_loss):
        dist = abs(entry_price - stop_loss)
        return self.risk_usd / dist if dist > 0 else 0

    def record_trade(self, signal, result, exit_time, exit_price):
        direction = 1 if signal['fvg_dir'] == 1 else -1
        entry_price = signal['f1_price']
        sl = signal['f2_price']
        
        qty = self._qty_from_risk(entry_price, sl)
        pnl = (exit_price - entry_price) * direction * qty

        self.trades.append({
            "direction": direction,
            "result": result,
            "pnl": round(pnl, 2),
            "fvg_time": signal['fvg_time'],
            "fvg_top": signal['fvg_top'],
            "fvg_bottom": signal['fvg_bottom'],
            "f1_price": entry_price,
            "f2_price": sl,
            "bos_time": signal['bos_time'],
            "bos_close": signal['bos_close'],
            "entry_time": signal['entry_time'],
            "entry_price": entry_price,
            "sl": sl,
            "tp": signal['tp'],
            "exit_time": exit_time,
            "exit_price": exit_price,
        })

    def calculate_statistics(self):
        """Розраховує ключові метрики, коректно використовуючи UTC."""
        if not self.trades:
            return {"Message": "No trades to analyze."}

        days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
        pnl_by_day = {day: {'pnl': 0, 'wins': 0, 'losses': 0} for day in days}
        pnl_by_hour = {hour: {'pnl': 0, 'wins': 0, 'losses': 0} for hour in range(24)}
        pnl_by_month = {} 
        trades_per_day_count = {}

        for trade in self.trades:
            trade_date = datetime.fromtimestamp(trade['entry_time'] / 1000.0, UTC) 
            day_name = trade_date.strftime('%A')
            month_key = trade_date.strftime('%Y-%m')
            day_key = trade_date.strftime('%Y-%m-%d')
            hour_key = trade_date.hour
            
            trades_per_day_count[day_key] = trades_per_day_count.get(day_key, 0) + 1
            pnl = trade['pnl']
            
            if month_key not in pnl_by_month:
                pnl_by_month[month_key] = {'pnl': 0, 'wins': 0, 'losses': 0}

            pnl_by_day[day_name]['pnl'] += pnl
            if pnl > 0: pnl_by_day[day_name]['wins'] += 1
            else: pnl_by_day[day_name]['losses'] += 1

            pnl_by_month[month_key]['pnl'] += pnl
            if pnl > 0: pnl_by_month[month_key]['wins'] += 1
            else: pnl_by_month[month_key]['losses'] += 1

            pnl_by_hour[hour_key]['pnl'] += pnl
            if pnl > 0: pnl_by_hour[hour_key]['wins'] += 1
            else: pnl_by_hour[hour_key]['losses'] += 1
        
        profits = [trade['pnl'] for trade in self.trades]
        total_trades = len(profits); net_profit = sum(profits)
        wins = [p for p in profits if p > 0]; losses = [p for p in profits if p < 0]
        win_rate = len(wins) / total_trades if total_trades > 0 else 0
        avg_win = sum(wins) / len(wins) if wins else 0
        avg_loss = sum(losses) / len(losses) if losses else 0
        profit_factor = abs(sum(wins) / sum(losses)) if sum(losses) != 0 else float('inf')
        max_consecutive_losses, current_losses = 0, 0
        for p in profits:
            if p < 0: current_losses += 1
            else: max_consecutive_losses = max(max_consecutive_losses, current_losses); current_losses = 0
        max_consecutive_losses = max(max_consecutive_losses, current_losses)
        equity_curve = [0]; peak = -float('inf'); max_drawdown = 0
        for p in profits: equity_curve.append(equity_curve[-1] + p)
        for equity in equity_curve:
            if equity > peak: peak = equity
            drawdown = peak - equity
            if drawdown > max_drawdown: max_drawdown = drawdown
        
        first_trade_date = datetime.fromtimestamp(self.trades[0]['entry_time'] / 1000.0, UTC)
        last_trade_date = datetime.fromtimestamp(self.trades[-1]['exit_time'] / 1000.0, UTC)
        
        total_days = (last_trade_date - first_trade_date).days + 1
        avg_trades_per_day = total_trades / total_days if total_days > 0 else 0
        
        max_trades_per_day = 0
        if trades_per_day_count:
            max_trades_per_day = max(trades_per_day_count.values())

        stats = {
            'Total Trades': total_trades, 'Total Net Profit': net_profit,
            'Avg. Trades per Day': avg_trades_per_day,
            'Max Trades in a Single Day': max_trades_per_day,
            'Trading Period Days': total_days, 'Win Rate (%)': win_rate * 100,
            'Profit Factor': profit_factor, 'Average Win': avg_win,
            'Average Loss': avg_loss, 'Max Drawdown': max_drawdown,
            'Max Consecutive Losses': max_consecutive_losses,
            'PnL by Day of Week': pnl_by_day, 'PnL by Month': pnl_by_month,
            'PnL by Hour (UTC)': pnl_by_hour
        }
        
        for key, value in stats.items():
            if isinstance(value, float):
                stats[key] = round(value, 2)
                
        return stats

class Backtester:
    """Оркестратор процесу бектесту."""
    def __init__(self, m5_data, m15_data, config):
        self.m5_data = m5_data
        self.m15_data = m15_data
        self.config = config
        self.portfolio = Portfolio(risk_usd=config.get('risk_usd', 1000))
        self.forbidden_entry_days = config.get('forbidden_entry_days', [])
        self.signal_generator = SignalGenerator(m15_data, m5_data, max_lookahead=config.get('max_lookahead', 100))
        self.forbidden_entry_days = config.get('forbidden_entry_days', []) 
        self.forbidden_entry_hours = config.get('forbidden_entry_hours', []) 

    def run(self):
        print("1. Генеруємо всі можливі торгові сигнали...")
        signals = self.signal_generator.generate()
        print(f"Знайдено {len(signals)} сигналів.")
        
        trades_per_day_count = {}
        max_trades = self.config.get('max_trades_per_day', 999) 
        print(f"2. Симулюємо торгівлю по сигналах (ліміт: {max_trades} угоди на день)...")
        print(f"   Заборонені дні: {self.forbidden_entry_days}")
        print(f"   Заборонені години: {self.forbidden_entry_hours}")
        
        for sig in signals:
            direction = 1 if sig['fvg_dir'] == 1 else -1
            entry_price, sl = sig['f1_price'], sig['f2_price']
            risk = abs(entry_price - sl)
            if risk == 0: continue
            
            tp = entry_price + risk * self.config['rr'] if direction == 1 else entry_price - risk * self.config['rr']
            
            search_entry_start = sig['bos_pos'] + 1
            search_entry_end = min(len(self.m5_data), search_entry_start + self.config['max_lookahead'])
            
            entry_pos = None
            for i in range(search_entry_start, search_entry_end):
                low, high = self.m5_data[i][3], self.m5_data[i][2]
                
                # 1. Перевіряємо, чи свічка торкнулась ціни входу
                if low <= entry_price <= high:
                    entry_timestamp = self.m5_data[i][0]
                    entry_date = datetime.fromtimestamp(entry_timestamp / 1000.0, UTC)
                    
                    day_name = entry_date.strftime('%A')
                    if day_name in self.forbidden_entry_days:
                        continue  
                    
                    hour = entry_date.hour
                    if hour in self.forbidden_entry_hours:
                        continue  

                    day_key = entry_date.strftime('%Y-%m-%d')
                    current_day_trades = trades_per_day_count.get(day_key, 0)
                    
                    if current_day_trades < max_trades:
                        trades_per_day_count[day_key] = current_day_trades + 1
                        entry_pos = i
                        break 
                    else:
                        continue 

            if not entry_pos:
                continue 

            search_exit_start = entry_pos
            search_exit_end = min(len(self.m5_data), search_exit_start + self.config['max_lookahead'])
            exit_pos = None; result = 'timeout'; exit_price = self.m5_data[search_exit_end - 1][4] 

            for i in range(search_exit_start, search_exit_end):
                low, high = self.m5_data[i][3], self.m5_data[i][2]
                touched_sl = (low <= sl) if direction == 1 else (high >= sl)
                touched_tp = (high >= tp) if direction == 1 else (low <= tp)

                if touched_sl:
                    exit_pos, result, exit_price = i, 'loss', sl; break
                if touched_tp:
                    exit_pos, result, exit_price = i, 'win', tp; break
            
            sig.update({'entry_time': self.m5_data[entry_pos][0], 'tp': tp})
            final_exit_pos = exit_pos if exit_pos is not None else search_exit_end - 1
            self.portfolio.record_trade(sig, result, self.m5_data[final_exit_pos][0], exit_price)

        print("Бектест завершено.")
        return self.portfolio

In [6]:
def convert_df_to_tuples(df):
    """
    Конвертує DataFrame у список кортежів.
    ВИПРАВЛЕНО: Використовує .values для прямої конвертації з pandas datetime.
    """
    timestamps_ms = (pd.to_datetime(df['time']).values.astype(np.int64) // 10**6)

    data_values = df[['open', 'high', 'low', 'close', 'volume']].values

    data_list = [
        (int(ts), row[0], row[1], row[2], row[3], row[4])
        for ts, row in zip(timestamps_ms, data_values)
    ]

    return data_list

def export_to_csv(trades, filename="backtest_trades.csv"):
    """
    Зберігає угоди у CSV, примусово конвертуючи час у UTC, щоб уникнути плутанини з часовими поясами.
    """
    if not trades:
        print("Немає угод для експорту.")
        return
    
    time_keys = ['fvg_time', 'bos_time', 'entry_time', 'exit_time']
    formatted_trades = []
    
    headers = trades[0].keys()

    for trade in trades:
        trade_copy = trade.copy()
        for key in time_keys:
            if key in trade_copy:
                timestamp_ms = trade_copy[key]
                if isinstance(timestamp_ms, (int, float)) and timestamp_ms > 0:
                    trade_copy[key] = datetime.fromtimestamp(timestamp_ms / 1000.0, UTC).strftime('%Y-%m-%d %H:%M:%S')
        formatted_trades.append(trade_copy)

    try:
        with open(filename, 'w', newline='', encoding='utf-8') as output_file:
            dict_writer = csv.DictWriter(output_file, fieldnames=headers)
            dict_writer.writeheader()
            dict_writer.writerows(formatted_trades)
        print(f"Угоди успішно експортовано у файл: {filename}")
    except Exception as e:
        print(f"Сталася помилка під час експорту в CSV: {e}")


def print_detailed_stats(stats_dict, title, period_order):
    """Виводить детальну статистику по періодах у відсортованому порядку."""
    print(f"\n--- {title.upper()} ---")
    
    for period in period_order:
        stats = stats_dict.get(period)
        if not stats: continue

        total_trades = stats['wins'] + stats['losses']
        if total_trades == 0:
            continue 

        pnl = round(stats['pnl'], 2)
        win_rate = (stats['wins'] / total_trades) * 100 if total_trades > 0 else 0
        
        indicator = "🟩" if pnl > 0 else "🟥"
        
        if isinstance(period, int):  
            period_str = f"{period:02}:00 - {period:02}:59"
        else:  
            period_str = period
        
        print(f"{indicator} {period_str:<18}: ${pnl:9,.2f}  (WR: {win_rate:5.1f}%, Угод: {total_trades})")

# --- 3. ГОЛОВНИЙ БЛОК ЗАПУСКУ ---
if __name__ == '__main__':

    print("Запускаємо завантаження даних...")
    
    if ohlc_m15.empty or ohlc_m5.empty:
        print("Не вдалося завантажити дані. Бектест неможливий.")
        exit()

    print("Конвертуємо дані для бекстера...")
    ohlc_m15 = convert_df_to_tuples(ohlc_m15)
    ohlc_m5 = convert_df_to_tuples(ohlc_m5)

    forbidden_hours_list = [0, 1, 2, 3, 4, 8, 9, 12, 14, 16, 19, 20, 21, 22, 23]
    
    BACKTEST_CONFIG = {
        "rr": 2.5,
        "risk_usd": 1000,
        "max_lookahead": 200,
        "max_trades_per_day": 2,
        "forbidden_entry_days": ['Wednesday', 'Friday', 'Saturday', 'Sunday'],
        "forbidden_entry_hours": forbidden_hours_list
    }
    
    backtester = Backtester(ohlc_m5, ohlc_m15, BACKTEST_CONFIG) 
    final_portfolio = backtester.run()
    
    final_stats = final_portfolio.calculate_statistics()
    
    pnl_by_day = final_stats.pop('PnL by Day of Week', {})
    pnl_by_month = final_stats.pop('PnL by Month', {})
    pnl_by_hour = final_stats.pop('PnL by Hour (UTC)', {}) 
    
    print(f"\n--- ОСНОВНІ РЕЗУЛЬТАТИ БЕКТЕСТУ ---")
    if 'Message' in final_stats:
        print(final_stats['Message'])
    else:
        for key, value in final_stats.items():
            print(f"{key:<25}: {value}")

    days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
    months_order = sorted(pnl_by_month.keys())
    hours_order = list(range(24)) 

    print_detailed_stats(pnl_by_day, "PnL за днями тижня", days_order)
    print_detailed_stats(pnl_by_month, "PnL за місяцями", months_order)
    print_detailed_stats(pnl_by_hour, "PnL за годинами (UTC)", hours_order)
    
    export_to_csv(final_portfolio.trades)

Запускаємо завантаження даних...
Конвертуємо дані для бекстера...
1. Генеруємо всі можливі торгові сигнали...
Знайдено 617 сигналів.
2. Симулюємо торгівлю по сигналах (ліміт: 2 угоди на день)...
   Заборонені дні: ['Wednesday', 'Friday', 'Saturday', 'Sunday']
   Заборонені години: [0, 1, 2, 3, 4, 8, 9, 12, 14, 16, 19, 20, 21, 22, 23]
Бектест завершено.

--- ОСНОВНІ РЕЗУЛЬТАТИ БЕКТЕСТУ ---
Total Trades             : 235
Total Net Profit         : 63709.57
Avg. Trades per Day      : 0.66
Max Trades in a Single Day: 2
Trading Period Days      : 358
Win Rate (%)             : 37.87
Profit Factor            : 1.44
Average Win              : 2342.51
Average Loss             : -991.6
Max Drawdown             : 9488.06
Max Consecutive Losses   : 6

--- PNL ЗА ДНЯМИ ТИЖНЯ ---
🟩 Monday            : $34,251.62  (WR:  43.2%, Угод: 74)
🟩 Tuesday           : $14,640.12  (WR:  34.6%, Угод: 81)
🟩 Thursday          : $14,817.83  (WR:  36.2%, Угод: 80)

--- PNL ЗА МІСЯЦЯМИ ---
🟥 2024-10           : $-4,

In [7]:
trades_df = pd.read_csv('backtest_trades.csv')

# Виводимо у вигляді красивої таблиці
trades_df.tail(5)

Unnamed: 0,direction,result,pnl,fvg_time,fvg_top,fvg_bottom,f1_price,f2_price,bos_time,bos_close,entry_time,entry_price,sl,tp,exit_time,exit_price
230,1,loss,-1000.0,2025-10-13 06:30:00,114947.1,114816.9,115147.5,114864.0,2025-10-13 06:50:00,115181.3,2025-10-13 07:00:00,115147.5,114864.0,115856.25,2025-10-13 09:10:00,114864.0
231,-1,loss,-1000.0,2025-10-14 03:00:00,113716.5,113584.4,113188.1,113639.0,2025-10-14 04:35:00,113051.8,2025-10-14 05:05:00,113188.1,113639.0,112060.85,2025-10-14 05:10:00,113639.0
232,-1,win,2500.0,2025-10-14 05:45:00,112929.5,112660.1,112333.0,112670.1,2025-10-14 06:20:00,112290.9,2025-10-14 06:25:00,112333.0,112670.1,111490.25,2025-10-14 06:45:00,111490.25
233,1,loss,-1000.0,2025-10-16 01:15:00,110647.4,110498.4,111432.8,110646.0,2025-10-16 06:50:00,111621.6,2025-10-16 07:05:00,111432.8,110646.0,113399.8,2025-10-16 07:55:00,110646.0
234,1,loss,-1000.0,2025-10-20 06:00:00,110950.0,110763.0,111200.0,110913.1,2025-10-20 08:25:00,111235.3,2025-10-20 15:00:00,111200.0,110913.1,111917.25,2025-10-20 16:00:00,110913.1
