In [24]:
import os
import sys
# 取得目前執行 Notebook 的工作目錄
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))
print(project_root)
if project_root not in sys.path:
    sys.path.append(project_root)
#強制重新載入 dbcon 模組（確保用到的是正確的 env）
import utils.dbcon
import importlib
importlib.reload(utils.dbcon)
from pathlib import Path
import json

/Users/apple/Desktop/Git/myworkspace/allpass/backend
DB_URL: postgresql+psycopg2://zoe:aipe-tester@localhost:25432/allpass_test


In [25]:
import gpxpy
import gpxpy.gpx
from shapely.geometry import LineString, Point, mapping
from geoalchemy2.shape import from_shape, to_shape
from datetime import datetime,timedelta
from utils.dbcon import SessionLocal
from models import TrailModel, POIModel

In [26]:
from models import POIModel

In [27]:
# gpx檔名-trail_id對應

keyword_to_trail_id ={
    "桃山瀑布": "taoshan-waterfall",
    "合歡山北" : "hehuan-north",
    "合歡南峰": "hehuan-main",
    "陽明山東": "yangmingshan-east",
    "玉山": "mountjade"
}

mapping = {}

gpx_folder = Path(".")

for gpx_file in gpx_folder.glob("*.gpx"):
    filename = gpx_file.name
    matched =False
    for keyword, trail_id in keyword_to_trail_id.items():
        if keyword in filename:
            mapping[filename] = trail_id
            matched = True
            break
    if not matched:
        mapping[filename] = "unknown"

print(mapping)


{'玉山主峰單攻.gpx': 'mountjade', '桃山瀑布.gpx': 'taoshan-waterfall', '合歡山北峰.gpx': 'hehuan-north', '合歡南峰.gpx': 'hehuan-main', '陽明山東段縱走.gpx': 'yangmingshan-east'}


In [28]:
#每 1 公里取一點（固定距離抽樣）
def extract_comm_points_every_km(points: gpxpy.gpx.GPXTrackPoint, interval_km: float=1.0) -> list: 
    comm_points = []
    acc_dist = 0.0  # 累積距離
    next_mark = interval_km * 1000  # 下一個標記點（公尺）

    for i in range(1, len(points)):  #從第 1 個點開始（第 0 點沒有前一點無法計算距離）
        prev_point = points[i - 1]
        curr_point = points[i]

        segment_dist = prev_point.distance_3d(curr_point) #使用 gpxpy 提供的 distance_3d() 計算兩點間的距離
        acc_dist += segment_dist #累加到總距離 acc_dist

        if acc_dist >= next_mark:  #如果累積距離超過目標（例如 1000 公尺），就把目前的點當作通訊點加入
            comm_points.append(curr_point)
            next_mark += interval_km * 1000 #更新下一個目標距離（例如下一個是 2000 公尺）

    return comm_points

#平均切成 3 段取點（等距抽樣）
def extract_comm_points_by_segments(points: gpxpy.gpx.GPXTrackPoint, num_points: int=3) -> list:
    # 計算整條路徑長度
    total_distance = 0.0 #累加所有路段的總距離
    distances = [0.0]  # 距離累積陣列,  紀錄從起點到每個點的累積距離，例如 [0, 123, 328, 550, ...]

    for i in range(1, len(points)):
        d = points[i - 1].distance_3d(points[i])
        total_distance += d
        distances.append(total_distance)

    # 分段目標距離
    segment_length = total_distance / (num_points + 1) #把路徑分成 num_points + 1 段（例如 3 點會切成 4 段），計算每段長度（等距切分）
    comm_points = []

    # 尋找最接近每個段點的 index
    for i in range(1, num_points + 1):
        target_dist = i * segment_length  #依序決定每個「目標距離」(1/4, 2/4, 3/4), 第i個點在 i/4 總長度
        # 從 distances 中找「最接近目標距離」的點（最靠近那個切分位置）
        # range(len(distances)) -> [0, 1, 2, 3..., (num_of_ele -1)]
        # lambda j（自定義比較準則):第 j 個累積距離和 target_dist差距
        closest_idx = min(range(len(distances)), key=lambda j: abs(distances[j] - target_dist)
                    )
        comm_points.append(points[closest_idx])

    return comm_points

In [29]:
# 讀取gpx完整路徑與隨機選擇通訊點
for gpx_file in gpx_folder.glob("*.gpx"):
    trail_id = mapping[str(gpx_file)]
    print(f"處理檔案:{gpx_file}-trail_id: {trail_id}")
    try:
        #讀檔
        with open (gpx_file, "r", encoding="utf-8") as f:
            gpx_data = gpxpy.parse(f)

        # 假設只取第一個 track/segment 的點
        segment = gpx_data.tracks[0].segments[0]
        points = segment.points

        if not points:
            print(f"  -> 檔案 {gpx_file.name} 沒有點，略過")
            continue

        #組成LineString 幾何格式
        coords = [(p.longitude, p.latitude) for p in points]
        linestring_new = LineString(coords)
        
        #寫入資料庫: paths.trails
        with SessionLocal() as session:
            trail = session.query(TrailModel).filter_by(trail_id=trail_id).first()
            if trail:
                trail.route_geometry = from_shape(linestring_new, srid=4326)
            session.commit()
            print(f"Update 成功！{trail_id}" )

        #comm_points_1km = extract_comm_points_every_km(points, interval_km=1)

        #以等距法切出三個通訊點
        comm_points_3cut = extract_comm_points_by_segments(points, num_points=3)
        with SessionLocal() as session:
            for idx, pt in enumerate(comm_points_3cut):
                Shapely_pt = Point(pt.longitude, pt.latitude)
                new_comm_point = POIModel(
                    trail_id = trail_id,
                    name = f"cp{idx + 1}",
                    poi_type = "comm_point",
                    location = from_shape(Shapely_pt, srid=4326),
                    description = f"{trail_id} - cp{idx + 1}"
                )
                session.add(new_comm_point)
            session.commit()
            print(f"Insert 成功！{trail_id} - 共 {len(comm_points_3cut)} 筆通訊點")


        # 印出結果
        # for p in comm_points_1km:
        #     print(p.latitude, p.longitude)

        # for p in comm_points_3cut:
        #     print(p.latitude, p.longitude)
                
        # # 依照時間排序，取得最新一點（時間最大）
        # latest = max(points, key=lambda p: p.time)

        # # 組成 POINTZ 幾何格式（注意順序：lon, lat, ele）
        # latest_point = Point(latest.longitude, latest.latitude, latest.elevation)



        #寫入資料庫: paths.points_of_interests



        # with SessionLocal() as session:
        #     new_gpx = GpxTestModel(
        #             trail_id = trail_id,
        #             # from_shape: 轉換為 PostGIS 欄位可接受格式
        #             route=from_shape(linestring_new, srid=4326),
        #             location = from_shape(latest_point, srid=4326),
        #             recorded_at = datetime.now()
        #         )

        #     session.add(new_gpx)
        #     session.commit()
        #     print("Insert 成功！", new_gpx.id)

         



    except Exception as e:
        print(f"❌ 錯誤處理 {gpx_file.name}：{e}")

處理檔案:玉山主峰單攻.gpx-trail_id: mountjade
Update 成功！mountjade
Insert 成功！mountjade - 共 3 筆通訊點
處理檔案:桃山瀑布.gpx-trail_id: taoshan-waterfall
Update 成功！taoshan-waterfall
Insert 成功！taoshan-waterfall - 共 3 筆通訊點
處理檔案:合歡山北峰.gpx-trail_id: hehuan-north
Update 成功！hehuan-north
Insert 成功！hehuan-north - 共 3 筆通訊點
處理檔案:合歡南峰.gpx-trail_id: hehuan-main
Update 成功！hehuan-main
Insert 成功！hehuan-main - 共 3 筆通訊點
處理檔案:陽明山東段縱走.gpx-trail_id: yangmingshan-east
Update 成功！yangmingshan-east
Insert 成功！yangmingshan-east - 共 3 筆通訊點


In [30]:
#包裝成Feature collection
id = "taoshan-waterfall"
features = []

with SessionLocal() as session:
    trail = session.query(TrailModel).filter_by(trail_id=id).first()
    trail_geom = mapping(to_shape(trail.route_geometry)) if trail else None
    trail_id = trail.trail_id if trail else None

    point_records = session.query(POIModel).filter_by(trail_id=id).all()
    point_geoms = [(pt.name, mapping(to_shape(pt.location))) for pt in point_records]
# 到這邊 session 已關閉，下面就只處理純 Python 資料了

features = []

if trail_geom:
    features.append({
        "type": "Feature",
        "geometry": trail_geom,
        "properties": {
            "type": "route",
            "id": trail_id
        }
    })

for name, geom in point_geoms:
    features.append({
        "type": "Feature",
        "geometry": geom,
        "properties": {
            "type": "point",
            "name": name
        }
    })

feature_collection = {
    "type": "FeatureCollection",
    "features" :features
}

print(json.dumps(feature_collection))
    

TypeError: 'dict' object is not callable

In [None]:

#優化寫法
from sqlalchemy.orm import Session
from shapely.geometry import mapping
from shapely.wkb import loads as wkb_loads
from models import TrailModel, POIModel
from utils.dbcon import SessionLocal

def get_trail_geojson(trail_id: str) -> dict:
    features = []

    with SessionLocal() as session:
        # 取得 trail 路線
        trail = session.query(TrailModel).filter_by(trail_id=trail_id).first()
        if trail and trail.route_geometry is not None:
            trail_geom = mapping(wkb_loads(bytes(trail.route_geometry.data)))
            features.append({
                "type": "Feature",
                "geometry": trail_geom,
                "properties": {
                    "type": "route",
                    "id": trail.trail_id,
                    "name": trail.name,
                    "location": trail.location,
                    "difficulty": trail.difficulty,
                    "length_km": trail.length_km,
                    "elevation_gain_m": trail.elevation_gain_m,
                    # 可按需增加其他欄位
                }
            })

        # 取得該 trail 所有通訊點
        point_records = session.query(POIModel).filter_by(trail_id=trail_id).all()
        for pt in point_records:
            if pt.location is not None:
                pt_geom = mapping(wkb_loads(bytes(pt.location.data)))
                features.append({
                    "type": "Feature",
                    "geometry": pt_geom,
                    "properties": {
                        "type": "point",
                        "name": pt.name
                        # 可加入其他欄位
                    }
                })

    return {
        "type": "FeatureCollection",
        "features": features
    }
"""
✅ 安全與效能面重點解釋：
Session 一定要用 with 管理：

自動處理 session 的關閉與 rollback（即使中途有錯誤也不會遺留連線）。

效能差異極小，但錯誤風險差異極大。

mapping(to_shape(...)) → mapping(wkb_loads(...)) 更安全

to_shape() 是 GeoAlchemy 提供的，但實際上內部就是呼叫 wkb.loads()，我們可以直接轉換為更直觀明確的做法。

用 bytes(geom.data) 處理 PostGIS 的 WKB 格式，避免資料格式錯誤。

欄位是否為 None 需檢查：

route_geometry、pt.location 若為空值，直接轉換會 raise exception。

結構更清晰：

將查詢與資料轉換封裝在單一 function，便於測試與維護。
"""

