In [151]:
import xmltodict
import pandas as pd
from shapely.geometry import LineString, Point
from shapely.ops import nearest_points
import glob
from pathlib import Path

with open("../data/roads.osm", "r", encoding="utf-8") as file:
    osm_data = xmltodict.parse(file.read()) # XML 문서를 파이썬 딕셔너리로 변환

# GPS 로그 Pandas DataFrame 객체 생성 - 행,열 구
df = pd.read_csv("../gps_logs/gps_straight01.csv")  # 경로가 맞는지 확인

# 모든 노드를 딕셔너리로 정리
nodes_raw = osm_data["osm"]["node"]          # 노드가 하나면 dict, 여러 개면 list
if isinstance(nodes_raw, dict):              # 노드가 1개뿐인 edge-case 대비 - 이경우 dict로 반환됨
    nodes_raw = [nodes_raw]

nodes = {}
for n in nodes_raw:
    nid = int(n["@id"])
    lat = float(n["@lat"])
    lon = float(n["@lon"])
    nodes[nid] = (lon, lat)                  # (경도, 위도) 순서로 저장

print("총 노드 개수:", len(nodes))
print("예시 3개:", list(nodes.items())[:3])

총 노드 개수: 113
예시 3개: [(436854331, (127.02421, 37.4969787)), (1906756396, (127.0259134, 37.4975028)), (2280678660, (127.0294169, 37.4984367))]


In [152]:
ways_raw = osm_data["osm"]["way"]
if isinstance(ways_raw, dict):                  # way가 1개일 가능성 대비
    ways_raw = [ways_raw]

print("총 way 개수:", len(ways_raw))

총 way 개수: 31


In [153]:
all_roads = {}
missing_report = []          # (way_id, missing_nids) 목록 기록용

for w in ways_raw:
    wid = int(w["@id"])
    node_refs = [int(nd["@ref"]) for nd in w["nd"]]

    # 존재하는 노드만 추출
    coords = []
    missing = []
    for nid in node_refs:
        if nid in nodes: # 정의되지 않은 노드 필터링.
            coords.append(nodes[nid])
        else:
            missing.append(nid)

    # 누락 노드 기록
    if missing:
        missing_report.append((wid, missing))

    # 좌표가 2개 이상이면 LineString 생성
    if len(coords) >= 2:
        all_roads[wid] = LineString(coords)

print(f"선 생성 완료: {len(all_roads)}개")
if missing_report:
    print(f"⚠️  누락 노드가 있는 way: {len(missing_report)}개 (아래 일부 예시)")
    for wid, miss in missing_report[:5]:
        print(f"  way {wid} → missing {len(miss)} nodes (ex: {miss[:3]} …)")
# 누락노드 제외한 way의 노드 개수 확인
for wid, line in all_roads.items():
    way_tag = next(w for w in ways_raw if int(w["@id"]) == wid)
    original_n   = len(way_tag["nd"])   # XML에 기록된 nd 개수
    actual_n     = len(line.coords)     # 존재하는 노드만으로 만든 개수
    if original_n != actual_n:
        print(f"way {wid}: XML {original_n}개 → 실제 {actual_n}개")
print(f"✅ 생성된 LineString 수: {len(all_roads)}")   

# 전부 출력해보기
for i, (wid, line) in enumerate(all_roads.items()):
    
    print(f"way {wid}: {len(line.coords)} pts, "
          f"첫점 {line.coords[0]} → 끝점 {line.coords[-1]}")

선 생성 완료: 31개
⚠️  누락 노드가 있는 way: 3개 (아래 일부 예시)
  way 472042763 → missing 1 nodes (ex: [11034387027] …)
  way 472042765 → missing 1 nodes (ex: [11034387028] …)
  way 504231011 → missing 1 nodes (ex: [11245176923] …)
way 472042763: XML 3개 → 실제 2개
way 472042765: XML 3개 → 실제 2개
way 504231011: XML 4개 → 실제 3개
✅ 생성된 LineString 수: 31
way 218864485: 4 pts, 첫점 (127.0276806, 37.4980478) → 끝점 (127.0274875, 37.4984961)
way 218864491: 15 pts, 첫점 (127.0294169, 37.4984367) → 끝점 (127.0369935, 37.5007121)
way 218873820: 5 pts, 첫점 (127.0280994, 37.4966908) → 끝점 (127.0289478, 37.4949431)
way 218877837: 5 pts, 첫점 (127.0260403, 37.5009804) → 끝점 (127.0267729, 37.4994695)
way 239954010: 10 pts, 첫점 (127.0284946, 37.4982952) → 끝점 (127.0274875, 37.4984961)
way 304445274: 2 pts, 첫점 (127.0274725, 37.4979843) → 끝점 (127.0269036, 37.497806)
way 305539406: 9 pts, 첫점 (127.0272447, 37.4984526) → 끝점 (127.0269036, 37.497806)
way 375049565: 14 pts, 첫점 (127.0369107, 37.5009049) → 끝점 (127.0284946, 37.4982952)
way 472042763: 2

In [154]:

DEG2M = 111_320    
# 위도 1° ≈ 111.32 km (서울 근사) 
# Shapely가 계산한 거리는 ‘도(degree)’ 단위이므로, m 단위로 바꿀 때 곱해 주기 위함

def find_closest_road(lon, lat, roads_dict):
    """하나의 GPS 점(lon, lat)을 roads_dict 중 가장 가까운 선으로 매칭."""
    p = Point(lon, lat)
    best_way, best_dist, best_proj = None, float("inf"), None

    for wid, line in roads_dict.items():
        d = p.distance(line)           # 단위: degree
        if d < best_dist:
            best_way, best_dist = wid, d
            best_proj = nearest_points(p, line)[1]   # 수직으로 내려서 구한 도로 위 점(투영점)
            # p와 line 사이에서 가장 가까운 두 점을 반환
            
    return best_way, best_dist * DEG2M, best_proj

# GPS의 첫 행만 테스트
test = df.iloc[0]
wid, dist_m, proj = find_closest_road(test.Longitude, test.Latitude, all_roads)

print("가장 가까운 way:", wid)
print(f"거리: {dist_m:.1f} m")
print("투영점:", (proj.x, proj.y))


가장 가까운 way: 218873820
거리: 2.3 m
투영점: (127.02813114879656, 37.496624035596895)


In [155]:
# 1) 각 행에 매칭 결과를 돌려주는 함수
def match_row(row):
    wid, dist_m, proj = find_closest_road(
        row.Longitude, row.Latitude, all_roads
    )
    return pd.Series({
        "matched_way":  wid,        # 가장 가까운 도로 ID
        "match_dist_m": dist_m,     # 거리 (m)
        "proj_lon":     proj.x,     # 도로 위 경도
        "proj_lat":     proj.y,     # 도로 위 위도
    })

# 2) DataFrame 전체에 적용해서 새 컬럼 붙이기
df[["matched_way", "match_dist_m","proj_lon","proj_lat"]] = (
    df.apply(match_row, axis=1) # axis=1 → DataFrame의 각 행(row) 를 순서대로 꺼냄
)
df["matched_way"] = df["matched_way"].astype("Int64")   # 대문자 I… 결측도 허용

df = df[[
    "Latitude", "Longitude", "Angle", "Speed (km/h)", "HDOP",
    "matched_way", "match_dist_m",
    "proj_lat", "proj_lon"          # 위도 → 경도 순서로 배치
]]

# 3) 확인
print(df)

     Latitude   Longitude  Angle  Speed (km/h)  HDOP  matched_way  \
0   37.496633  127.028150  336.0          54.1   1.0    218873820   
1   37.496750  127.028083  336.0          54.1   1.0   1229817205   
2   37.496883  127.028017  336.0          54.1   1.0   1229817205   
3   37.497000  127.027950  353.0          64.1   1.0   1229817205   
4   37.497167  127.027917  354.6          71.9   1.0   1229817205   
5   37.497333  127.027900  326.8          59.1   1.0    521766182   
6   37.497467  127.027800  348.7          52.0   1.0   1229817205   
7   37.497583  127.027767    0.0          71.5   1.0    521766174   
8   37.497767  127.027767   20.6          43.7   1.0    990628459   
9   37.497867  127.027817  298.0          61.7   1.0    990628459   
10  37.497950  127.027650   25.7          51.1   1.0    494952007   
11  37.498067  127.027717  326.3          67.6   1.0    520307303   
12  37.498200  127.027600  321.7          41.3   1.0    218864485   
13  37.498283  127.027517  336.5  

In [156]:
def is_noisy(row, dist_thr=30, hdop_thr=3):
    too_far  = row.match_dist_m > dist_thr
    bad_hdop = row.HDOP >= hdop_thr
    return too_far or bad_hdop

df["noisy"] = df.apply(is_noisy, axis=1)

print(df["noisy"].value_counts())
df

noisy
False    22
Name: count, dtype: int64


Unnamed: 0,Latitude,Longitude,Angle,Speed (km/h),HDOP,matched_way,match_dist_m,proj_lat,proj_lon,noisy
0,37.496633,127.02815,336.0,54.1,1.0,218873820,2.323706,37.496624,127.028131,False
1,37.49675,127.028083,336.0,54.1,1.0,1229817205,1.28191,37.496745,127.028073,False
2,37.496883,127.028017,336.0,54.1,1.0,1229817205,1.251976,37.496878,127.028007,False
3,37.497,127.02795,353.0,64.1,1.0,1229817205,0.333388,37.496999,127.027947,False
4,37.497167,127.027917,354.6,71.9,1.0,1229817205,5.271036,37.497146,127.027875,False
5,37.497333,127.0279,326.8,59.1,1.0,521766182,10.20954,37.497371,127.027983,False
6,37.497467,127.0278,348.7,52.0,1.0,1229817205,8.376283,37.497434,127.027733,False
7,37.497583,127.027767,0.0,71.5,1.0,521766174,10.611048,37.497542,127.027681,False
8,37.497767,127.027767,20.6,43.7,1.0,990628459,3.521129,37.49778,127.027796,False
9,37.497867,127.027817,298.0,61.7,1.0,990628459,6.172937,37.497844,127.027767,False


In [157]:
# ① gps_logs 폴더의 모든 CSV 경로 가져오기
paths = sorted(glob.glob("../gps_logs/*.csv"))
paths = [p for p in paths if "_matched" not in p]   # 이미 변환된 파일 제거

for path in paths:
    name = Path(path).stem            # 예: gps_straight01
    print(f"▶ 파일 처리 중: {name}")

    # ② 읽기
    df = pd.read_csv(path)

    # ③ 매칭 컬럼 붙이기
    df[["matched_way", "match_dist_m", "proj_lon", "proj_lat"]] = (
        df.apply(match_row, axis=1)
    )
    df["matched_way"] = df["matched_way"].astype("Int64")

    # ④ 보기 좋게 열 순서 재배치
    df = df[[
        "Latitude", "Longitude", "Angle", "Speed (km/h)", "HDOP",
        "matched_way", "match_dist_m",
        "proj_lat", "proj_lon"          # 위도 → 경도 순서로 배치
    ]]

    # ⑤ 새 파일로 저장 (덮어쓰기 싫으면 suffix 변경)
    out_path = f"../gps_logs/{name}_matched.csv"
    df.to_csv(out_path, index=False, encoding="utf-8")
    print("   → 저장 완료:", out_path)

print("*****모든 파일 매칭 종료*****")

▶ 파일 처리 중: gps_left02_turn
   → 저장 완료: ../gps_logs/gps_left02_turn_matched.csv
▶ 파일 처리 중: gps_left02_turn_final
   → 저장 완료: ../gps_logs/gps_left02_turn_final_matched.csv
▶ 파일 처리 중: gps_left_turn
   → 저장 완료: ../gps_logs/gps_left_turn_matched.csv
▶ 파일 처리 중: gps_left_turn_final
   → 저장 완료: ../gps_logs/gps_left_turn_final_matched.csv
▶ 파일 처리 중: gps_multipath
   → 저장 완료: ../gps_logs/gps_multipath_matched.csv
▶ 파일 처리 중: gps_multipath_final
   → 저장 완료: ../gps_logs/gps_multipath_final_matched.csv
▶ 파일 처리 중: gps_reverse_direction
   → 저장 완료: ../gps_logs/gps_reverse_direction_matched.csv
▶ 파일 처리 중: gps_reverse_direction_final
   → 저장 완료: ../gps_logs/gps_reverse_direction_final_matched.csv
▶ 파일 처리 중: gps_right02_turn.
   → 저장 완료: ../gps_logs/gps_right02_turn._matched.csv
▶ 파일 처리 중: gps_right02_turn._final
   → 저장 완료: ../gps_logs/gps_right02_turn._final_matched.csv
▶ 파일 처리 중: gps_right_turn_01
   → 저장 완료: ../gps_logs/gps_right_turn_01_matched.csv
▶ 파일 처리 중: gps_right_turn_01_final
   → 저장 완료: ../g

In [219]:
from shapely.ops import linemerge, unary_union

#경로를 이루는 5 개 way ID
route_set = {
    521766182, 990628459, 472042763, 218864485, 520307304
}

baseline  = linemerge(unary_union([all_roads[w] for w in route_set]))
# all_roads 딕셔너리에서 각 ID에 해당하는 LineString을 가져와서 
# 5개의 선을 한 번에 합집합 한다. - 선이 어긋나서 잘린 부분이 있으면 이어 붙이고, 중복 부분은 지운다.
# 이어서 가능한 한 긴 하나의 선으로 다시 합친다. 결과 → baseline : 기준 경로 전체를 대표하는 한 줄짜리 LineString.

buf_m = 40
buf40_deg = baseline.buffer(buf_m / DEG2M)      # 34 m → degree
# 선 주변에 너비를 주어 띠 모양(Polygon) 을 만드는 연산입니다.

print(baseline.coords[0], baseline.coords[-1])
# 1) 시작·끝 좌표
print("start :", baseline.coords[0])
print("end   :", baseline.coords[-1])

# 2) 버텍스 개수
print("vertex count :", len(baseline.coords))      # 좌표 배열 길이

# 3) 선 길이
length_deg = baseline.length                       # 단위: degree
length_m   = length_deg * DEG2M                    # 대략 m 로 환산
print(f"length : {length_deg:.6f}°  ≈  {length_m:,.1f} m")



(127.028743, 37.4958431) (127.0268379, 37.4998762)
start : (127.028743, 37.4958431)
end   : (127.0268379, 37.4998762)
vertex count : 14
length : 0.004462°  ≈  496.7 m


In [228]:
import math                       # seg_bearing 계산에 필요
from tabulate import tabulate

# 40 m버퍼 
buf40_deg = baseline.buffer(40 / DEG2M) # baseline·DEG2M은 앞서 계산된 그대로 사용

##### 역주행 각도 계산 필터링 #####
# 두 각도의 최소차(0~180°)
def ang_diff(a, b):
    return abs((a - b + 180) % 360 - 180)

# baseline에서 투영점 주변 세그먼트 진행각(방위각) 구하기
def seg_bearing(line, proj, delta=1e-6):
    s = line.project(proj)          # 투영점까지 누적 길이
    p1 = line.interpolate(max(0, s - delta))
    p2 = line.interpolate(min(line.length, s + delta))
    dx = (p2.x - p1.x) * math.cos(math.radians((p1.y + p2.y)/2))
    dy =  p2.y - p1.y
    return (math.degrees(math.atan2(dx, dy)) + 360) % 360

# 한 행(row) → 역주행 여부
def is_reverse_row(r, thr=45):
    p = Point(r.proj_lon, r.proj_lat)
    road_ang = seg_bearing(baseline, p)
    return ang_diff(r.Angle, road_ang) > (180 - thr)    # 135°↑ → 역방향
##### 역주행 각도 계산 필터링 endendend #####

def off_path_buffer(r):
    """
    1) 40 m 버퍼 밖 → off
    2) 버퍼 안이지만 '역주행' → off
    3) 그 외             → 정상
    """
    p = Point(r.proj_lon, r.proj_lat)
    in_buffer = buf40_deg.contains(p)

    if not in_buffer:
        return True            # 거리 이탈

    # 버퍼 안인데 역주행이면 이탈
    return is_reverse_row(r)


# 모든 *_matched.csv 파일 순회
paths = sorted(
    p for p in glob.glob("../gps_logs/*.csv")
    if p.endswith("_matched.csv")
)

summary = []

for path in paths:
    name = pathlib.Path(path).stem
    df   = pd.read_csv(path)

    # (A) 버퍼 판정
    df["raw_off"] = df.apply(off_path_buffer, axis=1)

    # (B) 3연속 이상 이탈만 확정
    #     .rolling(3, center=True) → 앞뒤 포함 3행 창
    df["off_path"] = df["raw_off"].rolling(3, center=True).sum() >= 3

    off_cnt = int(df["off_path"].sum())
    verdict = "경로이탈" if off_cnt else "경로이탈 없음"   # ← 새 컬럼
    summary.append([name, off_cnt, len(df), verdict]) # ← 판정 추가

# 결과 출력
result = pd.DataFrame(summary, columns=["파일", "이탈행수", "총행수", "판정"]).sort_values("파일")
print(tabulate(result, headers="keys", tablefmt="pretty", showindex=False))


+-------------------------------+----------+--------+---------------+
|             파일              | 이탈행수 | 총행수 |     판정      |
+-------------------------------+----------+--------+---------------+
|    gps_left02_turn_matched    |    2     |   15   |   경로이탈    |
|     gps_left_turn_matched     |    5     |   18   |   경로이탈    |
|     gps_multipath_matched     |    0     |   19   | 경로이탈 없음 |
| gps_reverse_direction_matched |    15    |   17   |   경로이탈    |
|   gps_right02_turn_matched    |    7     |   23   |   경로이탈    |
|   gps_right_turn_01_matched   |    4     |   16   |   경로이탈    |
|    gps_straight01_matched     |    0     |   22   | 경로이탈 없음 |
|    gps_straight02_matched     |    0     |   10   | 경로이탈 없음 |
|    gps_straight03_matched     |    0     |   19   | 경로이탈 없음 |
|    gps_straight04_matched     |    0     |   30   | 경로이탈 없음 |
+-------------------------------+----------+--------+---------------+


In [235]:
# gps_straight01_matched CSV 로드
df = pd.read_csv("../gps_logs/gps_right02_turn_matched.csv")

# (A) ±40 m 버퍼 (한 차선 더 여유)
buf40_deg = baseline.buffer(40 / DEG2M)

##### 역주행 각도 계산 필터링 #####
# 두 각도의 최소차(0~180°)
def ang_diff(a, b):
    return abs((a - b + 180) % 360 - 180)

# baseline에서 투영점 주변 세그먼트 진행각(방위각) 구하기
def seg_bearing(line, proj, delta=1e-6):
    s = line.project(proj)          # 투영점까지 누적 길이
    p1 = line.interpolate(max(0, s - delta))
    p2 = line.interpolate(min(line.length, s + delta))
    dx = (p2.x - p1.x) * math.cos(math.radians((p1.y + p2.y)/2))
    dy =  p2.y - p1.y
    return (math.degrees(math.atan2(dx, dy)) + 360) % 360

# 한 행(row) → 역주행 여부
def is_reverse_row(r, thr=45):
    p = Point(r.proj_lon, r.proj_lat)
    road_ang = seg_bearing(baseline, p)
    return ang_diff(r.Angle, road_ang) > (180 - thr)    # 135°↑ → 역방향
##### 역주행 각도 계산 필터링 endendend #####

def off_path_buffer(r):
    """
    1) 40 m 버퍼 밖 → off
    2) 버퍼 안이지만 '역주행' → off
    3) 그 외             → 정상
    """
    p = Point(r.proj_lon, r.proj_lat)
    in_buffer = buf40_deg.contains(p)

    if not in_buffer:
        return True            # 거리 이탈

    # 버퍼 안인데 역주행이면 이탈
    return is_reverse_row(r)

# (B) 3연속 이상 이탈 때만 확정
df["raw_off"] = df.apply(off_path_buffer, axis=1)

# rolling(3): 창 크기 3, center=True면 앞뒤 모두 포함
df["off_path"] = df["raw_off"].rolling(3, center=True).sum() >= 3

print("이탈 행 수 :", df["off_path"].sum(), "/", len(df))

### df가 gps_logs/gps_straight01_matched.csv일 경우
import folium
from shapely.geometry import mapping, Point

# 1) 지도 중심 = 기준 경로 중간점
coords_baseline = [(lat, lon) for lon, lat in baseline.coords]
center = coords_baseline[len(coords_baseline) // 2]

m = folium.Map(location=center, zoom_start=16, tiles="cartodbpositron")

# 2) 기준 경로(파랑) · 버퍼(주황)
folium.PolyLine(
    coords_baseline, color="blue", weight=6, opacity=0.8,
    tooltip="기준 경로"
).add_to(m)

folium.GeoJson(
    mapping(buf30_deg),
    style_function=lambda x: {
        "fillColor": "orange",
        "color": "orange",
        "weight": 1,
        "fillOpacity": 0.25
    },
    tooltip="±40 m 버퍼"
).add_to(m)

# 3) GPS 점 – 정상(검정), 이탈(빨강)
for _, r in df.iterrows():
    color = "red" if r.off_path else "black"
    folium.CircleMarker(
        location=(r.proj_lat, r.proj_lon),
        radius=3,
        color=color,
        fill=True, fillOpacity=0.7
    ).add_to(m)

# 4) 노트북에서 바로 보기
m

# 5) 파일로 저장하고 싶다면
# m.save("gps_straight01_map.html")



이탈 행 수 : 7 / 23
