Скрипт для обработки кмл-файлов энергосистемы с созданием мультигеометрии на основе исходной костыльной геометрии отпаек.
**Оптимизированная версия**

In [4]:
from lxml import etree
from shapely.geometry import LineString, Point
from scipy.spatial import KDTree
from scipy.spatial import distance
import numpy as np
import math

# Функция для вычисления расстояния между двумя точками (в метрах)
def haversine(lon1, lat1, lon2, lat2):
    R = 6371000  # Радиус Земли в метрах
    phi1, phi2 = math.radians(lat1), math.radians(lat2)
    dphi = math.radians(lat2 - lat1)
    dlambda = math.radians(lon2 - lon1)

    a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

# Функция для парсинга координат из строки KML
def parse_coordinates(coord_string):
    coords = []
    for coord in coord_string.strip().split():
        try:
            lon, lat, *_ = map(float, coord.split(","))
            coords.append((lon, lat))
        except ValueError:
            print(f"Warning: Skipping invalid coordinate string: {coord}")  # Добавлено для отладки
            # Или можно поднять исключение, если хотите остановить выполнение при ошибке
            # raise ValueError(f"Invalid coordinate string: {coord}")
    return coords

# Функция для форматирования координат обратно в строку
def format_coordinates(coords):
    return " ".join(f"{lon},{lat},0" for lon, lat in coords)


def snap_vertices_to_each_other(coords, tolerance, max_iterations=10):
    if len(coords) < 2:  # Check if coords has at least 2 points
        print("Warning: Not enough points to perform snapping ", coords)
        return coords  # Return if not enough points for KDTree
    coords = np.array(coords)  # Преобразуем список координат в массив numpy
    changes_made = True  # Флаг для повторных итераций
    iteration_count = 0  # Счётчик итераций

    while changes_made and iteration_count < max_iterations:
        changes_made = False
        iteration_count += 1
        tree = KDTree(coords)  # Создаём KDTree для текущих координат
        processed = set()  # Набор обработанных точек

        for i, point in enumerate(coords):
            if i in processed:
                continue  # Пропускаем, если точка уже обработана

            # Найдём все точки в пределах толеранса
            indices = tree.query_ball_point(point, r=tolerance / 111320)

            if len(indices) > 1:  # Если есть совпадения
                # Собираем координаты кластера
                cluster_coords = coords[indices]

                # Вычисляем усреднённые координаты
                avg_lon = np.mean(cluster_coords[:, 0])
                avg_lat = np.mean(cluster_coords[:, 1])
                avg_coords = np.array([avg_lon, avg_lat])

                # Обновляем координаты всех точек в кластере
                for idx in indices:
                    coords[idx] = avg_coords  # Обновляем координаты
                    processed.add(idx)  # Отмечаем точку как обработанную
                    changes_made = True  # Устанавливаем флаг изменений

    return [tuple(pt) for pt in coords]


def point_to_segment_distance(node, segment_start, segment_end):
    # Длина сегмента
    segment_length = haversine(segment_start[0], segment_start[1], segment_end[0], segment_end[1])
    if segment_length == 0:
        # Если сегмент вырожден (точка), возвращаем расстояние до начала сегмента
        return haversine(node[0], node[1], segment_start[0], segment_start[1]), segment_start

    # Вычисляем параметр u для проекции точки на прямую сегмента
    u = (
        ((node[0] - segment_start[0]) * (segment_end[0] - segment_start[0]) +
         (node[1] - segment_start[1]) * (segment_end[1] - segment_start[1]))
        / ((segment_end[0] - segment_start[0]) ** 2 + (segment_end[1] - segment_start[1]) ** 2)
    )

    # Если проекция лежит за пределами сегмента, ближайшая точка — один из концов
    if u < 0:  # Ближе к началу сегмента
        nearest_lon, nearest_lat = segment_start
        nearest_dist = haversine(node[0], node[1], nearest_lon, nearest_lat)
    elif u > 1:  # Ближе к концу сегмента
        nearest_lon, nearest_lat = segment_end
        nearest_dist = haversine(node[0], node[1], nearest_lon, nearest_lat)
    else:
        # Проекция находится внутри сегмента
        nearest_lon = segment_start[0] + u * (segment_end[0] - segment_start[0])
        nearest_lat = segment_start[1] + u * (segment_end[1] - segment_start[1])
        nearest_dist = haversine(node[0], node[1], nearest_lon, nearest_lat)

    return nearest_dist, (nearest_lon, nearest_lat)


def preprocess_line_with_node_segment_overlap(coords, tolerance):
    updated_coords = coords[:]
    insertions = []  # Список для хранения новых вершин

    for i, node in enumerate(coords):
        for j in range(len(updated_coords) - 1):
            segment_start = updated_coords[j]
            segment_end = updated_coords[j + 1]

            # Вычисляем расстояние от точки до сегмента
            nearest_dist, nearest_coords = point_to_segment_distance(node, segment_start, segment_end)
            #print(f"Node {i}: Distance to Segment [{j}-{j+1}] is {nearest_dist} meters")

            # Если расстояние до ближайшей точки меньше или равно толерансу
            if nearest_dist <= tolerance:
                if nearest_coords not in updated_coords and nearest_coords not in [coord for _, coord in insertions]:
                    #print(f"Adding new vertex at {nearest_coords} for Node {i} on Segment [{j}-{j+1}]")
                    insertions.append((j + 1, nearest_coords))  # Запоминаем индекс вставки и координаты

    # Применяем вставки
    for index, new_coord in sorted(insertions, reverse=True):  # Сортируем в обратном порядке, чтобы не нарушать индексацию
        updated_coords.insert(index, new_coord)

    return updated_coords


def remove_duplicate_vertices(coords, tolerance):
    if not coords:  # Check if the list is empty
        return []  # Return an empty list if it is
    cleaned_coords = [coords[0]]  # Начинаем с первой точки
    for i in range(1, len(coords)):
        prev_lon, prev_lat = cleaned_coords[-1]
        curr_lon, curr_lat = coords[i]
        distance = haversine(prev_lon, prev_lat, curr_lon, curr_lat)
        if distance > tolerance:
            cleaned_coords.append(coords[i])
    return cleaned_coords

# Функция для поиска наложений вершин с допуском (в метрах)
def find_overlapping_vertices(coords, tolerance):
    points = [Point(lon, lat) for lon, lat in coords]
    overlaps = {}
    multiple_overlaps = {}
    for i, point1 in enumerate(points):
        for j, point2 in enumerate(points):
            if i != j and haversine(point1.x, point1.y, point2.x, point2.y) <= tolerance:
                if j not in overlaps:
                    if i in overlaps:
                        multiple_overlaps[i] = []
                        multiple_overlaps[i].append(overlaps[i])
                        multiple_overlaps[i].append(j)
                    else:
                        overlaps[i] = j
    return overlaps, multiple_overlaps

# Функция для разрезания линии с учётом наложений вершин
def split_line_by_overlaps(coords, overlaps, m_overlaps):
    segments = []
    split_points = [0]

    idx_list = list(overlaps.keys())
    idx_list.sort()
    for idx in idx_list:
        if idx in m_overlaps:
            split_points.append(idx)
            for v in m_overlaps[idx]:
                split_points.append(v)
        if idx in split_points:
            continue
        if idx < split_points[-1]:
            continue
        split_points.append(idx)
        split_points.append(overlaps[idx])

    split_points.append(len(coords) - 1)
    split_points.sort()
    split_points = list(set(split_points))  # Remove duplicate split points
    split_points.sort()

    # Добавляем сегменты
    for i in range(len(split_points) - 1):
        # Ensure split_points[i + 1] is within bounds
        end_point = min(split_points[i + 1], len(coords) - 1)
        idxs = [v for v in range(split_points[i], end_point + 1) if (v not in overlaps.values() or v == split_points[i])]

        # Check if idxs are valid before accessing coords
        if all(0 <= idx < len(coords) for idx in idxs):
            segment = [coords[idx] for idx in idxs]
            segments.append(segment)
        else:
            print(f"Warning: Invalid indices found: {idxs}, skipping segment.")  # Add warning for debugging

    return segments

# Основная функция обработки KML
def create_multigeometry(input_file, output_file, tolerance):
    tree = etree.parse(input_file)
    root = tree.getroot()

    nsmap = {"kml": "http://www.opengis.net/kml/2.2"}
    placemarks = root.xpath(".//kml:Placemark", namespaces=nsmap)

    for placemark in placemarks:
        # Пропускаем объекты, содержащие MultiGeometry
        if placemark.find(".//kml:MultiGeometry", namespaces=nsmap) is not None:
            continue

        line_string = placemark.find(".//kml:LineString", namespaces=nsmap)
        if line_string is not None:
            coords_text = line_string.find(".//kml:coordinates", namespaces=nsmap).text
            coords = parse_coordinates(coords_text)

            # Предобработка: добавляем узлы на сегменты
            coords = preprocess_line_with_node_segment_overlap(coords, tolerance)

            # Убираем избыточные вершины
            coords = remove_duplicate_vertices(coords, tolerance)

            # Находим наложения вершин
            overlaps, mult_overlaps = find_overlapping_vertices(coords, tolerance)

            snapped_coords = snap_vertices_to_each_other(coords, tolerance)

            # Разрезаем линию на сегменты с учётом наложений
            segments = split_line_by_overlaps(snapped_coords, overlaps, mult_overlaps)

            if len(segments) == 1:
                # Если линия состоит из одной части, оставляем её простой
                coords_elem = line_string.find(".//kml:coordinates", namespaces=nsmap)
                coords_elem.text = format_coordinates(segments[0])
            else:
                # Создаём MultiGeometry для нескольких сегментов
                multi_geometry = etree.SubElement(placemark, "MultiGeometry")
                for segment in segments:
                    # Проверка на вырожденную геометрию (сегмент должен содержать >= 2 вершин)
                    if len(segment) > 1:
                        line_elem = etree.SubElement(multi_geometry, "LineString")
                        coords_elem = etree.SubElement(line_elem, "coordinates")
                        coords_elem.text = format_coordinates(segment)

                # Удаляем исходный LineString
                if line_string.getparent() is placemark:
                    placemark.remove(line_string)

    tree.write(output_file, pretty_print=True, xml_declaration=True, encoding="UTF-8")



# Параметры вызова
input_kml = "2025_01_02_Энергосистема.kml"  # Исходный файл
output_kml = "2025_01_02_Энергосистема_multi.kml"  # Выходной файл
tolerance = 1  # Допуск на совпадение (в метрах)

create_multigeometry(input_kml, output_kml, tolerance)
print(f"Файл сохранён как {output_kml}")

Файл сохранён как 2025_01_02_Энергосистема_multi.kml
