<a href="https://colab.research.google.com/github/harnack/Leakage/blob/main/%E6%BB%9A%E5%8A%A8%E7%BA%BF%E6%80%A7%E5%9B%9E%E5%BD%92.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 此处只列出主要代码
#%% 导入数据

# 数据所在目录
os.chdir('D:/Dropbox/AliouLee/Research/Fund/中石油管道泄漏项目/')

# 读取数据
# https://www.cnblogs.com/songzhenhua/p/13236794.html
day = '20171108'
out_pressure = open_csv(site=r'铁锦线\20171108新民-黑山\新民', sensor='出站压力', plot=True, day=day)
in_pressure = open_csv(site=r'铁锦线\20171108新民-黑山\黑山', sensor='进站压力', plot=True, day=day)

# 实际开关阀时间
# https://www.gairuo.com/p/pandas-read-excel
records_path = '历次放油测试结果.xlsx'
records = pd.read_excel(records_path, sheet_name=0, header=0, index_col=0, engine='openpyxl', skiprows=range(1,2))
records_rows = records[records['日期'] == pd.to_datetime(day)]
records_time_open = [dt.datetime.combine(row['日期'], row['开阀时刻']) for index, row in records_rows.iterrows()]
records_time_close = [dt.datetime.combine(row['日期'], row['关阀时刻']) for index, row in records_rows.iterrows()]

# 进出站之间总距离(km)
total_length = records_rows['管长\n(km)'].iloc[0]

#%% 局部线性回归(Rolling Regression)
# Identifying trends Using Regression
# https://medium.com/vortechsa/detecting-trends-in-time-series-data-using-python-2752be7d1172
# https://digfir-published.macmillanusa.com/psbe4e/psbe4e_ch13_8.html

# 每个小时 3600*20 个数据点
freq = 3600*20

# 切片
hour = 10 # 起始时间(小时)
minute = 26 # 起始时间(分钟)
time_range = 3 # 考察的时间段长度
i = hour + minute/60

# 切片
ahead = 1 # 起始点最好比开发时刻稍微向前一点
span = np.arange(i*freq - ahead*freq/60, i*freq + time_range * freq/60, dtype=int)
x_out = out_pressure[span]
x_in = in_pressure[span]

# (重要参数)窗口大小(1秒20个数据)
seconds = 30
window = seconds*20

# 切片对应的时间
slice_time = pd.date_range(start = time[span][0], end = time[span][-1], periods = len(span))

# 局部线性回归的系数
slope_out = rolling_lm(x_out, window, slice_time)
slope_in = rolling_lm(x_in, window, slice_time)

# 进出站结果合并
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html
slopes = pd.DataFrame({'时间': slice_time[:len(slope_out)], '出站回归斜率(标准化)': slope_out['回归斜率(标准化)'], '进站回归斜率(标准化)': slope_in['回归斜率(标准化)']})

# 根据回归斜率寻找拐点
duration = 60 # 要求斜率 < threshold 持续多少秒(亦是重要参数)
threshold = 0 # 判断是否算是下降的斜率阈值(和波形有关, 不太好调 >.<)
time_out = slice_time[drop_point(slope_out['回归斜率(标准化)'], threshold=threshold, duration=duration)]
time_in = slice_time[drop_point(slope_in['回归斜率(标准化)'], threshold=threshold, duration=duration)]

# 泄漏定位
position(time_out, time_in, total_length, velocity=1.24, consecutive=False)

# 以下为作图部分, 图形既可以用以展示结果, 也可以在结果不理想时帮助分析症结所在.
# 绘制结果(静态图)
title = fr'({pd.to_datetime(day).strftime("%Y-%m-%d")}){plot_title}'
rolling_lm_plot(x_out, x_in, window, slice_time, threshold=threshold, duration=duration, title=title)

# 绘制动图
ani = rolling_lm_movie(x_out, x_in, window, slice_time, title)

# 注: 为了正确显示/保存动画, 要在 Spyder 里将绘图的后端设为"自动"而非默认的"行内".
# https://stackoverflow.com/a/35856940/13863685
ani.save(fr'C:\Users\aliou\Desktop\({hour}点{minute})滚动线性回归({int(window/20)}s).gif')

# fps = 1000/interval
ani.save(fr'C:\Users\aliou\Desktop\({hour}点{minute})滚动线性回归({int(window/20)}s).mp4', fps=20)


引用到的主要函数

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.gridspec as gridspec
import matplotlib.animation as animation
import datetime as dt
import os
import math
import collections # 检验数据是否 iterable
from functools import reduce
from scipy.interpolate import interp1d
from scipy import signal


# 下面两行代码在图中有中文时必须使用
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

#%% 导入数据

# 定义从目录中读取所有数据的函数(文件类型为.csv)
# 注: 文件名必须按照时间排序！
# site: 站点
# sensor: 进(出)站压力(流量)
# day_path: 最后一层(含日期的)目录名
# plot: 是否画图
# day: 画图用的日期

# 输出: 包含目录中所有文件中数据组成的 np.array

# 注意 Python 中下标 i:j 表示 [i,j), 左闭右开, 从 i (默认为 0)取到 j-1
# 下标 -i 则表示取倒数第 i 个
# 所以 x[:-i] 表示从 0 取到倒数第 i+1 个
# range(H) 相当于 0:H

def open_csv(site, sensor, plot=True, day=''):
    data = []

    # 目录名
    # https://stackoverflow.com/q/3752240/13863685
    # folder = '\\'.join(filter(None, [site, sensor]))
    folder = f'{site}\\{sensor}' # use f-string formatting

    # 读取目录中所有文件
    files = os.listdir(folder)
    # print(files)

    for i, e in enumerate(files):
        file_data = pd.read_csv('\\'.join([folder, e]), names=[''])
        data.append(file_data)

        # 检查数据量(正常是 3600*20 = 72000)
        if len(file_data) < 72000:
            print(f'{i}:00→{i+1}:00 缺失数据量: {72000-len(file_data)}')
        elif len(file_data) > 72000:
            print(f'{i}:00→{i+1}:00 多余数据量: {len(file_data)-72000}')

    # 转换为一维序列(np.ndarray)(保留缺失值)
    data_array = np.array(pd.concat(data).stack(dropna=False))

    # 缺失值(nan)填充
    # https://stackoverflow.com/q/13166914/13863685
    not_nan = np.logical_not(np.isnan(data_array))
    indices = np.arange(len(data_array))

    # 线性插值(返回的是一个函数)(线性插值也可以用 np.interp)
    interp = interp1d(indices[not_nan], data_array[not_nan], kind='linear')
    data_array_interp = interp(indices)

    # 画图(横轴为时间)
    if plot:
        time = pd.date_range(start = day, end = pd.to_datetime(day) + dt.timedelta(hours=len(files)), periods = len(data_array_interp))

        # 流量蓝色, 压力黑色
        if '流量' in sensor:
            color = 'blue'
        elif '压力' in sensor:
            color = 'black'
        else:
            color = 'red'

        fig, ax = plt.subplots()
        plt.plot(time, data_array_interp, color=color)

        # 横轴的时间格式
        ax.xaxis.set_major_locator(mdates.HourLocator(interval = 2))
        ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
        fig.autofmt_xdate()

        plt.title(site.split('\\')[-1] + f'{sensor}({day})')
        plt.show()

    return data_array_interp

#%% 寻找连续片段
# https://stackoverflow.com/q/2154249/13863685

# x: 待处理数据
# gap: 数据间隔 ≤ gap 视为连续片段
# 输出: 各个连续片段的 range 组成的 list

def group(x, gap=1):
    result = []
    first = last = x[0] # range's bounds

    for n in x[1:]:
        if n - last <= gap: # range grows
            last = n
        else: # range ended
            if isinstance(gap, int):
                result.append(range(first, last+1)) # 注意 range 左闭右开
            elif isinstance(gap, dt.timedelta):
                result.append(pd.date_range(first, last, freq=gap))
            first = last = n # start again with a single

    # corner case for last range
    if isinstance(gap, int):
        result.append(range(first,last+1))
    elif isinstance(gap, dt.timedelta):
        result.append(pd.date_range(first, last, freq=gap))

    return result

#%% Iterate over a single value
# https://stackoverflow.com/q/6710834/13863685
# https://stackoverflow.com/q/59809785/13863685

def make_iterable(x):
    if isinstance(x, collections.abc.Iterable):
        return x
    else:
        return [x,] # 把单个元素转化成列表


#%% 泄漏定位

# 公式: x = (L+vΔt)/2, Δt = (负压波)传到首站(出站)时间 - 传到末站(进站)时间

# out_warnings: 出站报警时间
# in_warnings: 进站报警时间
# total_length: 管道总长 L
# velocity: 负压波波速v(单位: km/s)
#           水: 1.44, 原油: 1.1, 柴油: 1.2
# consecutive: 是否将报警点分组
# gap: consecutive=True 时相差 ≤gap 的报警点视为一组, 此时只考虑第一次报警时刻

def position(out_warnings, in_warnings, total_length, velocity=1.1, consecutive=True, gap=dt.timedelta(seconds=1)):
    alarms = []

    if consecutive: # 连续报警点分组
        if isinstance(gap, dt.timedelta):
            out_warnings = [i[0] for i in group(out_warnings, gap)]
            in_warnings = [i[0] for i in group(in_warnings, gap)]
        else:
            raise ValueError('参数 gap 有误!')

    # 和站点距离较小时, 认为是在站点内发生了泄漏
    err = 0.5

    # 要求 0 ≤ x ≤ L
    for t_1 in make_iterable(out_warnings):
        for t_2 in make_iterable(in_warnings):
            time_difference = (t_1 - t_2).total_seconds()  # 以秒为单位
            x = (total_length + velocity * time_difference) / 2

            if abs(x) <= err:
                alarms.append((t_1, t_2, '出站'))
            elif abs(x-total_length) <= err:
                alarms.append((t_1, t_2, '进站'))
            elif 0 <= x <= total_length:
                alarms.append((t_1, t_2, round(x,2)))

    # 转换成 DataFrame
    alarms = pd.DataFrame(alarms, columns =['出站报警', '进站报警', '泄漏定位(km)'])
    return alarms

#%% 局部线性回归(Rolling Regression)

# 输入:
# window: 窗口大小
# time: x 对应的时间

# 输出: 回归直线的斜率
# 注意回归斜率 β = cov(X,Y)/D(X)
# 所以这里改用协方差代替斜率, 并且最后除以最大斜率以限制在 [-1,1] 之间

def rolling_lm(x, window, time):
    # https://stackoverflow.com/q/75072435/13863685
    # https://numpy.org/devdocs/reference/generated/numpy.lib.stride_tricks.sliding_window_view.html
    # https://numpy.org/doc/stable/reference/generated/numpy.tile.html
    # https://numpy.org/doc/stable/reference/generated/numpy.corrcoef.html
    X = np.lib.stride_tricks.sliding_window_view(x, window)
    n = len(X)
    t = np.tile(np.arange(window), (n,1))
    covariances = np.cov(X, t)[:n,-1]
    slopes = covariances / max(abs(covariances)) # 标准化

    # 结果转换成 DataFrame
    # https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html
    result = pd.DataFrame({'时间': time[:n], '回归斜率(标准化)': slopes})
    # result.set_index('时间', inplace = True)

    return result

#%% 找到下降的开始时刻
# 用于侦测局部线性回归斜率的变化
# threshold: 判断是否下降的斜率阈值
# duration: 连续下降时长(单位:秒)

def drop_point(x, threshold=0, duration=60):
    # 先找出满足 x < threshold 的区间
    decrease = group(np.where(x < threshold)[0]) # np.where 返回的是个 tuple

    # 筛选出长度大于 duration*20 的区间(1秒20个数据)
    # https://www.liaoxuefeng.com/wiki/1016959663602400/1017317609699776
    long = [i for i in decrease if len(i) > duration*20]

    # 返回上面找到的区间的起点
    if len(long) > 0:
        return [i[0] for i in long]
    else:
        raise Exception('找不到满足条件的拐点!')

    # 找到所有 >= threshold 的元素下标
    # positive = np.where(x >= threshold)[0]

    # if len(positive) == 0: # 曲线一直在下降
    #     return 0
    # elif positive[-1] == len(x): # 曲线最后时刻没有下降
    #     return None
    # else:
    #     return positive[-1] + 1

#%% 绘制局部线性回归的图形

# 输入:
# x_out: 出站压力
# x_in: 进站压力
# window: 窗口大小
# time: x 对应的时间
# threshold: 判断下降的阈值
# duration: 连续下降时长(单位:秒)
# title: 图形标题

def rolling_lm_plot(x_out, x_in, window, time, threshold, duration, title=''):
    fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True, figsize=(18, 6))

    # 局部线性回归的系数
    slope_out = rolling_lm(x_out, window, time)
    slope_in = rolling_lm(x_in, window, time)

    # 绘图出站图形
    ax1.plot(time, x_out, label='原始信号', zorder=0)

    # 绘制拐点处的局部回归直线
    # 相比 sm.OlS, 用 Numpy 计算速度更快
    # https://numpy.org/doc/stable/reference/generated/numpy.polyfit.html
    drop_out = drop_point(slope_out['回归斜率(标准化)'], threshold=threshold, duration=duration)

    if len(drop_out)>0:
        for i in drop_out:
            Y_out = x_out[i:i+window]
            X = range(window)
            fit_out = np.polyfit(X, Y_out, 1)
            p_out = np.poly1d(fit_out)
            pred_out = p_out(X)
            ax1.plot(time[i:i+window], pred_out, 'r', lw=2.5, zorder=5, label='局部线性拟合')

            # 绘制拐点
            x_out_mid = i + int(window/2)
            y_out_mid = p_out(int(window/2))
            ax1.scatter(time[x_out_mid], y_out_mid, s=70, marker='o', color='darkviolet', zorder=10, label='拐点')
            ax1.vlines(time[x_out_mid], min(x_out), y_out_mid, color = 'darkviolet', ls='--')

    # 去除重复的图例
    # https://stackoverflow.com/q/13588920/13863685
    handles, labels = ax1.get_legend_handles_labels()
    by_label = dict(zip(labels, handles))
    ax1.legend(by_label.values(), by_label.keys(), loc='best')

    ax1.set_title('出站压力')

    # 绘图进站图形
    ax2.plot(time, x_in, label='原始信号', zorder=0)

    # 绘制拐点处的局部回归直线
    drop_in = drop_point(slope_in['回归斜率(标准化)'], threshold=threshold, duration=duration)
    if len(drop_in)>0:
        for j in drop_in:
            Y_in = x_in[j:j+window]
            fit_in = np.polyfit(X, Y_in, 1)
            p_in = np.poly1d(fit_in)
            pred_in = p_in(X)
            ax2.plot(time[j:j+window], pred_in, 'r', lw=2.5, zorder=5, label='局部线性拟合')

            # 绘制拐点
            x_in_mid = j + int(window/2)
            y_in_mid = p_in(int(window/2))
            ax2.scatter(time[x_in_mid], y_in_mid, s=70, marker='o', color='b', zorder=10, label='拐点')
            ax2.vlines(time[x_in_mid], min(x_in), y_in_mid, color = 'b', ls='--')

    # 仅有一组报警时绘制 Δt
    if len(drop_out) == 1 and len(drop_in) == 1:
        ax2.axvline(time[x_out_mid], color = 'darkviolet', ls='--')
        ax2.text(time[int((x_out_mid + x_in_mid)/2 - 20)], (y_in_mid + min(x_in))/2, r'$\bf{{\Delta t}}$', fontsize=12, color='r')

    # 去除重复的图例
    handles, labels = ax2.get_legend_handles_labels()
    by_label = dict(zip(labels, handles))
    ax2.legend(by_label.values(), by_label.keys(), loc='best')

    ax2.set_title('进站压力')

    # 坐标轴
    ax2.set_xlim(time[0], time[-1])
    ax2.xaxis.set_major_locator(mdates.SecondLocator(interval = 10))
    ax2.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
    fig.autofmt_xdate()

    fig.suptitle(fr'{title}滚动线性回归(window: {window/20}秒)', y=1, fontsize='xx-large', fontweight='extra bold')
    # fig.tight_layout()
    plt.show()

#%% 绘制局部线性回归的动图
# https://zhuanlan.zhihu.com/p/442932579

# 参数见 rolling_lm_plot

def rolling_lm_movie(x_out, x_in, window, time, title=''):
    fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True, figsize=(18, 6))

    ax1.plot(time, x_out, label='原始信号')
    line_out, = ax1.plot([], [], 'r', lw=2.5, label='局部线性拟合')
    ax1.legend(loc='best')
    ax1.set_title('出站压力')

    ax2.plot(time, x_in, 'g', label='原始信号')
    line_in, = ax2.plot([], [], 'r-.', lw=2.5, label='局部线性拟合')
    ax2.legend(loc='best')
    ax2.set_title('进站压力')

    ax2.set_xlim(time[0], time[-1])
    ax2.xaxis.set_major_locator(mdates.SecondLocator(interval = 10))
    ax2.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
    fig.autofmt_xdate()

    fig.suptitle(fr'{title}滚动线性回归(window: {window/20}秒)', y=1, fontsize='xx-large', fontweight='extra bold')
    fig.tight_layout()

    def animate(i):
        # 线性回归(用 Numpy 计算速度快)
        Y_out = x_out[i:i+window]
        Y_in = x_in[i:i+window]
        X = range(window)

        # https://numpy.org/doc/stable/reference/generated/numpy.polyfit.html
        fit_out = np.polyfit(X, Y_out, 1)
        fit_in = np.polyfit(X, Y_in, 1)

        p_out = np.poly1d(fit_out)
        p_in = np.poly1d(fit_in)
        pred_out = p_out(X)
        pred_in = p_in(X)

        line_out.set_data(time[i:i+window], pred_out)
        line_in.set_data(time[i:i+window], pred_in)

        # https://stackoverflow.com/q/16037494/13863685
        return line_out, line_in

    # https://matplotlib.org/stable/api/_as_gen/matplotlib.animation.FuncAnimation.html
    ani = animation.FuncAnimation(fig, animate,
                                  frames=range(0, len(x_out)-window, 20),
                                  interval=50, blit=True)

    return ani
