# Диагностика скважин по аномалиям

Ноутбук формирует PDF-отчёты по скважинам, указанным в файле `../alma/Общая таблица.xlsx`.
Для каждой скважины строятся графики параметров по целевым интервалам нормальной и ненормальной работы,
а также добавляются интерпретационные комментарии и сравнение с фоновыми значениями.


In [None]:
import os
import re
from datetime import datetime, timedelta
from textwrap import fill
from pathlib import Path

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.backends.backend_pdf as pdf_backend

plt.rcParams['font.family'] = 'DejaVu Sans'
plt.style.use('seaborn-v0_8')
plt.rcParams.update({
    'figure.figsize': (12, 7),
    'figure.dpi': 120,
    'savefig.dpi': 300,
    'font.size': 10,
    'axes.titlesize': 14,
    'axes.labelsize': 11,
    'legend.fontsize': 9
})
sns.set_palette('deep')


In [None]:
CURRENT_DIR = Path.cwd()
if (CURRENT_DIR / 'alma' / 'Общая таблица.xlsx').exists():
    PROJECT_ROOT = CURRENT_DIR
elif (CURRENT_DIR.parent / 'alma' / 'Общая таблица.xlsx').exists():
    PROJECT_ROOT = CURRENT_DIR.parent
else:
    raise FileNotFoundError('Не удалось найти файл alma/Общая таблица.xlsx')

EXCEL_PATH = PROJECT_ROOT / 'alma' / 'Общая таблица.xlsx'
DATA_PATH = PROJECT_ROOT / 'data' / 'processed' / 'merged_hourly.parquet'
if CURRENT_DIR.name == 'notebooks':
    OUTPUT_DIR = CURRENT_DIR / 'well_anom'
else:
    OUTPUT_DIR = PROJECT_ROOT / 'notebooks' / 'well_anom'

CONTEXT_HOURS = 12  # дополнительные часы до и после интервала для контекста

PARAM_GROUPS = [
    ('Электропараметры', ['Ток фазы A', 'Выходная частота', 'Температура двигателя']),
    ('Давления', ['Давление на приеме насоса', 'Устьевое давление', 'Давление в коллекторе ГЗУ при замере']),
    ('Производительность', ['Объемный дебит жидкости, м3/сут', 'Объемный дебит газа, м3/сут']),
    ('Операционные показатели', ['Планируемая продолжительность, мин', 'Фактическая продолжительность???, мин'])
]

DATE_PATTERNS = [
    '%d.%m.%y %H:%M', '%d.%m.%Y %H:%M', '%d.%m.%y %H:%M:%S', '%d.%m.%Y %H:%M:%S',
    '%d.%m.%y', '%d.%m.%Y'
]

OUTPUT_DIR.mkdir(parents=True, exist_ok=True)


In [None]:
def normalize_spaces(text: str) -> str:
    return re.sub(' +', ' ', text.replace('\xa0', ' ').strip())


def parse_datetime(text: str) -> datetime:
    cleaned = normalize_spaces(text)
    for pattern in DATE_PATTERNS:
        try:
            return datetime.strptime(cleaned, pattern)
        except ValueError:
            continue
    raise ValueError(f'Не удалось распознать дату: {text!r}')


def extract_interval(cell_value):
    if pd.isna(cell_value):
        return None, None
    if isinstance(cell_value, (datetime, pd.Timestamp)):
        center = pd.to_datetime(cell_value).to_pydatetime()
        return center - timedelta(hours=1), center + timedelta(hours=1)
    text = str(cell_value).replace('\u2014', '-').replace('\u2013', '-')
    text = normalize_spaces(text)
    parts = [p for p in [seg.strip() for seg in text.split('-')] if p]
    if len(parts) == 2:
        return parse_datetime(parts[0]), parse_datetime(parts[1])
    if len(parts) == 1:
        center = parse_datetime(parts[0])
        return center - timedelta(hours=1), center + timedelta(hours=1)
    return None, None


def collect_intervals(excel_path):
    sheet_data = pd.read_excel(excel_path, sheet_name=None)
    records = []
    for sheet_name, df in sheet_data.items():
        status = 'Ненормальная работа' if 'Ненормальная' in sheet_name else 'Нормальная работа'
        for _, row in df.iterrows():
            start, end = extract_interval(row.get('Дата и время'))
            if start is None or end is None:
                continue
            if end <= start:
                end = start + timedelta(minutes=30)
            well = str(row.get('Скважина')).strip()
            if not well or well.lower() == 'nan':
                continue
            record = {
                'well_number': well,
                'start': start,
                'end': end,
                'status': status,
                'warning': normalize_spaces(str(row.get('Название предупреждения')))
                           if 'Название предупреждения' in df.columns and pd.notna(row.get('Название предупреждения')) else None,
                'parameter': normalize_spaces(str(row.get('Параметр по которому была отмечена аномалия')))
                            if pd.notna(row.get('Параметр по которому была отмечена аномалия')) else None,
                'overview': normalize_spaces(str(row.get('Общая картина')))
                            if pd.notna(row.get('Общая картина')) else None,
                'comment': normalize_spaces(str(row.get('Комментарий')))
                           if pd.notna(row.get('Комментарий')) else None
            }
            records.append(record)
    records.sort(key=lambda x: (x['well_number'], x['start']))
    return records


In [None]:
def compute_parameter_summary(param, core_series, window_series):
    if core_series is None or core_series.empty:
        return f"   – {param}: нет валидных значений в указанном интервале"

    core_mean = core_series.mean()
    core_min = core_series.min()
    core_max = core_series.max()

    baseline_mask = ~window_series.index.isin(core_series.index)
    baseline_series = window_series[baseline_mask].dropna()
    if not baseline_series.empty:
        baseline_mean = baseline_series.mean()
        delta = core_mean - baseline_mean
        delta_pct = (delta / baseline_mean * 100) if baseline_mean != 0 else np.nan
        delta_text = f", отклонение от фона {delta:+.2f} ({delta_pct:+.1f}%)"
    else:
        delta_text = ''

    return (f"   – {param}: среднее {core_mean:.2f}, диапазон [{core_min:.2f}; {core_max:.2f}]"
            f"{delta_text}")


def format_notes(record, stats_lines):
    notes = []
    if record.get('warning'):
        notes.append(f"• Обнаружено предупреждение: {record['warning']}")
    if record.get('parameter'):
        notes.append(f"• Ключевой параметр по данным мониторинга: {record['parameter']}")
    if record.get('comment'):
        notes.append('• Описание инженера: ' + fill(record['comment'], width=110))
    elif record.get('overview'):
        notes.append('• Контекст периода: ' + fill(record['overview'], width=110))
    if stats_lines:
        notes.append('• Итог по параметрам:')
        notes.extend(stats_lines)
    return '\n'.join(notes)


In [None]:
def render_interval_figure(well_df, record):
    start, end = record['start'], record['end']
    window_start = start - timedelta(hours=CONTEXT_HOURS)
    window_end = end + timedelta(hours=CONTEXT_HOURS)

    window_df = well_df[(well_df['timestamp'] >= window_start) & (well_df['timestamp'] <= window_end)].copy()
    if window_df.empty:
        return None

    core_mask = (window_df['timestamp'] >= start) & (window_df['timestamp'] <= end)
    core_df = window_df[core_mask].copy()

    fig, axes = plt.subplots(len(PARAM_GROUPS), 1, figsize=(12, 3.2 * len(PARAM_GROUPS)), sharex=True)
    if len(PARAM_GROUPS) == 1:
        axes = [axes]

    fig.suptitle(
        f"Скважина {record['well_number']} · {record['status']}"
        f"Целевой период: {start:%d.%m.%Y %H:%M} – {end:%d.%m.%Y %H:%M}",
        fontsize=16, fontweight='bold'
    )

    stats_lines = []

    for ax, (group_name, params) in zip(axes, PARAM_GROUPS):
        ax.set_ylabel(group_name)
        group_has_data = False
        for param in params:
            if param not in window_df.columns:
                continue
            series = window_df[['timestamp', param]].dropna()
            if series.empty:
                continue
            group_has_data = True
            line, = ax.plot(series['timestamp'], series[param], linewidth=1.6, label=param)
            core_series = core_df[['timestamp', param]].dropna()
            if not core_series.empty:
                ax.plot(core_series['timestamp'], core_series[param], linewidth=2.4,
                        color=line.get_color(), alpha=0.9, label='_nolegend_')
                stats_lines.append(
                    compute_parameter_summary(param, core_series[param], series[param])
                )
        if group_has_data:
            ax.legend(loc='upper left', fontsize=8)
        else:
            ax.text(0.5, 0.5, 'Нет валидных данных для этой группы',
                    transform=ax.transAxes, ha='center', va='center', fontsize=9, color='gray')
        ax.axvspan(start, end, color='#ff9896' if 'Ненорм' in record['status'] else '#98df8a', alpha=0.2)
        ax.axvline(start, linestyle='--', color='#d62728' if 'Ненорм' in record['status'] else '#2ca02c', linewidth=1)
        ax.axvline(end, linestyle='--', color='#d62728' if 'Ненорм' in record['status'] else '#2ca02c', linewidth=1)
        ax.grid(True, alpha=0.25)

    axes[-1].set_xlabel('Дата и время')
    fig.autofmt_xdate(rotation=25)

    notes_text = format_notes(record, stats_lines)
    fig.text(0.01, 0.01, notes_text, ha='left', va='bottom', fontsize=9)

    return fig


def create_cover_page(well, well_records):
    fig = plt.figure(figsize=(11.7, 8.3))
    ax = fig.add_subplot(111)
    ax.axis('off')
    fig.suptitle(f"Скважина {well}: сводка по отмеченным интервалам", fontsize=20, fontweight='bold')

    lines = []
    for idx, rec in enumerate(well_records, 1):
        period = f"{rec['start']:%d.%m.%Y %H:%M} – {rec['end']:%d.%m.%Y %H:%M}"
        status = rec['status']
        param = rec.get('parameter') or 'не указан'
        summary = rec.get('comment') or rec.get('overview') or 'Комментарий отсутствует.'
        summary = fill(summary, width=100)
        lines.append(
            f"{idx}. {status}\n"
            f"   Период: {period}\n"
            f"   Ключевой параметр: {param}\n"
            f"   Описание: {summary}"
        )

    text_body = '\n\n'.join(lines)

    ax.text(0.02, 0.94, text_body, transform=ax.transAxes,
            va='top', ha='left', fontsize=11, linespacing=1.4)
    return fig


def generate_well_reports(data_path, excel_path, output_dir):
    output_dir = Path(output_dir)
    records = collect_intervals(excel_path)
    print(f"Интервалы из Excel: {len(records)}")

    df = pd.read_parquet(data_path)
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df['well_number'] = df['well_number'].astype(str)

    print(f"Доступно скважин в данных: {df['well_number'].nunique()}")

    grouped_records = {}
    for record in records:
        grouped_records.setdefault(record['well_number'], []).append(record)

    generated = []
    skipped = []

    for well, well_records in grouped_records.items():
        well_df = df[df['well_number'] == well].copy()
        if well_df.empty:
            skipped.append(well)
            continue
        well_records.sort(key=lambda r: r['start'])
        pdf_path = output_dir / f'скважина_{well}_диагностика.pdf'
        with pdf_backend.PdfPages(pdf_path) as pdf:
            cover = create_cover_page(well, well_records)
            pdf.savefig(cover)
            plt.close(cover)
            for record in well_records:
                fig = render_interval_figure(well_df, record)
                if fig is None:
                    continue
                pdf.savefig(fig)
                plt.close(fig)
        generated.append(pdf_path)
        print(f"Сохранён отчёт: {pdf_path}")

    return generated, skipped


def list_generated_pdfs(folder):
    folder = Path(folder)
    pdf_paths = sorted(folder.glob('*.pdf'))
    for pdf_path in pdf_paths:
        size_mb = pdf_path.stat().st_size / (1024 * 1024)
        print(f"{pdf_path.name} — {size_mb:.2f} МБ")
    return [p.name for p in pdf_paths]


In [None]:
generated, skipped = generate_well_reports(DATA_PATH, EXCEL_PATH, OUTPUT_DIR)

print('\nГотово. Сводка:')
print(f"  сформировано файлов: {len(generated)}")
if skipped:
    print(f"  пропущено (нет данных): {', '.join(sorted(skipped))}")

list_generated_pdfs(OUTPUT_DIR)
