In [1]:
import datetime
import pandas as pd
from IPython.display import display

# Предполагается, что ваш ноутбук запущен из корня проекта
from utils.downloader import download_and_process_ticks_to_df, SourceType, Frequency

# --- Параметры для теста (используем месячный файл для наглядности) ---
test_date = datetime.date(2023, 10, 1) # День не важен для monthly
test_ticker = "BTCUSDT"
test_source: SourceType = "BINANCE-FUT"
test_frequency: Frequency = "monthly"

print(f"▶️  Запускаем тест: скачивание {test_frequency} архива для {test_ticker} ({test_source}) за {test_date.strftime('%Y-%m')}...")
print("Это может занять несколько минут...")

# --- 1. Определяем callback-функцию для отображения прогресса ---
def report_progress(rows_count: int):
    # Используем \r и end='' для обновления строки на месте, создавая эффект "живого" счетчика
    # Выводим в мега-строках (M) для лучшей читаемости
    print(f"  ⏳ Обработано строк: {rows_count / 1_000_000:.2f} M", end='\r')

# --- 2. Вызов основной функции с callback ---
data_df = None
try:
    data_df = download_and_process_ticks_to_df(
        trade_date=test_date,
        ticker=test_ticker,
        source=test_source,
        frequency=test_frequency,
        progress_callback=report_progress # <-- Передаем нашу функцию
    )
    
    # После завершения цикла выводим итоговое сообщение на новой строке
    print("\n\n✅  Успешно! Данные скачаны и обработаны.")
    
except Exception as e:
    print(f"\n❌  Произошла ошибка при выполнении:")
    print(f"Тип ошибки: {type(e).__name__}")
    print(f"Детали: {e}")

# --- 3. Отображение статистики (если данные получены) ---
if data_df is not None and not data_df.empty:
    print(f"\n--- Итоговая информация ---")
    print(f"Форма DataFrame (shape): {data_df.shape[0]:,} строк, {data_df.shape[1]} колонок")
    
    print("\n--- Первые 5 строк (head) ---")
    display(data_df.head())
    
    print("\n--- Последние 5 строк (tail) ---")
    display(data_df.tail())

    total_ticks = data_df['Ticks'].sum()
    print(f"\nПроверка: общее количество тиков в месяце: {total_ticks:,}")

▶️  Запускаем тест: скачивание monthly архива для BTCUSDT (BINANCE-FUT) за 2023-10...
Это может занять несколько минут...
  ⏳ Обработано строк: 113.69 M

✅  Успешно! Данные скачаны и обработаны.

--- Итоговая информация ---
Форма DataFrame (shape): 535,668 строк, 7 колонок

--- Первые 5 строк (head) ---


Unnamed: 0,Timestamp,H,L,V,Ticks,Source,Ticker
0,2023-10-01 00:00:05,26951.0,26950.9,71931,41,BINANCE-FUT,BTCUSDT
1,2023-10-01 00:00:10,26951.0,26946.4,754970,341,BINANCE-FUT,BTCUSDT
2,2023-10-01 00:00:15,26946.5,26946.4,150765,49,BINANCE-FUT,BTCUSDT
3,2023-10-01 00:00:20,26946.5,26946.4,97977,47,BINANCE-FUT,BTCUSDT
4,2023-10-01 00:00:25,26946.5,26946.4,21045,27,BINANCE-FUT,BTCUSDT



--- Последние 5 строк (tail) ---


Unnamed: 0,Timestamp,H,L,V,Ticks,Source,Ticker
535663,2023-10-31 23:59:40,34662.6,34662.5,74767,34,BINANCE-FUT,BTCUSDT
535664,2023-10-31 23:59:45,34662.6,34656.7,201926,216,BINANCE-FUT,BTCUSDT
535665,2023-10-31 23:59:50,34660.7,34660.6,62319,39,BINANCE-FUT,BTCUSDT
535666,2023-10-31 23:59:55,34660.7,34656.2,29459,104,BINANCE-FUT,BTCUSDT
535667,2023-11-01 00:00:00,34656.3,34651.3,217684,197,BINANCE-FUT,BTCUSDT



Проверка: общее количество тиков в месяце: 113,686,101


In [1]:
import sys
from datetime import date

try:
    # Импортируем обновленную функцию
    from utils.discovery import get_ticker_files, SourceType, Frequency
    print("✅  Модуль 'discovery' успешно импортирован.")
except ImportError:
    print("❌  Ошибка импорта.")
    sys.exit()

ticker_to_check = "BTCUSDT"
source_to_check: SourceType = "BINANCE-FUT"

# --- Тест 1: Получение ДНЕВНЫХ файлов (как раньше) ---
print(f"\n--- Получение DAILY файлов для {ticker_to_check} ---")
try:
    daily_files = get_ticker_files(
        source=source_to_check,
        ticker=ticker_to_check,
        frequency="daily"
    )
    print(f"✅  Найдено {len(daily_files)} дневных файлов.")
    if daily_files:
        first_day = sorted(daily_files.keys())[0]
        print(f"  Пример: {first_day} -> {daily_files[first_day]:,} байт")
except Exception as e:
    print(f"❌  Произошла ошибка: {e}")


# --- Тест 2: Получение МЕСЯЧНЫХ файлов ---
print(f"\n--- Получение MONTHLY файлов для {ticker_to_check} ---")
try:
    def report_progress(count: int):
        print(f"  ⏳ Найдено файлов: {count}", end='\r')

    monthly_files = get_ticker_files(
        source=source_to_check,
        ticker=ticker_to_check,
        frequency="monthly",
        progress_callback=report_progress
    )
    
    print(f"\n✅  Готово! Всего найдено {len(monthly_files)} месячных файлов.")
    if monthly_files:
        print("\n--- Все найденные месячные файлы ---")
        for month_date, size in sorted(monthly_files.items()):
            # Дата будет первым днем месяца
            print(f"  Дата: {month_date} (Месяц: {month_date.strftime('%Y-%m')}), Размер: {size:,} байт")

except Exception as e:
    print(f"\n❌  Произошла ошибка: {e}")

✅  Модуль 'discovery' успешно импортирован.

--- Получение DAILY файлов для BTCUSDT ---
✅  Найдено 2148 дневных файлов.
  Пример: 2019-09-08 -> 55,253 байт

--- Получение MONTHLY файлов для BTCUSDT ---
  ⏳ Найдено файлов: 70
✅  Готово! Всего найдено 70 месячных файлов.

--- Все найденные месячные файлы ---
  Дата: 2019-09-01 (Месяц: 2019-09), Размер: 14,243,543 байт
  Дата: 2019-10-01 (Месяц: 2019-10), Размер: 53,264,789 байт
  Дата: 2019-11-01 (Месяц: 2019-11), Размер: 119,661,317 байт
  Дата: 2019-12-01 (Месяц: 2019-12), Размер: 99,771,382 байт
  Дата: 2020-01-01 (Месяц: 2020-01), Размер: 128,838,558 байт
  Дата: 2020-02-01 (Месяц: 2020-02), Размер: 119,153,913 байт
  Дата: 2020-03-01 (Месяц: 2020-03), Размер: 298,086,305 байт
  Дата: 2020-04-01 (Месяц: 2020-04), Размер: 317,101,072 байт
  Дата: 2020-05-01 (Месяц: 2020-05), Размер: 370,923,781 байт
  Дата: 2020-06-01 (Месяц: 2020-06), Размер: 196,314,591 байт
  Дата: 2020-07-01 (Месяц: 2020-07), Размер: 169,202,475 байт
  Дата: 2020-

In [1]:
import sys
from datetime import date
from IPython.display import display

try:
    # Импортируем обе функции из нашего модуля
    from utils.discovery import get_available_tickers, get_ticker_daily_files, SourceType
    print("✅  Модуль 'discovery' успешно импортирован.")
except ImportError:
    print("❌  Ошибка импорта. Проверьте путь к utils.discovery.")
    sys.exit()

# --- Тест новой функции get_ticker_daily_files ---
ticker_to_check = "BTCUSDT"
source_to_check: SourceType = "BINANCE-SPOT"

print(f"\n--- Получение списка дневных файлов для {ticker_to_check} ({source_to_check}) ---")

try:
    # Определяем callback-функцию для прогресса
    def report_progress(count: int):
        print(f"  ⏳ Найдено файлов: {count}", end='\r')

    # Вызываем новую функцию
    daily_files = get_ticker_daily_files(
        source=source_to_check,
        ticker=ticker_to_check,
        progress_callback=report_progress
    )

    print(f"\n✅  Готово! Всего найдено {len(daily_files)} дневных файлов.")

    if daily_files:
        # Сортируем даты для красивого вывода
        sorted_dates = sorted(daily_files.keys())
        
        print("\n--- Первые 5 найденных файлов ---")
        for d in sorted_dates[:5]:
            print(f"  Дата: {d}, Размер: {daily_files[d]:,} байт")
            
        print("\n--- Последние 5 найденных файлов ---")
        for d in sorted_dates[-5:]:
            print(f"  Дата: {d}, Размер: {daily_files[d]:,} байт")

except Exception as e:
    print(f"\n❌  Произошла ошибка: {e}")

✅  Модуль 'discovery' успешно импортирован.

--- Получение списка дневных файлов для BTCUSDT (BINANCE-SPOT) ---
  ⏳ Найдено файлов: 2900
✅  Готово! Всего найдено 2900 дневных файлов.

--- Первые 5 найденных файлов ---
  Дата: 2017-08-17, Размер: 63,039 байт
  Дата: 2017-08-18, Размер: 95,776 байт
  Дата: 2017-08-19, Размер: 40,219 байт
  Дата: 2017-08-20, Размер: 43,413 байт
  Дата: 2017-08-21, Размер: 70,078 байт

--- Последние 5 найденных файлов ---
  Дата: 2025-07-21, Размер: 23,233,971 байт
  Дата: 2025-07-22, Размер: 25,369,818 байт
  Дата: 2025-07-23, Размер: 19,131,415 байт
  Дата: 2025-07-24, Размер: 19,069,511 байт
  Дата: 2025-07-25, Размер: 29,753,237 байт


In [2]:
import sys
from IPython.display import display

try:
    # Import the updated function and types
    from utils.discovery import get_available_tickers, SourceType, Frequency
    print("✅  Module 'discovery' successfully imported.")
except ImportError:
    print("❌  Import Error. Please check the path to utils.discovery.")
    sys.exit()

source_to_check: SourceType = "BINANCE-SPOT"

# --- Test 1: Get tickers for DAILY frequency ---
print(f"\n--- Test 1: Discovering tickers for DAILY data ({source_to_check}) ---")
try:
    def report_daily_progress(count: int):
        print(f"  ⏳ Found daily tickers: {count}", end='\r')

    daily_tickers = get_available_tickers(
        source=source_to_check,
        frequency="daily",
        ticker_filter=None, # Get all tickers to compare fairly
        progress_callback=report_daily_progress
    )
    print(f"\n✅  Done! Found {len(daily_tickers)} tickers with DAILY data.")
    print("Sample:", sorted(list(daily_tickers))[:5])
    
except Exception as e:
    print(f"\n❌  An error occurred: {e}")
    daily_tickers = set() # Initialize as empty set on error to allow comparison later


# --- Test 2: Get tickers for MONTHLY frequency ---
print(f"\n--- Test 2: Discovering tickers for MONTHLY data ({source_to_check}) ---")
try:
    def report_monthly_progress(count: int):
        print(f"  ⏳ Found monthly tickers: {count}", end='\r')

    monthly_tickers = get_available_tickers(
        source=source_to_check,
        frequency="monthly",
        ticker_filter=None, # Get all tickers
        progress_callback=report_monthly_progress
    )
    print(f"\n✅  Done! Found {len(monthly_tickers)} tickers with MONTHLY data.")
    print("Sample:", sorted(list(monthly_tickers))[:5])

except Exception as e:
    print(f"\n❌  An error occurred: {e}")
    monthly_tickers = set()


# --- Test 3: Compare the two sets of tickers ---
print("\n--- Test 3: Comparing DAILY vs MONTHLY ticker sets ---")

if daily_tickers and monthly_tickers:
    # Check if the sets are identical
    if daily_tickers == monthly_tickers:
        print("\n✅  The set of tickers for daily and monthly data is IDENTICAL.")
    else:
        print("\n⚠️  The sets of tickers are DIFFERENT.")

        # Find tickers that only have daily data
        only_daily = daily_tickers - monthly_tickers
        if only_daily:
            print(f"\n  - Found {len(only_daily)} tickers that ONLY have DAILY data.")
            display(f"Sample: {sorted(list(only_daily))[:5]}")

        # Find tickers that only have monthly data
        only_monthly = monthly_tickers - daily_tickers
        if only_monthly:
            print(f"\n  - Found {len(only_monthly)} tickers that ONLY have MONTHLY data.")
            display(f"Sample: {sorted(list(only_monthly))[:5]}")
else:
    print("\nCould not perform comparison due to an error in one of the previous steps.")

✅  Module 'discovery' successfully imported.

--- Test 1: Discovering tickers for DAILY data (BINANCE-SPOT) ---
  ⏳ Found daily tickers: 3205
✅  Done! Found 3205 tickers with DAILY data.
Sample: ['1000CATBNB', '1000CATFDUSD', '1000CATTRY', '1000CATUSDC', '1000CATUSDT']

--- Test 2: Discovering tickers for MONTHLY data (BINANCE-SPOT) ---
  ⏳ Found monthly tickers: 3182
✅  Done! Found 3182 tickers with MONTHLY data.
Sample: ['1000CATBNB', '1000CATFDUSD', '1000CATTRY', '1000CATUSDC', '1000CATUSDT']

--- Test 3: Comparing DAILY vs MONTHLY ticker sets ---

⚠️  The sets of tickers are DIFFERENT.

  - Found 23 tickers that ONLY have DAILY data.


"Sample: ['AXSUSDC', 'CBNB', 'CFDUSD', 'COMPUSDC', 'CTRY']"

In [3]:
import sys
import requests
from IPython.display import display

try:
    # Импортируем наш новый декоратор
    from utils.decorators import retry_on_io_error
    print("✅  Декоратор 'retry_on_io_error' успешно импортирован.")
except ImportError:
    print("❌  Ошибка импорта. Убедитесь, что существует файл utils/decorators.py.")
    sys.exit()

# --- Тест 1: Демонстрация успешного повтора ---
print("\n--- Тест 1: Функция, которая срабатывает на 3-й попытке ---")

# Используем nonlocal для изменения переменной из внешней области видимости
call_count_success = 0
@retry_on_io_error(retries=4, delay=2)
def mock_flaky_function():
    """Эта функция падает 2 раза, а на 3-й срабатывает."""
    global call_count_success
    call_count_success += 1
    print(f"Вызов mock_flaky_function, попытка №{call_count_success}...")
    if call_count_success < 3:
        raise requests.exceptions.ConnectionError("Симуляция обрыва сети")
    return f"Успех на попытке №{call_count_success}!"

try:
    result = mock_flaky_function()
    print(f"\n✅  Итоговый результат: {result}")
except Exception as e:
    print(f"\n❌  Неожиданная ошибка: {e}")


# --- Тест 2: Демонстрация полного провала и re-raise ---
print("\n\n--- Тест 2: Функция, которая падает всегда ---")

call_count_fail = 0
@retry_on_io_error(retries=3, delay=1)
def mock_always_failing_function():
    """Эта функция падает всегда."""
    global call_count_fail
    call_count_fail += 1
    print(f"Вызов mock_always_failing_function, попытка №{call_count_fail}...")
    raise requests.exceptions.Timeout("Сервер не отвечает (симуляция)")

try:
    mock_always_failing_function()
except requests.exceptions.Timeout as e:
    print(f"\n✅  Успешно перехвачена последняя ошибка, как и ожидалось:")
    print(f"   Тип: {type(e).__name__}, Сообщение: {e}")

✅  Декоратор 'retry_on_io_error' успешно импортирован.

--- Тест 1: Функция, которая срабатывает на 3-й попытке ---
Вызов mock_flaky_function, попытка №1...
Вызов mock_flaky_function, попытка №2...
Вызов mock_flaky_function, попытка №3...

✅  Итоговый результат: Успех на попытке №3!


--- Тест 2: Функция, которая падает всегда ---
Вызов mock_always_failing_function, попытка №1...
Вызов mock_always_failing_function, попытка №2...
Вызов mock_always_failing_function, попытка №3...
Error: All 3 retries for 'mock_always_failing_function' failed.

✅  Успешно перехвачена последняя ошибка, как и ожидалось:
   Тип: Timeout, Сообщение: Сервер не отвечает (симуляция)


In [1]:
import logging
import sys
import datetime
from clickhouse_driver import Client

# --- Настройка ---
# Убедитесь, что Jupyter может найти ваши модули.
# Раскомментируйте и настройте путь, если ваш ноутбук находится не в корне проекта.
# sys.path.append('../') 

try:
    # Импортируем "внутреннюю" функцию, которую хотим протестировать
    from sync_orchestrator import _monthly_bulk_load
    from utils.types import SourceType
    print("✅  Модули успешно импортированы.")
except ImportError as e:
    print(f"❌ Ошибка импорта: {e}")
    print("   Убедитесь, что ноутбук запущен из корневой папки или sys.path настроен правильно.")
    sys.exit() # Прерываем выполнение, если импорт не удался

# --- Конфигурация теста ---
CLICKHOUSE_HOST = '172.16.0.9'
TICKER_TO_TEST = "BTCUSDT"
SOURCE_TO_TEST: SourceType = "BINANCE-FUT"

DATA_TABLE = 'hl5s_test'
CONTROL_TABLE = 'hl5s_daily_downloaded'

# Настраиваем логирование прямо в ячейке для наглядности
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
    stream=sys.stdout  # Выводим логи в вывод ячейки
)


# --- 1. Подключение к ClickHouse и очистка перед тестом ---
client = None
try:
    logging.info(f"Подключаемся к ClickHouse по адресу: {CLICKHOUSE_HOST}...")
    client = Client(CLICKHOUSE_HOST, settings={'use_numpy': True}, password='123' )
    
    logging.warning(f"--- 1. ОЧИСТКА ПЕРЕД ТЕСТОМ ---")
    logging.warning(f"Удаляем все предыдущие записи для {TICKER_TO_TEST} ({SOURCE_TO_TEST})...")
    
    params = {'source': SOURCE_TO_TEST, 'ticker': TICKER_TO_TEST}
    
    # Очищаем основную таблицу данных
    client.execute(f"ALTER TABLE {DATA_TABLE} DELETE WHERE Source = %(source)s AND Ticker = %(ticker)s", params)
    logging.info(f"Таблица '{DATA_TABLE}' очищена.")
    
    # Очищаем контрольную таблицу
    client.execute(f"ALTER TABLE {CONTROL_TABLE} DELETE WHERE Source = %(source)s AND Ticker = %(ticker)s", params)
    logging.info(f"Таблица '{CONTROL_TABLE}' очищена.")
    
    logging.warning("--- Очистка завершена. Начинаем тест. ---")

except Exception as e:
    logging.critical(f"Ошибка на этапе подготовки (подключение или очистка): {e}")
    # Если не удалось даже подготовиться, прерываем выполнение
    client = None


# --- 2. Запуск основной функции _monthly_bulk_load ---
if client:
    try:
        logging.info(f"\n--- 2. ЗАПУСК _monthly_bulk_load для {TICKER_TO_TEST} ---")
        logging.info("Этот процесс может занять значительное время...")
        
        # Вызываем функцию, которую тестируем
        _monthly_bulk_load(
            source=SOURCE_TO_TEST,
            ticker=TICKER_TO_TEST,
            client=client
        )
        
        logging.info(f"--- Тест _monthly_bulk_load для {TICKER_TO_TEST} УСПЕШНО ЗАВЕРШЕН ---")

    except Exception as e:
        logging.critical(f"--- ТЕСТ ПРЕРВАН С ОШИБКОЙ ---")
        logging.critical(f"Ошибка во время выполнения _monthly_bulk_load: {e}", exc_info=True)


# --- 3. Верификация результата ---
if client:
    try:
        logging.info("\n--- 3. ВЕРИФИКАЦИЯ РЕЗУЛЬТАТА ---")
        params = {'source': SOURCE_TO_TEST, 'ticker': TICKER_TO_TEST}

        # Проверяем, сколько строк вставилось в основную таблицу
        count_data = client.execute(f"SELECT count() FROM {DATA_TABLE} WHERE Source = %(source)s AND Ticker = %(ticker)s", params)[0][0]
        logging.info(f"Итого строк в '{DATA_TABLE}' для {TICKER_TO_TEST}: {count_data:,}")

        # Проверяем, сколько строк вставилось в контрольную таблицу
        count_control = client.execute(f"SELECT count() FROM {CONTROL_TABLE} WHERE Source = %(source)s AND Ticker = %(ticker)s", params)[0][0]
        logging.info(f"Итого строк в '{CONTROL_TABLE}' для {TICKER_TO_TEST}: {count_control:,} (количество дней)")

    except Exception as e:
        logging.error(f"Ошибка на этапе верификации: {e}")
        


2025-07-27 03:18:16 - INFO - Подключаемся к ClickHouse по адресу: 172.16.0.9...
2025-07-27 03:18:16 - INFO - Таблица 'hl5s_test' очищена.
2025-07-27 03:18:16 - INFO - Таблица 'hl5s_daily_downloaded' очищена.
2025-07-27 03:18:16 - INFO - 
--- 2. ЗАПУСК _monthly_bulk_load для BTCUSDT ---
2025-07-27 03:18:16 - INFO - Этот процесс может занять значительное время...
2025-07-27 03:18:16 - INFO - [BTCUSDT] Начальная загрузка: ищем месячные архивы...


✅  Модули успешно импортированы.


2025-07-27 03:18:18 - INFO - [BTCUSDT] Найдено 70 месячных архивов. Начинаем обработку...
2025-07-27 03:18:18 - INFO - [BTCUSDT] Обрабатываем месяц 2019-09 (размер: 14.24 MB)...
2025-07-27 03:18:18 - INFO - [BTCUSDT] Скачивание основного файла за 2019-09...
2025-07-27 03:18:21 - INFO - [BTCUSDT] Основной файл за 2019-09 успешно скачан (14.24 MB).
2025-07-27 03:18:21 - INFO - [BTCUSDT] Скачивание файла контрольной суммы за 2019-09...
2025-07-27 03:18:22 - INFO - [BTCUSDT] Файл контрольной суммы успешно скачан.
2025-07-27 03:18:22 - INFO - [BTCUSDT] Начало проверки SHA-256 для файла за 2019-09...
2025-07-27 03:18:22 - INFO - [BTCUSDT] Контрольная сумма совпала. Данные целостны.
2025-07-27 03:18:22 - INFO - [BTCUSDT] Начало обработки данных из файла за 2019-09...
2025-07-27 03:18:23 - INFO - [BTCUSDT] Данные из файла за 2019-09 успешно обработаны.
2025-07-27 03:18:23 - INFO - [BTCUSDT] Диапазон времени для вставки: с 2019-09-08 17:57:55 по 2019-10-01 00:00:00
2025-07-27 03:18:23 - INFO - 

KeyboardInterrupt: 

2025-07-27 05:05:44 - INFO - --- Начинаем синхронизацию для BTCUSDT (BINANCE-FUT) ---
2025-07-27 05:05:44 - INFO - [BTCUSDT][BINANCE-FUT] Записи не найдены. Запускаем начальную массовую загрузку (monthly)...
2025-07-27 05:05:44 - INFO - [BTCUSDT][BINANCE-FUT] Начальная загрузка: ищем месячные архивы...


2025-07-27 05:05:46 - INFO - [BTCUSDT][BINANCE-FUT] Найдено 70 месячных архивов в диапазоне с 2019-09 по 2025-06. Начинаем обработку...
2025-07-27 05:05:46 - INFO - [BTCUSDT][BINANCE-FUT] Обрабатываем месяц 2019-09 (размер: 14.24 MB)...
2025-07-27 05:05:46 - INFO - [BTCUSDT][BINANCE-FUT] Скачивание основного файла за 2019-09...
2025-07-27 05:05:49 - INFO - [BTCUSDT][BINANCE-FUT] Основной файл за 2019-09 успешно скачан (14.24 MB).
2025-07-27 05:05:49 - INFO - [BTCUSDT][BINANCE-FUT] Скачивание файла контрольной суммы за 2019-09...
2025-07-27 05:05:50 - INFO - [BTCUSDT][BINANCE-FUT] Файл контрольной суммы успешно скачан.
2025-07-27 05:05:50 - INFO - [BTCUSDT][BINANCE-FUT] Начало проверки SHA-256 для файла за 2019-09...
2025-07-27 05:05:50 - INFO - [BTCUSDT][BINANCE-FUT] Контрольная сумма совпала. Данные целостны.
2025-07-27 05:05:50 - INFO - [BTCUSDT][BINANCE-FUT] Начало обработки данных из файла за 2019-09...
2025-07-27 05:05:52 - INFO - [BTCUSDT][BINANCE-FUT] Данные из файла за 2019-09 

In [None]:
import logging
import sys
import datetime
import pandas as pd
from IPython.display import display
from clickhouse_driver import Client

# --- Настройка ---
try:
    # Импортируем "внутреннюю" функцию, которую хотим протестировать
    from sync_orchestrator import _daily_incremental_sync
    from utils.types import SourceType
    print("✅  Модули успешно импортированы.")
except ImportError as e:
    print(f"❌ Ошибка импорта: {e}")
    sys.exit()

# --- Конфигурация теста ---



DATA_TABLE = 'hl5s_test'
CONTROL_TABLE = 'hl5s_daily_downloaded'

# Настраиваем логирование
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stdout)


# --- 1. Подключение к ClickHouse и подготовка данных ---
client = None
try:
    logging.info(f"Подключаемся к ClickHouse по адресу: {CLICKHOUSE_HOST}...")
    client = Client(CLICKHOUSE_HOST, settings={'use_numpy': True}, password = '123')
    
    
    logging.warning("--- Начинаем тест инкрементальной загрузки. ---")

except Exception as e:
    logging.critical(f"Ошибка на этапе подготовки: {e}")
    client = None


# --- 2. Запуск основной функции _daily_incremental_sync ---
if client:
    try:
        logging.info(f"\n--- 2. ЗАПУСК _daily_incremental_sync для {TICKER_TO_TEST} ---")
        
        # Вызываем функцию, которую тестируем
        _daily_incremental_sync(
            source=SOURCE_TO_TEST,
            ticker=TICKER_TO_TEST,
            client=client
        )
        
        logging.info(f"--- Тест _daily_incremental_sync для {TICKER_TO_TEST} УСПЕШНО ЗАВЕРШЕН ---")

    except Exception as e:
        logging.critical(f"--- ТЕСТ ПРЕРВАН С ОШИБКОЙ ---")
        logging.critical(f"Ошибка во время выполнения _daily_incremental_sync: {e}", exc_info=True)


# --- 3. Верификация результата ---
if client:
    try:
        logging.info("\n--- 3. ВЕРИФИКАЦИЯ РЕЗУЛЬТАТА ---")
        params = {'source': SOURCE_TO_TEST, 'ticker': TICKER_TO_TEST, 'dates': dates_to_remove}

        # Проверяем, что записи для "удаленных" дат снова появились в контрольной таблице
        count_control = client.execute(f"SELECT count() FROM {CONTROL_TABLE} WHERE Source = %(source)s AND Ticker = %(ticker)s AND Date IN %(dates)s", params)[0][0]
        
        if count_control == DAYS_TO_REMOVE:
             logging.info(f"✅  Проверка пройдена: В '{CONTROL_TABLE}' снова появились {count_control} записи за недостающие дни.")
        else:
             logging.error(f"❌  ПРОВЕРКА НЕ ПРОЙДЕНА: Ожидалось {DAYS_TO_REMOVE} записей, а найдено {count_control}.")

        # Проверяем, что данные для "удаленных" дат снова появились в основной таблице
        count_data = client.execute(f"SELECT count() FROM {DATA_TABLE} WHERE Source = %(source)s AND Ticker = %(ticker)s AND toDate(Timestamp) IN %(dates)s", params)[0][0]
        
        if count_data > 0:
            logging.info(f"✅  Проверка пройдена: В '{DATA_TABLE}' появились данные за недостающие дни ({count_data:,} строк).")
        else:
            logging.error(f"❌  ПРОВЕРКА НЕ ПРОЙДЕНА: Данные в '{DATA_TABLE}' за недостающие дни не были вставлены.")

    except Exception as e:
        logging.error(f"Ошибка на этапе верификации: {e}")

2025-07-27 04:18:31 - INFO - Подключаемся к ClickHouse по адресу: 172.16.0.9...
2025-07-27 04:18:31 - INFO - 
--- 2. ЗАПУСК _daily_incremental_sync для BTCUSDT ---
2025-07-27 04:18:31 - INFO - [BTCUSDT] Инкрементальное обновление: ищем доступные дневные файлы...


✅  Модули успешно импортированы.


2025-07-27 04:18:41 - INFO - [BTCUSDT] Требуется скачать 1647 новых дневных файлов...
2025-07-27 04:18:41 - INFO - [BTCUSDT] Обрабатываем день 2021-01-21...
2025-07-27 04:18:41 - INFO - [BTCUSDT] Скачивание основного файла за 2021-01-21...
2025-07-27 04:18:46 - INFO - [BTCUSDT] Основной файл за 2021-01-21 успешно скачан (47.39 MB).
2025-07-27 04:18:46 - INFO - [BTCUSDT] Скачивание файла контрольной суммы за 2021-01-21...
2025-07-27 04:18:47 - INFO - [BTCUSDT] Файл контрольной суммы успешно скачан.
2025-07-27 04:18:47 - INFO - [BTCUSDT] Начало проверки SHA-256 для файла за 2021-01-21...
2025-07-27 04:18:47 - INFO - [BTCUSDT] Контрольная сумма совпала. Данные целостны.
2025-07-27 04:18:47 - INFO - [BTCUSDT] Начало обработки данных из файла за 2021-01-21...


KeyboardInterrupt: 

In [1]:
from clickhouse_driver import Client
CLICKHOUSE_HOST = '172.16.0.9'
client = Client(CLICKHOUSE_HOST, settings={'use_numpy': True}, password = '123')

DATA_TABLE = 'hl5s_test'
CONTROL_TABLE = 'hl5s_daily_downloaded'

In [2]:

client.execute(f'TRUNCATE TABLE {DATA_TABLE}')
client.execute(f'TRUNCATE TABLE {CONTROL_TABLE}')

[]

In [None]:
#ONE-THREAD!
SOURCE_TO_SYNC='BINANCE-FUT'
from utils.discovery import get_available_tickers, SourceType, Frequency

from sync_orchestrator import sync_ticker_data
from utils.types import SourceType

import logging
import sys
from tqdm import tqdm


# Настраиваем логирование
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', 
                    #stream=sys.stdout,
                    filename='./sync_process.log',
                    filemode='a'
)

daily_tickers = get_available_tickers(
    source=SOURCE_TO_SYNC,
        frequency="daily",
        ticker_filter=lambda ticker: ticker.endswith( ('USDT','USDC') ),
        progress_callback=None
    )

for ticker in tqdm(daily_tickers):
    sync_ticker_data(SOURCE_TO_SYNC, ticker, DATA_TABLE, CONTROL_TABLE, client)




  4%|▍         | 25/561 [6:13:27<133:27:02, 896.31s/it] 


KeyboardInterrupt: 

In [None]:
#MULTI-THREAD
import os
from multiprocessing import Pool
from tqdm.notebook import tqdm


from utils.discovery import get_available_tickers
from sync_orchestrator import run_sync_in_process, ENV_CH_HOST, ENV_CH_PASSWORD, ENV_LOG_LEVEL


SOURCE_TO_SYNC='BINANCE-FUT'
DATA_TABLE_NAME = 'hl5s_test'
CONTROL_TABLE_NAME = 'hl5s_daily_downloaded'
NUM_PROCESSES = 3
DRY_RUN_MODE = True

LOG_LEVEL = 'INFO'
CLICKHOUSE_HOST = '172.16.0.9'
CLICKHOUSE_PASSWORD = '123'

os.environ[ENV_CH_HOST] = CLICKHOUSE_HOST
os.environ[ENV_LOG_LEVEL] = LOG_LEVEL
os.environ[ENV_CH_PASSWORD] = CLICKHOUSE_PASSWORD

ticker_filter = lambda ticker: ticker.endswith( ('USDT','USDC') )

tickers = get_available_tickers(
    source=SOURCE_TO_SYNC,
        frequency="daily",
        ticker_filter = ticker_filter,
        progress_callback=None
    )
params = {
    "source": SOURCE_TO_SYNC,
    "data_table": DATA_TABLE_NAME,
    "control_table": CONTROL_TABLE_NAME,
    "dry_run": DRY_RUN_MODE
}

tasks = [  {**params, "ticker": _}  for _ in tickers ]
print(f"Найдено {len(tasks)} тикеров. Запускаем параллельную обработку на {NUM_PROCESSES} процессах...")
print(f"Режим 'Dry Run': {DRY_RUN_MODE}")

with Pool(processes=NUM_PROCESSES) as pool:
    results = list(
        tqdm(pool.imap_unordered(run_sync_in_process, tasks), total=len(tasks))
    )

print("\n--- Обработка завершена. Итоги: ---")
success_count = sum(1 for r in results if r['status'] == "OK")
fail_count = len(results) - success_count
print(f"Успешно: {success_count}, С ошибками: {fail_count}")

if fail_count > 0:
    print("\nТикеры с ошибками:")
    for res in results:
        if res['status'] == "FAILED":
            print(f" - {res['ticker']}: {res['error']}")

Найдено 561 тикеров. Запускаем параллельную обработку на 3 процессах...
Режим 'Dry Run': True


  0%|          | 0/561 [00:00<?, ?it/s]

Process ForkPoolWorker-6:
Process ForkPoolWorker-5:


KeyboardInterrupt: 