In [6]:
import os
from clickhouse_driver import Client
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, date

# Параметры подключения
clickhouse_host = os.getenv('CLICKHOUSE_HOST', '10.95.19.132')
clickhouse_user = os.getenv('CLICKHOUSE_USER', 'default')
clickhouse_password = os.getenv('CLICKHOUSE_PASSWORD', 'quie1ahpoo5Su0wohpaedae8keeph6bi')
database_name = os.getenv('CLICKHOUSE_DB', 'default')

# Установить количество дней для расчёта
number_of_days = 2

# Подключение к ClickHouse
client = Client(
    host=clickhouse_host,
    user=clickhouse_user,
    password=clickhouse_password,
    database=database_name
)

# Шаг 0: Создание структуры промежуточной таблицы
def initialize_intermediate_table():
    columns = [
        "serialno", "Status_prev", "Status_P_prev", "sne_prev", "ppr_prev", "repair_days_prev",
        "Status", "Status_P", "sne", "ppr", "repair_days",
        "daily_flight_hours_prev", "ll_prev", "oh_prev", "BR_prev", "RepairTime_prev", "ac_typ_prev", 
        "mi8t_count_prev", "mi17_count_prev",
        "balance_total", "balance_mi8t", "balance_mi17", "balance_empty",
        "stock_mi8t", "stock_mi17", "stock_empty", "stock_total"
    ]
    return pd.DataFrame(columns=columns)

# Промежуточная таблица в памяти
intermediate_table = initialize_intermediate_table()

# Шаг 1: Определение первых дат
current_date_query = "SELECT MIN(Dates) FROM OlapCube_VNV"
result = client.execute(current_date_query)
if not result or not result[0][0]:
    raise ValueError("Нет данных в OlapCube_VNV.")
target_date_prev = result[0][0]

for day in range(number_of_days):
    target_date = target_date_prev + timedelta(days=1)
    print(f"Обработка данных для дня {day + 1}, target_date-1: {target_date_prev}, target_date: {target_date}")
    
    # Шаг 2: Загрузка данных из ClickHouse
    query_prev = f"""
        SELECT 
            serialno, 
            Status AS Status_prev, 
            Status_P AS Status_P_prev, 
            sne AS sne_prev, 
            ppr AS ppr_prev, 
            repair_days AS repair_days_prev,
            daily_flight_hours AS daily_flight_hours_prev,
            ll AS ll_prev, 
            oh AS oh_prev, 
            BR AS BR_prev, 
            RepairTime AS RepairTime_prev, 
            ac_typ AS ac_typ_prev, 
            mi8t_count AS mi8t_count_prev, 
            mi17_count AS mi17_count_prev
        FROM OlapCube_VNV
        WHERE Dates = '{target_date_prev.strftime('%Y-%m-%d')}'
    """
    data_prev = client.execute(query_prev)
    print(f"Количество записей для target_date_prev ({target_date_prev}): {len(data_prev)}")
    
    query_target = f"""
        SELECT 
            serialno, 
            daily_flight_hours AS daily_flight_hours_current, 
            ll AS ll_current, 
            oh AS oh_current, 
            BR AS BR_current, 
            RepairTime AS RepairTime_current, 
            ac_typ AS ac_typ_current, 
            mi8t_count AS mi8t_count_current, 
            mi17_count AS mi17_count_current
        FROM OlapCube_VNV
        WHERE Dates = '{target_date.strftime('%Y-%m-%d')}'
    """
    data_target = client.execute(query_target)
    print(f"Количество записей для target_date ({target_date}): {len(data_target)}")
    
    # Проверка наличия данных
    if not data_prev or not data_target:
        print(f"Нет данных для target_date-1: {target_date_prev} или target_date: {target_date}")
        target_date_prev = target_date
        continue
    
    # Создание промежуточной таблицы с суффиксами
    data_prev_df = pd.DataFrame(data_prev, columns=[
        "serialno", "Status_prev", "Status_P_prev", "sne_prev", "ppr_prev", "repair_days_prev",
        "daily_flight_hours_prev", "ll_prev", "oh_prev", "BR_prev", "RepairTime_prev", "ac_typ_prev", 
        "mi8t_count_prev", "mi17_count_prev"
    ])
    data_target_df = pd.DataFrame(data_target, columns=[
        "serialno", "daily_flight_hours_current", "ll_current", "oh_current", "BR_current", 
        "RepairTime_current", "ac_typ_current", 
        "mi8t_count_current", "mi17_count_current"
    ])
    intermediate_table = pd.merge(
        data_prev_df,
        data_target_df,
        on="serialno",
        how="outer"
    )
    
    # Проверка наличия необходимых столбцов и их создание при отсутствии
    required_columns = [
        "ll_prev", "oh_prev", "BR_prev", "RepairTime_prev",
        "ll_current", "oh_current", "BR_current", "RepairTime_current"
    ]
    for column in required_columns:
        if column not in intermediate_table.columns:
            intermediate_table[column] = np.nan
    
    # **Преобразование и заполнение NaN значений**
    intermediate_table['repair_days_prev'] = pd.to_numeric(intermediate_table['repair_days_prev'], errors='coerce').fillna(0)
    intermediate_table['RepairTime_prev'] = pd.to_numeric(intermediate_table['RepairTime_prev'], errors='coerce').fillna(0)
    intermediate_table['sne_prev'] = pd.to_numeric(intermediate_table['sne_prev'], errors='coerce').fillna(0)
    intermediate_table['ppr_prev'] = pd.to_numeric(intermediate_table['ppr_prev'], errors='coerce').fillna(0)
    intermediate_table['ll_prev'] = pd.to_numeric(intermediate_table['ll_prev'], errors='coerce').fillna(0)
    intermediate_table['oh_prev'] = pd.to_numeric(intermediate_table['oh_prev'], errors='coerce').fillna(0)
    intermediate_table['BR_prev'] = pd.to_numeric(intermediate_table['BR_prev'], errors='coerce').fillna(0)
    
    # Шаг I: Определение Status_P
    intermediate_table['Status_P'] = None
    
    # Статус "Неактивно" и "Хранение" без условий (прямое копирование)
    intermediate_table.loc[intermediate_table['Status_prev'] == 'Неактивно', 'Status_P'] = 'Неактивно'
    intermediate_table.loc[intermediate_table['Status_prev'] == 'Хранение', 'Status_P'] = 'Хранение'
    
    # Статус "Исправен"
    intermediate_table.loc[intermediate_table['Status_prev'] == 'Исправен', 'Status_P'] = 'Исправен'
    
    # Статус "Ремонт"
    intermediate_table.loc[
        (intermediate_table['Status_prev'] == 'Ремонт') & 
        (intermediate_table['repair_days_prev'] < intermediate_table['RepairTime_prev']),
        'Status_P'
    ] = 'Ремонт'
    
    # Статус "Ремонт" -> "Исправен", если превышен RepairTime
    intermediate_table.loc[
        (intermediate_table['Status_prev'] == 'Ремонт') & 
        (intermediate_table['repair_days_prev'] >= intermediate_table['RepairTime_prev']),
        'Status_P'
    ] = 'Исправен'
    
    # Сложная логика для "Эксплуатация"
    # 1. Эксплуатация -> Эксплуатация
    intermediate_table.loc[
        (intermediate_table['Status_prev'] == 'Эксплуатация') &
        (intermediate_table['sne_prev'] < (intermediate_table['ll_prev'] - intermediate_table['daily_flight_hours_prev'])) &
        (intermediate_table['ppr_prev'] < (intermediate_table['oh_prev'] - intermediate_table['daily_flight_hours_prev'])),
        'Status_P'
    ] = 'Эксплуатация'
    
    # 2. Эксплуатация -> Хранение
    intermediate_table.loc[
        (intermediate_table['Status_prev'] == 'Эксплуатация') &
        (intermediate_table['sne_prev'] >= (intermediate_table['ll_prev'] - intermediate_table['daily_flight_hours_prev'])),
        'Status_P'
    ] = 'Хранение'
    
    # 3. Эксплуатация -> Ремонт или Хранение
    intermediate_table.loc[
        (intermediate_table['Status_prev'] == 'Эксплуатация') &
        (intermediate_table['ppr_prev'] >= (intermediate_table['oh_prev'] - intermediate_table['daily_flight_hours_prev'])),
        'Status_P'
    ] = intermediate_table.apply(
        lambda row: 'Ремонт' if row['sne_prev'] < row['BR_prev'] else 'Хранение',
        axis=1
    )
    
    # Шаг II: Расчёт балансов и запасов
    intermediate_table['balance_mi8t'] = (
        intermediate_table[
            (intermediate_table['Status_P'] == 'Эксплуатация') & 
            (intermediate_table['ac_typ_prev'] == 'Ми-8Т')
        ].groupby('ac_typ_prev')['serialno'].transform('count')
        - intermediate_table['mi8t_count_current']
    )
    intermediate_table['balance_mi17'] = (
        intermediate_table[
            (intermediate_table['Status_P'] == 'Эксплуатация') & 
            (intermediate_table['ac_typ_prev'] == 'Ми-17')
        ].groupby('ac_typ_prev')['serialno'].transform('count')
        - intermediate_table['mi17_count_current']
    )
    intermediate_table['balance_empty'] = (
        intermediate_table[
            (intermediate_table['Status_P'] == 'Эксплуатация') & 
            (intermediate_table['ac_typ_prev'].isnull())
        ].groupby('ac_typ_prev')['serialno'].transform('count')
    )
    intermediate_table['balance_total'] = (
        intermediate_table['balance_mi8t'] + 
        intermediate_table['balance_mi17'] + 
        intermediate_table['balance_empty']
    )
    intermediate_table['stock_mi8t'] = (
        intermediate_table[
            (intermediate_table['Status_P'] == 'Исправен') & 
            (intermediate_table['ac_typ_prev'] == 'Ми-8Т')
        ].groupby('ac_typ_prev')['serialno'].transform('count')
    )
    intermediate_table['stock_mi17'] = (
        intermediate_table[
            (intermediate_table['Status_P'] == 'Исправен') & 
            (intermediate_table['ac_typ_prev'] == 'Ми-17')
        ].groupby('ac_typ_prev')['serialno'].transform('count')
    )
    intermediate_table['stock_empty'] = (
        intermediate_table[
            (intermediate_table['Status_P'] == 'Исправен') & 
            (intermediate_table['ac_typ_prev'].isnull())
        ].groupby('ac_typ_prev')['serialno'].transform('count')
    )
    intermediate_table['stock_total'] = (
        intermediate_table['stock_mi8t'] + 
        intermediate_table['stock_mi17'] + 
        intermediate_table['stock_empty']
    )
    
    # Шаг III: Балансировка статусов
    def balance_status(intermediate_table):
        if intermediate_table['balance_total'].iloc[0] > 0:
            balance_to_fix = intermediate_table.loc[
                (intermediate_table['Status_P'] == 'Эксплуатация')
            ].head(int(intermediate_table['balance_total'].iloc[0])).index
            intermediate_table.loc[balance_to_fix, 'Status_P'] = 'Исправен'
        elif intermediate_table['balance_total'].iloc[0] < 0:
            balance_to_operate = intermediate_table.loc[
                (intermediate_table['Status_P'] == 'Исправен')
            ].head(int(abs(intermediate_table['balance_total'].iloc[0]))).index
            intermediate_table.loc[balance_to_operate, 'Status_P'] = 'Эксплуатация'
            remaining_balance = int(abs(intermediate_table['balance_total'].iloc[0])) - len(balance_to_operate)
            if remaining_balance > 0:
                balance_to_operate_inactive = intermediate_table.loc[
                    (intermediate_table['Status_P'] == 'Неактивно')
                ].head(int(remaining_balance)).index
                intermediate_table.loc[balance_to_operate_inactive, 'Status_P'] = 'Эксплуатация'
        return intermediate_table
    
    intermediate_table = balance_status(intermediate_table)
    
    # Шаг IV: Обновление счётчиков sne, ppr и repair_days
    def update_counters(intermediate_table):
        # Эксплуатация: увеличиваем sne и ppr
        intermediate_table.loc[
            intermediate_table['Status_P'] == 'Эксплуатация',
            ['sne', 'ppr']
        ] = intermediate_table.loc[
            intermediate_table['Status_P'] == 'Эксплуатация',
            ['sne_prev', 'ppr_prev']
        ] + intermediate_table['daily_flight_hours_prev']
    
        # Исправен: sne остаётся прежним, ppr зависит от предыдущего статуса
        intermediate_table.loc[
            intermediate_table['Status_P'] == 'Исправен',
            'sne'
        ] = intermediate_table.loc[
            intermediate_table['Status_P'] == 'Исправен',
            'sne_prev'
        ]
        intermediate_table.loc[
            (intermediate_table['Status_P'] == 'Исправен') & 
            (intermediate_table['Status_P_prev'] == 'Ремонт'),
            'ppr'
        ] = 0
        intermediate_table.loc[
            (intermediate_table['Status_P'] == 'Исправен') & 
            (intermediate_table['Status_P_prev'] != 'Ремонт'),
            'ppr'
        ] = intermediate_table.loc[
            (intermediate_table['Status_P'] == 'Исправен') & 
            (intermediate_table['Status_P_prev'] != 'Ремонт'),
            'ppr_prev'
        ]
    
        # Ремонт: значения sne и ppr остаются прежними
        intermediate_table.loc[
            intermediate_table['Status_P'] == 'Ремонт',
            ['sne', 'ppr']
        ] = intermediate_table.loc[
            intermediate_table['Status_P'] == 'Ремонт',
            ['sne_prev', 'ppr_prev']
        ]
    
        # Обновление repair_days
        intermediate_table.loc[
            (intermediate_table['Status_P'] == 'Ремонт') & 
            (intermediate_table['Status_P_prev'] == 'Эксплуатация'),
            'repair_days'
        ] = 1
        intermediate_table.loc[
            (intermediate_table['Status_P'] == 'Ремонт') & 
            (intermediate_table['Status_P_prev'] != 'Эксплуатация'),
            'repair_days'
        ] = intermediate_table.loc[
            (intermediate_table['Status_P'] == 'Ремонт') & 
            (intermediate_table['Status_P_prev'] != 'Эксплуатация'),
            'repair_days_prev'
        ] + 1
    
        # Хранение и Неактивно: значения sne и ppr остаются прежними
        intermediate_table.loc[
            intermediate_table['Status_P'].isin(['Хранение', 'Неактивно']),
            ['sne', 'ppr']
        ] = intermediate_table.loc[
            intermediate_table['Status_P'].isin(['Хранение', 'Неактивно']),
            ['sne_prev', 'ppr_prev']
        ]
        intermediate_table.loc[
            intermediate_table['Status_P'].isin(['Хранение', 'Неактивно']),
            'repair_days'
        ] = None
    
        return intermediate_table
    
    intermediate_table = update_counters(intermediate_table)
    
    # Шаг 3: Запись данных обратно в ClickHouse
    # **Удаление операции удаления**
    # delete_query = f"ALTER TABLE OlapCube_VNV DELETE WHERE Dates = '{target_date.strftime('%Y-%m-%d')}'"
    # client.execute(delete_query)
    
    # Убедитесь, что 'Status' установлен корректно
    intermediate_table['Status'] = intermediate_table['Status_prev']  # или используйте другую логику при необходимости
    
    # **Оптимизированная массовая вставка**
    # Корректируем порядок столбцов, чтобы он соответствовал INSERT запросу
    insert_data = intermediate_table[['serialno', 'Status', 'Status_P', 'sne', 'ppr', 'repair_days']].copy()
    insert_data['Dates'] = target_date  # Устанавливаем как объект date
    
    # Переставляем столбцы в нужном порядке
    insert_data = insert_data[['serialno', 'Dates', 'Status', 'Status_P', 'sne', 'ppr', 'repair_days']]
    
    # Добавление поля 'version'
    current_timestamp = int(datetime.now().timestamp())  # Используем текущую временную метку
    insert_data['version'] = current_timestamp  # Можно использовать другой механизм для генерации версии
    
    # Переставляем столбцы, включая 'version', если необходимо
    insert_data = insert_data[['serialno', 'Dates', 'Status', 'Status_P', 'sne', 'ppr', 'repair_days', 'version']]
    
    # Преобразование DataFrame в список кортежей
    data_tuples = list(insert_data.itertuples(index=False, name=None))
    
    # Проверка типов данных в столбце 'Dates'
    dates_types = insert_data['Dates'].apply(type).value_counts()
    print(f"Типы данных в столбце 'Dates':\n{dates_types}")
    
    # Убедимся, что все 'Dates' являются объектами типа date
    if dates_types.index[0] != date:
        print("Некоторые значения в 'Dates' не являются объектами типа datetime.date. Преобразуем их.")
        insert_data['Dates'] = pd.to_datetime(insert_data['Dates'], errors='coerce').dt.date
        # Проверка после преобразования
        dates_types_after = insert_data['Dates'].apply(type).value_counts()
        print(f"Типы данных в столбце 'Dates' после преобразования:\n{dates_types_after}")
        # Заполнение NaT значениями по умолчанию, если необходимо
        if insert_data['Dates'].isnull().any():
            print("Некоторые даты не были корректно преобразованы. Заполняем их текущей датой.")
            insert_data['Dates'] = insert_data['Dates'].fillna(target_date)
    
    try:
        # Выполнение массовой вставки
        client.execute(
            "INSERT INTO OlapCube_VNV (serialno, Dates, Status, Status_P, sne, ppr, repair_days, version) VALUES",
            data_tuples
        )
        print(f"Вставлены данные для даты {target_date}")
    except Exception as e:
        print(f"Ошибка при вставке данных для даты {target_date}: {e}")
        # Решите, продолжать ли цикл или остановиться
        target_date_prev = target_date
        continue
    
    # Запуск OPTIMIZE TABLE для ReplacingMergeTree
    try:
        client.execute("OPTIMIZE TABLE OlapCube_VNV FINAL")
        print(f"OPTIMIZE TABLE выполнен для даты {target_date}")
    except Exception as e:
        print(f"Ошибка при выполнении OPTIMIZE TABLE для даты {target_date}: {e}")
    
    # Переход на следующий день
    target_date_prev = target_date

print("Обработка завершена.")


Обработка данных для дня 1, target_date-1: 2024-11-25, target_date: 2024-11-26
Количество записей для target_date_prev (2024-11-25): 420
Количество записей для target_date (2024-11-26): 420
Типы данных в столбце 'Dates':
Dates
<class 'datetime.date'>    420
Name: count, dtype: int64
Ошибка при вставке данных для даты 2024-11-26: Code: 16.
DB::Exception: No such column version in table default.OlapCube_VNV (0bf23fa5-6248-4251-8edc-1d7d04887613). Stack trace:

0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000cf720fb
1. DB::Exception::Exception(PreformattedMessage&&, int) @ 0x0000000007ea888c
2. DB::Exception::Exception<String const&, String>(int, FormatStringHelperImpl<std::type_identity<String const&>::type, std::type_identity<String>::type>, String const&, String&&) @ 0x00000000084d02ab
3. DB::InterpreterInsertQuery::getSampleBlockImpl(std::vector<String, std::allocator<String>> const&, std::shared_ptr<DB::IStorage> const&, std::shared_ptr<DB::Storage