# Температурный пайплайн (модульный)
*Последнее обновление: 2025-09-07*


In [None]:
# === 1) Импорт и конфигурация ===
import os, io, csv, fnmatch, glob
from typing import List, Tuple, Optional, Dict

import numpy as np
import pandas as pd

try:
    from google.colab import files, drive  # type: ignore
    IN_COLAB = True
except Exception:
    IN_COLAB = False

from IPython.display import display

# --- Конфигурация (меняйте по необходимости) ---
DATE_FORMAT = ''          # '%Y-%m-%d %H:%M:%S' или пусто для авто
REF_IDX = 8               # Эталон T{REF_IDX}
N_FOLLOW = 3              # Кол-во следующих каналов
WINDOW_N = 50             # Длина окна (сек)
STD_THR = 1e-3            # Порог std
MIN_LEN = 50              # Мин. длина интервала
GROUP_BY_FILE = True      # Индексы внутри файла
TRIM_COLUMNS = True       # Обрезать лишние T-колонки
FILE_PATTERNS = ['*.csv', '*.txt']
SAMPLE_K = 1              # Сэмплирование при формировании пар
INTERVAL_MODE = 'by_sensor'  # 'by_sensor' | 'joint'

# Глобальные
DATA = None
STABLE_JOINT = None
STABLE_BY_SENSOR = None
PAIRS_DF = None


In [None]:
# === 2) Загрузка/парсинг ===
def sniff_sep(sample: bytes) -> str:
    try:
        dialect = csv.Sniffer().sniff(sample.decode('utf-8', errors='ignore'),
                                      delimiters=[',',';','\t','|'])
        return dialect.delimiter
    except Exception:
        line = sample.decode('utf-8', errors='ignore').splitlines()[0] if sample else ''
        for cand in [',',';','\t','|']:
            if line.count(cand) >= 1:
                return cand
        return ','

def read_one_table(name: str, stream: io.BytesIO, date_format: Optional[str]=None) -> pd.DataFrame:
    head = stream.read(8192); stream.seek(0)
    sep = sniff_sep(head)
    df = pd.read_csv(stream, sep=sep, engine='python')
    if df.shape[1] < 17:
        raise ValueError(f"{name}: найдено {df.shape[1]} столбцов, требуется >= 17 (1 дата + 16 температур).")
    df = df.iloc[:, :17].copy()
    df.columns = ['date'] + [f'T{i}' for i in range(16)]
    if date_format and date_format.strip():
        df['date'] = pd.to_datetime(df['date'], format=date_format, errors='coerce')
    else:
        df['date'] = pd.to_datetime(df['date'], infer_datetime_format=True, errors='coerce', dayfirst=True)
    if df['date'].isna().any():
        bad = int(df['date'].isna().sum()); print(f"[Предупреждение] {name}: {bad} строк с нераспознанной датой отброшены.")
        df = df.dropna(subset=['date'])
    for c in [f'T{i}' for i in range(16)]:
        df[c] = pd.to_numeric(df[c], errors='coerce')
    df['source_file'] = name
    return df


In [None]:
# === 3) Поиск стабильности ===
from typing import Tuple

def rolling_std_mask(series: pd.Series, window: int, threshold: float) -> pd.Series:
    rs = series.rolling(window=window, min_periods=window).std()
    return (rs <= threshold)

def _segments_from_stable_mask(stable_mask: pd.Series, window: int) -> List[Tuple[int,int]]:
    segs = []; cur = None
    arr = stable_mask.to_numpy()
    for i, ok in enumerate(arr):
        if ok:
            s = max(0, i - window + 1); e = i
            if cur is None: cur = (s,e)
            else:
                cs, ce = cur
                if s <= ce + 1: cur = (cs, max(ce, e))
                else: segs.append(cur); cur = (s,e)
        else:
            if cur is not None: segs.append(cur); cur = None
    if cur is not None: segs.append(cur)
    return segs

def summarize_interval(df: pd.DataFrame, cols: List[str], s: int, e: int) -> Dict:
    row = {'start_idx': int(s), 'end_idx': int(e), 'length': int(e - s + 1)}
    row['start_date'] = pd.to_datetime(df.loc[s, 'date']) if s < len(df) else pd.NaT
    row['end_date']   = pd.to_datetime(df.loc[e, 'date']) if e < len(df) else pd.NaT
    for c in cols:
        vals = df[c].to_numpy()[s:e+1]; good = ~np.isnan(vals)
        row[f'mean_{c}'] = float(np.nanmean(vals)) if good.any() else np.nan
        row[f'std_{c}']  = float(np.nanstd(vals, ddof=1)) if good.sum()>1 else np.nan
    return row

def detect_stability(data: pd.DataFrame, ref_idx: int, follow_idxs: List[int],
                     window: int, threshold: float, min_len: int, group_by_file: bool=True) -> Tuple[pd.DataFrame, pd.DataFrame]:
    ref_col = f'T{ref_idx}'; follow_cols = [f'T{i}' for i in follow_idxs]
    for c in [ref_col]+follow_cols:
        if c not in data.columns: raise ValueError(f"Нет колонки: {c}")
    groups = list(data.groupby('source_file', sort=False)) if group_by_file and 'source_file' in data.columns else [('ALL', data)]
    joint_rows, by_sensor_rows = [], []
    for src, g in groups:
        g = g.reset_index(drop=True)
        mask_ref = rolling_std_mask(g[ref_col], window=window, threshold=threshold)
        masks_others = {c: rolling_std_mask(g[c], window=window, threshold=threshold) for c in follow_cols}
        joint_mask = mask_ref.copy()
        for c in follow_cols: joint_mask &= masks_others[c]
        joint_segs = [(s,e) for (s,e) in _segments_from_stable_mask(joint_mask, window) if (e-s+1)>=int(min_len)]
        for (s,e) in joint_segs:
            row = {'source_file': src, 'ref': ref_col, 'followers': ','.join(follow_cols)}
            row.update(summarize_interval(g, [ref_col]+follow_cols, s, e))
            if src!='ALL':
                first_idx = int(data.index[data['source_file']==src][0])
                row['start_idx_abs'] = row['start_idx']+first_idx; row['end_idx_abs']=row['end_idx']+first_idx
            else:
                row['start_idx_abs']=row['start_idx']; row['end_idx_abs']=row['end_idx']
            joint_rows.append(row)
        for c in follow_cols:
            gated = masks_others[c] & mask_ref
            segs = [(s,e) for (s,e) in _segments_from_stable_mask(gated, window) if (e-s+1)>=int(min_len)]
            for (s,e) in segs:
                row = {'source_file': src, 'sensor': c, 'ref': ref_col}
                row.update(summarize_interval(g, [c, ref_col], s, e))
                if src!='ALL':
                    first_idx = int(data.index[data['source_file']==src][0])
                    row['start_idx_abs'] = row['start_idx']+first_idx; row['end_idx_abs']=row['end_idx']+first_idx
                else:
                    row['start_idx_abs']=row['start_idx']; row['end_idx_abs']=row['end_idx']
                by_sensor_rows.append(row)
    joint = pd.DataFrame(joint_rows).sort_values(['source_file','start_idx']) if joint_rows else pd.DataFrame()
    by_sensor = pd.DataFrame(by_sensor_rows).sort_values(['source_file','sensor','start_idx']) if by_sensor_rows else pd.DataFrame()
    return joint, by_sensor


In [None]:
# === 4) Вариант A: загрузка через диалог (Colab) ===
if not IN_COLAB:
    print('Диалог загрузки доступен только в Google Colab.')
else:
    print('Выберите один или несколько .csv/.txt файлов…')
    uploads = files.upload()
    SELECTED_FILES = [('uploaded:' + name, content) for name, content in uploads.items()]
    IS_BYTES_INPUT = True
    print('Загружено файлов:', len(SELECTED_FILES))


In [None]:
# === 5) Вариант B: выбор папки в Google Drive (Colab) ===
if not IN_COLAB:
    print('Доступ к Google Drive возможен только в Colab.')
else:
    drive.mount('/content/drive')
    ROOT_DIR = '/content/drive/MyDrive'  # при необходимости поменяйте
    found = []
    for pat in FILE_PATTERNS:
        found.extend(glob.glob(os.path.join(ROOT_DIR, '**', pat), recursive=True))
    found = sorted(found)
    print(f'Найдено файлов: {len(found)}')
    SELECTED_FILES = found
    IS_BYTES_INPUT = False


In [None]:
# === 6) Обработка и объединение данных ===
assert 'SELECTED_FILES' in globals(), 'Сначала выполните ячейку 4 или 5.'

frames, errors = [], []
for item in SELECTED_FILES:
    try:
        if IS_BYTES_INPUT:
            name, content = item
            bio = io.BytesIO(content)
            df = read_one_table(name, bio, date_format=DATE_FORMAT or None)
        else:
            path = item
            with open(path, 'rb') as f:
                df = read_one_table(os.path.basename(path), io.BytesIO(f.read()), date_format=DATE_FORMAT or None)
        frames.append(df)
    except Exception as e:
        errors.append((str(item), str(e)))

if not frames:
    raise RuntimeError('Не удалось прочитать ни один файл.')

data = pd.concat(frames, ignore_index=True).sort_values('date').reset_index(drop=True)

follow_idxs = [REF_IDX + i for i in range(1, N_FOLLOW + 1) if REF_IDX + i <= 15]
cols_keep = ['date', f'T{REF_IDX}'] + [f'T{i}' for i in follow_idxs] + ['source_file']
if TRIM_COLUMNS:
    data = data[[c for c in cols_keep if c in data.columns]]

DATA = data
print('Итоговые размеры набора:', data.shape)
print(f'Эталон: T{REF_IDX}; следующие:', [f"T{i}" for i in follow_idxs])
display(data.head(10))

if errors:
    print('\\nОшибки чтения (первые 10):')
    for p, msg in errors[:10]:
        print(' -', p, ':', msg)


In [None]:
# === 7) Поиск стабильных интервалов ===
assert DATA is not None, 'Нет данных DATA.'
follow_idxs = [REF_IDX + i for i in range(1, N_FOLLOW + 1) if REF_IDX + i <= 15]

STABLE_JOINT, STABLE_BY_SENSOR = detect_stability(
    DATA, ref_idx=REF_IDX, follow_idxs=follow_idxs,
    window=WINDOW_N, threshold=STD_THR, min_len=MIN_LEN, group_by_file=GROUP_BY_FILE
)

print('Совместные интервалы:', 0 if STABLE_JOINT is None else len(STABLE_JOINT))
print('По каналам:', 0 if STABLE_BY_SENSOR is None else len(STABLE_BY_SENSOR))
if STABLE_JOINT is not None and not STABLE_JOINT.empty:
    display(STABLE_JOINT.head(10))
if STABLE_BY_SENSOR is not None and not STABLE_BY_SENSOR.empty:
    display(STABLE_BY_SENSOR.head(10))


## 7.1 Отбор плато: самое длинное в пределах ±DEG_TOL °C по эталону


In [None]:
# Параметры отбора
DEG_TOL = globals().get('DEG_TOL', 1.0)                  # допуск по эталону (°C)
SELECT_LONGEST_PER_DEGREE = globals().get('SELECT_LONGEST_PER_DEGREE', True)
CLUSTER_STRATEGY = globals().get('CLUSTER_STRATEGY', 'sliding')  # 'sliding' | 'bucket'

def _interval_ref_level(df, ref_col, s, e):
    vals = df[ref_col].to_numpy()[int(s):int(e)+1]
    return float(np.nanmedian(vals))

def _filter_longest_per_degree(table: pd.DataFrame, data: pd.DataFrame, ref_idx: int, mode: str,
                               deg_tol: float = 1.0, strategy: str = 'sliding') -> pd.DataFrame:
    if table is None or table.empty:
        return table
    ref_col = f"T{ref_idx}"
    tbl = table.copy()
    ref_levels = []
    for _, row in tbl.iterrows():
        src = row['source_file']; s, e = int(row['start_idx']), int(row['end_idx'])
        g = data[data['source_file'] == src].reset_index(drop=True) if src != 'ALL' else data
        ref_levels.append(_interval_ref_level(g, ref_col, s, e))
    tbl['ref_level'] = ref_levels

    group_keys = ['source_file','sensor'] if (mode=='by_sensor' and 'sensor' in tbl.columns) else ['source_file']
    out = []
    for _, grp in tbl.sort_values('ref_level').groupby(group_keys, as_index=False):
        g = grp.sort_values('ref_level').reset_index(drop=True)
        if strategy == 'bucket':
            bins = np.floor(g['ref_level'] / deg_tol).astype(int)
            g = g.assign(_bin=bins)
            keep = g.sort_values('length', ascending=False).groupby('_bin', as_index=False).head(1)
            out.append(keep.drop(columns=['_bin']))
        else:
            i, n = 0, len(g)
            while i < n:
                start_val = g.loc[i, 'ref_level']; j = i
                while j+1 < n and (g.loc[j+1, 'ref_level'] - start_val) <= deg_tol:
                    j += 1
                cluster = g.loc[i:j].copy()
                keep = cluster.sort_values(['length','end_idx'], ascending=[False, False]).iloc[0:1]
                out.append(keep); i = j + 1
    return pd.concat(out, ignore_index=True) if out else tbl

# Применяем к STABLE_* (после шага 7)
try:
    follow_idxs = [REF_IDX + i for i in range(1, N_FOLLOW + 1) if REF_IDX + i <= 15]
    if SELECT_LONGEST_PER_DEGREE and DATA is not None:
        if 'STABLE_BY_SENSOR' in globals() and STABLE_BY_SENSOR is not None and not STABLE_BY_SENSOR.empty:
            STABLE_BY_SENSOR = _filter_longest_per_degree(STABLE_BY_SENSOR, DATA, REF_IDX, mode='by_sensor',
                                                          deg_tol=DEG_TOL, strategy=CLUSTER_STRATEGY)
            print(f"STABLE_BY_SENSOR после отбора → {len(STABLE_BY_SENSOR)} интервалов")
            display(STABLE_BY_SENSOR.head(10))
        if 'STABLE_JOINT' in globals() and STABLE_JOINT is not None and not STABLE_JOINT.empty:
            STABLE_JOINT = _filter_longest_per_degree(STABLE_JOINT, DATA, REF_IDX, mode='joint',
                                                      deg_tol=DEG_TOL, strategy=CLUSTER_STRATEGY)
            print(f"STABLE_JOINT после отбора → {len(STABLE_JOINT)} интервалов")
            display(STABLE_JOINT.head(10))
except NameError:
    print("Сначала выполните шаг 7, затем 7.1.")


In [None]:
# === 8) Формирование пар (X=датчик, Y=эталон) ===
def build_pairs(data: pd.DataFrame,
                stable_joint: Optional[pd.DataFrame],
                stable_by_sensor: Optional[pd.DataFrame],
                ref_idx: int,
                follow_idxs: List[int],
                mode: str = 'by_sensor',
                sample_k: int = 1) -> pd.DataFrame:
    ref_col = f'T{ref_idx}'
    pairs_rows = []
    if mode == 'by_sensor':
        assert stable_by_sensor is not None and not stable_by_sensor.empty, 'Нет STABLE_BY_SENSOR.'
        table = stable_by_sensor
        for src in table['source_file'].unique():
            tsub = table[table['source_file'] == src]
            g = data[data['source_file'] == src].reset_index(drop=True) if src != 'ALL' else data
            for _, row in tsub.iterrows():
                sensor = row['sensor']
                s, e = int(row['start_idx']), int(row['end_idx'])
                x = g[sensor].to_numpy()[s:e+1]; y = g[ref_col].to_numpy()[s:e+1]; dates = g['date'].to_numpy()[s:e+1]
                mask = (~np.isnan(x)) & (~np.isnan(y))
                x, y, dates = x[mask], y[mask], dates[mask]
                if sample_k > 1 and len(x) > 0:
                    x, y, dates = x[::sample_k], y[::sample_k], dates[::sample_k]
                for xi, yi, ti in zip(x, y, dates):
                    pairs_rows.append({'source_file': src, 'sensor': sensor, 'ref': ref_col,
                                       'x_sensor': float(xi), 'y_ref': float(yi), 'date': pd.to_datetime(ti)})
    else:
        assert stable_joint is not None and not stable_joint.empty, 'Нет STABLE_JOINT.'
        table = stable_joint
        follow_cols = [f'T{i}' for i in follow_idxs]
        for src in table['source_file'].unique():
            tsub = table[table['source_file'] == src]
            g = data[data['source_file'] == src].reset_index(drop=True) if src != 'ALL' else data
            for _, row in tsub.iterrows():
                s, e = int(row['start_idx']), int(row['end_idx'])
                for sensor in follow_cols:
                    x = g[sensor].to_numpy()[s:e+1]; y = g[ref_col].to_numpy()[s:e+1]; dates = g['date'].to_numpy()[s:e+1]
                    mask = (~np.isnan(x)) & (~np.isnan(y))
                    x, y, dates = x[mask], y[mask], dates[mask]
                    if sample_k > 1 and len(x) > 0:
                        x, y, dates = x[::sample_k], y[::sample_k], dates[::sample_k]
                    for xi, yi, ti in zip(x, y, dates):
                        pairs_rows.append({'source_file': src, 'sensor': sensor, 'ref': ref_col,
                                           'x_sensor': float(xi), 'y_ref': float(yi), 'date': pd.to_datetime(ti)})
    return pd.DataFrame(pairs_rows)

assert DATA is not None, 'Нет данных DATA.'
follow_idxs = [REF_IDX + i for i in range(1, N_FOLLOW + 1) if REF_IDX + i <= 15]
PAIRS_DF = build_pairs(DATA, STABLE_JOINT, STABLE_BY_SENSOR, REF_IDX, follow_idxs,
                       mode=INTERVAL_MODE, sample_k=SAMPLE_K)

print('Сформировано пар:', 0 if PAIRS_DF is None else len(PAIRS_DF))
if PAIRS_DF is not None and not PAIRS_DF.empty:
    for sensor in list(PAIRS_DF['sensor'].unique())[:2]:
        print(f'\n{sensor} (пример):')
        display(PAIRS_DF[PAIRS_DF['sensor']==sensor].head(8))


In [None]:
# === 9) Сохранение результатов ===
assert DATA is not None, 'Нет DATA.'
out_dir = '/content' if IN_COLAB else os.getcwd()

DATA.to_csv(os.path.join(out_dir, 'combined_temperatures.csv'), index=False)
if STABLE_JOINT is not None and not STABLE_JOINT.empty:
    STABLE_JOINT.to_csv(os.path.join(out_dir, 'stable_intervals_joint.csv'), index=False)
if STABLE_BY_SENSOR is not None and not STABLE_BY_SENSOR.empty:
    STABLE_BY_SENSOR.to_csv(os.path.join(out_dir, 'stable_intervals_by_sensor.csv'), index=False)
if PAIRS_DF is not None and not PAIRS_DF.empty:
    PAIRS_DF.to_csv(os.path.join(out_dir, 'ref_pairs_by_sensor.csv'), index=False)
    cnt = 0
    for sensor, g in PAIRS_DF.groupby('sensor'):
        g.to_csv(os.path.join(out_dir, f'ref_pairs_{sensor}.csv'), index=False)
        cnt += 1
    print(f'Сохранено файлов пар по датчикам: {cnt}')
print('Файлы записаны в:', out_dir)


In [None]:
# === 10) (опционально) Быстрый график ===
try:
    import matplotlib.pyplot as plt
    if PAIRS_DF is not None and not PAIRS_DF.empty:
        one = PAIRS_DF[PAIRS_DF['sensor'] == PAIRS_DF['sensor'].unique()[0]].head(1000)
        plt.figure()
        plt.scatter(one['x_sensor'], one['y_ref'], s=10)
        plt.xlabel('Sensor (X)'); plt.ylabel('Reference (Y)')
        plt.title(f"Scatter: {one['sensor'].iloc[0]} vs ref")
        plt.show()
    else:
        print('Нет данных PAIRS_DF для графика.')
except Exception as e:
    print('График пропущен:', e)
