# 🎯 КЛАССИФИКАТОР РЫНОЧНЫХ РЕЖИМОВ

## 📊 НАЗНАЧЕНИЕ
Статичный классификатор для определения рыночных режимов без оптимизации параметров.

### 🎯 ПРИНЦИПЫ:
- ✅ **СТАТИЧНЫЕ НАСТРОЙКИ** - не оптимизируем под данные
- ✅ **ТЕОРЕТИЧЕСКИ ОБОСНОВАННЫЕ** - используем семантически значимые значения
- ✅ **ОБЪЕКТИВНАЯ КЛАССИФИКАЦИЯ** - избегаем порочного круга оптимизации

### 🔍 РЫНОЧНЫЕ РЕЖИМЫ:
1. **Сильный тренд вверх** - ADX > 30, Choppiness < 38.2, цена растет
2. **Сильный тренд вниз** - ADX > 30, Choppiness < 38.2, цена падает
3. **Флет** - ADX < 20, Choppiness > 61.8
4. **Высокая волатильность** - Bollinger Bands Width в верхних 20%
5. **Неопределенное состояние** - остальные случаи


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from typing import Tuple, Dict, List

EPS = 1e-12
import warnings
warnings.filterwarnings('ignore')

print("✅ Импорты загружены!")


## 🏗️ СОЗДАНИЕ КЛАССИФИКАТОРА РЫНОЧНЫХ РЕЖИМОВ


In [None]:
class AdvancedMarketRegimeClassifier:
    """
    ПРОФЕССИОНАЛЬНЫЙ КЛАССИФИКАТОР РЫНОЧНЫХ РЕЖИМОВ (без ML)

    Ключевые характеристики:
    - Wilder RSI, ADX/DI (устойчивые формулы и защита делений)
    - Волатильность через безопасный BB width
    - Экзо-перцентили (ex-ante) для порогов волатильности (shift +1)
    - Композитная сила тренда: SMA cross + DI sign + объем
    - Гистерезис и минимальная длительность режима
    - Детальная статистика и визуализация
    """

    def __init__(self):
        self.parameters = {
            'adx': {'strong_trend': 30, 'weak_trend': 20, 'no_trend': 15, 'period': 14},
            'choppiness': {'trend_threshold': 38.2, 'flat_threshold': 61.8, 'period': 14},
            'volatility': {
                'lookback_percentile': 252,
                'low_percentile': 0.20,
                'high_percentile': 0.80,
                'bb_period': 20,
                'bb_std': 2.0
            },
            'rsi': {'period': 14, 'overbought': 70, 'oversold': 30},
            'statistical': {'z_score_threshold': 2.0, 'min_trend_bars': 5, 'regime_persistence': 3}
        }
        self.regime_hierarchy = [
            'high_volatility_crash', 'high_volatility_rally',
            'strong_trend_up', 'strong_trend_down',
            'weak_trend_up', 'weak_trend_down',
            'consolidation_high_vol', 'consolidation_low_vol',
            'ranging_market', 'uncertain'
        ]
        print("🎯 AdvancedMarketRegimeClassifier инициализирован!")

    # ---------- Индикаторы ----------
    def _rsi_wilder(self, close: pd.Series) -> pd.Series:
        period = self.parameters['rsi']['period']
        delta = close.diff()
        gain = delta.clip(lower=0.0)
        loss = -delta.clip(upper=0.0)
        avg_gain = gain.ewm(alpha=1/period, adjust=False, min_periods=period).mean()
        avg_loss = loss.ewm(alpha=1/period, adjust=False, min_periods=period).mean()
        rs = avg_gain / (avg_loss + EPS)
        rsi = 100 - (100 / (1 + rs))
        return rsi

    def _bb_width_relative(self, close: pd.Series) -> pd.Series:
        period = self.parameters['volatility']['bb_period']
        std_mult = self.parameters['volatility']['bb_std']
        sma = close.rolling(window=period, min_periods=period).mean()
        std = close.rolling(window=period, min_periods=period).std(ddof=0)
        upper = sma + std_mult * std
        lower = sma - std_mult * std
        width = (upper - lower) / (sma.abs() + EPS)
        return width

    def _adx_di_wilder(self, high: pd.Series, low: pd.Series, close: pd.Series) -> Tuple[pd.Series, pd.Series, pd.Series]:
        period = self.parameters['adx']['period']
        up_move = high.diff()
        down_move = -low.diff()
        plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0)
        minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0)
        plus_dm = pd.Series(plus_dm, index=high.index)
        minus_dm = pd.Series(minus_dm, index=high.index)
        prev_close = close.shift(1)
        tr = pd.concat([(high - low), (high - prev_close).abs(), (low - prev_close).abs()], axis=1).max(axis=1)
        atr = tr.ewm(alpha=1/period, adjust=False, min_periods=period).mean()
        plus_di = 100 * (plus_dm.ewm(alpha=1/period, adjust=False, min_periods=period).mean() / (atr + EPS))
        minus_di = 100 * (minus_dm.ewm(alpha=1/period, adjust=False, min_periods=period).mean() / (atr + EPS))
        dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + EPS)
        adx = dx.ewm(alpha=1/period, adjust=False, min_periods=period).mean()
        return adx, plus_di, minus_di

    def _choppiness_index(self, high: pd.Series, low: pd.Series, close: pd.Series) -> pd.Series:
        period = self.parameters['choppiness']['period']
        prev_close = close.shift(1)
        tr = pd.concat([(high - low), (high - prev_close).abs(), (low - prev_close).abs()], axis=1).max(axis=1)
        sum_tr = tr.rolling(window=period, min_periods=period).sum()
        highest_high = high.rolling(window=period, min_periods=period).max()
        lowest_low = low.rolling(window=period, min_periods=period).min()
        range_hl = (highest_high - lowest_low).replace(0, np.nan)
        chop = 100 * np.log10((sum_tr + EPS) / (range_hl + EPS)) / np.log10(period)
        return chop

    def _rolling_percentiles_ex_ante(self, series: pd.Series) -> Tuple[pd.Series, pd.Series, pd.Series]:
        lookback = self.parameters['volatility']['lookback_percentile']
        low_q = self.parameters['volatility']['low_percentile']
        high_q = self.parameters['volatility']['high_percentile']
        low_thr = series.rolling(window=lookback, min_periods=lookback).quantile(low_q).shift(1)
        high_thr = series.rolling(window=lookback, min_periods=lookback).quantile(high_q).shift(1)
        def ex_ante_pct(window: pd.Series) -> float:
            vals = window[:-1].values
            if len(vals) == 0:
                return np.nan
            current = window.iloc[-1]
            return (np.searchsorted(np.sort(vals), current, side='right') / len(vals))
        pct = series.rolling(window=lookback + 1, min_periods=lookback + 1).apply(ex_ante_pct, raw=False)
        return low_thr, high_thr, pct

    def _volume_profile(self, volume: pd.Series, lookback: int = 20) -> pd.Series:
        volume_ma = volume.rolling(window=lookback, min_periods=lookback).mean()
        return volume / (volume_ma + EPS)

    def _atr_pct(self, high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series:
        prev_close = close.shift(1)
        tr = pd.concat([(high - low), (high - prev_close).abs(), (low - prev_close).abs()], axis=1).max(axis=1)
        atr = tr.ewm(alpha=1/period, adjust=False, min_periods=period).mean()
        return atr / (close.abs() + EPS)

    def _return_zscore(self, close: pd.Series, lookback: int = 63) -> pd.Series:
        ret = close.pct_change()
        mean = ret.rolling(window=lookback, min_periods=lookback).mean().shift(1)
        std = ret.rolling(window=lookback, min_periods=lookback).std(ddof=0).shift(1)
        z = (ret - mean) / (std + EPS)
        return z

    # ---------- Композитная логика ----------
    def _trend_strength(self, close: pd.Series, adx: pd.Series, choppiness: pd.Series, volume_ratio: pd.Series, plus_di: pd.Series, minus_di: pd.Series) -> pd.Series:
        sma_short = close.rolling(window=20, min_periods=20).mean()
        sma_long = close.rolling(window=50, min_periods=50).mean()
        price_trend = np.where(sma_short > sma_long, 1, -1)
        adx_strength = np.select([
            adx > self.parameters['adx']['strong_trend'],
            adx > self.parameters['adx']['weak_trend'],
            adx > self.parameters['adx']['no_trend']
        ], [2, 1, 0], default=0)
        choppy_trend = np.where(choppiness < self.parameters['choppiness']['trend_threshold'], 1, 0)
        di_sign = np.where(plus_di > minus_di, 1, -1)
        volume_support = np.where(volume_ratio > 1.2, 1, 0)
        strength = (price_trend * 0.4 + di_sign * 0.3) * (adx_strength * 0.2 + choppy_trend * 0.3 + volume_support * 0.5)
        return pd.Series(strength, index=close.index)

    def _hysteresis(self, series: pd.Series, enter: float, exit: float) -> pd.Series:
        state = False
        out = []
        for v in series.fillna(0).values:
            if not state and v >= enter:
                state = True
            elif state and v <= exit:
                state = False
            out.append(1 if state else 0)
        return pd.Series(out, index=series.index)

    # ---------- Публичные методы ----------
    def classify_market_regime(self, data: pd.DataFrame) -> Tuple[List[str], pd.DataFrame]:
        print("🔍 Начинаем расширенную классификацию рыночных режимов...")
        adx, plus_di, minus_di = self._adx_di_wilder(data['high'], data['low'], data['close'])
        choppiness = self._choppiness_index(data['high'], data['low'], data['close'])
        bb_width = self._bb_width_relative(data['close'])
        rsi = self._rsi_wilder(data['close'])
        volume_ratio = self._volume_profile(data['volume']) if 'volume' in data.columns else pd.Series(np.nan, index=data.index)
        low_thr, high_thr, vol_pct = self._rolling_percentiles_ex_ante(bb_width)
        trend_strength = self._trend_strength(data['close'], adx, choppiness, volume_ratio, plus_di, minus_di)
        atr_pct = self._atr_pct(data['high'], data['low'], data['close'])
        ret_z = self._return_zscore(data['close'])

        regimes = []
        details = []
        for i in range(len(data)):
            if i < max(252, 50):
                regimes.append('insufficient_data')
                details.append({'reason': 'insufficient_data'})
                continue
            cur = {
                'adx': adx.iloc[i],
                'chop': choppiness.iloc[i],
                'bb': bb_width.iloc[i],
                'rsi': rsi.iloc[i],
                'vol_ratio': volume_ratio.iloc[i] if 'volume' in data.columns else np.nan,
                'ts': trend_strength.iloc[i],
                'low_thr': low_thr.iloc[i],
                'high_thr': high_thr.iloc[i],
                'vol_pct': vol_pct.iloc[i],
                'plus_di': plus_di.iloc[i],
                'minus_di': minus_di.iloc[i],
                'atr_pct': atr_pct.iloc[i],
                'ret_z': ret_z.iloc[i],
                'open': data['open'].iloc[i] if 'open' in data.columns else data['close'].iloc[i-1],
                'close': data['close'].iloc[i]
            }
            if any(pd.isna([cur['adx'], cur['chop'], cur['bb'], cur['ts'], cur['low_thr'], cur['high_thr'], cur['vol_pct']])):
                regimes.append('insufficient_data')
                details.append({'reason': 'nan_values'})
                continue
            is_high_vol = cur['bb'] > cur['high_thr']
            is_low_vol = cur['bb'] < cur['low_thr']
            is_strong_trend = cur['adx'] > self.parameters['adx']['strong_trend']
            is_weak_trend = self.parameters['adx']['weak_trend'] <= cur['adx'] <= self.parameters['adx']['strong_trend']
            is_choppy_trend = cur['chop'] < self.parameters['choppiness']['trend_threshold']
            is_choppy_flat = cur['chop'] > self.parameters['choppiness']['flat_threshold']
            price_up = cur['close'] > cur['open']

            # Crash / rally уточнение: высокий перцентиль волатильности + экстремальный ret_z и/или высокий ATR%
            if is_high_vol and (cur['ts'] < -1.5) and (cur['vol_pct'] > 0.9) and ((cur['ret_z'] < -2.0) or (cur['atr_pct'] > 0.03)):
                regimes.append('high_volatility_crash')
                details.append({'reason': 'high_volatility_crash', 'vol_pct': cur['vol_pct'], 'ts': cur['ts'], 'ret_z': cur['ret_z'], 'atr_pct': cur['atr_pct']})
            elif is_high_vol and (cur['ts'] > 1.5) and (cur['vol_pct'] > 0.9) and ((cur['ret_z'] > 2.0) or (cur['atr_pct'] > 0.03)):
                regimes.append('high_volatility_rally')
                details.append({'reason': 'high_volatility_rally', 'vol_pct': cur['vol_pct'], 'ts': cur['ts'], 'ret_z': cur['ret_z'], 'atr_pct': cur['atr_pct']})
            elif is_strong_trend and is_choppy_trend and cur['ts'] > 1.0 and price_up:
                regimes.append('strong_trend_up')
                details.append({'reason': 'strong_trend_up'})
            elif is_strong_trend and is_choppy_trend and cur['ts'] < -1.0 and not price_up:
                regimes.append('strong_trend_down')
                details.append({'reason': 'strong_trend_down'})
            elif is_weak_trend and is_choppy_trend and cur['ts'] > 0.5 and price_up:
                regimes.append('weak_trend_up')
                details.append({'reason': 'weak_trend_up'})
            elif is_weak_trend and is_choppy_trend and cur['ts'] < -0.5 and not price_up:
                regimes.append('weak_trend_down')
                details.append({'reason': 'weak_trend_down'})
            elif is_high_vol and is_choppy_flat and abs(cur['ts']) < 0.5:
                regimes.append('consolidation_high_vol')
                details.append({'reason': 'consolidation_high_vol'})
            elif is_low_vol and is_choppy_flat and abs(cur['ts']) < 0.5:
                regimes.append('consolidation_low_vol')
                details.append({'reason': 'consolidation_low_vol'})
            elif is_choppy_flat and abs(cur['ts']) < 1.0:
                regimes.append('ranging_market')
                details.append({'reason': 'ranging_market'})
            else:
                regimes.append('uncertain')
                details.append({'reason': 'uncertain'})

        regimes = self._apply_persistence(regimes)
        classification_data = pd.DataFrame({
            'adx': adx,
            'plus_di': plus_di,
            'minus_di': minus_di,
            'choppiness': choppiness,
            'bb_width': bb_width,
            'rsi': rsi,
            'volume_ratio': volume_ratio,
            'trend_strength': trend_strength,
            'volatility_percentile': vol_pct,
            'regime': regimes
        }, index=data.index)
        classification_data['classification_details'] = details
        self._print_regime_stats(regimes)
        return regimes, classification_data

    def _apply_persistence(self, regimes: List[str]) -> List[str]:
        if len(regimes) < 3:
            return regimes
        min_persist = self.parameters['statistical']['regime_persistence']
        smoothed = regimes.copy()
        i = 0
        while i < len(regimes):
            j = i
            while j < len(regimes) and regimes[j] == regimes[i]:
                j += 1
            run_len = j - i
            if run_len < min_persist:
                # заменяем короткий пробег на соседний доминирующий режим
                left = regimes[i-1] if i - 1 >= 0 else None
                right = regimes[j] if j < len(regimes) else None
                fill = right if right is not None else left
                for k in range(i, j):
                    smoothed[k] = fill if fill is not None else regimes[k]
            i = j
        return smoothed

    def _print_regime_stats(self, regimes: List[str]):
        counts = pd.Series(regimes).value_counts()
        total = len(regimes)
        print("\n📊 СТАТИСТИКА РЫНОЧНЫХ РЕЖИМОВ:")
        print("=" * 50)
        for regime in self.regime_hierarchy:
            if regime in counts:
                c = counts[regime]
                print(f"   • {regime:25s}: {c:4d} баров ({(c/total)*100:5.1f}%)")
        sufficient = [r for r in regimes if r != 'insufficient_data']
        if sufficient:
            print(f"\n   • Всего классифицировано: {len(sufficient)} баров")
            print(f"   • Недостаточно данных: {total - len(sufficient)} баров")

    # ---------- Статистика и визуализация ----------
    def get_detailed_statistics(self, classification_data: pd.DataFrame) -> Dict:
        valid = classification_data[classification_data['regime'] != 'insufficient_data']
        def transitions(series: pd.Series) -> pd.DataFrame:
            pairs = []
            prev = None
            for r in series:
                if prev and prev != r:
                    pairs.append((prev, r))
                prev = r
            if not pairs:
                return pd.DataFrame()
            df = pd.DataFrame(pairs, columns=['from', 'to'])
            return pd.crosstab(df['from'], df['to'], normalize='index')
        def durations(series: pd.Series) -> Dict[str, float]:
            dur = {}
            cur = None
            run = 0
            for r in series:
                if r == cur:
                    run += 1
                else:
                    if cur:
                        dur.setdefault(cur, []).append(run)
                    cur = r
                    run = 1
            if cur:
                dur.setdefault(cur, []).append(run)
            return {k: float(np.mean(v)) for k, v in dur.items()}
        stats = {
            'regime_counts': valid['regime'].value_counts(),
            'regime_percentages': valid['regime'].value_counts(normalize=True) * 100,
            'indicators_by_regime': valid.groupby('regime').agg({
                'adx': ['mean', 'std', 'min', 'max'],
                'choppiness': ['mean', 'std', 'min', 'max'],
                'bb_width': ['mean', 'std', 'min', 'max'],
                'rsi': ['mean', 'std', 'min', 'max'],
                'volume_ratio': ['mean', 'std', 'min', 'max'],
                'trend_strength': ['mean', 'std', 'min', 'max']
            }),
            'regime_transitions': transitions(valid['regime']),
            'regime_durations': durations(valid['regime'])
        }
        return stats

    def visualize_regime_analysis(self, data: pd.DataFrame, classification_data: pd.DataFrame, title: str = "Расширенный анализ рыночных режимов BTC"):
        fig, axes = plt.subplots(4, 1, figsize=(16, 14))
        regime_colors = {
            'strong_trend_up': '#00ff00',
            'strong_trend_down': '#ff0000',
            'weak_trend_up': '#90ee90',
            'weak_trend_down': '#ffcccb',
            'high_volatility_rally': '#ffa500',
            'high_volatility_crash': '#8b0000',
            'consolidation_high_vol': '#ff69b4',
            'consolidation_low_vol': '#9370db',
            'ranging_market': '#1e90ff',
            'uncertain': '#a9a9a9',
            'insufficient_data': '#000000'
        }
        axes[0].plot(data.index, data['close'], label='BTC Price', color='black', alpha=0.8, linewidth=1)
        for regime in classification_data['regime'].unique():
            if regime in regime_colors:
                mask = classification_data['regime'] == regime
                if mask.any():
                    axes[0].scatter(data.index[mask], data['close'][mask], c=regime_colors[regime], label=regime, alpha=0.7, s=12)
        axes[0].set_title(f'{title}\nЦена BTC с рыночными режимами', fontsize=14, fontweight='bold')
        axes[0].set_ylabel('Цена BTC', fontweight='bold')
        # Ограничиваем количество элементов в легенде, если их слишком много
        handles, labels = axes[0].get_legend_handles_labels()
        if len(labels) > 9:
            axes[0].legend(handles[:9], labels[:9], bbox_to_anchor=(1.05, 1), loc='upper left')
        else:
            axes[0].legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        axes[0].grid(True, alpha=0.3)

        axes[1].plot(classification_data.index, classification_data['adx'], label='ADX', color='purple', linewidth=1)
        axes[1].axhline(y=self.parameters['adx']['strong_trend'], color='red', linestyle='--', alpha=0.7, label='Сильный тренд')
        axes[1].axhline(y=self.parameters['adx']['weak_trend'], color='orange', linestyle='--', alpha=0.7, label='Слабый тренд')
        axes[1].set_ylabel('ADX', fontweight='bold')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)

        axes[2].plot(classification_data.index, classification_data['choppiness'], label='Choppiness Index', color='blue', linewidth=1)
        axes[2].axhline(y=self.parameters['choppiness']['flat_threshold'], color='red', linestyle='--', alpha=0.7, label='Флет')
        axes[2].axhline(y=self.parameters['choppiness']['trend_threshold'], color='green', linestyle='--', alpha=0.7, label='Тренд')
        axes[2].set_ylabel('Choppiness Index', fontweight='bold')
        axes[2].legend()
        axes[2].grid(True, alpha=0.3)

        ax4 = axes[3]
        ax4_twin = ax4.twinx()
        ax4.plot(classification_data.index, classification_data['bb_width'], label='Волатильность (BB Width)', color='brown', linewidth=1)
        ax4.set_ylabel('Волатильность', fontweight='bold', color='brown')
        ax4.tick_params(axis='y', labelcolor='brown')

        ax4_twin.plot(classification_data.index, classification_data['rsi'], label='RSI', color='gray', linewidth=1, alpha=0.7)
        ax4_twin.axhline(y=70, color='red', linestyle='--', alpha=0.5, label='Перекупленность')
        ax4_twin.axhline(y=30, color='green', linestyle='--', alpha=0.5, label='Перепроданность')
        ax4_twin.axhline(y=50, color='black', linestyle='-', alpha=0.3)
        ax4_twin.set_ylabel('RSI', fontweight='bold', color='gray')
        ax4_twin.tick_params(axis='y', labelcolor='gray')
        ax4_twin.set_ylim(0, 100)

        ax4.set_xlabel('Дата', fontweight='bold')
        # Стабилизируем легенды на последнем графике
        l1, l2 = ax4.get_legend_handles_labels()
        r1, r2 = ax4_twin.get_legend_handles_labels()
        ax4.legend(l1 + r1, r2 + r2, loc='upper left')
        ax4.grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()
        return fig


## 🧪 ТЕСТИРОВАНИЕ КЛАССИФИКАТОРА


In [None]:
# Загружаем данные BTC для тестирования
print("📊 Загрузка данных BTC для тестирования...")
df = pd.read_csv('df_btc_1h.csv', index_col=0, parse_dates=True)
print(f"✅ Загружено {len(df)} записей")
print(f"📅 Период: {df.index[0]} - {df.index[-1]}")
df.head()


In [None]:
# Создаем классификатор и тестируем
print("🎯 СОЗДАНИЕ И ТЕСТИРОВАНИЕ КЛАССИФИКАТОРА")
print("=" * 60)

classifier = AdvancedMarketRegimeClassifier()
print("\n" + "=" * 60)


In [None]:
# Классифицируем рыночные режимы (расширенная версия)
print("🔍 КЛАССИФИКАЦИЯ РЫНОЧНЫХ РЕЖИМОВ")
print("=" * 50)

regimes, classification_data = classifier.classify_market_regime(df)

print("\n✅ Классификация завершена!")
print(f"📊 Обработано {len(regimes)} баров")
print(f"📈 Уникальных режимов: {len(set(regimes))}")


In [None]:
# Получаем статистику по режимам (расширенная)
print("📊 СТАТИСТИКА ПО РЫНОЧНЫМ РЕЖИМАМ")
print("=" * 50)

stats = classifier.get_detailed_statistics(classification_data)

print("\n📈 Распределение режимов:")
for regime, count in stats['regime_counts'].items():
    percentage = stats['regime_percentages'][regime]
    print(f"   • {regime}: {count} баров ({percentage:.1f}%)")

print("\n📊 Сводные индикаторы по режимам:")
print(stats['indicators_by_regime'])

print("\n🔁 Матрица переходов (норм.):")
print(stats['regime_transitions'])

print("\n⏱ Средняя длительность режимов (бары):")
print(stats['regime_durations'])


In [None]:
# Визуализируем результаты (расширенный анализ)
print("📊 ВИЗУАЛИЗАЦИЯ РЫНОЧНЫХ РЕЖИМОВ")
print("=" * 50)

# Берем последние 1000 баров для лучшей визуализации
recent_data = df.tail(1000)
recent_classification = classification_data.tail(1000)

fig = classifier.visualize_regime_analysis(recent_data, recent_classification, "BTC 1H - Последние 1000 баров")

print("✅ Визуализация завершена!")


## 📋 РЕЗУЛЬТАТЫ КЛАССИФИКАЦИИ


In [None]:
# Показываем первые 20 результатов классификации
print("📋 ПЕРВЫЕ 20 РЕЗУЛЬТАТОВ КЛАССИФИКАЦИИ")
print("=" * 60)

sample_data = classification_data.head(20)[['adx', 'choppiness', 'bb_width', 'regime']]
print(sample_data.to_string())

print("\n📊 ИНТЕРПРЕТАЦИЯ РЕЖИМОВ:")
print("   • strong_trend_up: Сильный восходящий тренд")
print("   • strong_trend_down: Сильный нисходящий тренд")
print("   • weak_trend_up: Слабый восходящий тренд")
print("   • weak_trend_down: Слабый нисходящий тренд")
print("   • flat_market: Боковой рынок (флет)")
print("   • volatile_market: Высокая волатильность")
print("   • uncertain: Неопределенное состояние")
print("   • insufficient_data: Недостаточно данных для анализа")


## 🎯 ЗАКЛЮЧЕНИЕ

### ✅ **ЧТО МЫ СОЗДАЛИ:**
1. **Статичный классификатор рыночных режимов** - без оптимизации параметров
2. **Объективную классификацию** - основанную на теоретически обоснованных значениях
3. **7 различных рыночных режимов** - от сильных трендов до флета
4. **Визуализацию результатов** - для понимания работы классификатора

### 🎯 **СЛЕДУЮЩИЕ ШАГИ:**
1. **Анализ эффективности сигнальных индикаторов** по выявленным режимам
2. **Целевая оптимизация** сигнальных индикаторов под конкретные режимы
3. **Создание адаптивной системы** - выбор индикаторов в зависимости от режима

### 💡 **КЛЮЧЕВЫЕ ПРИНЦИПЫ:**
- ✅ **НЕ ОПТИМИЗИРУЕМ** параметры классификаторов
- ✅ **ИСПОЛЬЗУЕМ** теоретически обоснованные значения
- ✅ **ИЗБЕГАЕМ** порочного круга оптимизации
- ✅ **СОЗДАЕМ** объективную основу для дальнейшей работы


In [None]:
# 📦 ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ДЛЯ ОЦЕНКИ ЭФФЕКТИВНОСТИ
import numpy as np
import pandas as pd

FORWARD_HORIZONS = [1, 3, 5, 10]

def compute_forward_returns(close: pd.Series, horizons=FORWARD_HORIZONS) -> pd.DataFrame:
    out = {}
    for h in horizons:
        out[f'r_forward_{h}'] = close.shift(-h) / close - 1.0
    return pd.DataFrame(out, index=close.index)

def sharpe_ratio(returns: pd.Series) -> float:
    mu = returns.mean()
    sd = returns.std(ddof=0)
    if sd == 0 or np.isnan(sd):
        return np.nan
    return mu / sd

def sign_accuracy(returns: pd.Series, direction: int) -> float:
    if direction not in (-1, 1):
        return np.nan
    valid = returns.dropna()
    if len(valid) == 0:
        return np.nan
    return (np.sign(valid) == direction).mean()

def regime_run_lengths(regimes: pd.Series) -> pd.Series:
    lengths = []
    prev = None
    run = 0
    for r in regimes:
        if r == prev:
            run += 1
        else:
            if prev is not None:
                lengths.append(run)
            prev = r
            run = 1
    if prev is not None:
        lengths.append(run)
    return pd.Series(lengths, dtype=float)

def stability_metrics(regimes: pd.Series) -> dict:
    runs = regime_run_lengths(regimes)
    switches_per_1000 = 0.0 if len(regimes) == 0 else (len(runs) - 1) / max(len(regimes), 1) * 1000.0
    uncertain_share = (regimes == 'uncertain').mean()
    return {
        'avg_run_length': runs.mean() if len(runs) else np.nan,
        'switches_per_1000': switches_per_1000,
        'uncertain_share': uncertain_share,
    }

def simple_regime_position(regime: str) -> int:
    if regime in ('strong_trend_up', 'weak_trend_up'):
        return 1
    if regime in ('strong_trend_down', 'weak_trend_down'):
        return -1
    return 0

def backtest_regime_on_off(close: pd.Series, regimes: pd.Series) -> dict:
    ret = close.pct_change()
    pos = regimes.fillna('uncertain').map(simple_regime_position).astype(float)
    # Используем позицию со сдвигом, чтобы не было look-ahead
    strat_ret = pos.shift(1) * ret
    bh_ret = ret
    out = {
        'strat_return_mean': strat_ret.mean(),
        'strat_return_std': strat_ret.std(ddof=0),
        'strat_sharpe': sharpe_ratio(strat_ret),
        'bh_return_mean': bh_ret.mean(),
        'bh_return_std': bh_ret.std(ddof=0),
        'bh_sharpe': sharpe_ratio(bh_ret),
        'strat_cumret': float((1 + strat_ret.fillna(0)).prod() - 1),
        'bh_cumret': float((1 + bh_ret.fillna(0)).prod() - 1),
    }
    return out

def crash_rally_analysis(close: pd.Series, regimes: pd.Series, window: int = 10) -> dict:
    ret = close.pct_change()
    idx = close.index
    results = {}
    for tag in ('high_volatility_crash', 'high_volatility_rally'):
        starts = idx[regimes == tag]
        if len(starts) == 0:
            results[tag] = {'signals': 0}
            continue
        prof = []
        falses = 0
        for t in starts:
            if t not in idx:
                continue
            start_loc = idx.get_loc(t)
            end_loc = min(start_loc + window, len(idx) - 1)
            seg = ret.iloc[start_loc+1:end_loc+1]  # доходности после сигнала
            cum = (1 + seg.fillna(0)).prod() - 1
            prof.append(cum)
            if tag == 'high_volatility_crash' and cum > 0:
                falses += 1
            if tag == 'high_volatility_rally' and cum < 0:
                falses += 1
        prof = pd.Series(prof, dtype=float)
        results[tag] = {
            'signals': int(len(starts)),
            'mean_cum_return_w'+str(window): float(prof.mean()) if len(prof) else np.nan,
            'median_cum_return_w'+str(window): float(prof.median()) if len(prof) else np.nan,
            'false_rate': float(falses / len(starts)) if len(starts) else np.nan,
        }
    return results



In [None]:
# 🧮 ОЦЕНКА НА ОДНОМ ТАЙМФРЕЙМЕ (ПРИМЕР: 1H)
print("🔎 ОЦЕНКА НА 1H")

# 1) Получаем режимы уже рассчитанные выше: classification_data
fr = compute_forward_returns(df['close'])
joined = classification_data.join(fr)

# 2) Таблица метрик по режимам
rows = []
for regime, grp in joined.groupby('regime'):
    row = {'regime': regime}
    for h in FORWARD_HORIZONS:
        r = grp[f'r_forward_{h}']
        row[f'mean_{h}'] = r.mean()
        row[f'median_{h}'] = r.median()
        row[f'sharpe_{h}'] = sharpe_ratio(r)
    rows.append(row)
per_regime_metrics_1h = pd.DataFrame(rows).set_index('regime').sort_index()
print(per_regime_metrics_1h)

# 3) Стабильность режимов
stab = stability_metrics(classification_data['regime'])
print("\nСтабильность режимов:", stab)

# 4) Анализ crash/rally
cr = crash_rally_analysis(df['close'], classification_data['regime'], window=10)
print("\nCrash/Rally анализ (окно=10):", cr)

# 5) Простой бэктест режимной стратегии против buy&hold
bt = backtest_regime_on_off(df['close'], classification_data['regime'])
print("\nСравнение стратегии режимов vs buy&hold:")
print(bt)



In [None]:
# 🌐 МУЛЬТИ‑ТАЙМФРЕЙМ: СРАВНЕНИЕ МЕТРИК ПО ВСЕМ 5 ТФ
import os

files = [
    ('15m', 'df_btc_15m.csv'),
    ('30m', 'df_btc_30m.csv'),
    ('1h', 'df_btc_1h.csv'),
    ('4h', 'df_btc_4h.csv'),
    ('1d', 'df_btc_1d.csv'),
]

summary_rows = []
per_tf_tables = {}

for tf, fname in files:
    if not os.path.exists(fname):
        print(f"⚠️ Файл {fname} не найден, пропускаю {tf}")
        continue
    print(f"\n⏱ Обработка {tf} ({fname})")
    d = pd.read_csv(fname, index_col=0, parse_dates=True)
    clf = AdvancedMarketRegimeClassifier()
    regimes, cls = clf.classify_market_regime(d)

    fr = compute_forward_returns(d['close'])
    joined = cls.join(fr)

    # Таблица метрик
    rows = []
    for regime, grp in joined.groupby('regime'):
        row = {'regime': regime}
        for h in FORWARD_HORIZONS:
            r = grp[f'r_forward_{h}']
            row[f'mean_{h}'] = r.mean()
            row[f'median_{h}'] = r.median()
            row[f'sharpe_{h}'] = sharpe_ratio(r)
        rows.append(row)
    table = pd.DataFrame(rows).set_index('regime').sort_index()
    per_tf_tables[tf] = table

    # Стабильность и бэктест
    stab = stability_metrics(cls['regime'])
    bt = backtest_regime_on_off(d['close'], cls['regime'])

    summary_rows.append({
        'tf': tf,
        'avg_run_length': stab['avg_run_length'],
        'switches_per_1000': stab['switches_per_1000'],
        'uncertain_share': stab['uncertain_share'],
        'strat_sharpe': bt['strat_sharpe'],
        'bh_sharpe': bt['bh_sharpe'],
        'strat_cumret': bt['strat_cumret'],
        'bh_cumret': bt['bh_cumret']
    })

summary = pd.DataFrame(summary_rows).set_index('tf').sort_index()
print("\n📊 Сводка по ТФ:")
print(summary)

print("\n📋 Примеры таблиц метрик по режимам (1-2 ТФ):")
for tf in list(per_tf_tables.keys())[:2]:
    print(f"\n— {tf} —")
    print(per_tf_tables[tf])



In [None]:
# 🧭 TOP-DOWN АГРЕГАЦИЯ (HTF 1d/4h → базовый ТФ)
import os

def classify_df(file):
    d = pd.read_csv(file, index_col=0, parse_dates=True)
    clf = AdvancedMarketRegimeClassifier()
    regimes, cls = clf.classify_market_regime(d)
    return d, cls

# 1) Загружаем HTF: 1d и 4h
htf_files = [('1d', 'df_btc_1d.csv'), ('4h', 'df_btc_4h.csv')]
htf_cls = {}
for tf, fname in htf_files:
    if os.path.exists(fname):
        d_tf, cls_tf = classify_df(fname)
        htf_cls[tf] = (d_tf, cls_tf)
    else:
        print(f"⚠️ Нет файла {fname}, HTF {tf} пропущен")

# 2) Базовый ТФ (пример — 1h)
base_tf = ('1h', 'df_btc_1h.csv')
if not os.path.exists(base_tf[1]):
    print(f"⚠️ Нет файла {base_tf[1]}, пропуск top-down")
else:
    d_base, cls_base = classify_df(base_tf[1])

    # 3) Выравниваем HTF по индексу базового ТФ (forward-fill по времени)
    def align_htf_to_base(htf_df):
        htf_on_base = htf_df.reindex(d_base.index, method='ffill')
        return htf_on_base

    aligned = {}
    for tf, (d_tf, cls_tf) in htf_cls.items():
        aligned[tf] = align_htf_to_base(cls_tf[['regime']])

    # 4) Простейшее правило top-down
    # Если дневка трендовая — принимаем тренд; если дневка флет/консолидация — LTF тренды понижаем до ranging
    def gate_regime(htf_regime: str, ltf_regime: str) -> str:
        trend_up = ('strong_trend_up', 'weak_trend_up')
        trend_down = ('strong_trend_down', 'weak_trend_down')
        cons = ('ranging_market', 'consolidation_high_vol', 'consolidation_low_vol')
        if htf_regime in trend_up + trend_down:
            return ltf_regime
        if htf_regime in cons or htf_regime == 'uncertain':
            if ltf_regime in trend_up + trend_down:
                return 'ranging_market'
            return ltf_regime
        return ltf_regime

    # 5) Формируем агрегированный режим (используем 1d, при его отсутствии — 4h)
    htf_primary = aligned.get('1d', aligned.get('4h'))
    if htf_primary is None:
        print("⚠️ Нет HTF классификации — пропускаю top-down")
    else:
        htf_series = htf_primary['regime']
        ltf_series = cls_base['regime']
        agg_regime = []
        for t in d_base.index:
            r_htf = htf_series.loc[t] if t in htf_series.index else 'uncertain'
            r_ltf = ltf_series.loc[t] if t in ltf_series.index else 'uncertain'
            agg_regime.append(gate_regime(r_htf, r_ltf))
        agg_regime = pd.Series(agg_regime, index=d_base.index, name='regime_topdown')

        # 6) Метрики как и раньше
        fr = compute_forward_returns(d_base['close'])
        joined = pd.concat([cls_base, agg_regime], axis=1).join(fr)

        # Таблица метрик по агрегированному режиму
        rows = []
        for regime, grp in joined.groupby('regime_topdown'):
            row = {'regime': regime}
            for h in FORWARD_HORIZONS:
                r = grp[f'r_forward_{h}']
                row[f'mean_{h}'] = r.mean()
                row[f'median_{h}'] = r.median()
                row[f'sharpe_{h}'] = sharpe_ratio(r)
            rows.append(row)
        table_topdown = pd.DataFrame(rows).set_index('regime').sort_index()
        print("\n📊 Метрики по агрегированному режиму (top-down, базовый 1h):")
        print(table_topdown)

        # Стабильность и бэктест
        stab_td = stability_metrics(agg_regime)
        bt_td = backtest_regime_on_off(d_base['close'], agg_regime)
        print("\nСтабильность top-down:", stab_td)
        print("\nБэктест top-down vs buy&hold:")
        print(bt_td)

