In [None]:
import numpy as np
import pandas as pd
from typing import List
from detecta import detect_onset
import statsmodels.api as sm
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Функции

## Сегментация сигнала

In [None]:
def segmentation_internal(dataframe: pd.DataFrame) -> List[pd.DataFrame]:
    """
    Функция для внутренней сегментации в define_start_end_time().
    
    Функция сгементирует входной датафрейм с помощью detect_onset() по заданному порогу. 
    detect_onset() выдает индексы значений, больше заданного порога.
    В list_with_segments добавляются рабочие сегменты (датафреймы с рабочими значениями параметра и первой нулевой точкой).
    
    :param dataframe: датафрейм телеметрии управляющего параметра
    
    :return: список с сегментами работы
    """
    list_with_segments = [] # Список с рабочими сегментами работы скважины в режиме ПКВ
    
    list_with_indexes_of_segm = detect_onset(dataframe.val, 1)
    
    for segment in list_with_indexes_of_segm:
        list_with_segments.append(dataframe.iloc[segment[0]-1:segment[1]+1])
    
    return list_with_segments

In [None]:
def define_start_end_time(df_one_well_freq: pd.DataFrame) -> List[List[pd.Timestamp]]:
    """
    Функция для определения начала сегмента, начала и конца рабочего инетварала по управляющему параметру на сегменте.
    
    Функция сегментирует управляющий параметр на сегменты работы в segmentation_internal().
    
    По полученной разметке выбирается три времени:
    1. Время начала сегмента (нулевая точка).
    2. Время начала рабочего интервала в сегменте (начало ненулевой "полки" по упраляющему параметр)
    3. Время конца рабочего интервала в сегменте (начало ненулевой "полки" по упраляющему параметр)
    
    :param df_one_well_freq: датафрейм с парметрами телеметрии частоты с одной скважины за заданный временной интервал
    
    :return: список с временной разметкой для каждого рабочего сегмента [start_time, start_work_time, end_work_time]     
    """
    list_start_end_time = []
    
    list_with_segments_freq = segmentation_internal(df_one_well_freq)
    
    for segm_freq in list_with_segments_freq:
        if len(segm_freq) != 0:
            work_median = segm_freq['val'].median()
            start_time = segm_freq.index[0]
            start_work_time = segm_freq[segm_freq['val'] > work_median * 0.95].index[0]
            end_work_time = segm_freq[segm_freq['val'] > work_median * 0.95].index[-1]
            list_start_end_time.append([start_time, 
                                        start_work_time, 
                                        end_work_time])
    
    return list_start_end_time

In [None]:
def segmentation_val(dataframe: pd.DataFrame, list_start_end_time: list) -> pd.DataFrame:
    """
    Функция для сегемнтации анализируемого параметра телеметрии по разметке времени.
    Функция позволяет отсегментировать любой параметр телеметрии скважины в режиме ПКВ на рабочие участки.
    Далее каждый рабочий сегмент для трендирования заменяется на одну точку (медианное сегмента или максимальное сегмента).
    
    :param dataframe: датафрейм с данными телеметрии анализируемого параметра
    :param list_start_end_time: список с временной разметкой для каждого рабочего сегмента [start_time, start_work_time, end_work_time]

    :return: датафрейм с точками для трендиров
    """
    df_result_val_for_trend = pd.DataFrame()
    dataframe.set_index("dt", inplace=True)
    dataframe.index = pd.to_datetime(dataframe.index)
    dataframe = dataframe.resample("1s").mean().interpolate(method="linear")
    
    for i, segment in enumerate(list_start_end_time):
        if i in [0, len(list_start_end_time) - 1]:
            continue
        if segment[1] in dataframe.index and segment[2] in dataframe.index:
            temp_df = dataframe.loc[segment[1]:segment[2]]
            temp_df = temp_df[int(0.2 * len(temp_df)):] # Избавляемся от пускового эффекта
            # Если одна точка в цикле или каждая следующая точка больше предыдущей, то берем максимальную точку
            if all(temp_df.val.diff().dropna() >= 0):
                result_point = temp_df.val.max()
            # Если точек > 1 и следующая точка не факт, что больше предыдущей, то берем медианную точку
            else:
                result_point = temp_df.val.median()
            if result_point == 0:
                continue
            temp_df = temp_df[:1]
            temp_df.val = result_point
            df_result_val_for_trend = pd.concat([df_result_val_for_trend,temp_df])
    return df_result_val_for_trend

## Валидация данных

In [None]:
def validate_values(data: pd.DataFrame, 
                    param_id: int, 
                    well_id: int, 
                    dt_from: str, 
                    dt_to: str, 
                    df_regime: pd.DataFrame, 
                    up_limit: int = 250, 
                    low_limit: int = 10
                   ):
    """
    Функция производит подготовку данных для алгоритма трендирования.
    Если скважина работает в ПКВ, то производится сегментация рабочих участков.
    Если скважина работает в ПДФ, то фильтруются нефизичные значения параметра.
    
    :param data: датафрейм с данными телеметрии
    :param param_id: параметр телеметрии для анализа
    :param well_id: номер скважины для анализа
    :param dt_from: время начала анализа (включительно)
    :param dt_to: время конца анализа (не включительно)
    :param df_regime: датафрейм с данными о режиме работы скважины
    :param up_limit: верхняя граница
    :param low_limit: нижняя граница
    
    :return df_val, regime: отвалидированные данные, режим работы скважины
    """
    
    # Из всего датафрейма отбираем одну скважину и один параметр для анализа за указанный временной интервал
    df = data.copy()
    df = df[df.well_id == well_id]
    df_from_to = df[(df.dt > dt_from) & (df.dt < dt_to)]
    df_val = df_from_to.loc[(df_from_to["param_id"] == param_id)].sort_values("dt")
    df_val = df_val.dropna(subset=["val"])
    
    # Определение режима работы скважины (ПКВ, ПДФ и тд.)
    if df_regime[(df_regime.well_id == well_id) & (df_regime.dt_from == dt_from)]['well_regime'].empty:
        if df_regime[(df_regime.well_id == well_id)]['well_regime'].empty:
            regime = 0
        else:
            regime = int(df_regime[(df_regime.well_id == well_id)]['well_regime'].median())
    else:
        regime = int(df_regime[(df_regime.well_id == well_id) & (df_regime.dt_from == dt_from)]['well_regime'].values)
    regime = regime_dict[regime]
    
    # Валидация в зависимости от режима и параметра телеметрии
    if regime in ['ПКВ'] and param_ids[param_id] in ['Загрузка', 'Частота']:
        # Получение датафрейма по частоте
        df_freq = df_from_to[df_from_to.param_id == 220]
        df_freq = df_freq.dropna(subset=["val"])
        df_freq.set_index('dt',inplace=True)
        df_freq = df_freq.sort_index()
        list_start_end_time = define_start_end_time(df_freq) # Список работы
        df_val = segmentation_val(df_val, list_start_end_time) # Сегментация анализируемого параметра
    else:
        df_val.set_index("dt", inplace=True)
        if param_ids[param_id] == 'Давление на приеме':
            df_val['val'].where(~((df_val.val > up_limit) | (df_val.val < low_limit)), other=None, inplace=True)
        if param_ids[param_id] == 'Температура двигателя ПЭД':
             df_val['val'].where(~((df_val.val > up_limit) | (df_val.val < low_limit)), other=None, inplace=True)
    return df_val, regime

## Нахождение значений тренда для анализируемого параметра

In [None]:
def preprocessing_trends(df: pd.DataFrame, 
                         window: int, 
                         regime: str, 
                         lambda_HP: int, 
                         param_id: int
                        ):
    """
    Функция находит значения тренда для анализируемого параметра телметрии.
    В функции используется два метода:
    1. Оконный медианный фильтр
    2. Фильтр Ходрика-Прескотта
    
    :param df: датафрейм с отвалидироваными данными телеметрии (данные после validate_values())
    :param window: скользящее окно для метода скользящего медианного
    :param regime: режим работы скважины
    :param lambda_HP: значение параметра для фильтра Ходрика-Прескотта (43200,129600)
    :param param_id: параметр телеметрии для анализа
    
    :return df, x_trend: отвалидированные данные, данные тренда
    """
    df['tm_value_median'] = df.val.rolling(window = window).median() # Rolling median

    df = df.dropna() # Убираем None значения
    
    # Если рассматриваем скважины в режиме ПКВ (ПКВ с ЧЧ) и параметры загрузка, частота, то ресэмплим
    if regime == 'ПКВ' and param_ids[param_id] in ['Загрузка', 'Частота']:
        df = df.resample('10s').mean().interpolate(method='linear')
    
    cycle, target_med = sm.tsa.filters.hpfilter(df['tm_value_median'], lamb = lambda_HP) # Hodrick-Prescott filter
    df.insert(4, 'HP_filter_med', target_med.values) 
    x_trend = df['HP_filter_med'] # Полученный тренд
    return df, x_trend

## Кумулятивные суммы

In [None]:
def detect_cusum(list_for_analize: list, threshold: float = 1, drift: float = 0, ending: bool = False):
    """
    Метод для выявления резких изменений данных с использованием метода кумулятивных сумм.
    
    drift - величина позволяющая корректировать длину тренда (его начало временное начало и конец).
    drift - может быть нулем или любым положительным числом.
    По дефолту drift = 0,  с увеличением drift длина тренда уменьшается (так как кумулятивная сумма доходит до threshold медленнее)
    

    :param list_for_analize: данные для анализа
    :param threshold: минимальный порог амплитуды изменения данных
    :param drift: величина, предотвращающая попадание незначительных изменений в результат
    :param ending: признак, отвечающий за выявление завершения изменений
    
    :return time_alarm_init_trend, time_alarm_start, time_alarm_final, amplitude: 
    список из индексов, c какого индекса задетектировался тренд,
    список из индексов, соответствующих началу изменения,
    список из индексов, соответствующих завершению изменения,
    амплитуда изменений
    
    :Example
    ---------------------------------------------------------------------------------------------------------------
    time_alarm_init_trend, time_alarm_start, time_alarm_final, amplitude = 
    (
    array([ 1,  2,  4, 10, 11, 13, 15]),
    array([ 0,  1,  3,  4, 10, 11, 14]),
    array([ 1,  3,  4, 10, 11, 14, 17]),
    array([ -98.,  121., -116.,  305., -306.,  114., 9.])
     )
     
     Например начало тренда (time_alarm_start) было на 1 индексе, а конец тренда (time_alarm_final) был на 3 индексе,
     но первое детектирование тренда (time_alarm_init_trend) произошло на индексе 2. При этом изменение параметра считается,
     как значение на time_alarm_final - значение на time_alarm_start (это изменение является амлитудой (amplitude) и равно 121)
    """
    
    # Числовой ряд для анализа
    list_for_analize = np.array(list_for_analize)
    list_for_analize = np.atleast_1d(list_for_analize).astype("float64")
    
    # Инициализация масивов с позитивными и негативными изменениями (массивы с кумулятивными суммами)
    positive_changes, negative_changes = np.zeros(list_for_analize.size), np.zeros(list_for_analize.size)
    
    # Инициализация массивов временами изменений
    time_alarm_init_trend, time_alarm_start, time_alarm_final = np.array([[], [], []], dtype=int)
    time_alarm_positive, time_alarm_negative = 0, 0
    # Амплитуда изменения
    amplitude = np.array([])
    
    # Цикл по индексам анализируемого числового ряда
    for i in range(1, list_for_analize.size):
        difference = list_for_analize[i] - list_for_analize[i - 1] # Находим разницу между двумя соседними точками
        positive_changes[i] = positive_changes[i - 1] + difference - drift
        negative_changes[i] = negative_changes[i - 1] - difference - drift
        # Инициализация трендов
        if positive_changes[i] < 0:
            positive_changes[i], time_alarm_positive = 0, i
        if negative_changes[i] < 0:
            negative_changes[i], time_alarm_negative = 0, i
        if positive_changes[i] > threshold or negative_changes[i] > threshold:
            time_alarm_init_trend = np.append(time_alarm_init_trend, i)
            time_alarm_start = np.append(time_alarm_start, time_alarm_positive if positive_changes[i] > threshold else time_alarm_negative)
            
            positive_changes[i], negative_changes[i] = 0, 0
            
    # Ищем время конца тренда и амплитуду
    if time_alarm_start.size and ending:
        _, time_alarm_start_2, _, _ = detect_cusum(list_for_analize[::-1], threshold, drift)
        time_alarm_final = list_for_analize.size - time_alarm_start_2[::-1] - 1
        time_alarm_start, ind = np.unique(time_alarm_start, return_index=True)
        time_alarm_init_trend = time_alarm_init_trend[ind]
        if time_alarm_start.size != time_alarm_final.size:
            if time_alarm_start.size < time_alarm_final.size:
                time_alarm_final = time_alarm_final[[np.argmax(time_alarm_final >= i) for i in time_alarm_init_trend]]
            else:
                ind = [np.argmax(i >= time_alarm_start[::-1]) - 1 for i in time_alarm_final]
                time_alarm_init_trend = time_alarm_init_trend[ind]
                time_alarm_start = time_alarm_start[ind]
        ind = time_alarm_final[:-1] - time_alarm_start[1:] > 0
        if ind.any():
            time_alarm_init_trend = time_alarm_init_trend[~np.append(False, ind)]
            time_alarm_start = time_alarm_start[~np.append(False, ind)]
            time_alarm_final = time_alarm_final[~np.append(ind, False)]
        amplitude = list_for_analize[time_alarm_final] - list_for_analize[time_alarm_start]
        
    return time_alarm_init_trend, time_alarm_start, time_alarm_final, amplitude

In [None]:
def use_cusum(df: pd.DataFrame,
    well_id: int,
    param_id: int,
    dt_from: str,
    dt_to: str,
    regime: str,
    drift: float,
    threshold: int = 5,
    range_med: int = 3,
    median_window: int = 3,
    lambda_HP: int = 43200,
    ) -> list[dict] and pd.DataFrame:
    """
    Основная функция по анализу тренда параметра на приеме с помощью Метода накопленных сумм CUSUM с
    дополнительными обертками

    Данная функция позволяет определять тренда для скважин, работающих в режиме ПКВ и ПДФ.
    Можно находить тренды и направление трендов по следующим параметрыам:
    1. Загрузка
    2. Частота
    3. Давление на приеме
    4. Тепература двигателя ПЭД
    5. Сопротивление изоляции
    
    :param input_dataframe: данные для анализа
    :param well_id: номер скважины
    :param param_id: номер параметра
    :param threshold: порог изменения параметра
    :param range_med: максимальная медианная дискретность данных по величине
    :param median_window: величина медианного окна (5,7,9)
    :param lambda_HP: значение параметра для фильтра Ходрика-Прескотта (43200,129600)

    :return: список трендов
    """

    # Проверка на количество None в данных и дискретность данных
    if df.empty or int(df[['val']].isnull().sum()) == len(df[['val']]) or len(df['val']) < (median_window + 2):
        return [], df

    if regime in ['ПКВ'] and param_ids[param_id] in ['Частота', 'Загрузка']:
        df, x = preprocessing_trends(df, 3, regime, 43200, param_id)
    else:
        df, x = preprocessing_trends(df, 5, regime, 43200, param_id)

    ta, tai, taf, amp = detect_cusum(x, threshold, drift, True)
    result_trend = np.zeros(shape=len(df['HP_filter_med']))

    for left_border, right_border, cusum in zip(tai, taf, amp):
        if cusum > 0:
            trend_direction = 1
        else:
            trend_direction = -1
        for i in range(left_border, right_border - 1):
            result_trend[i] = trend_direction

    trend_periods = np.stack([df['HP_filter_med'].index.values[tai],df['HP_filter_med'].index.values[taf]], axis=1)
    positive_trend_periods = trend_periods[amp > 0]
    negative_trend_periods = trend_periods[amp < 0]

    result = [
        {
        "well_id": well_id,
        "param_id_well": param_id,
        "param_id": 'positive',
        "dt_from": pd.Timestamp(i[0]).to_pydatetime(),
        "dt_to": pd.Timestamp(i[1]).to_pydatetime(),
        }
    for i in positive_trend_periods
    ]

    result += [
        {
        "well_id": well_id,
        "param_id_well": param_id,
        "param_id": 'negative',
        "dt_from": pd.Timestamp(i[0]).to_pydatetime(),
        "dt_to": pd.Timestamp(i[1]).to_pydatetime(),
        }
    for i in negative_trend_periods
    ]

    return result, df

# Основая функция трендирования

In [None]:
def start_analysis(data: pd.DataFrame, 
                   param_id: int, 
                   well_id: int, 
                   dt_from: str, 
                   dt_to: str, 
                   df_regime: pd.DataFrame
                  ) -> list and pd.DataFrame:
    """
    Функция для определения трендов в данных параметров телеметрии скважин, работающих в режимах ПКВ и ПДФ.
    
    :param data: датафрейм с данными телеметрии
    :param param_id: параметр телеметрии для анализа
    :param well_id: номер скважины для анализа
    :param dt_from: время начала анализа (включительно)
    :param dt_to: время конца анализа (не включительно)
    :param df_regime: датафрейм с данными о режиме работы скважины
    
    :return trends, df: список трендов, датафрейм с данными тренда
    """

    data_for_trend, regime = validate_values(data, param_id, well_id, dt_from, dt_to, df_regime) # Валидация данных

    threshold = settings_threshold[param_id][regime] # Установка гиперпараметра для use_cusum()

    drift = drifts[param_id] # Установка гиперпараметра для use_cusum()
    
    trends, df = use_cusum(data_for_trend, well_id, param_id, dt_from, dt_to, regime, drift=drift, threshold=threshold)
    
    # Вывод информации для себя (потом удалить)
    print(f"""Скважина: {well_id}\nВремя анализа: {dt_from} - {dt_to}
Режим работы: {regime}\nПараметр: {param_ids[param_id]} ({param_id})
Threshold: {threshold}\nDrift: {drift}""")
    
    return trends, df

# Анализ трендов

In [None]:
def analize_trend(trends: list, 
                  trends_freq: list
                 ) -> list[dict]:
    """
    Функция анализирует полученные тренды с помощью сравненения с трендом по управляющему параметру.
    
    :param trends: тренд по анализируемому параметру
    :param trends_freq: тренд по управляющему параметру
    
    :return trends, df: список трендов, датафрейм с данными тренда
    """
    all_times = []
    
    final_trends = []
    final_trends_freq = []
    
    for trend in trends:
        trend_copy = trend.copy()
        final_trends.append(trend_copy)
    
    for trend in trends_freq:
        trend_copy = trend.copy()
        final_trends_freq.append(trend_copy)
        
    if not final_trends:
        return final_trends
        
    # Добавляем для каждого тренда список со всем временем в интервале тренда в формате timestamp
    for trend in final_trends:
        trend['time_interval'] = [x for x in range(int(trend['dt_from'].timestamp()), (int(trend['dt_to'].timestamp()) +1))]
    for trend_freq in final_trends_freq:
        trend_freq['time_interval'] = [x for x in range(int(trend_freq['dt_from'].timestamp()), (int(trend_freq['dt_to'].timestamp()) +1))]
    
    # Все время анализа трендов по управляющему параметру
    for time in final_trends_freq:
        all_times += time['time_interval']    
        
    if final_trends[0]['param_id_well'] == 200:
    
        for trend in final_trends:

            analize_one_trend = []
            start_time = int(trend['dt_from'].timestamp()) # Время начала тренда
            stop_time = int(trend['dt_to'].timestamp()) # Время конца тренда
            direction = trend['param_id'] # Направление тренда

            # Если тренд по анализируемому параметру нигде не пересекается с любым трендом по управляющему
            # Или трендов по управдляющему параметру нет
            if start_time not in all_times and stop_time not in all_times:
                trend['real_trend'] = 'yes'
                continue

            for trend_freq in final_trends_freq:

                time_interval = trend_freq['time_interval'] # Временной интервал тренда по управляющему параметру

                # Если время начала тренда и конца не лежат в этом тренде, то пропускаем
                if (set(trend['time_interval']) & set(time_interval)) == set():    
                    continue

                # Если время начала и время конца лежат в одном тренде по управляющему параметру
                elif (set(trend['time_interval']) & set(time_interval)) != set():
                    analize_one_trend.append(trend_freq)

            # Если в списке только один тренд (то есть только какая-то часть тренда анализируемого параметра пересекается с трендом по управ. параметру)
            if len(analize_one_trend) == 1:
                if direction == analize_one_trend[0]['param_id']:
                    trend['real_trend'] = 'no'
                elif direction != analize_one_trend[0]['param_id']:
                    trend['real_trend'] = 'yes'
            # Если в списке больше одного тренда
            elif len(analize_one_trend) > 1:

                list_final = []

                for trend_freq in analize_one_trend:
                    dict_final = {}
                    intersection = set(trend['time_interval']) & set(trend_freq['time_interval'])
                    dict_final['dt_from'] = trend_freq['dt_from']
                    dict_final['dt_to'] = trend_freq['dt_to']
                    dict_final['param_id'] = trend_freq['param_id']
                    dict_final['intersection'] = max(intersection)- min(intersection)
                    list_final.append(dict_final)

                sum_positive = []
                sum_negative = []

                for insert in list_final:
                    if insert['param_id'] == 'positive':
                        sum_positive.append(insert['intersection'])
                    if insert['param_id'] == 'negative':
                        sum_negative.append(insert['intersection'])
                if  sum(sum_positive) > sum(sum_negative):
                    final_direction = 'positive'
                elif  sum(sum_negative) > sum(sum_positive):
                    final_direction = 'negative'
                if direction == final_direction:
                    trend['real_trend'] = 'no'
                elif direction != final_direction:
                    trend['real_trend'] = 'yes'
        for trend in final_trends:
            trend.pop('time_interval')
    
    elif final_trends[0]['param_id_well'] == 188:
        
        for trend in final_trends:

            analize_one_trend = []
            start_time = int(trend['dt_from'].timestamp()) # Время начала тренда
            stop_time = int(trend['dt_to'].timestamp()) # Время конца тренда
            direction = trend['param_id'] # Направление тренда

            # Если тренд по анализируемому параметру нигде не пересекается с любым трендом по управляющему
            # Или трендов по управдляющему параметру нет
            if start_time not in all_times and stop_time not in all_times:
                if trend['param_id'] == 'positive':
                    trend['real_trend'] = 'yes'
                elif trend['param_id'] == 'negative':
                    trend['real_trend'] = 'no'    
                continue

            for trend_freq in final_trends_freq:

                time_interval = trend_freq['time_interval'] # Временной интервал тренда по управляющему параметру

                # Если время начала тренда и конца не лежат в этом тренде, то пропускаем
                if (set(trend['time_interval']) & set(time_interval)) == set():    
                    continue

                # Если время начала и время конца лежат в одном тренде по управляющему параметру
                elif (set(trend['time_interval']) & set(time_interval)) != set():
                    analize_one_trend.append(trend_freq)

            # Если в списке только один тренд (то есть только какая-то часть тренда анализируемого параметра пересекается с трендом по управ. параметру)
            if len(analize_one_trend) == 1:
                if direction != analize_one_trend[0]['param_id']:
                    trend['real_trend'] = 'no'
                elif direction == analize_one_trend[0]['param_id']:
                    trend['real_trend'] = 'yes'
            # Если в списке больше одного тренда
            elif len(analize_one_trend) > 1:

                list_final = []

                for trend_freq in analize_one_trend:
                    dict_final = {}
                    intersection = set(trend['time_interval']) & set(trend_freq['time_interval'])
                    dict_final['dt_from'] = trend_freq['dt_from']
                    dict_final['dt_to'] = trend_freq['dt_to']
                    dict_final['param_id'] = trend_freq['param_id']
                    dict_final['intersection'] = max(intersection)- min(intersection)
                    list_final.append(dict_final)

                sum_positive = []
                sum_negative = []

                for insert in list_final:
                    if insert['param_id'] == 'positive':
                        sum_positive.append(insert['intersection'])
                    if insert['param_id'] == 'negative':
                        sum_negative.append(insert['intersection'])
                if  sum(sum_positive) > sum(sum_negative):
                    final_direction = 'positive'
                elif  sum(sum_negative) > sum(sum_positive):
                    final_direction = 'negative'
                if direction != final_direction:
                    trend['real_trend'] = 'no'
                elif direction == final_direction:
                    trend['real_trend'] = 'yes'
        for trend in final_trends:
            trend.pop('time_interval')
    else:
        for trend in final_trends:
            trend['real_trend'] = 'yes'
    return final_trends

# Загрузка данных для примера работы алгоритма трендирования

Пример данных телеметрии со скважины:

![image.png](attachment:image.png)

Пример данных по режиму работы скважины:

![image.png](attachment:image.png)

In [None]:
# Данные телеметрии скважин
df = pd.read_csv('data_tm.csv')
df = df.rename(columns={"tm_time": "dt", "tm_value": "val"})

In [None]:
# Данные по режимам работы скважин
df_regime = pd.read_csv('data_regime.csv')
df_regime[['dt_from', 'dt_to']] = df_regime[['dt_from', 'dt_to']].apply(pd.to_datetime)

In [None]:
# Вид исходного датафрейма с данными по телеметрии по разным параметрам
df.head()

In [None]:
# Вид исходного датафрейма с режимами работы скважин
df_regime.head()

# Словари с параметрами и гиперпараметрами

In [None]:
# Режимы работы скважин
regime_dict = {0 : 'ПДФ',
               1 : 'АПВ',
               2 : 'ПКВ',
               3 : 'ЧЧ',
               4 : 'ПДД'}

In [None]:
# Параметры телеметрии
param_ids = {200: 'Загрузка',
             220: 'Частота',
             188: 'Давление на приеме',
             187: 'Температура двигателя ПЭД',
             219: 'Сопротивление изоляции'
            }

In [None]:
# Гиперпараметр кумулятивной суммы - параметр отвечает за размах (длину) тренда (чем выше параметр, тем меньше рахмах)
drifts = {
    187: 0,
    188: 0,
    200: 0.01,
    220: 0,
    219: 0
}

In [None]:
# Гиперпараметр кумулятивной суммы - порог изменения параметра телеметрии, при котором детектируется тренд
settings_threshold = {
        200: {'ПКВ': 20, 'ПДФ': 10},
        187: {'ПКВ': 5, 'ПДФ': 5},
        188: {'ПКВ': 10, 'ПДФ': 5},
        219: {'ПКВ': 100, 'ПДФ': 100},
        220: {'ПКВ': 5, 'ПДФ': 5}
    }

# Пример работы алгоритма

## Входные данные

In [None]:
# Скважины для анализа
df.well_id.unique()

In [None]:
print(f'Возможный временной интервал для анализа: {df.dt.min()} - {df.dt.max()}')

In [None]:
print(f'Количество скважин: {len(df.well_id.unique())}')

## Ввод данных

In [None]:
well_id = 110001500
param_id = 219
dt_from = '2022-05-02'
dt_to = '2022-05-03'

## Поиск трендов

In [None]:
trends, df_for_trend = start_analysis(df, param_id, well_id, dt_from, dt_to, df_regime)

In [None]:
# Нахождегте трендов по управляюшему параметру (частота)
param_id_freq = 220
trends_freq, df_for_trend_freq = start_analysis(df, param_id_freq, well_id, dt_from, dt_to, df_regime)

In [None]:
# Датафрейм на выходе алгоритма по анализу трендов
df_for_trend

In [None]:
# Вывод трендов анализируемого параметра
# Если трендов нет, то выводится пустой список
trends

## Подготовка данных для визуализации

### Исходные данные анализируемого параметра телеметрии

In [None]:
df_init = df.copy()
df_init = df_init[df_init.well_id == well_id]
df_init = df_init[(df_init.dt > dt_from) & (df_init.dt < dt_to)]
df_init = df_init.loc[(df_init["param_id"] == param_id)].sort_values("dt")
df_init = df_init.dropna(subset=["val"])

In [None]:
df_init

### Исходные данные управляющего параметра (частоты)

In [None]:
df_init_freq = df.copy()
df_init_freq = df_init_freq[df_init_freq.well_id == well_id]
df_init_freq = df_init_freq[(df_init_freq.dt > dt_from) & (df_init_freq.dt < dt_to)]
df_init_freq = df_init_freq.loc[(df_init_freq["param_id"] == 220)].sort_values("dt")
df_init_freq = df_init_freq.dropna(subset=["val"])

# Визуализация

## Визуализация анализируемого параметра

In [None]:
if df_regime[(df_regime.well_id == well_id) & (df_regime.dt_from == dt_from)]['well_regime'].empty:
    if df_regime[(df_regime.well_id == well_id)]['well_regime'].empty:
        regime = 0
    else:
        if len(df_regime[(df_regime.well_id == well_id)]['well_regime'].values) == 1:
            regime = int(df_regime[(df_regime.well_id == well_id)]['well_regime'].values)
        else:
            regime = int(df_regime[(df_regime.well_id == well_id)]['well_regime'].median())
else:
    regime = int(df_regime[(df_regime.well_id == well_id) & (df_regime.dt_from == dt_from)]['well_regime'].values)
regime = regime_dict[regime]

In [None]:
fig = go.Figure(layout_title_text=f"ID скважины: {int(df_init.well_id.unique())}"\
                f" Дата: {df_init.dt.min()} - {df_init.dt.max()}"\
               f" {param_ids[int(df_init.param_id.unique())]}: {int(df_init.param_id.unique())}"\
                f" Режим: {regime}"\
               f" Threshold: {settings_threshold[int(df_init.param_id.unique())][regime]}")
fig.update_layout(title_font_size=10)
fig.add_trace(go.Scatter(x=df_init.dt, y=df_init.val, mode='lines+markers', name = 'Исходные данные'))
if 'HP_filter_med' in df_for_trend.columns:
    fig.add_trace(go.Scatter(x=df_for_trend.index, y=df_for_trend.HP_filter_med, mode='lines+markers', name = 'Тренд'))
for trend in trends:
    if trend['param_id'] == 'positive':
        fig.add_trace(go.Scatter(x=[trend['dt_from'], trend['dt_to']], y=[int(df_init.param_id.unique())]*2, 
                                 line_color="green",
                                 name = 'Тренд вверх'))
    if trend['param_id'] == 'negative':
        fig.add_trace(go.Scatter(x=[trend['dt_from'], trend['dt_to']], y=[int(df_init.param_id.unique())]*2,
                                 line_color="red",
                                 name = 'Тренд вниз'))       
fig.show()

## Визуализация анализируемого параметра с частотой

In [None]:
fig = make_subplots(rows=3, cols=1)

fig.update_layout(title_text=f"ID скважины: {int(df_init.well_id.unique())}"\
                f" Дата: {df_init.dt.min()} - {df_init.dt.max()}"\
               f" {param_ids[int(df_init.param_id.unique())]}: {int(df_init.param_id.unique())}"\
                f" Режим: {regime}"\
               f" Threshold: {settings_threshold[int(df_init.param_id.unique())][regime]}")
                        
fig.update_layout(height=900, title_font_size=10)

fig.add_trace(go.Scatter(x=df_init.dt, y=df_init.val, mode='lines+markers', name = f'{param_ids[int(df_init.param_id.unique())]}'), row=2, col=1)
fig.add_trace(go.Scatter(x=df_init_freq.dt, y=df_init_freq.val, mode='lines+markers', name = 'Частота'), row=3, col=1)
if 'HP_filter_med' in df_for_trend.columns:
    fig.add_trace(go.Scatter(x=df_for_trend.index, y=df_for_trend.HP_filter_med, mode='lines+markers', name = 'Тренд анализируемый'), row=2, col=1)

fig.add_trace(go.Scatter(x=[list(df_for_trend.index)[0], list(df_for_trend.index)[-1]], y=[int(df_init.param_id.unique())]*2, mode='markers'), row=1, col=1)
for trend in trends:
    if trend['param_id'] == 'positive':
        fig.add_trace(go.Scatter(x=[trend['dt_from'], trend['dt_to']], y=[int(df_init.param_id.unique())]*2, 
                                 line_color="green",
                                 name = 'Тренд вверх'), row=1, col=1)
    if trend['param_id'] == 'negative':
        fig.add_trace(go.Scatter(x=[trend['dt_from'], trend['dt_to']], y=[int(df_init.param_id.unique())]*2,
                                 line_color="red",
                                 name = 'Тренд вниз'), row=1, col=1)
if 'HP_filter_med' in df_for_trend_freq.columns:
    fig.add_trace(go.Scatter(x=df_for_trend_freq.index, y=df_for_trend_freq.HP_filter_med, mode='lines+markers', name = 'Тренд по частоте'), row=3, col=1)

fig.add_trace(go.Scatter(x=[list(df_for_trend_freq.index)[0], list(df_for_trend_freq.index)[-1]], y=[int(df_init.param_id.unique())]*2, mode='markers'), row=1, col=1)
for trend in trends_freq:
    if trend['param_id'] == 'positive':
        fig.add_trace(go.Scatter(x=[trend['dt_from'], trend['dt_to']], y=[int(df_init_freq.param_id.unique())]*2, 
                                 line_color="green",
                                 name = 'Тренд вверх'), row=1, col=1)
    if trend['param_id'] == 'negative':
        fig.add_trace(go.Scatter(x=[trend['dt_from'], trend['dt_to']], y=[int(df_init_freq.param_id.unique())]*2,
                                 line_color="red",
                                 name = 'Тренд вниз'), row=1, col=1) 
fig.show()

## Анализ трендов (сравнение с трендов управляющего параметра)

In [None]:
# Анализ тренда
trends_after_analize = analize_trend(trends, trends_freq)

In [None]:
trends_after_analize

## Визуализация анализируемого параметра с частотой (подсветка аномальных трендов)

In [None]:
fig = make_subplots(rows=3, cols=1)

fig.update_layout(title_text=f"ID скважины: {int(df_init.well_id.unique())}"\
                f" Дата: {df_init.dt.min()} - {df_init.dt.max()}"\
               f" {param_ids[int(df_init.param_id.unique())]}: {int(df_init.param_id.unique())}"\
                f" Режим: {regime}"\
               f" Threshold: {settings_threshold[int(df_init.param_id.unique())][regime]}")
                        
fig.update_layout(height=900, title_font_size=10)
# fig.update_layout(xaxis_range=[list(df_init_freq.dt)[0], list(df_init_freq.dt)[-1]])

fig.add_trace(go.Scatter(x=df_init.dt, y=df_init.val, mode='lines+markers', name = f'{param_ids[int(df_init.param_id.unique())]}'), row=2, col=1)
fig.add_trace(go.Scatter(x=df_init_freq.dt, y=df_init_freq.val, mode='lines+markers', name = 'Частота'), row=3, col=1)
if 'HP_filter_med' in df_for_trend.columns:
    fig.add_trace(go.Scatter(x=df_for_trend.index, y=df_for_trend.HP_filter_med, mode='lines+markers', name = 'Тренд анализируемый'), row=2, col=1)

fig.add_trace(go.Scatter(x=[list(df_for_trend.index)[0], list(df_for_trend.index)[-1]], y=[int(df_init.param_id.unique())]*2, mode='markers'), row=1, col=1)
for trend in trends_after_analize:
    if trend['param_id'] == 'positive':
        fig.add_trace(go.Scatter(x=[trend['dt_from'], trend['dt_to']], y=[int(df_init.param_id.unique())]*2, 
                                 line_color="green",
                                 name = 'Тренд вверх'))
    if trend['param_id'] == 'negative':
        fig.add_trace(go.Scatter(x=[trend['dt_from'], trend['dt_to']], y=[int(df_init.param_id.unique())]*2,
                                 line_color="red",
                                 name = 'Тренд вниз'))
    if trend['real_trend'] == 'yes':
        fig.add_vrect(x0=trend['dt_from'], x1=trend['dt_to'], fillcolor="red", opacity=0.15, line_width=0)
if 'HP_filter_med' in df_for_trend_freq.columns:
    fig.add_trace(go.Scatter(x=df_for_trend_freq.index, y=df_for_trend_freq.HP_filter_med, mode='lines+markers', name = 'Тренд по частоте'), row=3, col=1)

fig.add_trace(go.Scatter(x=[list(df_for_trend_freq.index)[0], list(df_for_trend_freq.index)[-1]], y=[int(df_init.param_id.unique())]*2, mode='markers'), row=1, col=1)
for trend in trends_freq:
    if trend['param_id'] == 'positive':
        fig.add_trace(go.Scatter(x=[trend['dt_from'], trend['dt_to']], y=[int(df_init_freq.param_id.unique())]*2, 
                                 line_color="green",
                                 name = 'Тренд вверх'))
    if trend['param_id'] == 'negative':
        fig.add_trace(go.Scatter(x=[trend['dt_from'], trend['dt_to']], y=[int(df_init_freq.param_id.unique())]*2,
                                 line_color="red",
                                 name = 'Тренд вниз')) 
fig.show()

In [None]:
list(df_init_freq.dt)[0]

In [None]:
df_for_trend

In [None]:
print(f'Возможный временной интервал для анализа: {df.dt.min()} - {df.dt.max()}')

# Цикл по всем скважинам

In [None]:
%%time
final_list = []
len_list = []
for well_id in list(df.well_id.unique()):
    len_list.append(well_id)
    print(len(len_list))
    final_list_one_well = []
    param_id = 219
    param_id_freq = 220
    dt_from = '2022-05-05'
    dt_to = '2022-05-06'
    try:
        trends, df_for_trend = start_analysis(df, param_id, well_id, dt_from, dt_to, df_regime)    
        trends_freq, df_for_trend_freq = start_analysis(df, param_id_freq, well_id, dt_from, dt_to, df_regime)
        trends_after_analize = analize_trend(trends, trends_freq)
    except:
        print('---------------Ошибка---------------')
        print(f'Скважина: {well_id}')
        print('------------------------------------')
        continue
    for trend in trends_after_analize:
        if trend['real_trend'] == 'yes':
            final_list_one_well.append(trend)
    if final_list_one_well:
        final_list.append(final_list_one_well)

In [None]:
# Число скважин с аномальными трендами
len(final_list)

In [None]:
# Номера скважин с аномальными трендами
for well in final_list:
    print(well[0]['well_id'])