# Swinging Door #

Реализация алгоритма Swinging Door на Python.

Литература:
 * [Патент US4669097](https://patents.google.com/patent/US4669097 "patents.google.com");
 * [Swinging Door Trending Compression Algorithm for IoT Environments](https://sol.sbc.org.br/index.php/sbesc_estendido/article/download/8650/8551/#:~:text=The%20Swing%20Door%20Trending%20(SDT,process%20information%20systems%20(PIMs). "sol.sbc.org.br");
 * [Компрессия данных в системах промышленной автоматизации. Алгоритм SwingingDoor](https://habr.com/ru/post/105652/ "habr.com");

Реализация:

In [1]:
def _sloping_calc(stretch, deviation):
    current, entrance = stretch

    _divide = current[0] - entrance[0]

    try:
        upper = (current[1] - (entrance[1] + deviation)) / _divide
        lower = (current[1] - (entrance[1] - deviation)) / _divide

    except ZeroDivisionError as err:
        raise ValueError from err

    return upper, lower


def _new_corridor(stretch, deviation, upper=True):
    past, current = stretch

    entrance = (
        (past[0] + current[0]) / 2,
        (
            (past[1] + current[1]) / 2 - (deviation / 2)
            if upper
            else (past[1] + current[1]) / 2 + (deviation / 2)
        ),
    )

    return entrance, _sloping_calc((current, entrance), deviation)


def swinging_door(source, deviation=0.1):
    if not deviation:
        yield from source
        return

    try:
        entrance = next(source)

    except StopIteration:
        return

    yield entrance

    try:
        current = next(source)

    except StopIteration:
        return

    sloping_upper, sloping_lower = _sloping_calc(
        (current, entrance), deviation
    )

    sloping_upper_max = sloping_upper
    sloping_lower_min = sloping_lower

    try:
        while True:
            past = current
            current = next(source)

            sloping_upper, sloping_lower = _sloping_calc(
                (current, entrance), deviation
            )

            if sloping_upper > sloping_upper_max:
                sloping_upper_max = sloping_upper

                if sloping_upper_max > sloping_lower_min:
                    entrance, (
                        sloping_upper_max,
                        sloping_lower_min,
                    ) = _new_corridor((past, current), deviation)

                    yield entrance

            elif sloping_lower < sloping_lower_min:
                sloping_lower_min = sloping_lower

                if sloping_upper_max > sloping_lower_min:
                    entrance, (
                        sloping_upper_max,
                        sloping_lower_min,
                    ) = _new_corridor((past, current), deviation, upper=False)

                    yield entrance

    except StopIteration:
        yield past

Тестирование:

In [2]:
assert list(swinging_door(iter([
    (1, 6), (2, 6.5), (3, 5.5),
    (4, 6.5), (5, 8), (6, 7.5),
    (7, 8), (8, 9.5),
]), 1)) == [(1, 6), (7.5, 8.25), (8, 9.5)]

In [3]:
assert list(swinging_door(iter([
    (1, 6), (2, 6.5), (3, 5.5),
    (4, 6.5), (5, 8), (6, 7.5),
    (7, 8), (8, 6),
]), 1)) == [(1, 6), (7.5, 7.5), (8, 6)]

Проверка:

In [4]:
from matplotlib.pyplot import plot, show
from ipywidgets import FloatSlider, interact

my_data = [
    (0., 5.0), (1., 5.5), (2., 4.2),
    (3., 5.8), (4., 5.2), (5., 6.8),
]

def re_show(deviation):
    o_data = list()
    o_index = list()
    
    for x, y in my_data:
        o_data.append(y)
        o_index.append(x)
    
    n_data = list()
    n_index = list()

    for x, y in swinging_door(iter(my_data), deviation):
        print(x, y)
        n_data.append(y)
        n_index.append(x)

    plot(o_index, o_data)
    plot(n_index, n_data)

    show()
    
interact(
    re_show,
    deviation=FloatSlider(min=.01, max=2, step=.01, value=1)
);

interactive(children=(FloatSlider(value=1.0, description='deviation', max=2.0, min=0.01, step=0.01), Output())…

---

## Применение

Постройка графика из сырых данных:

In [5]:
from datetime import datetime

from pandas import read_csv
from matplotlib.pyplot import figure, show
from ipywidgets import FloatSlider, interact

from swinging_door import swinging_door

data = [
    (datetime.strptime(date, "%Y-%m-%d").timestamp(), value)
    for date, value in read_csv(
        "https://raw.githubusercontent.com/datasets/oil-prices/refs/heads/main/data/wti-daily.csv"
    ).values.tolist()
]

def re_show(deviation):
    x = list()
    y = list()
    
    for k, v in data:
        x.append(datetime.fromtimestamp(k))
        y.append(v)

    s_x = list()
    s_y = list()

    s_data = list(
        swinging_door(iter(data), deviation)
    )

    for k, v in s_data:
        s_x.append(datetime.fromtimestamp(k))
        s_y.append(v)

    fig = figure()
    fig.set_dpi(100)
    fig.suptitle("The operation of the Swinging Door algorithm.")

    gs = fig.add_gridspec(2, hspace=.3)

    (ax_orig, ax_sd) = fig.add_gridspec(2, hspace=.3).subplots(sharex=True, sharey=True)

    ax_orig.set_title("Original data")
    ax_orig.plot(x, y, "tab:blue", label=f"count: {len(data)}")
    ax_orig.legend(loc="upper left")

    ax_sd.set_title(f"Compressed data use deviation {deviation}")
    ax_sd.plot(s_x, s_y, "tab:orange", label=f"count: {len(s_data)}")
    ax_sd.legend(loc="upper left")

    show()
    
interact(
    re_show,
    deviation=FloatSlider(min=.01, max=10, step=.01, value=1)
);

interactive(children=(FloatSlider(value=1.0, description='deviation', max=10.0, min=0.01, step=0.01), Output()…