In [1]:
# %% [markdown]
# # Spielberg Frenet Log 분석 & 최적 튜닝 솔버

# %% 
import os
import re
import glob
import math
import numpy as np
import pandas as pd

from math import hypot

# (선택) 머신러닝 / 최적화
from sklearn.ensemble import RandomForestRegressor
from scipy.optimize import minimize

# 로그 폴더 경로만 바꿔줘
LOG_DIR = "./logs/frenet"  # 예: "/home/xxx/f1_sim_ws/logs"
PATTERN = os.path.join(LOG_DIR, "Spielberg_*.csv")

print("Log dir:", LOG_DIR)
print("Num files:", len(glob.glob(PATTERN)))


Log dir: ./logs/frenet
Num files: 196


In [2]:
# %%
filename_re = re.compile(
    r"^(?P<map>\w+)_"
    r"SPD(?P<spd>[\d\.]+)_"
    r"TGT(?P<tgt>[\d\.]+)_"
    r"ACC(?P<acc>[\d\.]+)_"
    r"CRV(?P<crv>[\d\.]+)_"
    r"(?P<mode>\w+?)_"          # FRENET, FGM, MUX 등
    r"(?P<stamp>\d+)_"          # 타임스탬프
    r"(?P<result>\w+)_"
    r"dur_(?P<dur>[\d\.]+)s\.csv$"
)

def parse_filename(path):
    fname = os.path.basename(path)
    m = filename_re.match(fname)
    if not m:
        raise ValueError(f"Unexpected filename: {fname}")
    d = m.groupdict()
    return {
        "map": d["map"],
        "spd": float(d["spd"]),
        "tgt": float(d["tgt"]),
        "acc": float(d["acc"]),
        "crv": float(d["crv"]),
        "mode": d["mode"],
        "stamp": int(d["stamp"]),
        "file_result": d["result"],   # GOAL / ABORT / CRASH / STUCK ...
        "file_dur": float(d["dur"]),
        "filename": fname,
        "filepath": path,
    }


In [3]:
# %%
def split_timeseries_and_summary(df):
    # time column이 숫자로 캐스팅되는 행만 time-series라고 본다
    ts_mask = pd.to_numeric(df["t"], errors="coerce").notna()
    ts = df[ts_mask].copy()
    ts["t"] = ts["t"].astype(float)
    
    # 주요 numeric 컬럼 float 변환
    num_cols = ["x", "y", "yaw", "speed", "yaw_rate", "a_lat", "min_d", "track_error", "collision"]
    for c in num_cols:
        if c in ts.columns:
            ts[c] = pd.to_numeric(ts[c], errors="coerce")
    
    summary = df[~ts_mask].copy()
    return ts, summary


def get_summary_value(summary, label, to_float=True):
    row = summary[summary["t"] == label]
    if row.empty:
        return np.nan
    val = row["x"].iloc[0]
    if to_float:
        try:
            return float(val)
        except Exception:
            return np.nan
    else:
        return str(val)


def compute_total_distance(ts):
    dx = ts["x"].diff()
    dy = ts["y"].diff()
    dist = np.hypot(dx, dy)
    return float(dist.fillna(0).sum())


def estimate_lap_time_by_start_cross(ts, radius=1.0, min_time=10.0):
    """
    시작점 (x0,y0) 기준으로, 일정 시간(min_time) 이후
    다시 반경 radius 안에 들어오는 첫 시점을 랩완주로 봄.
    없으면 마지막 t 반환.
    """
    x0, y0 = ts["x"].iloc[0], ts["y"].iloc[0]
    for _, row in ts.iterrows():
        if row["t"] < min_time:
            continue
        if hypot(row["x"] - x0, row["y"] - y0) <= radius:
            return float(row["t"])
    return float(ts["t"].iloc[-1])


In [4]:
# %%
records = []

for path in glob.glob(PATTERN):
    meta = parse_filename(path)
    df = pd.read_csv(path)
    ts, summary = split_timeseries_and_summary(df)

    # --- Summary 값 가져오기 ---
    finish_reason = get_summary_value(summary, "Finish Reason", to_float=False)
    duration_s = get_summary_value(summary, "Duration (s)", to_float=True)
    total_dist_m = get_summary_value(summary, "Total Dist (m)", to_float=True)
    max_speed = get_summary_value(summary, "Max Speed (m/s)", to_float=True)
    max_lat_acc = get_summary_value(summary, "Max Lat Acc (m/s^2)", to_float=True)
    collision_str = get_summary_value(summary, "Collision", to_float=False)
    min_dist = get_summary_value(summary, "Min Dist (m)", to_float=True)
    unsafe_time = get_summary_value(summary, "Unsafe Time (s)", to_float=True)
    unsafe_ratio = get_summary_value(summary, "Unsafe Ratio", to_float=True)
    track_err_rms = get_summary_value(summary, "Track Error RMS (m)", to_float=True)
    lat_acc_rms = get_summary_value(summary, "Lat Acc RMS (m/s^2)", to_float=True)

    # 없으면 ts 기반으로라도 계산
    if np.isnan(total_dist_m):
        total_dist_m = compute_total_distance(ts)

    # collision → bool
    if isinstance(collision_str, str):
        collision_flag = (collision_str.strip().upper() == "YES")
    else:
        collision_flag = False

    # ---- abort / crash / stuck 보정용 기본값 ----
    # (일단 파일명에 있는 duration, summary duration, ts 마지막 t 다 받아 둠)
    file_dur = meta["file_dur"]

    # ts가 비었을 수도 있으니 방어 코드
    if ts is None or ts.empty:
        ts_last_t = np.nan
    else:
        ts_last_t = float(ts["t"].iloc[-1])
    # 크게 다르면 경고 찍어볼 수도 있음
    # print(meta["filename"], file_dur, duration_s, ts_last_t)

    # lap_time_raw: 기본은 summary duration이 있으면 그걸 쓰고, 없으면 file_dur, 또 없으면 ts_last_t
    lap_time_raw = duration_s if not np.isnan(duration_s) else file_dur
    if np.isnan(lap_time_raw):
        lap_time_raw = ts_last_t

    # 성공/실패 판단 초기값 (일단 파일 이름 기준)
    file_result = meta["file_result"].upper()
    is_success = (file_result == "GOAL")
    effective_lap_time = lap_time_raw  # 이후 abort 보정에서 바꿀 수 있음

    records.append({
        **meta,
        "finish_reason": finish_reason,
        "duration_s": lap_time_raw,
        "ts_last_t": ts_last_t,
        "total_dist_m": total_dist_m,
        "max_speed_run": max_speed,
        "max_lat_acc": max_lat_acc,
        "collision_flag": collision_flag,
        "min_dist": min_dist,
        "unsafe_time": unsafe_time,
        "unsafe_ratio": unsafe_ratio,
        "track_err_rms": track_err_rms,
        "lat_acc_rms": lat_acc_rms,
        "is_success_initial": is_success,
        "effective_lap_time": effective_lap_time,
        "ts": ts,  # 나중에 abort 보정용으로 쓰려고 같이 들고 있음
    })

len(records)


196

In [5]:
# %% [새로 수정된 셀]
import pandas as pd
import numpy as np
from math import hypot

# raceline 로드 (경로는 네 환경에 맞게)
RACELINE_PATH = "./raceline_Spielberg.csv"
raceline = pd.read_csv(RACELINE_PATH)
print(raceline.columns)
# 마지막 두 점 이용
x_last, y_last = raceline["# x"].iloc[-1], raceline["y"].iloc[-1]
x_prev, y_prev = raceline["# x"].iloc[-2], raceline["y"].iloc[-2]

# 방향 벡터 = 두 번째 마지막 → 마지막
dx, dy = x_last - x_prev, y_last - y_prev
seg_len = hypot(dx, dy)
if seg_len == 0:
    raise RuntimeError("raceline 마지막 두 점이 동일합니다. 결승선을 정의할 수 없음.")

# 결승선 법선 (트랙 방향에 수직)
nx, ny = -dy / seg_len, dx / seg_len

FINISH_ORIGIN = np.array([x_last, y_last], dtype=float)
FINISH_NORMAL = np.array([nx, ny], dtype=float)

print("Finish line origin:", FINISH_ORIGIN)
print("Finish line normal:", FINISH_NORMAL)


Index(['# x', 'y', 'yaw_rad', 'speed_mps', 'dist_to_wall'], dtype='object')
Finish line origin: [-75.7539  52.8974]
Finish line normal: [ 0.5999448 -0.8000414]


In [6]:
# %% [새 셀]
def signed_distance_to_finish_line(x, y,
                                   origin=FINISH_ORIGIN,
                                   normal=FINISH_NORMAL):
    """
    직선 (origin, normal) 기준 signed distance.
    (x - origin) · normal
    """
    vx, vy = x - origin[0], y - origin[1]
    return vx * normal[0] + vy * normal[1]


def estimate_lap_time_by_finish_line(ts,
                                     origin=FINISH_ORIGIN,
                                     normal=FINISH_NORMAL,
                                     min_time=10.0):
    """
    타임시리즈 ts에서, min_time 이후 처음으로
    '결승선'을 교차하는 시점(t)을 찾아서 반환.
    - 결승선: origin을 지나고, normal에 수직인 직선
    - 교차 조건: signed distance의 부호가 바뀌는 순간
    못 찾으면 NaN 반환.
    """
    if ts is None or ts.empty:
        return np.nan
    if not {"t", "x", "y"}.issubset(ts.columns):
        return np.nan

    t_vals = ts["t"].to_numpy(dtype=float)
    x_vals = ts["x"].to_numpy(dtype=float)
    y_vals = ts["y"].to_numpy(dtype=float)

    # signed distance 배열
    s_vals = np.array([
        signed_distance_to_finish_line(x, y, origin, normal)
        for x, y in zip(x_vals, y_vals)
    ], dtype=float)

    # min_time 이후 구간에서 sign change 찾기
    for i in range(1, len(t_vals)):
        t_prev, t_cur = t_vals[i-1], t_vals[i]
        if t_cur < min_time:
            continue

        s_prev, s_cur = s_vals[i-1], s_vals[i]

        # 0 통과 여부 (단, 둘 다 0 근처이면 노이즈라 무시)
        if np.sign(s_prev) == 0 and np.sign(s_cur) == 0:
            continue
        if np.sign(s_prev) == np.sign(s_cur):
            continue
        if np.isnan(s_prev) or np.isnan(s_cur):
            continue

        # 선형 보간으로 t_cross 추정
        # s(t) = s_prev + (s_cur - s_prev) * alpha, s(t)=0 되는 alpha
        denom = (s_cur - s_prev)
        if denom == 0:
            # 거의 동시에 0 근처라면 그냥 t_cur 사용
            t_cross = t_cur
        else:
            alpha = -s_prev / denom
            # alpha가 [0,1] 범위 안인지 체크
            if alpha < 0 or alpha > 1:
                t_cross = t_cur
            else:
                t_cross = t_prev + (t_cur - t_prev) * alpha

        return float(t_cross)

    # 교차 못 찾으면 NaN
    return np.nan


In [7]:
df_runs = pd.DataFrame([
    {k: v for k, v in rec.items() if k != "ts"} for rec in records
])

goal_mask = df_runs["is_success_initial"]
goal_dists = df_runs.loc[goal_mask, "total_dist_m"].dropna()

if len(goal_dists) == 0:
    raise RuntimeError("GOAL run이 하나도 없음... 최소 1개는 필요함")

ref_lap_dist = float(goal_dists.median())
print("Ref lap distance (median from GOALs):", ref_lap_dist)

# abort 보정 파라미터
ABORT_TIME_THRESHOLD = 80.0     # 80초 이상
MIN_CROSS_TIME = 10.0           # 출발 10초 이후부터 교차 인정

for rec in records:
    file_result = rec["file_result"].upper()
    ts = rec["ts"]
    duration_s = rec["duration_s"]
    total_dist_m = rec["total_dist_m"]

    is_success = rec["is_success_initial"]
    effective_lap_time = rec["effective_lap_time"]

    # ts 없으면 보정 불가 → 그냥 실패로 둔다
    if ts is None or ts.empty:
        rec["is_success"] = is_success
        rec["effective_lap_time"] = effective_lap_time
        continue

    # 결승선 교차시점 탐색
    t_finish = estimate_lap_time_by_finish_line(
        ts,
        origin=FINISH_ORIGIN,
        normal=FINISH_NORMAL,
        min_time=MIN_CROSS_TIME,
    )

    if not np.isnan(t_finish):
        is_success = True
        effective_lap_time = t_finish
        print(f"[Reclassify as SUCCESS] {rec['filename']}: "
                f"dur={duration_s:.2f}s, total_dist={total_dist_m:.1f}m -> lap={t_finish:.2f}s")

    rec["is_success"] = is_success
    rec["effective_lap_time"] = effective_lap_time

# 보정된 결과로 df 다시 생성
df_runs = pd.DataFrame([
    {k: v for k, v in rec.items() if k != "ts"} for rec in records
])

Ref lap distance (median from GOALs): 341.665
[Reclassify as SUCCESS] Spielberg_SPD5.5_TGT5.2_ACC5.0_CRV0.90_FRENET_1764862617_CRASH_dur_86.44s.csv: dur=86.44s, total_dist=343.4m -> lap=81.47s
[Reclassify as SUCCESS] Spielberg_SPD6.0_TGT4.6_ACC6.0_CRV1.00_FRENET_1764866733_ABORT_dur_89.80s.csv: dur=89.80s, total_dist=340.6m -> lap=88.82s
[Reclassify as SUCCESS] Spielberg_SPD5.5_TGT5.2_ACC5.0_CRV0.90_FRENET_1764865320_ABORT_dur_89.84s.csv: dur=89.84s, total_dist=343.6m -> lap=84.73s
[Reclassify as SUCCESS] Spielberg_SPD6.5_TGT5.0_ACC6.0_CRV0.95_FRENET_1764869964_ABORT_dur_89.83s.csv: dur=89.83s, total_dist=346.1m -> lap=82.33s
[Reclassify as SUCCESS] Spielberg_SPD5.5_TGT5.0_ACC6.0_CRV1.00_FRENET_1764862513_ABORT_dur_89.66s.csv: dur=89.66s, total_dist=362.4m -> lap=80.50s
[Reclassify as SUCCESS] Spielberg_SPD5.5_TGT4.8_ACC5.0_CRV0.90_FRENET_1764864083_ABORT_dur_89.77s.csv: dur=89.77s, total_dist=344.8m -> lap=83.23s
[Reclassify as SUCCESS] Spielberg_SPD5.5_TGT4.6_ACC5.0_CRV0.90_FRENET_17

In [8]:
# %%
def compute_objective_row(row,
                          T_penalty=100.0,
                          alpha=50.0,
                          beta=20.0,
                          ref_lap_dist=ref_lap_dist):
    """
    row: df_runs의 한 행
    """
    if row["is_success"]:
        return float(row["effective_lap_time"])
    else:
        # 실패 run
        fail_ratio = 1.0 - float(row["total_dist_m"]) / ref_lap_dist
        fail_ratio = max(0.0, min(1.0, fail_ratio))
        J = T_penalty + alpha * fail_ratio
        if row["collision_flag"]:
            J += beta
        return J

df_runs["J"] = df_runs.apply(compute_objective_row, axis=1)
df_runs[["filename", "file_result", "is_success", "effective_lap_time", "total_dist_m", "collision_flag", "J"]].head()


Unnamed: 0,filename,file_result,is_success,effective_lap_time,total_dist_m,collision_flag,J
0,Spielberg_SPD8.0_TGT8.0_ACC9.0_CRV1.2_FRENET_1...,CRASH,False,21.64,138.71,True,149.700877
1,Spielberg_SPD5.5_TGT5.2_ACC5.0_CRV0.90_FRENET_...,CRASH,True,81.467465,343.41,True,81.467465
2,Spielberg_SPD8.5_TGT8.5_ACC9.0_CRV1.0_FRENET_1...,CRASH,False,22.89,138.51,True,149.730145
3,Spielberg_SPD6.5_TGT6.5_ACC7.0_CRV1.2_FRENET_1...,CRASH,False,13.63,77.46,True,158.664335
4,Spielberg_SPD6.0_TGT5.0_ACC6.0_CRV0.90_FRENET_...,CRASH,False,3.89,6.38,True,169.066337


In [9]:
# %%
# 파라미터 조합별 평균/최소 J 구하기
group_cols = ["spd", "tgt", "acc", "crv", "effective_lap_time"]  # pp_max_speed까지 포함하고 싶으면 여기도 추가

summary = (
    df_runs
    .groupby(group_cols)
    .agg(
        mean_J=("J", "mean"),
        min_J=("J", "min"),
        runs=("J", "count")
    )
    .reset_index()
    .sort_values("mean_J")
)

print("=== 실험된 파라미터 중 평균 J 기준 TOP 10 ===")
print(summary.head(10))


=== 실험된 파라미터 중 평균 J 기준 TOP 10 ===
     spd  tgt  acc   crv  effective_lap_time     mean_J      min_J  runs
139  6.5  5.2  6.0  0.90           74.010000  74.010000  74.010000     1
73   5.5  5.5  9.0  1.00           75.570108  75.570108  75.570108     1
65   5.5  5.5  5.0  0.80           75.857684  75.857684  75.857684     1
70   5.5  5.5  7.0  1.00           76.947742  76.947742  76.947742     1
68   5.5  5.5  5.0  1.20           76.997965  76.997965  76.997965     1
66   5.5  5.5  5.0  1.00           77.238104  77.238104  77.238104     1
60   5.5  5.2  6.0  0.90           77.284717  77.284717  77.284717     1
58   5.5  5.2  5.0  0.95           77.856793  77.856793  77.856793     1
69   5.5  5.5  5.0  1.50           77.884860  77.884860  77.884860     1
71   5.5  5.5  7.0  1.00           78.089532  78.089532  78.089532     1


In [11]:
# %%
# (필요시) pp_max_speed도 같이 쓰고 싶으면 parse_filename에서 따로 뽑아서 df_runs에 넣어줘야 함.
# 여기서는 일단 freenet 4개 파라미터만 사용

param_cols = ["spd", "tgt", "acc", "crv"]

X = df_runs[param_cols].values
y = df_runs["J"].values

print("Num samples:", X.shape[0])

# RandomForest로 surrogate 모델 학습
rf = RandomForestRegressor(
    n_estimators=500,
    max_depth=None,
    random_state=42
)
rf.fit(X, y)

def surrogate_objective(w: np.ndarray):
    """
    w: [spd, tgt, acc, crv]
    """
    w = np.array(w).reshape(1, -1)
    J_pred = rf.predict(w)[0]
    return float(J_pred)

# 초기값: 지금까지 실험에서 mean_J가 가장 좋은 조합 사용
best_row = summary.iloc[0]
w0 = best_row[param_cols].values
print("Initial guess (best from experiments):", w0)

# 파라미터 범위 설정 (네 실험 범위 기준으로)
bounds = {
    "spd": (4.5, 9.0),
    "tgt": (4.0, 9.0),
    "acc": (5.0, 9.0),
    "crv": (0.8, 1.5),
}
lb = np.array([bounds[c][0] for c in param_cols])
ub = np.array([bounds[c][1] for c in param_cols])

def obj_with_clip(w):
    w = np.array(w)
    w = np.clip(w, lb, ub)
    return surrogate_objective(w)

res = minimize(
    obj_with_clip,
    x0=w0,
    method="Nelder-Mead",
    options={"maxiter": 300, "disp": True}
)

w_opt = np.clip(res.x, lb, ub)
print("\n=== Surrogate 기반 솔버 결과 ===")
for name, val in zip(param_cols, w_opt):
    print(f"{name}: {val:.3f}")
print("예측 J:", surrogate_objective(w_opt))

# 지금까지 실험 중 best와 비교
print("\n=== 실험에서 나온 best 조합 (mean_J 기준) ===")
print(best_row[param_cols].to_dict(), " -> mean_J =", best_row["mean_J"])


Num samples: 196
Initial guess (best from experiments): [6.5 5.2 6.  0.9]
Optimization terminated successfully.
         Current function value: 76.995396
         Iterations: 15
         Function evaluations: 80

=== Surrogate 기반 솔버 결과 ===
spd: 6.500
tgt: 5.200
acc: 6.000
crv: 0.900
예측 J: 76.99539569876308

=== 실험에서 나온 best 조합 (mean_J 기준) ===
{'spd': 6.5, 'tgt': 5.2, 'acc': 6.0, 'crv': 0.9}  -> mean_J = 74.01
