In [1]:
#!share/d.deyneko/metric_forecaster/metrics_forecaster.ipynb

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
from statsmodels.tsa.arima_process import ArmaProcess
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, r2_score
import statsmodels.api as sm
from scipy.stats import ttest_1samp
from tqdm.notebook import tqdm
import plotly.express as px
import plotly.graph_objs as go
import digger as dg
from scipy.stats import pearsonr
import sqlalchemy
import plotly.graph_objects as go
from scipy import stats
import os
import json
from sqlalchemy import create_engine
from clickhouse_sqlalchemy import make_session
from sqlalchemy.sql import text
from statsmodels.tsa.arima.model import ARIMA
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from statsmodels.tsa.statespace.sarimax import SARIMAX
from datetime import timedelta, datetime, date
from prophet import Prophet
from IPython.display import HTML, display, Markdown, clear_output
from ipywidgets import widgets, VBox, HBox, Output
from datetime import date, datetime, timedelta
from dateutil.relativedelta import relativedelta
from clickhouse_driver import Client


connection = {
    'host': 'http://click.c.test:8123',
    'password': '123123',
    'user': 'default',
    'database': 'metrics'
}


FRACTION_METRICS = {
    "DAU_with_friend_ratio_7d",
    "ratio_3plus_friends"
}


# Прогнозы DAU-метрики в разных разрезах


Этот микросервис позволяет получить пул прогнозов данных по метрике DAU в разных разрезах.
Пока код работает только с одной метрикой, но при необходимости возможно решение расширить на другие предрасчитанные метрики из `clickhouse.metrics` базы данных.

## Поля для ввода:
1. Разрезы для прогноза в формате Страна → Город.
   * Позволяет набить любой пул гео-разрезов, которые нужны для прогноза
   * Используйте `__ALL__` для выбора всех городов страны
2. Срезы данных для модели:
   * Начало периода обучения: От какой даты мы будем обучать модель
     * По умолчанию: 9 месяцев назад от текущей даты
   * Конец периода обучения: До какой даты мы будем обучать модель
     * По умолчанию: вчерашний день
   * Конец периода прогноза: До какой даты мы хотим построить прогноз
     * По умолчанию: 2 месяца вперед от текущей даты 
3. Количество строк в выводимой таблице: Под каждым графиком будет таблица с прогнозируемыми данными. Таблица строится от начала прогноза на количество дней, равному количеству строк из поля.
    
Программа выведет:
1. Сконфигурированный SQL-запрос
2. Метрики качества прогноза
3. Список прогнозных графиков на основе полученных данных
4. Таблицу с данными прогноза

In [None]:
from IPython.display import display, Markdown, clear_output
from ipywidgets import widgets, VBox, HBox, Output
from datetime import date, timedelta
from dateutil.relativedelta import relativedelta

# ========== БЛОК: МЕТРИКИ ==========

# Пять доступных метрик в формате (русское_название, machine_name).
METRICS_AVAILABLE = [
    ("DAU (дневная аудитория)", "DAU"),
    ("Доля DAU с ≥1 другом (7-дневное)", "DAU_with_friend_ratio_7d"),
    ("Доля пользователей с ≥3 друзьями", "ratio_3plus_friends"),
    ("Открытия карточек с друзьями (7-дневное)", "opens_with_friends_7d"),
    ("Discovery-открытия с друзьями (7-дневное)", "discovery_opens_with_friends_7d")
]

metrics_container = VBox()

def create_metric_row(rus_label, machine_name):
    """
    Создаёт строку «неизменяемая метрика + кнопка удаления (✕)».
    При удалении строка пропадает из контейнера metrics_container.
    """
    label_widget = widgets.Label(
        value=rus_label,
        layout=widgets.Layout(width='400px')
    )
    delete_btn = widgets.Button(
        description='✕',
        button_style='danger',
        layout=widgets.Layout(width='50px')
    )
    
    row = HBox([label_widget, delete_btn])
    row.metric_key = machine_name  # Запоминаем machine_name, чтобы собрать config["selected_metrics"].

    def on_delete_click(_):
        metrics_container.children = [
            child for child in metrics_container.children if child is not row
        ]
    delete_btn.on_click(on_delete_click)

    return row

# Изначально добавим все 5 метрик
for rus_name, machine_name in METRICS_AVAILABLE:
    metrics_container.children += (create_metric_row(rus_name, machine_name),)

# ========== БЛОК: СЕГМЕНТЫ ==========

segments_container = VBox()

def create_segment_row(country="", segment=""):
    country_input = widgets.Text(
        placeholder='Страна',
        value=country,
        layout=widgets.Layout(width='200px')
    )
    segment_input = widgets.Text(
        placeholder='Сегмент (или __ALL__)',
        value=segment,
        layout=widgets.Layout(width='200px')
    )
    delete_btn = widgets.Button(
        description='✕',
        button_style='danger',
        layout=widgets.Layout(width='50px')
    )
    row = HBox([country_input, segment_input, delete_btn])
    
    def delete_row(_):
        segments_container.children = [
            child for child in segments_container.children if child is not row
        ]
    delete_btn.on_click(delete_row)
    return row

add_segment_btn = widgets.Button(description='+ Добавить сегмент')

def on_add_segment(_):
    segments_container.children += (create_segment_row(),)

add_segment_btn.on_click(on_add_segment)

# Дефолтные сегменты
default_segments = [
    {"countryName": "__ALL__", "segment": "__ALL__"},
    {"countryName": "Россия", "segment": "__ALL__"},
    {"countryName": "Казахстан", "segment": "__ALL__"},
    {"countryName": "Россия", "segment": "Москва"},
    {"countryName": "Россия", "segment": "Санкт-Петербург"},
    {"countryName": "Россия", "segment": "Новосибирск"},
    {"countryName": "Россия", "segment": "Якутск"},
    {"countryName": "Казахстан", "segment": "Алматы"}
]
for seg in default_segments:
    segments_container.children += (create_segment_row(seg["countryName"], seg["segment"]),)

# ========== БЛОК: ДАТЫ, НАСТРОЙКИ И КНОПКА СТАРТА ==========

def get_default_dates():
    today = date.today()
    default_train_start = today - relativedelta(months=9)
    default_train_end = today - timedelta(days=1)
    default_forecast_end = today + relativedelta(months=2)
    return default_train_start, default_train_end, default_forecast_end

train_start_date, train_end_date, forecast_end_date = get_default_dates()

train_start = widgets.DatePicker(
    description='Начало периода обучения:',
    value=train_start_date,
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='500px')
)
train_end = widgets.DatePicker(
    description='Конец периода обучения:',
    value=train_end_date,
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='500px')
)
forecast_end = widgets.DatePicker(
    description='Конец периода прогноза:',
    value=forecast_end_date,
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='500px')
)

rows_to_display = widgets.IntText(
    description='Количество строк в таблице (прогнозные значения):',
    value=30,
    min=1,
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='750px')
)

start_button = widgets.Button(description='Начать прогноз', button_style='primary')
output = widgets.Output()

def collect_config():
    # Собираем сегменты
    segments = []
    for row in segments_container.children:
        country_input, segment_input, _ = row.children
        segments.append({
            "countryName": country_input.value,
            "segment": segment_input.value
        })
    
    # Собираем оставшиеся метрики (machine_name)
    selected_metrics = []
    for row in metrics_container.children:
        selected_metrics.append(row.metric_key)

    return {
        "segments": segments,
        "selected_metrics": selected_metrics,  # важный параметр!
        "train_period": [
            train_start.value.strftime("%Y-%m-%d"),
            train_end.value.strftime("%Y-%m-%d")
        ],
        "forecast_end_date": forecast_end.value.strftime("%Y-%m-%d"),
        "connection": connection,   
        "rows_to_display": rows_to_display.value
    }

def on_start_click(_):
    with output:
        output.clear_output()
        config = collect_config()
        try:
            results = forecast_manager(config)
        except Exception as e:
            print(f"Произошла ошибка: {e}")
            return
        
        for seg_item in results["segments_results"]:
            print(f"**Метрика**: {seg_item['metric_name']}")
            print(f"**Сегмент**: {seg_item['countryName']} - {seg_item['segment']}")
            if seg_item["metrics"]["mape"] is not None:
                print(f"**MAPE**: {seg_item['metrics']['mape']:.2%}")
                print(f"**MAE**: {seg_item['metrics']['mae']:.2f}")
                print(f"**R²**: {seg_item['metrics']['r2']:.4f}")
            else:
                print("Недостаточно исторических данных для расчёта метрик качества")
            display(seg_item["figure"])
            display(Markdown("**Прогнозные значения:**"))
            display(seg_item["forecast_table"])

start_button.on_click(on_start_click)

# Формируем финальный интерфейс и отображаем
interface = VBox([
    widgets.HTML("<h2>Прогнозирование метрик</h2>"),
    
    widgets.HTML("<h3>Сегменты</h3>"),
    segments_container,
    add_segment_btn,
    
    widgets.HTML("<h3>Метрики</h3>"),
    metrics_container,
    
    widgets.HTML("<h3>Периоды</h3>"),
    train_start,
    train_end,
    forecast_end,
    
    HBox([rows_to_display]),
    start_button,
    output
])

display(interface)

In [3]:
def build_segment_conditions(segments_config):
    """
    Принимает словарь вида:
    {
        "segments": [
            {"countryName": "Россия", "segment": "__ALL__"},
            {"countryName": "__ALL__", "segment": "__ALL__"},
            ...
        ]
    }
    Возвращает строку вида:
    (countryName = 'Россия' AND segment = '__ALL__') OR (countryName = 'Казахстан' AND segment = 'Алматы') ...
    """
    segment_conditions_list = []
    
    for segment in segments_config["segments"]:
        conditions = []
        if "countryName" in segment:
            conditions.append(f"countryName = '{segment['countryName']}'")
        if "segment" in segment:
            conditions.append(f"segment = '{segment['segment']}'")
        
        if conditions:
            segment_conditions_list.append(f"({' AND '.join(conditions)})")
    
    if segment_conditions_list:
        return ' OR '.join(segment_conditions_list)
    else:
        return '1 = 1'  # Если сегментов нет, чтобы WHERE не ломался

# Шаблоны для 5 метрик
METRICS_SQL_TEMPLATES = {
    "DAU": """
        SELECT
            toDate(time) AS dt,
            countryName,
            segment,
            platform,
            sum(value_users_friends) AS metric_value
        FROM metrics.zond_dashboard_auditory_daily
        WHERE platform = '__ALL__'
          AND ({segment_conditions})
        GROUP BY dt, countryName, segment, platform
        ORDER BY dt DESC
    """,
    "DAU_with_friend_ratio_7d": """
        SELECT
            dt,
            countryName,
            segment,
            platform,
            round(
                avg(ratio_friends) OVER (
                    PARTITION BY countryName, segment, platform
                    ORDER BY dt
                    ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
                ), 4
            ) AS metric_value
        FROM
        (
            SELECT
                toDate(time) AS dt,
                countryName,
                segment,
                platform,
                sum(value_users_friends) / sum(value_launch_users) AS ratio_friends
            FROM metrics.zond_dashboard_auditory_daily
            WHERE platform = '__ALL__'
              AND ({segment_conditions})
            GROUP BY dt, countryName, segment, platform
        ) sub
        ORDER BY dt DESC
    """,
    "ratio_3plus_friends": """
        SELECT
            toDate(time) AS dt,
            countryName,
            segment,
            platform,
            sumIf(value_users, CAST(cnt_friends AS UInt32) >= 3) / sum(value_users) AS metric_value
        FROM metrics.zond_dashboard_friends_count_distribution
        WHERE platform = '__ALL__'
          AND ({segment_conditions})
        GROUP BY dt, countryName, segment, platform
        ORDER BY dt DESC
    """,
    "opens_with_friends_7d": """
        SELECT
            dt,
            countryName,
            segment,
            platform,
            round(
                avg(cntWithFriends) OVER (
                    PARTITION BY countryName, segment, platform
                    ORDER BY dt
                    ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
                ), 0
            ) AS metric_value
        FROM
        (
            SELECT
                toDate(time) AS dt,
                countryName,
                segment,
                platform,
                sum(value_cntWithFriends) AS cntWithFriends
            FROM metrics.zond_dashboard_nsm_opens_daily
            WHERE platform = '__ALL__'
              AND ({segment_conditions})
            GROUP BY dt, countryName, segment, platform
        ) sub
        ORDER BY dt DESC
    """,
    "discovery_opens_with_friends_7d": """
        SELECT
            dt,
            countryName,
            segment,
            platform,
            round(
                avg(cntWithFriends_discovery) OVER (
                    PARTITION BY countryName, segment, platform
                    ORDER BY dt
                    ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
                ), 0
            ) AS metric_value
        FROM
        (
            SELECT
                toDate(time) AS dt,
                countryName,
                segment,
                platform,
                sum(value_cntWithFriends_discovery) AS cntWithFriends_discovery
            FROM metrics.zond_dashboard_nsm_opens_daily
            WHERE platform = '__ALL__'
              AND ({segment_conditions})
            GROUP BY dt, countryName, segment, platform
        ) sub
        ORDER BY dt DESC
    """
}

def configure_sql_queries(segments_config, selected_metrics=None):
    """
    Генерирует SQL-запросы только для указанных метрик (selected_metrics).
    Если selected_metrics не задан, используем все ключи из METRICS_SQL_TEMPLATES.
    Возвращает словарь { "metric_key": "sql_query" }.
    """
    sc = build_segment_conditions(segments_config)
    
    if selected_metrics is None:
        selected_metrics = list(METRICS_SQL_TEMPLATES.keys())
    
    queries = {}
    for metric_key in selected_metrics:
        if metric_key not in METRICS_SQL_TEMPLATES:
            # Если метрики нет в словаре, можно пропустить или выдать warning
            continue
        sql_template = METRICS_SQL_TEMPLATES[metric_key]
        query = sql_template.format(segment_conditions=sc)
        queries[metric_key] = query

    return queries


In [20]:
# # ===== Пример использования =====

# segments_config_example = {
#     "segments": [
#         {"countryName": "Россия", "segment": "__ALL__"},
#         {"countryName": "Казахстан", "segment": "__ALL__"},
#         {"countryName": "__ALL__", "segment": "__ALL__"}
#     ]
# }

# # Определяем список метрик, для которых нужно сформировать SQL-запросы
# selected_metrics_example = ["DAU", "opens_with_friends_7d"]

# # Передаём и segments_config, и selected_metrics в функцию
# result_queries = configure_sql_queries(segments_config_example, selected_metrics=selected_metrics_example)

# for metric, q in result_queries.items():
#     print(f"--- {metric} ---")
#     print(q)
#     print()

In [7]:
from clickhouse_driver import Client
import pandas as pd

def fetch_and_split_data(queries_dict, segments_config, connection):
    """
    Выполняет несколько SQL-запросов (queries_dict), отправляя их в ClickHouse,
    а затем «режет» каждый результат по сегментам.

    :param queries_dict: Словарь вида { "metric_name": "SQL-запрос", ... }
    :param segments_config: Словарь с ключом "segments", внутри список словарей:
        {
            "segments": [
                {"countryName": "Россия", "segment": "__ALL__"},
                ...
            ]
        }
    :param connection: Словарь вида {"host": ..., "user": ..., "password": ..., "database": ...}
    :return: Словарь формата:
        {
            "metrics_data": [
                {
                    "metric_name": <str>,
                    "sql_query": <str>,
                    "segments_data": [
                        {
                            # Копия сегмента
                            "countryName": ...,
                            "segment": ...,
                            # DataFrame с данными
                            "data": <pd.DataFrame>
                        },
                        ...
                    ]
                },
                ...
            ]
        }
    """
    # Инициализируем клиент ClickHouse
    client = Client(
        host=connection['host'].replace('http://', '').replace(':8123', ''),
        user=connection['user'],
        password=connection['password'],
        database=connection['database']
    )

    # Результирующая структура
    result = {
        "metrics_data": []
    }

    # Перебираем все метрики и их SQL-запросы
    for metric_name, sql_query in queries_dict.items():
        try:
            # Выполняем запрос
            query_result = client.execute(sql_query, with_column_types=True)
            columns = [col[0] for col in query_result[1]]
            df = pd.DataFrame(query_result[0], columns=columns)
            
            # Приведём поле dt к datetime, если оно есть в колонках
            if "dt" in df.columns:
                df["dt"] = pd.to_datetime(df["dt"])
        
        except Exception as e:
            print(f"Ошибка при выполнении запроса по метрике '{metric_name}': {str(e)}")
            # Если произошла ошибка, для этой метрики вернём пустой фрейм
            df = pd.DataFrame()

        # Создаём массив сегментов для текущей метрики
        segments_data_list = []

        for segment_config in segments_config.get("segments", []):
            # Начинаем с True, последовательно "AND" применяя фильтры
            segment_filter = pd.Series([True] * len(df), index=df.index)

            if "countryName" in segment_config:
                segment_filter &= (df['countryName'] == segment_config['countryName'])

            if "segment" in segment_config:
                segment_filter &= (df['segment'] == segment_config['segment'])

            # Фильтруем DataFrame
            segment_data = df[segment_filter].copy()

            # Добавляем запись в список
            segments_data_list.append({
                **segment_config,  # Копируем ключи из segment_config
                "data": segment_data
            })

        # Формируем структуру для текущей метрики
        metric_result = {
            "metric_name": metric_name,
            "sql_query": sql_query,
            "segments_data": segments_data_list
        }

        # Кладём в общий результат
        result["metrics_data"].append(metric_result)

    return result


In [8]:
# connection

In [19]:
# # Загружаем и «режем» данные по сегментам
# fetched_results = fetch_and_split_data(
#     queries_dict=result_queries,
#     segments_config=segments_config_example,
#     connection=connection
# )

# # Вывод структуры полученных данных
# for i, metric_item in enumerate(fetched_results["metrics_data"]):
#     metric_name = metric_item["metric_name"]
#     sql_query = metric_item["sql_query"]

#     print(f"Метрика {i}: '{metric_name}'")
#     print(f"SQL-запрос: {sql_query}")
#     print(f"  Всего сегментов: {len(metric_item['segments_data'])}")

#     # Перебор сегментов для данной метрики
#     for j, seg_item in enumerate(metric_item["segments_data"]):
#         seg_country = seg_item.get("countryName", "N/A")
#         seg_segment = seg_item.get("segment", "N/A")
#         print(f"    Сегмент {j}: countryName='{seg_country}', segment='{seg_segment}'")

#         # Получаем DataFrame с данными сегмента
#         df = seg_item["data"]
#         print(f"      Размер: {df.shape[0]} строк, {df.shape[1]} столбцов")
#         display(df.head(2))  # Показываем первые 2 строки

#     print("—" * 50)

In [14]:
def prepare_data_for_forecast(
    fetched_results,
    train_period,
    forecast_end_date,
    window_size=7,
    iqr_factor=1.5
):
    """
    Подготавливает данные для Prophet, дополнительно устраняя выбросы.
    
    :param fetched_results: словарь вида
      {
        "metrics_data": [
          {
            "metric_name": <str>,
            "sql_query": <str>,
            "segments_data": [
              {
                "countryName": <str>,
                "segment": <str>,
                "data": pd.DataFrame(columns=["dt", "metric_value", ...])
              },
              ...
            ]
          },
          ...
        ]
      }
    :param train_period: кортеж (start_date, end_date) вида ("YYYY-MM-DD", "YYYY-MM-DD")
    :param forecast_end_date: строка "YYYY-MM-DD", дата окончания прогноза
    :param window_size: ширина «окна» вокруг точки (по умолчанию 7, значит ±7 строк)
    :param iqr_factor: коэффициент для определения выбросов (по умолчанию 1.5)
    :return: словарь в формате:
      {
        "forecast_input": [
          {
            "metric_name": <str>,
            "segments_forecast_data": [
              {
                "countryName": <str>,
                "segment": <str>,
                "train_data": pd.DataFrame(columns=["ds", "y"]),
                "full_data": pd.DataFrame(columns=["ds", "y"]),
                "forecast_periods": <int>
              },
              ...
            ]
          },
          ...
        ]
      }
    """

    def smooth_outliers_windowed(df, window=7, factor=1.5):
        """
        Для каждой строки берём окрестность ±window, вычисляем IQR, 
        если y выходит за [Q1 - factor*IQR; Q3 + factor*IQR], 
        заменяем на среднее двух соседних значений (y_{i-1} и y_{i+1}).
        """
        # Копируем, чтобы не менять исходный df
        df = df.copy().reset_index(drop=True)
        
        for i in range(len(df)):
            left = max(0, i - window)
            right = min(len(df) - 1, i + window)

            window_vals = df.loc[left:right, "y"].dropna()
            if len(window_vals) < 2:
                continue  # Недостаточно данных для статистики

            Q1 = np.percentile(window_vals, 25)
            Q3 = np.percentile(window_vals, 75)
            iqr = Q3 - Q1
            lower_bound = Q1 - factor * iqr
            upper_bound = Q3 + factor * iqr

            current_val = df.at[i, "y"]
            if current_val < lower_bound or current_val > upper_bound:
                # Если не крайние точки, то усредняем с соседями
                if i > 0 and i < len(df) - 1:
                    prev_val = df.at[i - 1, "y"]
                    next_val = df.at[i + 1, "y"]
                    # На случай, если соседние - тоже выбросы, 
                    # можно усложнить логику, но пока просто среднее
                    df.at[i, "y"] = (prev_val + next_val) / 2
                # Иначе (крайние точки) - пропускаем
        return df

    train_start, train_end = pd.to_datetime(train_period[0]), pd.to_datetime(train_period[1])
    forecast_end = pd.to_datetime(forecast_end_date)
    results = {"forecast_input": []}

    for metric_item in fetched_results["metrics_data"]:
        metric_result = {
            "metric_name": metric_item["metric_name"],
            "segments_forecast_data": []
        }

        for segment_info in metric_item["segments_data"]:
            df_segment = segment_info["data"].copy()
            if "dt" not in df_segment.columns or "metric_value" not in df_segment.columns:
                continue

            # Формируем DataFrame для Prophet
            forecast_df = pd.DataFrame({
                "ds": df_segment["dt"],
                "y": df_segment["metric_value"]
            }).sort_values("ds").reset_index(drop=True)

            # Устраняем выбросы
            forecast_df = smooth_outliers_windowed(forecast_df, window=window_size, factor=iqr_factor)

            # Делим на train и full
            train_mask = (forecast_df["ds"] >= train_start) & (forecast_df["ds"] <= train_end)
            train_df = forecast_df[train_mask].copy()
            forecast_periods = (forecast_end - train_end).days

            metric_result["segments_forecast_data"].append({
                "countryName": segment_info.get("countryName"),
                "segment": segment_info.get("segment"),
                "train_data": train_df,
                "full_data": forecast_df,
                "forecast_periods": forecast_periods
            })

        results["forecast_input"].append(metric_result)

    return results

In [None]:
# # ===== Пример использования =====

# # 1. Получаем даты
# train_start_dt, train_end_dt, forecast_end_dt = get_default_dates()

# # 2. Вызываем prepare_data_for_forecast
# prepared_forecast_data = prepare_data_for_forecast(
#     fetched_results,
#     (train_start_dt.strftime("%Y-%m-%d"), train_end_dt.strftime("%Y-%m-%d")),
#     forecast_end_dt.strftime("%Y-%m-%d")
# )

# # 3. Пример просмотра результата
# for metric in prepared_forecast_data["forecast_input"]:
#     print(f"Метрика: {metric['metric_name']}")
#     for seg_data in metric["segments_forecast_data"]:
#         print(f"  Страна: {seg_data.get('countryName')}, Сегмент: {seg_data.get('segment')}")
#         print(f"  Тренировочных точек: {len(seg_data['train_data'])}")
#         print(f"  Всего точек во временном ряду: {len(seg_data['full_data'])}")
#         print(f"  Прогнозируем на {seg_data['forecast_periods']} дней вперёд")
#     print("-" * 60)

In [17]:
def make_prophet_forecast(prepared_forecast_data, actual_end_date=None):
    """
    Прогнозирование для нескольких метрик и сегментов с помощью Prophet.
    
    :param prepared_forecast_data: словарь вида
        {
            "forecast_input": [
                {
                    "metric_name": <str>,
                    "segments_forecast_data": [
                        {
                            "countryName": <str>,
                            "segment": <str>,
                            "train_data": pd.DataFrame(columns=["ds", "y"]),
                            "full_data": pd.DataFrame(columns=["ds", "y"]),
                            "forecast_periods": <int>
                        },
                        ...
                    ]
                },
                ...
            ]
        }
    :param actual_end_date: строка "YYYY-MM-DD" или None.
        Если указано, исторические данные будут обрезаны по эту дату.
    :return: словарь формата
        {
            "metrics_forecasts": [
                {
                    "metric_name": <str>,
                    "segments_forecasts": [
                        {
                            "countryName": <str>,
                            "segment": <str>,
                            "forecast_data": pd.DataFrame(columns=[
                                "ds", "realistic", "optimistic", "pessimistic", "historical"
                            ])
                        },
                        ...
                    ]
                },
                ...
            ]
        }
    """
    results = {"metrics_forecasts": []}

    for metric_item in prepared_forecast_data["forecast_input"]:
        metric_name = metric_item["metric_name"]
        metric_forecasts = {"metric_name": metric_name, "segments_forecasts": []}
        
        for segment_info in metric_item["segments_forecast_data"]:
            train_df = segment_info["train_data"]
            if train_df.empty:
                continue
            
            model = Prophet(interval_width=0.95, weekly_seasonality=True)
            model.add_seasonality(name='weekly', period=7, fourier_order=3)
            model.fit(train_df)
            
            future = model.make_future_dataframe(periods=segment_info["forecast_periods"], freq='D')
            forecast = model.predict(future)
            
            forecast_results = pd.DataFrame({
                'ds': forecast['ds'],
                'realistic': forecast['yhat'],
                'optimistic': forecast['yhat_upper'],
                'pessimistic': forecast['yhat_lower']
            })
            
            historical_data = segment_info["full_data"][['ds', 'y']].rename(columns={'y': 'historical'})
            if actual_end_date:
                actual_end_date_parsed = pd.to_datetime(actual_end_date)
                historical_data = historical_data[historical_data['ds'] <= actual_end_date_parsed]
            
            final_forecast = forecast_results.merge(historical_data, on='ds', how='left')
            
            metric_forecasts["segments_forecasts"].append({
                "countryName": segment_info.get("countryName"),
                "segment": segment_info.get("segment"),
                "forecast_data": final_forecast
            })
        
        results["metrics_forecasts"].append(metric_forecasts)

    return results

In [None]:
# actual_end_date_example = "2025-02-18"
# forecast_results = make_prophet_forecast(prepared_forecast_data, actual_end_date=actual_end_date_example)

# # Для просмотра:
# for metric_block in forecast_results["metrics_forecasts"]:
#     print(f"Метрика: {metric_block['metric_name']}")
#     for seg_forecast in metric_block["segments_forecasts"]:
#         print(f"  Страна: {seg_forecast.get('countryName')}, Сегмент: {seg_forecast.get('segment')}")
#         df_fcst = seg_forecast["forecast_data"]
#         print(f"  Количество строк в прогнозе: {len(df_fcst)}")
#         display(df_fcst.tail(5))
#     print("-" * 70)

In [47]:
# Словарь для русификации названий метрик
METRIC_RUS_NAMES = {
    "DAU": "DAU (дневная аудитория)",
    "DAU_with_friend_ratio_7d": "Доля DAU с хотя бы 1 другом (скользящее среднее)",
    "ratio_3plus_friends": "Доля пользователей с ≥3 друзьями",
    "opens_with_friends_7d": "Открытия карточек с друзьями (скользящее среднее)",
    "discovery_opens_with_friends_7d": "Discovery-открытия карточек с друзьями (скользящее среднее)",
}

def create_forecast_plots(forecasts_result, train_end_date):
    """
    Выводит графики в порядке: (countryName, segment) -> все метрики.
    :param forecasts_result: {
       "metrics_forecasts": [
         {
           "metric_name": <str>,
           "segments_forecasts": [
             {
               "countryName": <str>,
               "segment": <str>,
               "forecast_data": pd.DataFrame c колонками [ds, historical, realistic, optimistic, pessimistic]
             },
             ...
           ]
         },
         ...
       ]
    }
    :param train_end_date: строка "YYYY-MM-DD" (дата конца обучения / начало прогноза)
    :return: список Plotly Figure
    """

    # Шаг 1: Группируем данные по (countryName, segment), внутри — по метрике
    grouped_data = {}
    for metric_block in forecasts_result["metrics_forecasts"]:
        metric_name = metric_block["metric_name"]
        for seg_forecast in metric_block["segments_forecasts"]:
            c = seg_forecast.get("countryName", "N/A")
            s = seg_forecast.get("segment", "N/A")
            df = seg_forecast["forecast_data"]
            if df.empty:
                continue
            grouped_data.setdefault((c, s), {})[metric_name] = df

    figures = []
    forecast_start = pd.to_datetime(train_end_date)

    # Функция для форматирования значений (already scaled)
    def format_value(val, is_fraction):
        if pd.isna(val):
            return ""
        # Уже умножили на 100 или поделили на 1000
        # Для долей - теперь это проценты
        # Для абс. - это тысячи
        if is_fraction:
            return f"{val:.2f}%"
        else:
            # Если значение больше или равно 1000 (т.е. 1 миллион в исходных данных), 
            # отображаем в миллионах
            if val >= 1000:
                # Делим на 1000 еще раз (уже деленное на 1000) = миллионы
                return f"{val/1000:.2f}M"
            else:
                return f"{val:.1f}k"

    # Шаг 2: Идём по каждой паре (страна, сегмент)
    #        и строим графики по всем метрикам
    for (country, segment), metrics_dict in grouped_data.items():
        # Чтобы порядок метрик был стабильным, можно их отсортировать:
        for metric_name in sorted(metrics_dict.keys()):
            df = metrics_dict[metric_name].copy()
            if df.empty:
                continue

            # Находим максимальное значение, чтобы понять, доля это или нет
            y_cols = ["historical", "realistic", "optimistic", "pessimistic"]
            max_val = df[y_cols].max().max()
            is_fraction = (max_val <= 1.0)

            # Масштабируем весь ряд
            if is_fraction:
                for col in y_cols:
                    df[col] = df[col] * 100.0
            else:
                for col in y_cols:
                    df[col] = df[col] / 1000.0

            # Готовим дополнительные поля для аннотаций
            df["ds"] = pd.to_datetime(df["ds"])
            df["weekday"] = df["ds"].dt.dayofweek
            monday_mask = (df["weekday"] == 0)
            forecast_period_mask = (df["ds"] >= forecast_start)

            # Проставляем аннотации
            df["monday_labels_hist"] = ""
            df["monday_labels"] = ""

            historical_mondays = monday_mask & df["historical"].notna() & forecast_period_mask
            df.loc[historical_mondays, "monday_labels_hist"] = df.loc[historical_mondays, "historical"].apply(
                lambda x: format_value(x, is_fraction)
            )

            forecast_mondays = monday_mask & forecast_period_mask
            df.loc[forecast_mondays, "monday_labels"] = df.loc[forecast_mondays, "realistic"].apply(
                lambda x: format_value(x, is_fraction)
            )

            # Создаём Figure
            fig = go.Figure()

            # Добавляем аннотации для исторических понедельников
            for idx in df[historical_mondays].index:
                fig.add_annotation(
                    x=df.loc[idx, "ds"],
                    y=df.loc[idx, "historical"],
                    text=df.loc[idx, "monday_labels_hist"],
                    showarrow=False,
                    yshift=-40,
                    font=dict(size=10, color="#2E91E5")
                )

            # # Добавляем аннотации для прогнозных понедельников
            # for idx in df[forecast_mondays].index:
            #     fig.add_annotation(
            #         x=df.loc[idx, "ds"],
            #         y=df.loc[idx, "realistic"],
            #         text=df.loc[idx, "monday_labels"],
            #         showarrow=False,
            #         yshift=25,
            #         font=dict(size=10, color="#1616A7")
            #     )

            # Формируем поля customdata для hover
            df["custom_historical"]  = df["historical"].apply(lambda x: format_value(x, is_fraction))
            df["custom_realistic"]   = df["realistic"].apply(lambda x: format_value(x, is_fraction))
            df["custom_optimistic"]  = df["optimistic"].apply(lambda x: format_value(x, is_fraction))
            df["custom_pessimistic"] = df["pessimistic"].apply(lambda x: format_value(x, is_fraction))

            fig.add_trace(go.Scatter(
                x=df["ds"],
                y=df["historical"],
                mode="lines",
                name="Исторические данные",
                line=dict(color="#2E91E5", width=2),
                hovertemplate="Исторические данные: %{customdata}<extra></extra>",
                customdata=df["custom_historical"]
            ))
            fig.add_trace(go.Scatter(
                x=df["ds"],
                y=df["realistic"],
                mode="lines",
                name="Реалистичный прогноз",
                line=dict(color="#1616A7", width=2),
                hovertemplate="Реалистичный прогноз: %{customdata}<extra></extra>",
                customdata=df["custom_realistic"]
            ))
            fig.add_trace(go.Scatter(
                x=df["ds"],
                y=df["optimistic"],
                mode="lines",
                name="Оптимистичный прогноз",
                line=dict(color="#1CA71C", width=1, dash="dot"),
                hovertemplate="Оптимистичный прогноз: %{customdata}<extra></extra>",
                customdata=df["custom_optimistic"]
            ))
            fig.add_trace(go.Scatter(
                x=df["ds"],
                y=df["pessimistic"],
                mode="lines",
                name="Пессимистичный прогноз",
                line=dict(color="#AF0038", width=1, dash="dot"),
                hovertemplate="Пессимистичный прогноз: %{customdata}<extra></extra>",
                customdata=df["custom_pessimistic"]
            ))

            # Вертикальные линии для понедельников
            for idx in df[monday_mask & forecast_period_mask].index:
                fig.add_shape(
                    type="line",
                    x0=df.loc[idx, "ds"],
                    x1=df.loc[idx, "ds"],
                    y0=0,
                    y1=1,
                    yref="paper",
                    line=dict(color="gray", width=1, dash="dot"),
                    opacity=0.4
                )

            # Линия начала прогноза
            fig.add_shape(
                type="line",
                x0=forecast_start,
                x1=forecast_start,
                y0=0,
                y1=1,
                yref="paper",
                line=dict(color="grey", width=2, dash="dash")
            )
            fig.add_annotation(
                x=forecast_start,
                y=1,
                yref="paper",
                text="Начало прогноза",
                showarrow=False,
                yshift=10
            )

            # Русифицированное название метрики (если нет в словаре - берём исходное)
            rus_metric_name = METRIC_RUS_NAMES.get(metric_name, metric_name)

            # Заголовок и оси
            fig.update_layout(
                title=dict(
                    text=f"{rus_metric_name} — {country}, {segment}",
                    y=0.95,
                    x=0.5,
                    xanchor="center",
                    yanchor="top",
                    font=dict(size=20)
                ),
                xaxis_title="Дата",
                yaxis_title="Проценты" if is_fraction else "Тысячи",
                hovermode="x unified",
                template="plotly_white",
                showlegend=True,
                legend=dict(
                    y=1.15,
                    x=0.5,
                    xanchor="center",
                    yanchor="top",
                    orientation="h"
                ),
                margin=dict(t=150, l=50, r=50, b=90),
                height=700,
                width=1200,
                font=dict(size=14)
            )

            # Адекватные границы оси Y
            vals = df[y_cols].values.flatten()
            vals = vals[~np.isnan(vals)]
            if len(vals) > 0:
                ymin, ymax = np.min(vals), np.max(vals)
                yrange = ymax - ymin
                fig.update_yaxes(range=[
                    ymin - yrange * 0.15,
                    ymax + yrange * 0.1
                ])

            figures.append(fig)

    return figures


In [1]:
# train_end_date_str = "2025-02-18"

# # Создаём графики
# figures = create_forecast_plots(forecast_results, train_end_date_str)

# # figures — это список plotly.graph_objects.Figure. 
# # Каждая фигура соответствует одной (метрика + сегмент).
# # В Jupyter можно отобразить все:
# for i, fig in enumerate(figures):
#     print(f"Отображаем график {i+1} из {len(figures)}")
#     fig.show()

In [50]:
def forecast_manager(config):
    """
    Главная функция-менеджер процесса прогнозирования для нескольких метрик и сегментов.

    :param config: словарь настроек вида:
        {
            "segments": [
                {"countryName": "Россия", "segment": "__ALL__"},
                ...
            ],
            "train_period": ["YYYY-MM-DD", "YYYY-MM-DD"],
            "forecast_end_date": "YYYY-MM-DD",
            "actual_end_date": "YYYY-MM-DD" (опционально),
            "connection": {
                "host": "...",
                "user": "...",
                "password": "...",
                "database": "..."
            },
            "rows_to_display": 60 (опционально, кол-во строк прогноза),
            "selected_metrics": [...]  # список выбранных метрик
        }

    :return: словарь результатов:
        {
            "config": { ... },
            "segments_results": [
                {
                  "metric_name": <str>,
                  "segment": <str>,
                  "countryName": <str>,
                  "data": pd.DataFrame,   # полные данные прогноза (ds, historical, realistic, optimistic, pessimistic)
                  "metrics": {
                      "mape": float,
                      "mae": float,
                      "r2": float
                  },
                  "figure": plotly.graph_objs.Figure,
                  "forecast_table": pd.DataFrame   # упрощённая табличная версия прогноза для просмотра
                },
                ...
            ]
        }
    """
    from datetime import date, timedelta
    import pandas as pd
    # Предполагаем, что функции build_segment_conditions, configure_sql_queries,
    # fetch_and_split_data, prepare_data_for_forecast, make_prophet_forecast, create_forecast_plots,
    # а также метрики (mean_absolute_percentage_error, mean_absolute_error, r2_score) уже импортированы.

    # Проверка даты
    if not config.get('actual_end_date'):
        yesterday = date.today() - timedelta(days=1)
        config['actual_end_date'] = yesterday.strftime('%Y-%m-%d')

    # Готовим словарь SQL-запросов для всех метрик с учётом выбранных метрик
    segment_conditions = build_segment_conditions({"segments": config["segments"]})
    queries_dict = configure_sql_queries(
        {"segments": config["segments"]},
        selected_metrics=config["selected_metrics"]
    )

    # Загружаем данные из ClickHouse
    fetched_results = fetch_and_split_data(
        queries_dict=queries_dict,
        segments_config={"segments": config["segments"]},
        connection=config["connection"]
    )

    # Подготавливаем данные для Prophet
    forecast_data = prepare_data_for_forecast(
        fetched_results,
        config["train_period"],
        config["forecast_end_date"]
        # При необходимости можно передать и параметры сглаживания выбросов
    )

    # Строим прогнозы
    forecasts_result = make_prophet_forecast(
        prepared_forecast_data=forecast_data,
        actual_end_date=config["actual_end_date"]
    )

    # Визуализация – строим графики
    figures = create_forecast_plots(
        forecasts_result,
        train_end_date=config["train_period"][1]  # Дата конца обучения
    )

    # Формируем итоговую структуру результатов
    results = {
        "config": {
            "segments": config["segments"],
            "train_period": config["train_period"],
            "forecast_end_date": config["forecast_end_date"],
            "actual_end_date": config["actual_end_date"],
            "rows_to_display": config.get("rows_to_display", 60)
        },
        "segments_results": []
    }

    # Подготавливаем список для связывания метрики и сегмента с прогнозными данными
    # 0) Инициализируем словарь grouped_data
    grouped_data = {}

    # 1) Сначала группируем результаты forecasts_result по (country, segment), внутри — по metric_name
    for metric_block in forecasts_result["metrics_forecasts"]:
        metric_name = metric_block["metric_name"]
        for seg_data in metric_block["segments_forecasts"]:
            c = seg_data.get("countryName", "N/A")
            s = seg_data.get("segment", "N/A")
            df = seg_data["forecast_data"]

            # Пропускаем пустой DataFrame, если он не нужен
            if df.empty:
                continue

            # Записываем данные в структуру grouped_data[(c, s)][metric_name] = seg_data
            if (c, s) not in grouped_data:
                grouped_data[(c, s)] = {}
            grouped_data[(c, s)][metric_name] = seg_data

    # 2) Теперь идём по grouped_data в порядке, в котором (country, segment) были добавлены
    #    (начиная с Python 3.7 словари сохраняют порядок вставки).
    all_segment_items = []
    for (country, segment), metrics_dict in grouped_data.items():
        # Метрики сортируем, чтобы совпало с create_forecast_plots
        for metric_name in sorted(metrics_dict.keys()):
            seg_data = metrics_dict[metric_name]
            all_segment_items.append({
                "metric_name": metric_name,
                "countryName": seg_data.get("countryName"),
                "segment": seg_data.get("segment"),
                "forecast_df": seg_data["forecast_data"]
            })

    if len(all_segment_items) != len(figures):
        print(f"Предупреждение: число фигур ({len(figures)}) и сегментов ({len(all_segment_items)}) не совпадает!")

    # Считаем метрики для каждого ряда и готовим таблицу прогноза
    current_date = pd.to_datetime(date.today())

    for (seg_item, figure) in zip(all_segment_items, figures):
        df = seg_item["forecast_df"].copy()

        # Вычисляем метрики качества
        historical_mask = df["historical"].notna()
        historical_data = df[historical_mask]
        if len(historical_data) > 0:
            mape_val = mean_absolute_percentage_error(
                historical_data["historical"], historical_data["realistic"]
            )
            mae_val = mean_absolute_error(
                historical_data["historical"], historical_data["realistic"]
            )
            r2_val = r2_score(
                historical_data["historical"], historical_data["realistic"]
            )
        else:
            mape_val, mae_val, r2_val = None, None, None

        metrics_dict = {
            "mape": mape_val,
            "mae": mae_val,
            "r2": r2_val
        }

        # Подготовка таблицы прогноза для отображения
        # Определяем, является ли метрика долевой (фракционной) или абсолютной
        numeric_cols = ["realistic", "optimistic", "pessimistic", "historical"]
        
        # Определяем, является ли текущая метрика долевой по её имени
        is_fraction = (seg_item["metric_name"] in FRACTION_METRICS)

        # Делаем копию исходного DataFrame
        forecast_df_scaled = df.copy()

        if is_fraction:
            # Для долевых метрик умножаем на 100, чтобы получить «проценты»
            for col in numeric_cols:
                if col in forecast_df_scaled.columns:
                    forecast_df_scaled[col] = (forecast_df_scaled[col] * 100).round(2)
        else:
            # Для абсолютных — делим на 1000, чтобы выводить в «тысячах»
            for col in numeric_cols:
                if col in forecast_df_scaled.columns:
                    forecast_df_scaled[col] = (forecast_df_scaled[col] / 1000).round(1)

        # Выбираем диапазон дат для отображения прогноза
        start_forecast_date = current_date + pd.Timedelta(days=1)
        n_days = config.get("rows_to_display", 60)
        end_forecast_date = start_forecast_date + pd.Timedelta(days=n_days - 1)

        forecast_table = forecast_df_scaled[
            (forecast_df_scaled["ds"] >= start_forecast_date) &
            (forecast_df_scaled["ds"] <= end_forecast_date)
        ].copy()

        if forecast_table.empty:
            forecast_table = forecast_df_scaled[forecast_df_scaled["ds"] > current_date].copy()

        forecast_table = forecast_table[["ds", "optimistic", "realistic", "pessimistic"]]
        forecast_table.rename(columns={"ds": "date"}, inplace=True)
        forecast_table["date"] = forecast_table["date"].dt.strftime("%Y-%m-%d")

        # Функция форматирования для таблицы прогноза
        def format_value_table(x, is_fraction):
            if pd.isna(x):
                return ""
            return f"{x:.2f}%" if is_fraction else f"{x:.1f}k"

        forecast_table["optimistic"] = forecast_table["optimistic"].apply(lambda x: format_value_table(x, is_fraction))
        forecast_table["realistic"]  = forecast_table["realistic"].apply(lambda x: format_value_table(x, is_fraction))
        forecast_table["pessimistic"] = forecast_table["pessimistic"].apply(lambda x: format_value_table(x, is_fraction))

        # Добавляем данные по текущему сегменту в результирующую структуру
        results["segments_results"].append({
            "metric_name": seg_item["metric_name"],
            "countryName": seg_item["countryName"],
            "segment": seg_item["segment"],
            "data": df,           # Полные данные прогноза
            "metrics": metrics_dict,
            "figure": figure,
            "forecast_table": forecast_table
        })

    return results