# Импортирую библиотеки

In [None]:
import pandas as pd
import numpy as np
import requests
import geopandas as gpd
import gpxpy
import os
import matplotlib.pyplot as plt
import contextily as ctx
import time
import psycopg2
import time
from geopy.distance import geodesic
from shapely.geometry import box, LineString
from typing import List, Tuple, Optional, Dict, Set
from datetime import date, datetime
from sklearn.preprocessing import LabelEncoder

# Создаю список с ссылками для скачивания

In [23]:
links = []

with open("./Links.txt", mode="r", encoding="UTF-8") as f:
    for i in f:
        links.append(i.strip())

# Загружаю gpx файлы из ссылок


In [24]:
for num, url in enumerate(links):
    try:
        response = requests.get(url)
        filename = f"track{num}.gpx"
        
        with open(f"data/gpx/{filename}", mode="wb") as f:
            f.write(response.content)
    except Exception as e:
        print("Ошибка при скачивании")


In [25]:
gpx_list = os.listdir("data/gpx")
gpx_list.sort()
gpx_list

['track0.gpx',
 'track1.gpx',
 'track2.gpx',
 'track3.gpx',
 'track4.gpx',
 'track5.gpx',
 'track6.gpx',
 'track7.gpx',
 'track8.gpx']

# Создаю изображения карты с маршрутом по трекам

In [26]:
margin = 0.02
result = []

for file in gpx_list:
    with open(f"{"data/gpx"}/{file}", mode="r", encoding="UTF-8") as f:
        gpx = gpxpy.parse(f)
    
    lats, lons, alts = [], [], []

    time_list = []
    for track in gpx.tracks:
        for segment in track.segments:
            for point in segment.points:
                lats.append(point.latitude)
                lons.append(point.longitude)
                alts.append(point.elevation)
                result.append({
                        "track_id": file,
                        "analysis_date": point.time.date(),
                        "latitude": point.latitude,
                        "longitude": point.longitude,
                        "altitude": point.elevation})
                
    bbox = box(
            min(lons) - margin,
            min(lats) - margin,
            max(lons) + margin,
            max(lats) + margin
        )
    track_line = LineString(zip(lons, lats))

    gdf_bbox = gpd.GeoDataFrame(geometry=[bbox], crs="EPSG:4326")
    gdf_bbox_web = gdf_bbox.to_crs(epsg=3857)

    gdf_track = gpd.GeoDataFrame(geometry=[track_line], crs="EPSG:4326")
    gdf_track_web = gdf_track.to_crs(epsg=3857)

    _, ax = plt.subplots(figsize=(10, 8))

    gdf_bbox_web.plot(ax=ax, alpha=0)
    gdf_track_web.plot(ax=ax, color="red", linewidth=2)

    ctx.add_basemap(ax, crs=gdf_bbox_web.crs, source=ctx.providers.OpenStreetMap.Mapnik)

    ax.set_axis_off()
    os.makedirs("data/image", exist_ok=True)

    plt.savefig(f"{"data/image"}/{file[:-4]}", dpi=150, bbox_inches="tight", pad_inches=0)
    plt.close()


In [27]:
ll = list(zip(lats, lons))

for lat, lon in (ll[0], ll[len(ll)//5],  ll[len(ll)//2], ll[len(ll)//3],  ll[-1]):
        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}
            response = requests.get(f"https://nominatim.openstreetmap.org/reverse?lat={lat}&lon={lon}&format=json", headers=headers)
            print(response)
            json = response.json()
            time.sleep(1.5)
        except Exception as e:
            print(f"Возникла ошибка при получении данных с API: {e}")

<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>


# Создаю базу данных

In [28]:
def connection():
     return psycopg2.connect(
        database="db_arthur", 
        user="arthur", 
        password="146a",
        host="localhost",
        port=5430)

In [29]:
try:
    connectt = connection()
    
    cursor = connectt.cursor()

    cursor.execute("""
        CREATE TABLE IF NOT EXISTS track_analysis (
            track_id VARCHAR,
            analysis_date TIMESTAMP,
            region VARCHAR,
            latitude DOUBLE PRECISION,
            longitude DOUBLE PRECISION,
            altitude DOUBLE PRECISION,
            step_frequency DOUBLE PRECISION,
            temperature DOUBLE PRECISION,
            terrain_type VARCHAR,
            key_objects VARCHAR,
            UNIQUE (track_id, latitude, longitude)
        )
    """)
    for i in result:
        cursor.execute("""
            INSERT INTO track_analysis
            (track_id, analysis_date, latitude, longitude, altitude)
            VALUES (%s, %s, %s, %s, %s)
            ON CONFLICT (track_id, latitude, longitude) DO NOTHING;
        """,
            (i["track_id"],
            i["analysis_date"],
            i["latitude"],
            i["longitude"],
            i["altitude"]))

    connectt.commit()
except Exception as e:
    print(f"Ошибка при создании базы данных: {e}")
finally:
    if connectt:
        cursor.close()
        connectt.close()

создаю датасет и преобразую колонку analysis_date в удобный формат даты

In [30]:
df_test = pd.DataFrame(result)
df_test['analysis_date'] = pd.to_datetime(df_test['analysis_date']).dt.strftime('%Y-%m-%d')

# Получаю температуру

Функция для расчета темпиратуры

In [31]:
def temp(lat, lon, date):
    url = "https://archive-api.open-meteo.com/v1/archive"

    params = {
        'latitude': lat,        # Ширина
        'longitude': lon,       # Долгота
        'start_date': date,
        'end_date': date,
        'hourly': 'temperature_2m',
        "timezone":"auto"
    }
    headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0'}

    response = requests.get(url, params=params, headers=headers)
    response.raise_for_status()
    if response.status_code == 200:
        data = response.json()
        return data["hourly"]['temperature_2m'][12]
    else:
        print(f"Данные не получены")

Функция для интерполяции температур между опорными точками.

In [32]:
def analysis_weather(df_test):
    '''
    Функция для интерполяции температур между опорными точками.
    Использует 5 опорных точек для интерполяции температуры для всех строк.
    '''
    n = len(df_test)
    key_indexes = [0, n//4, n//2, 3*n//4, n-1]
    temperatures_at_key_points = {}
    
    for idx in key_indexes:
        lat = df_test.iloc[idx]["latitude"]
        lon = df_test.iloc[idx]["longitude"]
        date = df_test.iloc[idx]["analysis_date"]
        
        temp_value = temp(lat, lon, date)
        
        temperatures_at_key_points[idx] = temp_value

    
    if len(temperatures_at_key_points) < 2:
        print("Недостаточно данных для интерполяции, проверьте работоспособность API")
        df_test["temperature"] = None
        return df_test
    
    all_temperatures = []
    left_idx = 0
    right_idx = 1
    
    for i in range(n):
        # Если текущий индекс превысил правую границу и есть следующий интервал
        if (right_idx < len(key_indexes) - 1 and i >= key_indexes[right_idx]):
            left_idx += 1
            right_idx += 1
        
        left_key = key_indexes[left_idx]
        right_key = key_indexes[right_idx]
        
        left_temp = temperatures_at_key_points[left_key]
        right_temp = temperatures_at_key_points[right_key]
        
        # Если текущая строка - одна из опорных точек
        if i in temperatures_at_key_points:
            temperature = temperatures_at_key_points[i]
        elif left_temp is not None and right_temp is not None:
            # Линейная интерполяция между двумя опорными точками
            temperature = left_temp + (right_temp - left_temp) * (i - left_key) / (right_key - left_key)
        else:
            # Если нет данных для интерполяции
            temperature = left_temp if left_temp is not None else right_temp
        
        all_temperatures.append(temperature)
    
    df_test = df_test.copy()
    df_test["temperature"] = all_temperatures
    # df_test["step_frequency"] = all_steps
    
    return df_test

In [33]:
try:
	df = pd.DataFrame()
	for i in range(0, 9):
		track_data = df_test[df_test["track_id"] == f"track{i}.gpx"]
		track_data_weather = analysis_weather(track_data)
		df = pd.concat([df, track_data_weather], ignore_index=True)
		print(f"track{i} добавлен")
except Exception as e:
    print(f"Ошибка вызова функции: analysis_weather {e}")


track0 добавлен
track1 добавлен
track2 добавлен
track3 добавлен
track4 добавлен
track5 добавлен
track6 добавлен
track7 добавлен
track8 добавлен


 добавляю в базу данных температуру

In [34]:
try:
    connect_db = connection()
    cursor = connect_db.cursor()

    for idx, row in  df.iterrows():
        cursor.execute("""
            UPDATE track_analysis 
            SET temperature = %s 
            WHERE id = %s
        """,
        (row["temperature"], idx + 1))

    connect_db.commit()
    print(f"Обновлено {len(df)} записей в таблице track_analysis")

except Exception as e:
    print(f"Ошибка при обновлении базы данных: {e}")
finally:
    if connect_db:
        cursor.close()
        connect_db.close()

Обновлено 32006 записей в таблице track_analysis


# Получаю регион

функция для получения региона

In [35]:
def extract_map_region(lat: float, lon: float):
    try:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}
        response = requests.get(f"https://nominatim.openstreetmap.org/reverse?lat={lat}&lon={lon}&format=json", headers=headers)
        json = response.json()
        time.sleep(1.5)
        if "county" in json["address"]:
            return json["address"]["county"]
        if "state" in json["address"]:
            return json["address"]["state"]
        return json["address"]["country"]
    except Exception as e:
        print(f"Error {e}")

функция для опеределения региона

In [36]:
def analysis_region(df):
    '''
    Функция для определения регионов между опорными точками.
    '''
    try:
        lat = df.iloc[0]["latitude"]
        lon = df.iloc[0]["longitude"]
        df = df.copy()
        df["region"] = extract_map_region(lat, lon)
        return df
    except Exception as e:
        print(f"Ошибка вызова функции analysis_region {e}")

In [37]:
try:
	df_region = pd.DataFrame()
	for i in range(0, 9):
		track_data = df_test[df_test["track_id"] == f"track{i}.gpx"]
		track_data_region = analysis_region(track_data)
		df_region = pd.concat([df_region, track_data_region], ignore_index=True)
except Exception as e:
    print(f"Ошибка вызова функции: analysis_weather {e}")


In [38]:
try:
    connect_db = connection()
    cursor = connect_db.cursor()

    for idx, row in  df_region.iterrows():
        cursor.execute("""
            UPDATE track_analysis 
            SET region = %s 
            WHERE id = %s
        """,
        (row["region"], idx + 1))

    connect_db.commit()
    print(f"Обновлено {len(df_region)} записей в таблице track_analysis")

except Exception as e:
    print(f"Ошибка при обновлении базы данных: {e}")
finally:
    if connect_db:
        cursor.close()
        connect_db.close()

Обновлено 32006 записей в таблице track_analysis


# Получаю частоту шагов

In [43]:
df_region

Unnamed: 0,track_id,analysis_date,latitude,longitude,altitude,region
0,track0.gpx,2025-11-17,44.589199,38.400221,187.926320,городской округ Геленджик
1,track0.gpx,2025-11-17,44.589160,38.400190,188.159584,городской округ Геленджик
2,track0.gpx,2025-11-17,44.589230,38.400040,188.616480,городской округ Геленджик
3,track0.gpx,2025-11-17,44.589560,38.399690,189.506496,городской округ Геленджик
4,track0.gpx,2025-11-17,44.590050,38.399120,188.224640,городской округ Геленджик
...,...,...,...,...,...,...
32001,track8.gpx,2006-11-26,44.736280,37.772490,0.610000,городской округ Новороссийск
32002,track8.gpx,2006-11-26,44.736330,37.771890,43.920000,городской округ Новороссийск
32003,track8.gpx,2006-11-26,44.736140,37.773240,5.420000,городской округ Новороссийск
32004,track8.gpx,2006-11-26,44.736260,37.771970,5.420000,городской округ Новороссийск


In [40]:
def step_frequency(p1, p2, freqs):
    dist = geodesic(p1, p2).meters
    steps = dist / 0.75
    freqs.append(steps)
    return sum(freqs) / len(freqs)

In [41]:
path = "./data/gpx"
connect = connection()
cursor = connect.cursor()
for file in gpx_list:
    points = []
    with open(f"{"data/gpx"}/{file}", mode="r", encoding="UTF-8") as f:
        gpx = gpxpy.parse(f)

    print(gpx)

    for track in gpx.tracks:
        for segment in track.segments:
            for point in segment.points:
                points.append((point.latitude, point.longitude))

    sampled = points[::5]
    freqs = []

    for p1, p2 in zip(sampled, sampled[1:]):
        avg_freq = step_frequency(p1, p2, freqs)


    cursor.execute("""
        UPDATE track_analysis
        SET step_frequency = %s
        WHERE track_id = %s
    """, (avg_freq, file))

    connect.commit()

cursor.close()
connect.close()

GPX(tracks=[GPXTrack(segments=[GPXTrackSegment(points=[...]), GPXTrackSegment(points=[...])])])
GPX(tracks=[GPXTrack(segments=[GPXTrackSegment(points=[...]), GPXTrackSegment(points=[...]), GPXTrackSegment(points=[...]), GPXTrackSegment(points=[...]), GPXTrackSegment(points=[...]), GPXTrackSegment(points=[...]), GPXTrackSegment(points=[...]), GPXTrackSegment(points=[...])])])
GPX(tracks=[GPXTrack(segments=[GPXTrackSegment(points=[...])])])
GPX(tracks=[GPXTrack(segments=[GPXTrackSegment(points=[...]), GPXTrackSegment(points=[...]), GPXTrackSegment(points=[...]), GPXTrackSegment(points=[...]), GPXTrackSegment(points=[...])])])
GPX(tracks=[GPXTrack(name='activity_20855511589', segments=[GPXTrackSegment(points=[...])])])
GPX(tracks=[GPXTrack(segments=[GPXTrackSegment(points=[...])])])
GPX(tracks=[GPXTrack(name='2021-01-16 17:12:01', segments=[GPXTrackSegment(points=[...])])])
GPX(tracks=[GPXTrack(segments=[GPXTrackSegment(points=[...])])])
GPX(tracks=[GPXTrack(segments=[GPXTrackSegment(poin

In [42]:
overpass_endpoints = [
    "https://overpass-api.de/api/interpreter",
    "https://overpass.kumi.systems/api/interpreter",
    "https://overpass.openstreetmap.ru/cgi/interpreter"
]

connect = connection()
cursor = connect.cursor()

for filename in os.listdir("./data/gpx"):
    if not filename.endswith('.gpx'):
        continue
    
    track_id = filename[:-4]
    
    with open(f"./data/gpx/{filename}", "r") as f:
        gpx = gpxpy.parse(f)
    
    all_points = []
    for track in gpx.tracks:
        for segment in track.segments:
            for p in segment.points:
                all_points.append((p.latitude, p.longitude))
    
    rep_points = [all_points[0], all_points[len(all_points)//2], all_points[-1]]
    
    all_landuse, all_natural, all_key_objects = [], [], set()
    
    for lat, lon in rep_points:
        overpass_query = f"""
        [out:json][timeout:45];
        (
            way(around:500,{lat},{lon})["landuse"];
            way(around:500,{lat},{lon})["natural"];
            way(around:500,{lat},{lon})["leisure"];
            way(around:500,{lat},{lon})["waterway"="river"];
            way(around:500,{lat},{lon})["waterway"="stream"];
            way(around:500,{lat},{lon})["natural"="water"];
            node(around:500,{lat},{lon})["place"="city"];
            node(around:500,{lat},{lon})["place"="town"];
            node(around:500,{lat},{lon})["place"="village"];
            node(around:500,{lat},{lon})["natural"="peak"];
            node(around:500,{lat},{lon})["natural"="mountain"];
        );
        out tags center;
        """
        
        for endpoint in overpass_endpoints:
            response = requests.get(endpoint, params={'data': overpass_query}, timeout=60)
            if response.status_code == 200:
                data = response.json()
                break
        
        for element in data.get('elements', []):
            tags = element.get('tags', {})
            
            if 'landuse' in tags:
                all_landuse.append(tags['landuse'])
            elif 'natural' in tags:
                all_natural.append(tags['natural'])
                if tags['natural'] in ['peak', 'mountain'] and 'name' in tags:
                    all_key_objects.add(f"Mountain: {tags['name']}")
            
            if 'waterway' in tags and tags['waterway'] in ['river', 'stream'] and 'name' in tags:
                all_key_objects.add(f"River: {tags['name']}")
            
            if 'place' in tags and tags['place'] in ['city', 'town', 'village'] and 'name' in tags:
                all_key_objects.add(f"Settlement: {tags['name']} ({tags['place']})")
            
            if element.get('type') == 'way' and 'natural' in tags and tags['natural'] == 'water' and 'name' in tags:
                all_key_objects.add(f"Lake: {tags['name']}")
        
        time.sleep(1.5)
    
    terrain_type = "unknown"
    if all_landuse:
        terrain_type = max(set(all_landuse), key=all_landuse.count)
    elif all_natural:
        terrain_type = max(set(all_natural), key=all_natural.count)
    
    key_objects_str = "; ".join(sorted(all_key_objects)) if all_key_objects else None
    
    cursor.execute("""
        UPDATE track_analysis 
        SET terrain_type = %s, key_objects = %s, analysis_date = %s
        WHERE track_id = %s
    """, (terrain_type, key_objects_str, date.today(), f"{track_id}.gpx"))
    
    connect.commit()

cursor.close()
connect.close()

In [None]:
le = LabelEncoder()
df['terrain_type_encoded'] = le.fit_transform(df['terrain_type'])
df['key_objects_encoded'] = le.fit_transform(df['key_objects'])
df['region_encoded'] = le.fit_transform(df['region'])