# Демонстрация использования класса `SPEFAnalyzer`

Этот ноутбук показывает, как использовать класс `SPEFAnalyzer` из `analysis.spef` для обнаружения однофазных замыканий на землю (ОЗЗ) в осциллограммах.

Процесс включает:
- Загрузку осциллограмм с использованием `Oscillogram`.
- Нормализацию данных с помощью `OscillogramNormalizer`.
- Разделение данных по шинам/системам с помощью `OscillogramToCsvConverter` (используется его метод `split_buses`).
- Применение логики `SPEFAnalyzer` для детектирования ОЗЗ.

In [None]:
import os
import shutil
import pandas as pd
import numpy as np
import json
import sys
import csv

# Настройка путей для импорта модулей проекта
module_path = os.path.abspath(os.path.join(os.getcwd(), '..')) 
if module_path not in sys.path:
    sys.path.append(module_path)

from core.oscillogram import Oscillogram
from normalization.normalization import OscillogramNormalizer
from raw_to_csv.raw_to_csv import OscillogramToCsvConverter 
from analysis.spef import SPEFAnalyzer

# --- Создание временных директорий ---
base_sample_dir_spef = "temp_sample_data_spef"
source_dir_spef = os.path.join(base_sample_dir_spef, "source_oscillograms_spef")
output_dir_spef = os.path.join(base_sample_dir_spef, "spef_analysis_output")

if os.path.exists(base_sample_dir_spef):
    shutil.rmtree(base_sample_dir_spef)
os.makedirs(source_dir_spef)
os.makedirs(output_dir_spef)

# --- Вспомогательная функция для создания CFG/DAT ---
def create_spef_sample_comtrade(path, name, analog_channels_data, freq=50.0, samp_rate=1000.0, dat_rows=200):
    cfg_content = f"SampleStation,SampleDevice,1999\n"
    cfg_content += f"{len(analog_channels_data)},{len(analog_channels_data)}A,0D\n"
    for idx, (ch_name, data_series) in enumerate(analog_channels_data):
        # primary_val=1, secondary_val=1, a=1, b=0 means DAT values are physical
        cfg_content += f"{idx+1},{ch_name},A,,V,1.0,0.0,0,-32767,32767,1,1,P\n"
    cfg_content += f"{freq}\n1\n{samp_rate},{dat_rows}\n"
    cfg_content += "01/01/2023,00:00:00.000000\n01/01/2023,00:00:00.000000\nASCII\n1.0\n"
    with open(os.path.join(path, f"{name}.cfg"), "w", encoding="utf-8") as f: f.write(cfg_content)

    dat_content = ""
    for i in range(dat_rows):
        dat_content += f"{i+1},{int(i*(1000000/samp_rate))}"
        for _, data_series in analog_channels_data:
            dat_content += f",{data_series[i % len(data_series)]:.6f}" # Format to float
        dat_content += "\n"
    with open(os.path.join(path, f"{name}.dat"), "w", encoding="utf-8") as f: f.write(dat_content)

# --- Создание файлов осциллограмм ---
time_points = np.linspace(0, 2*np.pi*10, 200) # 10 periods for 200 points (samp_rate=1000, freq=50 -> 20 points/period)
nominal_phase_peak = 10000.0 * np.sqrt(2) / np.sqrt(3) # Example: 10kV line-to-line

# Файл 1: Явное ОЗЗ по UN BB (U0)
# Нормализованные данные должны быть около 1.0 PU. SPEFAnalyzer ожидает нормализованные данные.
# SPEF_THRESHOLD_Un (e.g. 0.05/3 = 0.016 PU). So, un_spef_signal should be in PU.
un_spef_signal_pu = np.concatenate([np.zeros(50), np.sin(time_points[:100])*0.2, np.zeros(50)]) # 0.2 PU U0
create_spef_sample_comtrade(source_dir_spef, "osc_spef_un", [
    ("U | BusBar-1 | phase: A", np.sin(time_points)*1.0), # Assume these are already 1 PU peak
    ("U | BusBar-1 | phase: B", np.sin(time_points - 2*np.pi/3)*1.0),
    ("U | BusBar-1 | phase: C", np.sin(time_points + 2*np.pi/3)*1.0),
    ("U | BusBar-1 | phase: N", un_spef_signal_pu) 
])
# Файл 2: ОЗЗ по фазным напряжениям (Ua повышено, Ub, Uc понижены) - PU values
ua_spef_pu = np.concatenate([np.sin(time_points[:50])*1.0, np.sin(time_points[50:150])*1.5, np.sin(time_points[150:])*1.0])
ub_spef_pu = np.concatenate([np.sin(time_points[:50] - 2*np.pi/3)*1.0, np.sin(time_points[50:150] - 2*np.pi/3)*0.5, np.sin(time_points[150:] - 2*np.pi/3)*1.0])
uc_spef_pu = np.concatenate([np.sin(time_points[:50] + 2*np.pi/3)*1.0, np.sin(time_points[50:150] + 2*np.pi/3)*0.5, np.sin(time_points[150:] + 2*np.pi/3)*1.0])
create_spef_sample_comtrade(source_dir_spef, "osc_spef_phases", [
    ("U | CableLine-1 | phase: A", ua_spef_pu),
    ("U | CableLine-1 | phase: B", ub_spef_pu),
    ("U | CableLine-1 | phase: C", uc_spef_pu)
])
# Файл 3: Нормальный режим (без ОЗЗ) - PU values
create_spef_sample_comtrade(source_dir_spef, "osc_normal", [
    ("U | BusBar-1 | phase: A", np.sin(time_points)*1.0),
    ("U | BusBar-1 | phase: B", np.sin(time_points - 2*np.pi/3)*1.0),
    ("U | BusBar-1 | phase: C", np.sin(time_points + 2*np.pi/3)*1.0)
])

# --- Создание sample_norm_coef.csv ---
# Values in _base columns should be physical phase RMS for VALID_NOMINAL_VOLTAGES check
# OscillogramNormalizer will use these to make PU values (e.g., by dividing by them if factor is 1)
physical_phase_nominal_rms = 10000.0 / np.sqrt(3)
norm_data = [
    ['name', 'norm', '1Ub_PS', '1Ub_base', '1Uc_PS', '1Uc_base'],
    ['osc_spef_un', 'YES', 's', physical_phase_nominal_rms, 's', physical_phase_nominal_rms],
    ['osc_spef_phases', 'YES', 's', physical_phase_nominal_rms, 's', physical_phase_nominal_rms],
    ['osc_normal', 'YES', 's', physical_phase_nominal_rms, 's', physical_phase_nominal_rms]
]
sample_norm_coef_path = os.path.join(output_dir_spef, "sample_norm_coef.csv")
with open(sample_norm_coef_path, "w", newline="", encoding="utf-8") as f: csv.writer(f).writerows(norm_data)

# --- Создание JSON для OscillogramToCsvConverter ---
analog_names_json_path = os.path.join(base_sample_dir_spef, "dict_analog_names.json")
discrete_names_json_path = os.path.join(base_sample_dir_spef, "dict_discrete_names.json")
analog_content = {
    "bus1": { 
        "U_BusBar_A": ["U | BusBar-1 | phase: A"], "U_BusBar_B": ["U | BusBar-1 | phase: B"],
        "U_BusBar_C": ["U | BusBar-1 | phase: C"], "U_BusBar_N": ["U | BusBar-1 | phase: N"],
        "U_CableLine_A": ["U | CableLine-1 | phase: A"], "U_CableLine_B": ["U | CableLine-1 | phase: B"],
        "U_CableLine_C": ["U | CableLine-1 | phase: C"]
    }
}
with open(analog_names_json_path, "w", encoding="utf-8") as f: json.dump(analog_content, f)
with open(discrete_names_json_path, "w", encoding="utf-8") as f: json.dump({}, f) 

print(f"Создана директория с примерами: {source_dir_spef}")
print(f"Создан файл коэффициентов нормализации: {sample_norm_coef_path}")
print(f"Созданы JSON-файлы для имен сигналов.")

## 1. Инициализация компонентов

In [None]:
spef_config = {
    'VALID_NOMINAL_VOLTAGES': {round(10000.0/np.sqrt(3), 2)}, # Phase RMS voltage in Volts
    'SPEF_THRESHOLD_U0': 0.06,  # PU (based on phase nominal RMS = 1 PU)
    'SPEF_THRESHOLD_Un': 0.05, # PU
    'SPEF_MIN_DURATION_PERIODS': 1, 
    'SIMILAR_AMPLITUDES_FILTER_ENABLED': True,
    'SIMILAR_AMPLITUDES_MAX_RELATIVE_DIFFERENCE': 0.15, 
    'verbose': True, # Enable verbose output from SPEFAnalyzer methods
    'norm_yes_phrase': 'YES' # To match 'norm' column in sample_norm_coef.csv
}

normalizer = OscillogramNormalizer(norm_coef_file_path=sample_norm_coef_path, is_print_message=True)

bus_splitter = OscillogramToCsvConverter(
    normalizer=normalizer, 
    raw_path=source_dir_spef, 
    csv_path=output_dir_spef,
    dict_analog_names_path=analog_names_json_path,
    dict_discrete_names_path=discrete_names_json_path,
    is_print_message=True
)

norm_coeffs_df_loaded = pd.read_csv(sample_norm_coef_path)
analyzer = SPEFAnalyzer(
    config=spef_config,
    normalizer=normalizer,
    bus_splitter=bus_splitter,
    norm_coef_df=norm_coeffs_df_loaded
)
print("\nSPEFAnalyzer и зависимости инициализированы.")

## 2. Анализ директории на ОЗЗ (`analyze_directory`)

In [None]:
output_spef_report_csv = os.path.join(output_dir_spef, "spef_report.csv")
error_log_spef_path = os.path.join(output_dir_spef, "spef_errors.log")

print(f"\nЗапуск анализа директории: {source_dir_spef}")
analyzer.analyze_directory(
    osc_folder_path=source_dir_spef,
    output_csv_path=output_spef_report_csv,
    log_path=error_log_spef_path
)

if os.path.exists(output_spef_report_csv):
    print(f"\nСодержимое отчета о ОЗЗ ({output_spef_report_csv}):")
    try:
        from IPython.display import display
        report_df = pd.read_csv(output_spef_report_csv)
        display(report_df)
    except ImportError:
        print(pd.read_csv(output_spef_report_csv))
    except Exception as e:
        print(f"Ошибка чтения CSV отчета: {e}")
else:
    print(f"Отчет о ОЗЗ не был создан.")

if os.path.exists(error_log_spef_path) and os.path.getsize(error_log_spef_path) > 0:
    print(f"\nСодержимое лога ошибок ({error_log_spef_path}):")
    with open(error_log_spef_path, "r", encoding='utf-8') as f: print(f.read())
else:
    print(f"\nЛог ошибок ({error_log_spef_path}) пуст или не создан.")

## 3. Демонстрация `detect_spef_on_bus_dataframe` (выборочно)

Покажем, как можно использовать внутренний метод `detect_spef_on_bus_dataframe` для анализа данных конкретной шины после их подготовки.

In [None]:
try:
    osc_example_path = os.path.join(source_dir_spef, "osc_spef_un.cfg")
    osc_example = Oscillogram(osc_example_path)
    print(f"Загружен {osc_example.file_hash} для детального анализа.")

    normalized_df_example = normalizer.normalize_bus_signals(
        osc_example.data_frame.copy(), 
        osc_example.file_hash,
        is_print_error=True
    )

    if normalized_df_example is not None:
        all_buses_example_df = bus_splitter.split_buses(
            normalized_df_example, # Normalizer returns df with time as column already
            os.path.basename(osc_example.filepath)
        )
        
        if all_buses_example_df is not None and not all_buses_example_df.empty:
            example_bus_group_name = all_buses_example_df['file_name'].unique()[0]
            example_bus_df = all_buses_example_df[all_buses_example_df['file_name'] == example_bus_group_name]
            
            print(f"Анализ группы шин: {example_bus_group_name}")
            samples_per_period_example = int(osc_example.cfg.sample_rates[0][0] / osc_example.frequency)
            
            is_spef = analyzer.detect_spef_on_bus_dataframe(
                example_bus_df, 
                samples_per_period_example,
                is_print_message=True
            )
            print(f"Результат detect_spef_on_bus_dataframe для {example_bus_group_name}: {is_spef}")
        else:
            print("Не удалось разделить данные по шинам для примера.")
    else:
        print("Не удалось нормализовать данные для примера.")
except Exception as e:
    print(f"Ошибка при детальном анализе: {e}")

## 4. Демонстрация `copy_spef_oscillograms`

In [None]:
copied_spef_dir = os.path.join(output_dir_spef, "copied_spef_oscillograms")

if os.path.exists(output_spef_report_csv):
    print(f"\nКопирование осциллограмм из отчета {output_spef_report_csv} в {copied_spef_dir}...")
    OvervoltageDetector.copy_spef_oscillograms(
        report_csv_path=output_spef_report_csv,
        source_osc_folder_path=source_dir_spef,
        destination_folder_path=copied_spef_dir,
        is_print_message=True
    )
    
    if os.path.exists(copied_spef_dir):
        print("\nСодержимое директории с скопированными ОЗЗ:")
        for item in os.listdir(copied_spef_dir):
            print(f"- {item}")
else:
    print(f"Отчет о ОЗЗ ({output_spef_report_csv}) не найден, копирование не будет выполнено.")

In [None]:
# Очистка временной директории
try:
    if os.path.exists(base_sample_dir_spef):
        shutil.rmtree(base_sample_dir_spef)
        print(f"\nВременная директория {base_sample_dir_spef} удалена.")
except Exception as e:
    print(f"Ошибка при удалении временной директории {base_sample_dir_spef}: {e}")