In [28]:
import json
import math
import re
import requests
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Optional, Set

In [29]:
Coordinate = Tuple[float, float]
API_TIMEOUT = 60

@dataclass
class MapEntity:
    title: str
    category: str
    points: List[Coordinate] = field(default_factory=list)

print("Библиотеки импортированы, структуры данных определены.")

Библиотеки импортированы, структуры данных определены.


Данный класс отвечает за построение газетира. Он:
    •Загружает станции метро через фильтры  public_transport=station,  railway=station,  railway=subway_entrance 
	•Загружает дороги через тег highway  с геометрией
	•Нормализует названия (приведение к нижнему регистру, разворачивание аббревиатур)

In [30]:
class Gazetteer:
    ENDPOINT = "http://overpass-api.de/api/interpreter"
    
    def __init__(self) -> None:
        self.subway_stations: Dict[str, MapEntity] = {}
        self.street_network: Dict[str, MapEntity] = {}
        self._initialize_database()

    def _initialize_database(self):
        print("Начало загрузки геоданных (SPB)...")
        self._fetch_transport_nodes()
        self._fetch_street_ways()
        print(f"База готова: Метро — {len(self.subway_stations)}, Улиц — {len(self.street_network)}")

    def _execute_query(self, osm_ql: str):
        try:
            response = requests.get(self.ENDPOINT, params={"data": osm_ql}, timeout=API_TIMEOUT)
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"Ошибка соединения с OSM: {e}")
            return {"elements": []}

    def _fetch_transport_nodes(self):
        ql_query = """
        [out:json][timeout:60];
        area[name="Санкт-Петербург"]->.searchArea;
        (
          node["public_transport"="station"]["station"="subway"](area.searchArea);
          node["railway"="station"]["station"="subway"](area.searchArea);
          node["railway"="subway_entrance"](area.searchArea);
        );
        out center;
        """
        raw_data = self._execute_query(ql_query)

        for item in raw_data.get("elements", []):
            if item.get("type") != "node": 
                continue
            
            lbl = item.get("tags", {}).get("name", "").strip()
            if not lbl: 
                continue

            clean_lbl = self.sanitize_name(lbl)
            geo_pt = (item.get("lat"), item.get("lon"))

            if None in geo_pt: 
                continue

            if clean_lbl not in self.subway_stations:
                self.subway_stations[clean_lbl] = MapEntity(
                    title=lbl,
                    category="metro",
                    points=[geo_pt]
                )

    def _fetch_street_ways(self):
        ql_query = """
        [out:json][timeout:120];
        area[name="Санкт-Петербург"]->.searchArea;
        (
          way["highway"]["name"](area.searchArea);
        );
        out geom;
        """
        raw_data = self._execute_query(ql_query)

        for item in raw_data.get("elements", []):
            if item.get("type") != "way": 
                continue
            
            tags = item.get("tags", {})
            lbl = tags.get("name", "").strip()
            if not lbl: 
                continue

            geom = item.get("geometry", [])
            if not geom: 
                continue

            path = [(p["lat"], p["lon"]) for p in geom]
            clean_lbl = self.sanitize_name(lbl)

            if clean_lbl in self.street_network:
                self.street_network[clean_lbl].points.extend(path)
            else:
                self.street_network[clean_lbl] = MapEntity(
                    title=lbl,
                    category="road",
                    points=path
                )

    def sanitize_name(self, raw_text: str):
        text = raw_text.lower().strip()
        replacements = [
            ("ул.", "улица"), ("пр.", "проспект"), ("пр-т", "проспект"),
            ("наб.", "набережная"), ("пл.", "площадь"), ("б-р", "бульвар"),
            ("ш.", "шоссе"), ("пер.", "переулок")
        ]
        for old, new in replacements:
            text = text.replace(old, new)
        return text

Данный класс реализует алгоритм извлечения координат:

Шаг 1: Извлечение топонимов
	•Метод: регулярные выражения для русского языка
	•Паттерны для улиц:  улица + название ,  проспект + название ,  набережная + название 
	•Паттерны для метро:  метро «название» ,  станция метро название 
	•Паттерны для пересечений:  на пересечении X с Y ,  у перекрестка X и Y 
Шаг 2: Сопоставление с газетиром
	Нечёткое сопоставление через:
		•Словарь алиасов для распространённых вариантов написания
		•Точное совпадение нормализованных названий
		•Токенизация и подсчёт пересечений слов
		•Поиск длинных общих подстрок (>3 символа)
Шаг 3: Определение пространственных отношений
	•Поиск ключевых слов: “пересечение”, “перекресток”, “съезд”
	•Извлечение пар улиц из контекста
	•Расчёт точки пересечения: перебор точек двух полилиний, поиск минимального расстояния по формуле Haversine
Шаг 4: Агрегация координат
	•Если найдено пересечение → возвращаем его координаты
	•Иначе — вычисляем взвешенный центроид:
	•Станции метро весят ×10 (подбирал в ручную)
	•Улицы усредняются по всем точкам
	•Финальная координата = центр масс всех найденных объектов

In [31]:
class IncidentGeoParser:
    def __init__(self, verbose_mode: bool = False):
        self.map_provider = Gazetteer()
        self.verbose = verbose_mode
        self._compile_patterns()

    def _compile_patterns(self):
        self.markers_intersection = {
            "пересечение", "перекресток", "пересечения", "перекрестке",
            "на пересечении", "у пересечения", "на перекрестке", "у перекрестка"
        }

        self.regex_streets = [
            r"(улиц[аеиыё]?\s+[а-яё][а-яё\s\-\.№]+?)(?:\s|,|\.|\d)",
            r"(проспект[еаиы]?\s+[а-яё][а-яё\s\-\.№]+?)(?:\s|,|\.|\d)",
            r"(шоссе\s+[а-яё][а-яё\s\-\.№]+?)(?:\s|,|\.|\d)",
            r"(набережн[аояыеи]+\s+[а-яё][а-яё\s\-\.№]+?)(?:\s|,|\.|\d)",
            r"(площад[иьья]+\s+[а-яё][а-яё\s\-\.№]+?)(?:\s|,|\.|\d)",
            r"(бульвар[еаиы]?\s+[а-яё][а-яё\s\-\.№]+?)(?:\s|,|\.|\d)",
            r"(переулок[еаиы]?\s+[а-яё][а-яё\s\-\.№]+?)(?:\s|,|\.|\d)",
            r"([а-яё]+ской)\s+улиц[еи]",
        ]

        self.regex_metro = [
            r"(станци[ияйе]\s+метро\s+[«\"„][а-яё\s\-\.]+[»\"”])",
            r"(метро\s+[«\"„][а-яё\s\-\.]+[»\"”])",
            r"у\s+метро\s+[«\"„]?([а-яё\s\-\.]+)[»\"”]?",
            r"станци[ияйе]\s+[«\"„]?([а-яё\s\-\.]+)[»\"”]?",
            r"([а-яё\s\-\.]+)\s+метро(?!\s+[а-яё])",
        ]
        
        self.regex_crossings = [
            r"(на\s+пересечени[ииеё]\s+[а-яё\s\-\.]+\s+с\s+[а-яё\s\-\.]+)",
            r"(у\s+пересечени[ияя]\s+[а-яё\s\-\.]+\s+с\s+[а-яё\s\-\.]+)",
            r"(на\s+перекрестк[еёа]\s+[а-яё\s\-\.]+\s+с\s+[а-яё\s\-\.]+)",
            r"(у\s+перекрестк[аеё]\s+[а-яё\s\-\.]+\s+с\s+[а-яё\s\-\.]+)",
            r"(пересечени[ея]\s+[а-яё\s\-\.]+\s+и\s+[а-яё\s\-\.]+)",
            r"(перекресток\s+[а-яё\s\-\.]+\s+и\s+[а-яё\s\-\.]+)",
        ]

    def solve_coordinates(self, content: str):
        if self.verbose:
            print(f"Анализ текста: {content[:300]}...")

        found_entities = self._scan_text(content)

        if self.verbose:
            print(f"Обнаружено объектов: {len(found_entities)}")
            for ent in found_entities:
                print(f" -> {ent.title} [{ent.category}]")

        if not found_entities:
            return 0.0, 0.0

        cross_points = self._analyze_intersections(content, found_entities)

        if cross_points:
            res = self._calculate_intersection_center(cross_points[0])
            if self.verbose:
                print(f"Координаты (пересечение): {res}")
            return res

        res = self._calculate_centroid(found_entities)
        if self.verbose:
            print(f"Координаты (центроид): {res}")
        return res

    def _scan_text(self, text: str) -> List[MapEntity]:
        buffer_lower = text.lower()
        unique_results: List[MapEntity] = []

        for r_pat in self.regex_streets:
            for m in re.finditer(r_pat, buffer_lower, re.IGNORECASE):
                raw_fragment = m.group(1).strip()
                match = self._fuzzy_lookup(raw_fragment, "road")
                if match and match not in unique_results:
                    unique_results.append(match)

        for r_pat in self.regex_metro:
            for m in re.finditer(r_pat, buffer_lower, re.IGNORECASE):
                raw_fragment = m.group(1).strip()
                cleaned = re.sub(r"(станци[ияйе]\s+)?(метро\s+)?[«\"„»”]?", "", raw_fragment).strip()
                if not cleaned: 
                    continue
                match = self._fuzzy_lookup(cleaned, "metro")
                if match and match not in unique_results:
                    unique_results.append(match)

        return unique_results

    def _fuzzy_lookup(self, query_str: str, mode: str):
        normalized_q = self.map_provider.sanitize_name(query_str)
        
        aliases = {
            "кантемировской": "кантемировская улица", "красина": "улица красина",
            "андреевской": "андреевская улица", "казакова": "маршала казакова",
            "героев": "проспект героев", "авиаконструкторов": "проспект авиаконструкторов",
            "шуваловского": "шуваловский проспект", "приморского": "приморское шоссе",
            "мосина": "улица мосина", "ополчения": "проспект народного ополчения",
            "голикова": "улица ленина голикова", "конюшенной": "большая конюшенная улица",
            "большевиков": "проспект большевиков", "дыбенко": "улица дыбенко",
            "коллонтай": "улица коллонтай",
        }

        for k, v in aliases.items():
            if k in normalized_q:
                mapped_norm = self.map_provider.sanitize_name(v)
                target_dict = self.map_provider.subway_stations if mode == "metro" else self.map_provider.street_network
                if mapped_norm in target_dict:
                    return target_dict[mapped_norm]

        dataset = self.map_provider.subway_stations if mode == "metro" else self.map_provider.street_network

        if normalized_q in dataset:
            return dataset[normalized_q]

        best_candidate: Optional[MapEntity] = None
        max_overlap = 0
        q_tokens = set(normalized_q.split())

        for db_name, entity in dataset.items():
            db_tokens = set(db_name.split())
            overlap_cnt = len(q_tokens & db_tokens)
            
            has_long_substr = any(token for token in q_tokens if len(token) > 3 and token in db_name)

            if overlap_cnt > max_overlap and overlap_cnt >= 1:
                max_overlap = overlap_cnt
                best_candidate = entity
            elif has_long_substr and max_overlap == 0 and best_candidate is None:
                best_candidate = entity

        return best_candidate

    def _analyze_intersections(self, text: str, entities: List[MapEntity]):
        buffer_lower = text.lower()
        found_pairs = []
        
        roads_only = [e for e in entities if e.category == "road"]

        for pat in self.regex_crossings:
            for m in re.finditer(pat, buffer_lower, re.IGNORECASE):
                phrase = m.group(1)
                r1, r2 = self._extract_roads_from_phrase(phrase, roads_only)
                if r1 and r2:
                    found_pairs.append((r1, r2))
                    break

        if not found_pairs and len(roads_only) >= 2:
            if any(k in buffer_lower for k in self.markers_intersection):
                found_pairs.append((roads_only[0], roads_only[1]))

        return found_pairs

    def _extract_roads_from_phrase(self, phrase: str, road_list: List[MapEntity]):
        candidates = []
        phrase_tokens = set(phrase.lower().split())

        for r in road_list:
            r_tokens = set(r.title.lower().split())
            intersection_size = len(phrase_tokens & r_tokens)
            if intersection_size > 0:
                candidates.append((intersection_size, r))

        candidates.sort(key=lambda x: x[0], reverse=True)

        if len(candidates) >= 2:
            return candidates[0][1], candidates[1][1]
        
        if len(road_list) >= 2:
            return road_list[0], road_list[1]
            
        return None, None

    def _calculate_intersection_center(self, pair: Tuple[MapEntity, MapEntity]) -> Coordinate:
        obj_a, obj_b = pair
        closest_pt = (0.0, 0.0)
        min_distance = float("inf")

        points_a_subset = obj_a.points[::2] if len(obj_a.points) > 10 else obj_a.points
        points_b_subset = obj_b.points[::2] if len(obj_b.points) > 10 else obj_b.points

        for p_a in points_a_subset:
            for p_b in points_b_subset:
                dist = self.haversine_distance(p_a, p_b)
                if dist < min_distance:
                    min_distance = dist
                    mid_lat = (p_a[0] + p_b[0]) / 2
                    mid_lon = (p_a[1] + p_b[1]) / 2
                    closest_pt = (mid_lat, mid_lon)

        return closest_pt

    def _calculate_centroid(self, entities: List[MapEntity]) -> Coordinate:
        if not entities:
            return 0.0, 0.0

        collection_pts: List[Coordinate] = []
        
        for e in entities:
            if e.category == "metro":
                collection_pts.extend(e.points * 10)
            elif e.points:
                avg_lat = sum(p[0] for p in e.points) / len(e.points)
                avg_lon = sum(p[1] for p in e.points) / len(e.points)
                collection_pts.append((avg_lat, avg_lon))

        if not collection_pts:
            return 0.0, 0.0

        final_lat = sum(p[0] for p in collection_pts) / len(collection_pts)
        final_lon = sum(p[1] for p in collection_pts) / len(collection_pts)
        return final_lat, final_lon

    @staticmethod
    def haversine_distance(pt1: Coordinate, pt2: Coordinate):
        R_EARTH = 6371000
        lat1, lon1 = map(math.radians, pt1)
        lat2, lon2 = map(math.radians, pt2)

        delta_phi = lat2 - lat1
        delta_lambda = lon2 - lon1

        a = (math.sin(delta_phi / 2) ** 2 +
             math.cos(lat1) * math.cos(lat2) * math.sin(delta_lambda / 2) ** 2)
        c = 2 * math.asin(math.sqrt(a))
        return R_EARTH * c

Реализованы три метрики сравнения предсказанных и реальных координат:
	1.	Haversine error — реальное расстояние по геодезической линии
	2.	RMSE — среднеквадратическая ошибка в метрах
	3.	Manhattan distance — сумма отклонений по широте и долготе

In [32]:
def compute_detailed_metrics(real: Coordinate, predicted: Coordinate):
    dist = IncidentGeoParser.haversine_distance(real, predicted)

    d_lat_m = (real[0] - predicted[0]) * 111000
    d_lon_m = (real[1] - predicted[1]) * 111000 * math.cos(math.radians(real[0]))

    return {
        "haversine_error": dist,
        "rmse_error": math.sqrt(d_lat_m**2 + d_lon_m**2),
        "manhattan_error": abs(d_lat_m) + abs(d_lon_m)
    }

def run_validation_loop(dataset: List[Dict], engine: IncidentGeoParser):
    total_dev = 0.0
    valid_counts = 0
    error_log: List[float] = []

    print(f"Запуск валидации на {len(dataset)} примерах...\n")

    for i, entry in enumerate(dataset, 1):
        txt = entry["text"]
        truth = tuple(entry["rta_coords"])

        print(f"--- Case #{i} ---")
        print(f"Text snippet: {txt[:100]}...")
        print(f"GT: {truth}")

        guess = engine.solve_coordinates(txt)
        print(f"Prediction: {guess}")

        if guess != (0.0, 0.0):
            deviation = engine.haversine_distance(truth, guess)
            print(f"Deviation: {deviation:.2f} m")
            
            total_dev += deviation
            valid_counts += 1
            error_log.append(deviation)
        else:
            print("[WARN] Координаты не извлечены.")
            error_log.append(float("inf"))
        print("")

    print("=== Результаты тестирования ===")
    print(f"Успешно обработано: {valid_counts} / {len(dataset)}")

    if valid_counts > 0:
        finite_errs = [x for x in error_log if x != float("inf")]
        avg_val = total_dev / valid_counts
        min_val = min(finite_errs) if finite_errs else float("nan")
        max_val = max(finite_errs) if finite_errs else float("nan")
        
        print(f"Mean Error: {avg_val:.2f} m")
        print(f"Min Error: {min_val:.2f} m")
        print(f"Max Error: {max_val:.2f} m")
        
        accurate = sum(1 for x in finite_errs if x < 500)
        print(f"High precision (<500m): {accurate} cases")
    else:
        print("Нет успешных предсказаний.")

In [33]:
if __name__ == "__main__":
    try:
        with open("rta_texts.json", "r", encoding="utf-8") as f:
            raw_json = json.load(f)
            validation_set = raw_json["text_list"]
            
        processor = IncidentGeoParser(verbose_mode=False)

        run_validation_loop(validation_set, processor)
        
    except FileNotFoundError:
        print("Файл 'rta_texts.json' не найден. Проверьте путь.")

Начало загрузки геоданных (SPB)...
База готова: Метро — 91, Улиц — 4017
Запуск валидации на 10 примерах...

--- Case #1 ---
Text snippet: Авария с участием спорткара, грузовика и велосипедиста произошла утром 29 августа в Выборгском район...
GT: (59.984386, 30.335297)
Prediction: (59.98432879935487, 30.340133430322577)
Deviation: 269.10 m

--- Case #2 ---
Text snippet: В Красногвардейском районе Петербурга водитель не справился с управлением машиной на закруглении дор...
GT: (59.97056, 30.496609)
Prediction: (59.95078308194901, 30.477721136964433)
Deviation: 2437.49 m

--- Case #3 ---
Text snippet: Петербургская полиция разбирается в обстоятельствах ДТП в Красносельском районе города. В ночь на 16...
GT: (59.868634, 30.168545)
Prediction: (60.017582454717, 30.270708954716987)
Deviation: 17512.40 m

--- Case #4 ---
Text snippet: Водителя на «Киа Рио» сегодня около трех часов ночи пытались остановить сотрудники отдельного специа...
GT: (60.025162, 30.22853)
Prediction: (60.0259612747788,