In [49]:
from typing import List, Tuple, Union

import uuid
import time

from datetime import datetime, timedelta
import re

import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from detecta import detect_onset
import enum

In [50]:
import warnings
warnings.filterwarnings('ignore')

## Функции

In [51]:
def make_data_prepocessing(df_one_param: pd.DataFrame, param_id: int) -> pd.DataFrame:
    """
    Функция выполняет препроцессинг данных:
     - Фильтрация по верхней и нижней границе.
     - Реземпл данных и интерполяция с дискретностью 1 сек.
     
    :param df_one_param: Датафрейм с валидными значениями ТМ по одному param_id
    :param param_id: Идентификатор параметра ТМ
    
    :return: Датафрейм с валидными интерполироваными значениями по 1 сек ТМ по одному param_id
    """
    if not df_one_param.empty:
        df_one_param = make_filtration_by_below_above_borders(df_one_param, param_id)
        df_one_param.set_index('tm_time', inplace=True)
        df_one_param_interp = df_one_param.resample('1S').mean().interpolate(method='linear')
    else:
        return df_one_param
    return df_one_param_interp

In [52]:
def make_filtration_by_below_above_borders(df_one_param: pd.DataFrame, param_id: int) -> pd.DataFrame:
    """
    Функция выполняет фильтрацию по верхнему и нижнему допустимому значению параметра.
    
    :param df_one_param: Датафрейм с параметрами телеметрии
    :param param_id: Идентификатор параметра ТМ
    
    :return: Датафрейм с валидными значениями ТМ по одному param_id
    """
    
    if not df_one_param.empty:
        if param_id in settings_gas_dict.keys():
            df_one_param = df_one_param.loc[
            (
                (settings_gas_dict.get(param_id).get(SettingsGas.ABOVE_BORDER) > df_one_param['tm_value']) 
            &
                (df_one_param['tm_value'] > settings_gas_dict.get(param_id).get(SettingsGas.BELOW_BORDER)) 
            &
                (df_one_param['param_id'] == param_id)
            )
            ]
        
    return df_one_param

In [53]:
def get_data_long_period(
    well_id: int,
    dt_from: datetime,
    dt_to: datetime, 
    params: list,
    day_delta: int,
    data_all: pd.DataFrame,
) -> pd.DataFrame:
    """
    Функция по получению ТМ за интервал [dt_from - day_delta: dt_to].
    
    :param well_id: Идентификатор скважины
    :param dt_from: Дата начала интервала анализа
    :param dt_to: Дата конца интервала анализа
    :param params: Идентификторы параметров ТМ для получения
    :param day_delta: Кол-во дней (горизонт дней влево от dt_from)
    
    :return: Датафрейм с ТМ за интервал [dt_from - day_delta: dt_to]
    """
    
    data_long_period = data_all[
        (df_all_data_wells.well_id == well_id)
        &
        (df_all_data_wells.tm_time >= dt_from-timedelta(days=day_delta))
        &
        (df_all_data_wells.tm_time <= dt_to)
        &
        (df_all_data_wells.param_id.isin(params))
    ]
    
    
    return data_long_period

In [54]:
def check_condition_of_pressure(data_long_period: pd.DataFrame, data_to_return: pd.DataFrame, param_id: int) -> pd.DataFrame:
    """
    Метод по расчету доверительных интервалов для параметров газовых скважин.
    
    :param data_long_period: Датафрейм с ТМ за длительный интервал (больше 24 часов)
    :param data_to_return: Датафрейм для преобразования
    :param param_id: Идентификтор параметра ТМ для анализа
    
    :return: Датафрейм с информаицией о состояние временного ряда по параметру param_id (Стационарное/Рост/Снижение/Нет данных)
    """
    
    upper_quantile = 0.9
    lower_quantile = 0.1
    count_std = 2
    percentage = 1
    enough_vals = 5
    
    if param_id == P_LIN:
        prefix = '_lin'
    elif param_id == P_CAS:
        prefix = '_cas'
    elif param_id == P_BUF:
        prefix = '_buf'
        
    if data_long_period.empty:
        data_to_return[f'condition_pressure{prefix}'] = 'Нет данных'
        return data_to_return
    
    if len(data_long_period.tm_value.unique()) < enough_vals:
        data_to_return[f'condition_pressure{prefix}'] = 'Нет данных'
        return data_to_return    
    
    data_long_period = data_long_period[(data_long_period.tm_value > 0)]

    current_data = data_long_period[(data_long_period.tm_time >= dt_from) & (data_long_period.tm_time <= dt_to)]
    past_data = data_long_period[(data_long_period.tm_time >= dt_from - timedelta(days=3)) & (data_long_period.tm_time < dt_from)]

    q3 = round(past_data.tm_value.quantile(upper_quantile), 2)
    q1 = round(past_data.tm_value.quantile(lower_quantile), 2)
    mn = round(past_data.tm_value.mean(), 2)
    std = round(past_data.tm_value.std(), 2)
    upper_border = round(mn + count_std * std, 2)
    lower_border = round(mn - count_std * std, 2)
    hour = 1 if std < 0.5 else 2
    curr_mean = round(current_data[current_data.tm_time > dt_to - timedelta(hours=hour)].tm_value.mean(), 2)
    one_percent = mn / 100

    max_delta = min(
        [
            abs(q3 - curr_mean),
            abs(q1 - curr_mean),
            abs(upper_border - curr_mean),
            abs(lower_border - curr_mean),
        ]
    )
    if mn == 0:
        data_to_return[f'condition_pressure{prefix}'] = 'Стационарное'
        return data_to_return
    if any([curr_mean > q3, curr_mean > upper_border]) and max_delta > percentage * one_percent:
        data_to_return[f'condition_pressure{prefix}'] = 'Рост'
        return data_to_return
    elif any([curr_mean < q1, curr_mean < lower_border]) and max_delta > percentage * one_percent:
        data_to_return[f'condition_pressure{prefix}'] = 'Снижение'
        return data_to_return
    else:
        data_to_return[f'condition_pressure{prefix}'] = 'Стационарное'
        return data_to_return

In [55]:
def calculate_delta_p_lin_buf(
    df_p_lin: pd.DataFrame,
    df_p_buf: pd.DataFrame,
    df_fin: pd.DataFrame,
):
    """
    Функция по совместному анализу линейного и буферного давления.
    
    :param df_p_lin: Датафрейм с параметрами по линейному давлению
    :param df_p_buf: Датафрейм с параметрами по буфрерному давлению
    :param df_fin:  Датафрейм для преобразования
    
    :return: Датафрейм с информаицией с информацией о дельтах линейного, буферного давления и интервалами
    
    """
    
    is_anomaly = 'Нет'
    
    list_with_intervals_1 = []
    list_with_intervals = [list_with_intervals_1]
    
    if df_p_lin.empty or df_p_buf.empty:
        df_fin['lin_more_buf'] = 'Нет данных'
        df_fin['intervals_lin_more_buf'] = list_with_intervals
        return df_fin
    
    df = pd.concat([df_p_lin, df_p_buf])
    df.reset_index(inplace=True)
    df_pt = pd.pivot_table(df, values='tm_value', index='tm_time', columns='param_id').fillna(method='ffill')
    df_pt['delta_p_lin_p_buf'] = df_pt[P_LIN] - df_pt[P_BUF]
    df_pt['lin_more_buf'] = np.where(df_pt['delta_p_lin_p_buf'] > 0, 'Да', 'Нет')
    
    if 'Да' in df_pt['lin_more_buf'].unique():
        
        is_anomaly = 'Да'
        df_pt['binary'] = np.where(df_pt['delta_p_lin_p_buf'] > 0, 1, 0)
        df_pt = df_pt.reset_index(drop=False)
        if not df_pt.empty:
            list_anom = list(detect_onset(df_pt.binary, 0.0001))
            if list_anom:
                for time_segm in list_anom:
                    dt_from = (
                        df_pt.tm_time.iat[time_segm[0] - 1] if time_segm[0] != 0 else df_pt.tm_time.iat[time_segm[0]]
                    )
                    dt_to = df_pt.tm_time.iat[time_segm[-1]]
                    list_with_intervals_1.append({"dt_from": dt_from, "dt_to": dt_to})
           
    df_fin['lin_more_buf'] = is_anomaly
    df_fin['intervals_lin_more_buf'] = list_with_intervals
    
    
    return df_fin

In [56]:
def calculate_delta_p_lin_cas(
    df_p_lin: pd.DataFrame,
    df_p_cas: pd.DataFrame,
    df_fin: pd.DataFrame,
):
    """
    Функция по совместному анализу линейного и затрубного давления.
    
    :param df_p_lin: Датафрейм с параметрами по линейному давлению
    :param df_p_cas: Датафрейм с параметрами по затрубное давлению
    :param df_fin:  Датафрейм для преобразования
    
    :return: Датафрейм с информаицией с информацией о дельтах линейного, затрубного давления и интервалами
    
    """
    
    is_anomaly = 'Нет'
    
    list_with_intervals_1 = []
    list_with_intervals = [list_with_intervals_1]
    
    if df_p_lin.empty or df_p_cas.empty:
        df_fin['cas_more_lin'] = 'Нет данных'
        df_fin['intervals_cas_more_lin'] = list_with_intervals
        return df_fin
    
    df = pd.concat([df_p_lin, df_p_cas])
    df.reset_index(inplace=True)
    df_pt = pd.pivot_table(df, values='tm_value', index='tm_time', columns='param_id').fillna(method='ffill')
    df_pt['delta_p_cas_p_lin'] = df_pt[P_CAS] - df_pt[P_LIN]

    df_pt['cas_more_lin'] = np.where(df_pt['delta_p_cas_p_lin'] > 0, 'Да', 'Нет')
    if 'Да' in df_pt['cas_more_lin'].unique():
        
        is_anomaly = 'Да'
        df_pt['binary'] = np.where(df_pt['delta_p_cas_p_lin'] > 0, 1, 0)
        df_pt = df_pt.reset_index(drop=False)
        if not df_pt.empty:
            list_anom = list(detect_onset(df_pt.binary, 0.0001))
            if list_anom:
                for time_segm in list_anom:
                    dt_from = (
                        df_pt.tm_time.iat[time_segm[0] - 1] if time_segm[0] != 0 else df_pt.tm_time.iat[time_segm[0]]
                    )
                    dt_to = df_pt.tm_time.iat[time_segm[-1]]
                    list_with_intervals_1.append({"dt_from": dt_from, "dt_to": dt_to})
                    
    df_fin['cas_more_lin'] = is_anomaly
    df_fin['intervals_cas_more_lin'] = list_with_intervals
    return df_fin

## Гиперпараметра для запуска алгоритма

In [57]:
# Интервал анализа
dt_to = datetime(2022, 12, 17, 18)
dt_from = dt_to - timedelta(days=1)

# Характер работы
GAS_XR = "XR0012"
CAS_COND_XR = "XR0015"

#ДО для анализа
list_company_ids = ['orb', 'msh', 'nng', 'mrng', 'yamal', 'zap']

#Параметры для анализа
P_LIN = 142
P_CAS = 6001
P_BUF = 135
PARAMS_GAS = (P_LIN, P_CAS, P_BUF)

class SettingsGas(enum.Enum):
    """Класс настроек для алгоритмов анализа газовых скважин"""
        
    BELOW_BORDER = 'Нижняя граница значения'
    ABOVE_BORDER = 'Верхняя граница значения'
    DELTA_DAYS = 'Кол-во дней в прошлом для сравенения с текущем состоянием параметра'
    ADD_HDIN = 'Булевый флаг валидировать или нет режим со значением H дин'
        
# Словарь со всеми настройками для анализа ФЧЗ   
settings_gas_dict = {
    P_LIN: {
        SettingsGas.BELOW_BORDER: 0.1,
        SettingsGas.ABOVE_BORDER: 500,
        SettingsGas.DELTA_DAYS: 3,
    },
    P_CAS: {
        SettingsGas.BELOW_BORDER: 0.1,
        SettingsGas.ABOVE_BORDER: 500,
        SettingsGas.DELTA_DAYS: 3,
    },
    P_BUF: {
        SettingsGas.BELOW_BORDER: 0.1,
        SettingsGas.ABOVE_BORDER: 500,
        SettingsGas.DELTA_DAYS: 3,
    },
    
}

## Получение данных по газовым скважинам

In [58]:
df_all_data_wells = pd.read_csv(r"data_anomaly_calc\all_data.csv").sort_values(by="tm_value")
df_all_data_wells["tm_time"] = pd.to_datetime(df_all_data_wells["tm_time"])
# df_all_data_wells = df_all_data_wells[['well_id', 'param_id', 'tm_time', 'tm_value', 'ext_data']]
df_all_data_wells

Unnamed: 0,well_id,param_id,tm_time,tm_value,ext_data,inserted
0,2455,142,2022-12-15 12:13:19,-1.1618,0.0,2023-12-15 07:23:13
14,2455,142,2022-12-13 09:01:12,-1.1618,0.0,2023-12-13 04:06:09
12,2455,142,2022-12-13 12:36:58,-1.1618,0.0,2023-12-13 07:43:50
11,2455,142,2022-12-13 13:13:00,-1.1618,0.0,2023-12-13 08:20:15
10,2455,142,2022-12-15 05:40:15,-1.1618,0.0,2023-12-15 00:50:40
...,...,...,...,...,...,...
470453,248,6001,2022-12-15 15:57:31,150.3800,0.0,2023-12-15 10:59:06
470452,248,6001,2022-12-15 16:00:00,150.3800,0.0,2023-12-15 11:04:42
470454,248,6001,2022-12-15 16:30:00,150.3800,0.0,2023-12-15 11:32:41
470455,562,6001,2022-12-15 13:33:49,160.0000,0.0,2023-12-15 08:35:03


## Анализ газовых скважин

In [59]:
df_final_all_wells = pd.DataFrame()

for well_id in df_all_data_wells.well_id.unique()[:50]:
    print(well_id)
    df_one_well = df_all_data_wells[
        (df_all_data_wells.well_id == well_id)
        &
        (df_all_data_wells.tm_time >= dt_from)
        &
        (df_all_data_wells.tm_time <= dt_to)
    ]
    
    if not df_one_well.empty:
        unique_param_id = df_one_well.param_id.unique()
        if (P_LIN in unique_param_id) or (P_CAS in unique_param_id) or (P_BUF in unique_param_id):
            df_one_well_prep = df_one_well.sort_values(by='tm_time')
            
            df_p_cas = df_one_well[df_one_well['param_id'] == P_CAS]
            df_p_lin = df_one_well[df_one_well['param_id'] == P_LIN]
            df_p_buf = df_one_well[df_one_well['param_id'] == P_BUF]
            
            
            df_p_cas_interp = make_data_prepocessing(df_p_cas, param_id=P_CAS)
            df_p_lin_interp = make_data_prepocessing(df_p_lin, param_id=P_LIN)
            df_p_buf_interp = make_data_prepocessing(df_p_buf, param_id=P_BUF)
            
            # Получение данных по параметру с горизонтом в DELTA_DAYS
            
            df_p_cas_long_period = get_data_long_period(
                well_id=well_id,
                dt_from=dt_from,
                dt_to=dt_to,
                params=[P_CAS],
                day_delta=settings_gas_dict.get(P_CAS).get(SettingsGas.DELTA_DAYS),
                data_all=df_all_data_wells,
            )
            
            df_p_lin_long_period = get_data_long_period(
                well_id=well_id,
                dt_from=dt_from,
                dt_to=dt_to,
                params=[P_LIN],
                day_delta=settings_gas_dict.get(P_LIN).get(SettingsGas.DELTA_DAYS),
                data_all=df_all_data_wells,
            )
            
            df_p_buf_long_period = get_data_long_period(
                well_id=well_id,
                dt_from=dt_from,
                dt_to=dt_to,
                params=[P_BUF],
                day_delta=settings_gas_dict.get(P_BUF).get(SettingsGas.DELTA_DAYS),
                data_all=df_all_data_wells,
            )
            
            # Анализ выхода параметра за доверительный интервал (Стационарное/Рост/Снижение)
            
            df_final = pd.DataFrame({"well_id": [well_id], "dt_from": [dt_from], "dt_to": [dt_to]})
            
            df_final = check_condition_of_pressure(df_p_cas_long_period, df_final, P_CAS)
            df_final = check_condition_of_pressure(df_p_lin_long_period, df_final, P_LIN)
            df_final = check_condition_of_pressure(df_p_buf_long_period, df_final, P_BUF)
            
            df_final_1 = calculate_delta_p_lin_buf(df_p_lin_interp, df_p_buf_interp, df_final)
            df_final_2 = calculate_delta_p_lin_cas(df_p_lin_interp, df_p_cas_interp, df_final_1)
            
            df_final_all_wells = pd.concat([df_final_all_wells, df_final_2])

2455
1797
740
319
1399
258
830
3168
810
562
1500
137
1578
3597
2840
3766
248
1815
1200
307
3490
3819
2396
3046
1834
3187
3874
2049
3032
2825
3455
3926
2140
3529
3515
184
1257
1783
809
560
3181
1949
763
1966
1687
2157
2551
3633
3567
3382


## Результаты расчета

In [60]:
df_final_all_wells

Unnamed: 0,well_id,dt_from,dt_to,condition_pressure_cas,condition_pressure_lin,condition_pressure_buf,lin_more_buf,intervals_lin_more_buf,cas_more_lin,intervals_cas_more_lin
0,2455,2022-12-16 18:00:00,2022-12-17 18:00:00,Нет данных,Стационарное,Нет данных,Нет данных,[],Нет данных,[]
0,1797,2022-12-16 18:00:00,2022-12-17 18:00:00,Нет данных,Стационарное,Нет данных,Нет данных,[],Нет данных,[]
0,740,2022-12-16 18:00:00,2022-12-17 18:00:00,Стационарное,Стационарное,Рост,Да,"[{'dt_from': 2022-12-16 18:04:54, 'dt_to': 202...",Да,"[{'dt_from': 2022-12-16 18:04:54, 'dt_to': 202..."
0,319,2022-12-16 18:00:00,2022-12-17 18:00:00,Стационарное,Стационарное,Снижение,Да,"[{'dt_from': 2022-12-16 18:00:00, 'dt_to': 202...",Нет,[]
0,1399,2022-12-16 18:00:00,2022-12-17 18:00:00,Стационарное,Стационарное,Стационарное,Нет,[],Нет,[]
0,258,2022-12-16 18:00:00,2022-12-17 18:00:00,Стационарное,Нет данных,Рост,Нет данных,[],Нет данных,[]
0,830,2022-12-16 18:00:00,2022-12-17 18:00:00,Нет данных,Снижение,Нет данных,Нет данных,[],Нет данных,[]
0,3168,2022-12-16 18:00:00,2022-12-17 18:00:00,Нет данных,Стационарное,Нет данных,Нет данных,[],Нет данных,[]
0,810,2022-12-16 18:00:00,2022-12-17 18:00:00,Стационарное,Рост,Стационарное,Нет,[],Да,"[{'dt_from': 2022-12-16 18:15:00, 'dt_to': 202..."
0,562,2022-12-16 18:00:00,2022-12-17 18:00:00,Стационарное,Стационарное,Стационарное,Да,"[{'dt_from': 2022-12-16 18:00:37, 'dt_to': 202...",Да,"[{'dt_from': 2022-12-16 18:00:37, 'dt_to': 202..."


In [61]:
list_wells_for_fig = [3819, 2825, 1783, 3187, 3046]

In [62]:
for well_id_fig in list_wells_for_fig:
    
    df_one_well_fig =  df_all_data_wells[
        (df_all_data_wells.well_id == well_id_fig)
        &
        (df_all_data_wells.tm_time >= dt_from)
        &
        (df_all_data_wells.tm_time <= dt_to)
    ]

    df_one_well_fig.sort_values(by='tm_time', inplace=True)
    df_one_well_fig.param_id.mask(df_one_well_fig.param_id == 142, 'Линейное давление, атм.', inplace=True)
    df_one_well_fig.param_id.mask(df_one_well_fig.param_id == 135, 'Буферное давление, атм.', inplace=True)
    df_one_well_fig.param_id.mask(df_one_well_fig.param_id == 6001, 'Затрубное давление, атм.', inplace=True)
    
    fig = px.line(df_one_well_fig, x='tm_time', y='tm_value', color='param_id', markers=True)

    fig.update_layout(
        xaxis_title='Время',
        yaxis_title='Параметры',
        legend_title='Параметры'
    )

    df_final_one_well = df_final_all_wells[df_final_all_wells.well_id == well_id_fig]

    for index, value in df_final_one_well.iterrows():
        if value.intervals_lin_more_buf:
            for interval_lb in value.intervals_lin_more_buf:
                fig.add_vrect(
                    x0=interval_lb["dt_from"],
                    x1=interval_lb["dt_to"],
                    annotation_text='Pлин > Pбуф',
                    annotation_position="top left",
                    fillcolor="red",
                    opacity=0.15,
                    line_width=0
                )
        if value.intervals_cas_more_lin:
            for interval_lb in value.intervals_cas_more_lin:
                fig.add_vrect(
                    x0=interval_lb["dt_from"],
                    x1=interval_lb["dt_to"],
                    annotation_text='Pзат > Pлин',
                    annotation_position="top right",
                    fillcolor="green",
                    opacity=0.15,
                    line_width=0
                )


        fig.update_layout(
            title=f'Скважина {well_id_fig} <br><sup>Затрубное давление: {value.condition_pressure_cas}; Линейное давление: {value.condition_pressure_lin}; Буферное давление: {value.condition_pressure_buf}</sup>'
        )



    fig.show()