In [1]:
import pandas as pd
import json
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import numpy as np
from math import pi
from matplotlib.lines import Line2D
import tomllib
from sklearn.ensemble import ExtraTreesRegressor
from sklearn.preprocessing import StandardScaler

from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import r2_score
import random
import snoop
import warnings
from PIL import Image
import matplotlib as mpl

from mpl_toolkits.mplot3d.art3d import Poly3DCollection

In [2]:
def dew_point_cacl(t, h):
    """t- температура в градусах цельсия, h - влажность воздуха в виде 0.ХХ"""
    a, b = 17.27, 237.7
    g = a * t / (b + t)
    g += np.log(h / 100)
    dp = (b * g) / (a - g)
    return dp

In [4]:
ugv = pd.read_feather("ugv.feather")
meteo = pd.read_feather("meteo.feather")
meteo["dew_point"] = dew_point_cacl(
    meteo["temp_air"].to_numpy(), meteo["hum_air"].to_numpy()
)
alldata = meteo.copy()

In [4]:
def plot_cube(ax, x_range, y_range, z_range, color="cyan", alpha=0.2):
    """
    Рисует закрашенный куб в 3D по заданным границам по осям x, y, z.

    Параметры:
        x_range: (x_min, x_max)
        y_range: (y_min, y_max)
        z_range: (z_min, z_max)
        color: цвет заливки (по умолчанию 'cyan')
        alpha: прозрачность (по умолчанию 0.4)
    """
    x_min, x_max = x_range
    y_min, y_max = y_range
    z_min, z_max = z_range

    # 8 вершин куба в порядке [x, y, z]
    vertices = np.array(
        [
            [x_min, y_min, z_min],
            [x_max, y_min, z_min],
            [x_max, y_max, z_min],
            [x_min, y_max, z_min],
            [x_min, y_min, z_max],
            [x_max, y_min, z_max],
            [x_max, y_max, z_max],
            [x_min, y_max, z_max],
        ]
    )

    # Грани (по 4 вершины каждая)
    faces = [
        [0, 1, 2, 3],  # z = z_min (нижняя)
        [4, 5, 6, 7],  # z = z_max (верхняя)
        [0, 1, 5, 4],  # y = y_min
        [2, 3, 7, 6],  # y = y_max
        [0, 3, 7, 4],  # x = x_min
        [1, 2, 6, 5],  # x = x_max
    ]

    # Формируем координаты граней
    face_coords = [vertices[face] for face in faces]
    # Добавляем куб
    cube = Poly3DCollection(
        face_coords, facecolors=color, alpha=alpha, edgecolors="k", linewidths=1
    )
    ax.add_collection3d(cube)

    # Настраиваем оси
    ax.set_xlabel("X")
    ax.set_ylabel("Y")
    ax.set_zlabel("Z")

In [5]:
curent_time = alldata["datetime"].max() - pd.Timedelta(days=2)
data_timesort = alldata[alldata["datetime"] > curent_time]

In [None]:
%matplotlib widget
fig = plt.figure(figsize=(9, 7))
ax = fig.add_subplot(111, projection='3d')
plot_cube(ax,
    (20, 25),
    (80, 100),
    (6,10)
)
# ax.scatter(10,1,1)

for id in data_timesort['id_s'].unique()[1]:
    print(id)
    ax.scatter(data_last['temp_air'],data_last['hum_air'],np.zeros(len(data_last)))
ax.set_xlabel('температура')
ax.set_ylabel('влажность')
ax.set_zlabel('время')

In [32]:
data = data_timesort[data_timesort["id_s"] == "5"].reset_index()
data["temp_air"] = data["temp_air"] + 9

работа с toml

In [33]:
with open("haze.toml", "rb") as f:
    diseases = tomllib.load(f)

In [None]:
# list(diseases.keys())[0]
time_last = data["datetime"].max()
data_last = data[data["datetime"] == time_last]
diseases_check={}
# первая стадия, проверка всех болезней
for diseas in list(diseases.keys()):
    # print(diseas)
    parameters = list(diseases[diseas]["param"])
    parameters.remove("time")
    # print(parameters)
    count=0
    for param in parameters:
        pmin = diseases[diseas]["param"][param]["min"]
        pmax = diseases[diseas]["param"][param]["max"]
        boud=(pmax-pmin)/20
        if pmin-boud <= data_last.iloc[-1][param] <= pmax+boud:
            # print(param)
            count+=1
        # print(param, pmin, pmax, data_last.iloc[-1][param])
    if count==len(parameters):
        # print("         ",diseas, "ALARM")
        diseases_check[diseas]=True
    else:
        diseases_check[diseas]=False
print(diseases_check)
diseases_current = [key for key, value in diseases_check.items() if value]
# подтвержденные болезни на текущий момент
print(diseases_current)


In [None]:
for diseas in diseases_current:
    time_range = diseases[diseas]["param"]["time"]["min"]
    time_last = data["datetime"].max()-pd.Timedelta(hours=time_range)
    data_last = data[data["datetime"] >= time_last]
    parameters = list(diseases[diseas]["param"])
    parameters.remove("time")
    # print(parameters)
    count=0
    for i in range(-1,-len(data_last),-1):
        count=0
        for param in parameters:
            pmin = diseases[diseas]["param"][param]["min"]
            pmax = diseases[diseas]["param"][param]["max"]
            boud=(pmax-pmin)/10
            if pmin-boud <= data_last.iloc[i][param] <= pmax+boud:
                count+=1
                print(param, pmin, pmax, data_last.iloc[i][param])
        if count==len(parameters):
            
            time_start=data_last.iloc[i]['datetime']
            print(i, time_start)


In [None]:
import time
time.ctime()

In [None]:
%matplotlib widget
fig = plt.figure(figsize=(9, 7))
ax = fig.add_subplot(111, projection='3d')
plot_cube(ax,
    (20, 25),
    (80, 100),
    (6,10)
)
# ax.scatter(10,1,1)


ax.scatter(data_last['temp_air'],data_last['hum_air'],np.zeros(len(data_last)))
ax.set_xlabel('температура')
ax.set_ylabel('влажность')
ax.set_zlabel('время')

In [28]:
time_range = diseases[diseas]["param"]["time"]["min"]
time_last = data["datetime"].max()-pd.Timedelta(hours=time_range)
data_last = data[data["datetime"] >= time_last]

In [None]:
for i in range(-1,-len(data_last),-1):
    print(i,data_last.iloc[i]['datetime'])

In [None]:
for disease_name, disease_data in diseases.items():
    if "info" in disease_data:
        print(disease_data["info"]["name"])

построение карт

In [2]:
def loocv(x, y, model):
    warnings.filterwarnings("ignore")
    """х - массив точек.
    у - массив значений.
    возвращаяет R2 LOOCV модели"""
    pred = []
    loo = LeaveOneOut()
    if type(x) is not np.array:
        x = np.array(x)
    if type(y) is not np.array:
        y = np.array(y)
    for i, (train_index, test_index) in enumerate(loo.split(x)):
        model.fit(x[train_index], y[train_index])
        res = model.predict(np.array(x[test_index]))
        pred.append(float(res))
    return r2_score(y, pred)

In [3]:
def gradient(x, y):
    """x.у - массивы с координатами по x и y осям.\n
    возвращает максимальный градиент функции"""
    q, t = [], []
    for s in np.argsort(x):
        q.append(x[s])
        t.append(y[s])
    grad = []
    for i in range(len(q) - 1):
        if q[i + 1] - q[i] == 0:
            grad.append(0)
        else:
            grad.append((t[i + 1] - t[i]) / (q[i + 1] - q[i]))
    return np.mean(np.abs(grad))

In [4]:
def custom_cmap(c1, c2, n):
    """Return object cmap.
    :c1: array [r,g,b,alpha] 0-1 or 0-255. or color name from matplotlib
    :c2: array [r,g,b,alpha] 0-1 or 0-255. or color name from matplotlib
    :n: number of gradation
    """
    if type(c1) != str:
        c1 = np.array(c1)
        c2 = np.array(c2)
        if np.logical_or(c1 > 1, c2 > 1).any():
            c1 = c1 / 255
            c2 = c2 / 255
    else:
        c1 = mpl.colors.to_rgb(c1)
        c2 = mpl.colors.to_rgb(c2)
    q = np.array(
        [
            np.linspace(c1[0], c2[0], n),
            np.linspace(c1[1], c2[1], n),
            np.linspace(c1[2], c2[2], n),
            np.linspace(c1[3], c2[3], n),
        ]
    )
    colorMap = mpl.colors.ListedColormap(q.T)
    return colorMap

In [37]:
def generate_surface(
    nums, x_range=(-6, 6), y_range=(-6, 6), grid_resolution=1000
):
    """
    Упрощенная функция: генерирует поверхность и возвращает только nums точек.

    Возвращает:
    - points: массив формы (nums, 3) с координатами [x, y, z]
    """

    # Создаем плотную сетку для генерации поверхности
    x = np.linspace(x_range[0], x_range[1], grid_resolution)
    y = np.linspace(y_range[0], y_range[1], grid_resolution)
    X, Y = np.meshgrid(x, y)

    # Базовая поверхность
    Z = np.full_like(X, 0)

    # Три максимума 
    # (45.03, 45.039), (38.82,38.83)(45.03, 38.825, 1)
    peaks=[]
    for i in range(3):
        peaks.append([random.uniform(x_range[0], x_range[1]),random.uniform(y_range[0], y_range[1]),40])
    # peaks = [ (45.032, 38.835, 1),()]


    for px, py, height in peaks:
        sigma = (x_range[1]-x_range[0])/10  # ширина пика (подберите под ваш масштаб!)
        # Расстояние в квадрате (гаусс использует квадрат!)
        distance_squared = (X - px)**2 + (Y - py)**2
        peak = height * np.exp(-distance_squared / (2 * sigma**2))
        Z += peak

    # Выбираем 50 случайных точек
    all_points = np.column_stack([X.ravel(), Y.ravel(), Z.ravel()])
    indices = np.random.choice(len(all_points), nums, replace=False)
    selected_points = all_points[indices]

    return selected_points

In [58]:
def calc_surface(
    data: dict,
    coordinates: dict,
    resolution=100,
    model=RBF(),
    validation=False,
    normalize=True,
    gap=10,
):
    """
    Создает интерполированную карту значений на основе заданных данных и координат.
    Функция использует машинное обучение для интерполяции значений между известными точками
    и генерирует регулярную сетку значений, которая может быть использована для визуализации
    карты изолиний, тепловой карты или других типов пространственного анализа.
    Параметры:
    -----------
    data : dict
        Словарь, где ключи - идентификаторы точек, значения - числовые значения в этих точках
    coordinates : dict
        Словарь, где ключи - те же идентификаторы точек, значения - кортежи координат (x, y)
    resolution : int, optional, default=100
        Разрешение выходной сетки (количество точек по каждой оси)
    model : sklearn estimator, optional, default=RBF()
        Модель машинного обучения для интерполяции значений
    normalize : bool, optional, default=True
        Флаг нормализации координат перед обучением модели
    gap : int, optional, default=10
        Процент расширения границ карты относительно крайних точек (для отступа)

    Возвращает:
    -----------

    aa : numpy.ndarray
        2D массив координат X сетки (meshgrid)
    bb : numpy.ndarray
        2D массив координат Y сетки (meshgrid)
    z_map : numpy.ndarray
        2D массив интерполированных значений Z, соответствующих координатам сетки

    Пример использования:
    --------------------

    >>> data = {'point1': 10, 'point2': 20, 'point3': 15}

    >>> coordinates = {'point1': (0, 0), 'point2': (10, 10), 'point3': (5, 3)}

    >>> xx, yy, zz = calc_surface(data, coordinates, resolution=50)

    """

    X = np.array(
        list(coordinates.values())
    )  # X- массив координат [[x1.y1].[x2.y2]...]       

    x, y = X.T[0], X.T[1]  # координаты X и Y отдельно

    bounds_x = [min(x), max(x)]
    bounds_y = [min(y), max(y)]
    delta_x = (bounds_x[1] - bounds_x[0]) * gap / 100
    delta_y = (bounds_y[1] - bounds_y[0]) * gap / 100

    resolution = int(resolution)

    aa, bb = np.meshgrid(
        np.linspace(bounds_x[0] - delta_x, bounds_x[1] + delta_x, resolution),
        np.linspace(bounds_y[0] - delta_y, bounds_y[1] + delta_y, resolution),
    )

    xx = np.vstack([aa.ravel(), bb.ravel()]).T

    if normalize:
        scaler = StandardScaler()
        learn_X = scaler.fit_transform(np.array(list(coordinates.values())))
        pred_xx = scaler.fit_transform(xx)

    else:
        learn_X = X
        pred_xx = xx

    learn_Z = np.array(list(data.values())).reshape(-1, 1)  # массив значений
    model.fit(learn_X, learn_Z)
    z_map = model.predict(pred_xx).reshape(resolution, resolution)
    if validation:
        r2 = loocv(X, learn_Z, model)
        print("R2-score: ", r2)

    return aa, bb, z_map

In [7]:
def create_map_plotter(
    xx, yy, zz, points={}, points_value={}, img=None, z_norm_coef=False,levels=10, cmap="custom"
):
    plt.figure(figsize=(12, 10))
    if img != None:
        alpha_map = 0.5
        plt.imshow(img, extent=[np.min(xx), np.max(xx), np.min(yy), np.max(yy)])
    else:
        alpha_map = 1

    if cmap == "custom":
        cmap = custom_cmap([0, 1, 0, 0.6], [1, 0, 0, 0.6], levels)

    if z_norm_coef:
        z_min = zz.min()
        z_max = zz.max()
        if z_max - z_min == 0:
            zz_norm = np.zeros_like(zz)  
        else:
            zz_norm = 100*(zz - z_min) / (z_max - z_min)
    else:
        zz_norm=zz


    plotparam = {"levels": levels, "cmap": cmap}
    plotparam2 = {"levels": levels, "colors": "black", "linewidths": 0.5}
#  extend='both'
    countr = plt.contourf(xx, yy, zz_norm, **plotparam)
    plt.contour(xx, yy, zz_norm, **plotparam2)
    plt.colorbar(countr, shrink=0.75)

    lev = countr.levels
    cm = plt.get_cmap(cmap)
    colors = cm(np.linspace(0, 1, len(lev) + 1))
    if len(points) != 0:
        poi = np.array(list(points.values()))
        names = list(points.keys())
        x, y = poi.T[0], poi.T[1]
        poi_v = list(points_value.values())
        size = 500
        color_index = np.searchsorted(lev, poi_v, side="left")
        color_index[color_index == max(color_index)] = max(color_index) - 1
        for i in range(len(names)):
            plt.scatter(
                x[i], y[i], c=colors[color_index[i]], s=size, edgecolors="black"
            )
            plt.text(x[i], y[i], names[i], ha="center", va="center")

    plt.gca().set_aspect("equal")  #'
    plt.tight_layout()
    plt.savefig("map.png")
    # plt.show()
    plt.close()
 

    polygons = []
    for i, level_segs in enumerate(countr.allsegs):
        z_val = float(countr.cvalues[i])
        # Замена "extend"-значений
        if z_val < -1e+200:
            z_val = countr.levels[0]
        elif z_val > 1e+200:
            z_val = countr.levels[-1]
        for seg in level_segs:
            seg_3d = np.column_stack([seg, np.full(len(seg), z_val)])
            polygons.append(seg_3d)


    return polygons, z_val

In [172]:
def normalize(mas, coef):
    '''Возвращает нормализированный массив от 0 до 1, умноженный на коэффициент coef'''
    mas=np.array(mas)
    mas_min = mas.min()
    mas_max = mas.max()
    if mas_max - mas_min == 0:
        mas_norm = np.zeros_like(mas)  
    else:
        mas_norm = coef*(mas - mas_min) / (mas_max - mas_min)
    return mas_norm

def denormalize(mas_norm, mas, coef):
    '''Возвращает ДЕнормализированный массив, mas_norm - нормализированный от 0 до 1 и умноженный на коэффициент coef'''
    mas_norm=np.array(mas_norm)/coef
    mas_min = mas.min()
    mas_max = mas.max()
    if mas_max - mas_min == 0:
        return np.full_like(mas_norm, mas_min)
    else:
        mas_denorm = mas_min + mas_norm * (mas_max - mas_min)
        return mas_denorm

In [175]:
def create_map(
    xx, yy, zz, z_norm_coef=0, levels=10, save_pic=False, points={}, points_value={}
):
    """Создаёт контурную карту на основе двумерных данных и возвращает координаты полигонов контуров.

    Функция строит изолинии (контуры) по заданным сеточным данным (xx, yy, zz),
    опционально нормализует значения по оси Z, отображает карту с возможностью
    сохранения изображения и нанесения дополнительных точек с подписями.
    Возвращает список полигонов контуров в 3D-формате (x, y, z) и соответствующие
    значения z для каждого полигона.

    Args:
        xx (array-like): Двумерный массив координат X сетки.
        yy (array-like): Двумерный массив координат Y сетки.
        zz (array-like): Двумерный массив значений Z.
        z_norm_coef (float, optional): Коэффициент для нормализации zz. Если не равен 0,
            применяется функция normalize. По умолчанию 0 (нормализация отключена).
        levels (int, optional): Количество уровней контуров. По умолчанию 10.
        save_pic (bool, optional): Если True — сохраняет изображение контурной карты
            в файл 'map.png'. По умолчанию False.
        points (dict, optional): Словарь с именами точек в качестве ключей и их
            координатами [x, y] в качестве значений. Используется только при save_pic=True.
        points_value (dict, optional): Словарь с теми же ключами, что и в points,
            содержащий значения Z для каждой точки (для определения цвета на карте).

    Returns:
        tuple: Кортеж из двух элементов:
            - polygons (list of ndarray): Список массивов формы (N, 3), где каждая строка —
              точка контура в формате [x, y, z].
            - z_values (list of float): Список значений z, соответствующих каждому полигону.
    Example:
        >>> import numpy as np
        >>> x = np.linspace(0, 10, 100)
        >>> y = np.linspace(0, 10, 100)
        >>> xx, yy = np.meshgrid(x, y)
        >>> zz = np.sin(xx) * np.cos(yy)
        >>> polygons, z_vals = create_map(xx, yy, zz, levels=8, save_pic=True,
        ...                               points={"A": [2.5, 3.0], "B": [7.0, 6.5]},
        ...                               points_value={"A": 0.4, "B": -0.3})
        >>> print(f"Создано {len(polygons)} контурных сегментов")
        Создано <число> контурных сегментов
        >>> print(f"Пример значения z: {z_vals[0]:.2f}")
        Пример значения z: <значение>
    """
    if z_norm_coef != 0:
        zz_norm = normalize(zz, z_norm_coef)
    else:
        zz_norm = zz

    if save_pic:
        plt.figure(figsize=(12, 10))
        countr = plt.contourf(xx, yy, zz_norm, levels=levels)
        plt.contour(xx, yy, zz_norm, levels=levels, colors="black", linewidths=0.5)
        # plt.colorbar(countr, shrink=0.75)

        lev = countr.levels
        cm = plt.get_cmap("viridis")
        colors = cm(np.linspace(0, 1, len(lev) + 1))
        if len(points) != 0:
            poi = np.array(list(points.values()))
            names = list(points.keys())
            x, y = poi.T[0], poi.T[1]
            poi_v = list(points_value.values())
            size = 500
            color_index = np.searchsorted(lev, poi_v, side="left")
            color_index[color_index == max(color_index)] = max(color_index) - 1
            for i in range(len(names)):
                plt.scatter(
                    x[i], y[i], c=colors[color_index[i]], s=size, edgecolors="black"
                )
                plt.text(x[i], y[i], names[i], ha="center", va="center")
        plt.colorbar(countr, shrink=0.95)
        plt.tight_layout()
        plt.savefig("map.png")
    else:
        countr = plt.contourf(xx, yy, zz_norm, levels=levels)
    plt.close()

    polygons = []
    z_values = []
    for i, level_segs in enumerate(countr.allsegs):
        z_val = float(countr.cvalues[i])
        # Замена "extend"-значений
        if z_val < -1e200:
            z_val = countr.levels[0]
        elif z_val > 1e200:
            z_val = countr.levels[-1]
        for seg in level_segs:
            seg_3d = np.column_stack([seg, np.full(len(seg), z_val)])
            z_values.append(z_val)
            polygons.append(seg_3d)
    if z_norm_coef != 0:
        z_values = denormalize(z_values, zz_norm, z_norm_coef)
    return polygons, z_values

In [123]:
surface_points = generate_surface(50, (0, 140), (20,120))

coordinates, values = {}, {}
for i in range(len(surface_points)):
    coordinates[str(i)] = surface_points[i][0:2]
    values[str(i)] = surface_points[i][2]

In [178]:
from RBFlib import RBF

x, y, z = calc_surface(
    values,
    coordinates,
    model=RBF(),
    resolution=200,
    normalize=True,
    validation=True,
    gap=10,
)
polygons, z_values=create_map(x, y, z,levels=7,z_norm_coef=100,save_pic=True,points=coordinates,points_value=values)

R2-score:  0.9003101346762024


In [180]:
def create_json_polygons(polygons, values, description="контур", postfix=""):
    '''
    Формирует список словарей в формате JSON для передачи или сохранения контурных полигонов.
    Каждый элемент списка представляет собой объект с идентификатором, названием,
    координатами полигона и описанием, содержащим соответствующее значение из списка `values`.

    Args:
        polygons (list of array-like): Список полигонов, где каждый полигон — это массив
            координат (обычно в формате [[x1, y1, z1], [x2, y2, z2], ...]).
        values (list of float): Список числовых значений, соответствующих каждому полигону.
            Используется в описании контура.
        description (str, optional): Базовое текстовое описание, вставляемое в поле
            "description". По умолчанию "контур".
        postfix (str, optional): Дополнительный суффикс (например, единица измерения),
            добавляемый к значению в описании. По умолчанию пустая строка.

    Returns:
        list of dict: Список словарей, каждый из которых содержит поля:
            - "id": уникальный идентификатор вида "contour-0", "contour-1", ...
            - "name": название вида "Контур 0", "Контур 1", ...
            - "coordinates": исходный массив координат полигона
            - "description": строка вида "{description} {value:.2f} {postfix}"
    '''
    json_list = []
    for i in range(len(polygons)):
        json_list.append(
            {
                "id": f"contour-{i}",
                "name": f"Контур {i}",
                "coordinates": polygons[i],
                "description": f"{description} {values[i]:.2f} {postfix}",
            }
        )
    return json_list


create_json_polygons(polygons, z_values, "Температура")

[{'id': 'contour-0',
  'name': 'Контур 0',
  'coordinates': array([[-13.03725836,  10.64064064,   7.5       ],
         [-12.20064285,  10.64064064,   7.5       ],
         [-11.36402734,  10.64064064,   7.5       ],
         ...,
         [-13.87387387,  11.20502412,   7.5       ],
         [-13.87387387,  10.64064064,   7.5       ],
         [-13.03725836,  10.64064064,   7.5       ]]),
  'description': 'Температура 7.50 '},
 {'id': 'contour-1',
  'name': 'Контур 1',
  'coordinates': array([[89.86644936, 10.64064064, 22.5       ],
         [90.70306487, 10.64064064, 22.5       ],
         [91.53968038, 10.64064064, 22.5       ],
         ...,
         [89.43015581, 11.20502412, 22.5       ],
         [89.39265438, 10.64064064, 22.5       ],
         [89.86644936, 10.64064064, 22.5       ]]),
  'description': 'Температура 22.50 '},
 {'id': 'contour-2',
  'name': 'Контур 2',
  'coordinates': array([[-13.36591772,  41.1173485 ,  22.5       ],
         [-13.03725836,  41.38709226,  22.5 