In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import VotingClassifier, RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, f1_score
import joblib  # 모델 저장용
import warnings
warnings.filterwarnings('ignore')

# 최종 !!!

In [51]:
import numpy as np
import joblib
import warnings
warnings.filterwarnings('ignore')

# -----------------------------
# 군집별 모델 경로 및 인코더 정의
# -----------------------------
cluster_model_paths = {
    1: {"model": "model/port_c1_17hours.joblib", "encoder": "model/encoder_c1_17hours.joblib"},
    2: {"model": "model/port_c2_17hours.joblib", "encoder": "model/encoder_c2_17hours.joblib"},
    3: {"model": "model/port_c3_17hours.joblib", "encoder": "model/encoder_c3_17hours.joblib"},
    4: {"model": "model/port_c4_17hours.joblib", "encoder": "model/encoder_c4_17hours.joblib"},
    6: {"model": "model/port_c6_17hours.joblib", "encoder": "model/encoder_c6_17hours.joblib"},
    7: {"model": "model/port_c7_17hours.joblib", "encoder": "model/encoder_c7_17hours.joblib"}
}

# -----------------------------
# 군집별 고정 단일 항구 매핑
# -----------------------------
fixed_cluster_ports = {
    0: "PHMNL",
    5: "VNHPH"
}

# -----------------------------
# 예측 함수
# -----------------------------
def predict_top_ports_from_input():
    # 1. 사용자 입력
    print("🚢 AIS 데이터 입력 (5시간 시점 기준)")
    try:
        lat = float(input("LAT (위도): "))
        lon = float(input("LON (경도): "))
        cog = float(input("COG (침로): "))
        heading = float(input("HEADING (방위각): "))
    except ValueError:
        print("⚠️ 숫자만 입력해주세요.")
        return

    user_input = np.array([[lat, lon, cog, heading]]) # (1, 4) 크기의 numpy array로 저장

    # 2. 1차 군집 예측 모델 불러오기
    try:
        cluster_model = joblib.load("./model/cluster_17hours.joblib") #1차 soft voting 분류기
    except FileNotFoundError:
        print("❌ 1차 분류기 모델이 없습니다.")
        return

    # 3. 1차 군집 예측 및 확률
    cluster_probs = cluster_model.predict_proba(user_input)[0] #각 군집에 속할 확률 반환환
    cluster_labels = cluster_model.classes_

    # 4. 확률 높은 Top-2 군집 추출
    top2_idx = np.argsort(cluster_probs)[-2:][::-1]
    print("\n🔍 [1차 분류기] Top-2 군집:")
    for i in top2_idx:
        print(f" - CLUSTER_{cluster_labels[i]}: {cluster_probs[i]:.2%}")
        
    final_probs_joint = {}    # joint 확률 기반 (1차 군집의 확률 X 2차 군집의 확률)
    
    # 5. 각 Top-2 군집에 대해 2차 항구 예측 수행
    for i in top2_idx:
        prob_cluster = cluster_probs[i] #해당 군집이 선택될 확률
        cluster_id = cluster_labels[i] #해당 군집의 번호호

        # 5-1. 고정 단일 항구 처리 (모델 없이 고정된 항구)
        if cluster_id in fixed_cluster_ports:
            fixed_port = fixed_cluster_ports[cluster_id]
            final_probs_joint[fixed_port] = final_probs_joint.get(fixed_port, 0) + prob_cluster
            print(f"📦 군집 {cluster_id} → 단일 항구 '{fixed_port}' 확률 적용: {prob_cluster:.2%}")
            continue

        # 5-2. 모델 불러오기 실패 시 skip
        if cluster_id not in cluster_model_paths:
            print(f"⚠️ 군집 {cluster_id} 모델 경로 미정의")
            continue

        try:
            model = joblib.load(cluster_model_paths[cluster_id]["model"])
            le = joblib.load(cluster_model_paths[cluster_id]["encoder"])
        except:
            print(f"⚠️ 군집 {cluster_id}의 모델 또는 인코더 로딩 실패")
            continue

        # 5-3. 항구별 예측 확률 계산
        port_probs = model.predict_proba(user_input)[0] #해당 군집의 2차 항구 분류기
        try:
            port_names = le.inverse_transform(np.arange(len(port_probs))) #그 군집에서 학습된 LabelEncoder
        except:
            port_names = model.classes_

        # 5-4. Top-2 항구만 추출 JOINT 확률 (조건부 확률)
        top2_ports_idx = np.argsort(port_probs)[-2:][::-1]
        print(f"\n📍 CLUSTER_{cluster_id} 내 Top-2 항구:")
        for j in top2_ports_idx:
            port = port_names[j]
            prob = port_probs[j]
            print(f" - {port}: {prob:.2%}")
        

            # Joint 확률 = 1차 군집 확률 × 2차 항구 확률
            joint_prob = prob_cluster * prob #1차 분류기 확률 X 2차 분류기 확률 = 해당 군집이 선택되고, 그 안에서 해당 항구가 나올 확률
            final_probs_joint[port] = joint_prob #final_probs_joint라는 딕셔너리 {port:joint_port}
            
    # 6. 결과 출력
    if not final_probs_joint:
        print("❗ 예측 가능한 항구가 없습니다.")
        return

    # 6-1. Joint 확률 기반 Top-3
    # 최종 joint 확률 기준 top-3 항구 출력
    sorted_joint = sorted(final_probs_joint.items(), key=lambda x: x[1], reverse=True)[:3]
    print("\n🔮 [Joint 확률 기반] Top-3 항구 예측 결과:")
    for port, prob in sorted_joint:
       print(f"📦 {port}: {prob:.2%}")

In [22]:
predict_top_ports_from_input()

🚢 AIS 데이터 입력 (5시간 시점 기준)


LAT (위도):  33.93432833
LON (경도):  132.6795133
COG (침로):  57
HEADING (방위각):  55



🔍 [1차 분류기] Top-2 군집:
 - CLUSTER_3: 81.64%
 - CLUSTER_7: 10.36%

📍 CLUSTER_3 내 Top-2 항구:
 - West Japan (JPUKB, JPOSA, JPWAK, JPYKK, JPNGO, JPMKX): 98.18%
 - East Japan (JPKIJ, JPTYO, JPYOK, JPSMZ): 1.82%

📍 CLUSTER_7 내 Top-2 항구:
 - RUVVO: 97.26%
 - RUNJK: 2.74%

🔮 [Joint 확률 기반] Top-3 항구 예측 결과:
📦 West Japan (JPUKB, JPOSA, JPWAK, JPYKK, JPNGO, JPMKX): 80.16%
📦 RUVVO: 10.08%
📦 East Japan (JPKIJ, JPTYO, JPYOK, JPSMZ): 1.48%


# 테스트 데이터셋 병합

In [53]:
import numpy as np
import pandas as pd
import os
import joblib

# 📁 경로 및 설정
base_path = "datasets"
clusters = [1, 2, 3, 4, 6, 7]
columns = ["LAT", "LON", "COG", "HEADING"]

merged_all = []

for cluster_id in clusters:
    x_path = os.path.join(base_path, f"X_test_c{cluster_id}_17hours.npy")
    y_path = os.path.join(base_path, f"y_test_c{cluster_id}_17hours.npy")
    encoder_path = f"model/encoder_c{cluster_id}_17hours.joblib"
   
    if os.path.exists(x_path) and os.path.exists(y_path):
        X = np.load(x_path)
        y = np.load(y_path, allow_pickle=True)

        # 🔁 군집별 인코더 로드 (숫자라면 역변환)
        if isinstance(y[0], (int, np.integer)):
            if os.path.exists(encoder_path):
                le = joblib.load(encoder_path)
                y = le.inverse_transform(y)
            else:
                print(f"⚠️ 인코더 없음: {encoder_path}")
        
        # 🔧 DataFrame 구성
        df = pd.DataFrame(X, columns=columns)
        df["CLUSTER_ID"] = cluster_id
        df["PORT_NAME"] = y  # 문자열 항구명으로 복원됨

        merged_all.append(df)
    else:
        print(f"⚠️ cluster {cluster_id} 파일 누락: {x_path} or {y_path}")

# 🔄 전체 병합
final_df = pd.concat(merged_all, ignore_index=True)

# ✅ 저장
final_df.to_csv("test_17h_datasets.csv", index=False)

print("✅ merged_dataset_full.csv 저장 완료")
print(f"📏 전체 샘플 수: {final_df.shape[0]}, 컬럼 수: {final_df.shape[1]}")

✅ merged_dataset_full.csv 저장 완료
📏 전체 샘플 수: 101, 컬럼 수: 6


# top-1 정확도 + top-3 포함률

In [55]:
import pandas as pd
import numpy as np

# 🚢 테스트 데이터 불러오기
df = pd.read_csv("test_17h_datasets.csv")  # 컬럼: LAT, LON, COG, HEADING, CLUSTER_ID, PORT_NAME

# 🎯 평가 지표 저장
top1_correct = 0 #top-1 정확도 카운트
top3_correct = 0 #top-3 포함률 카운트
total = len(df) #전체 샘플 수

for i, row in df.iterrows():
    # 입력값 준비 (5시간 시점의 위치, 코그, 헤딩)
    user_input = np.array([[row["LAT"], row["LON"], row["COG"], row["HEADING"]]])
    # 실제 정답 레이블 추
    true_cluster = row["CLUSTER_ID"]
    true_port = row["PORT_NAME"]

    # 예측 수행 - 최종 항구별 joint 확률 저장용 딕셔너리
    final_probs_joint = {}

    # 1차 분류기 로딩
    try:
        cluster_model = joblib.load("./model/cluster_17hours.joblib")
    except:
        print("❌ 1차 군집 분류기 로딩 실패")
        continue

    # 1차 군집 예측 및 Top-2 군집 추출
    cluster_probs = cluster_model.predict_proba(user_input)[0]
    cluster_labels = cluster_model.classes_
    top2_idx = np.argsort(cluster_probs)[-2:][::-1]

    # 각 Top-2 군집에 대해 2차 예측 수행
    for idx in top2_idx:
        prob_cluster = cluster_probs[idx] # 1차 분류기에서의 군집 확률
        cluster_id = cluster_labels[idx] # 해당 군집 번호

        if cluster_id in fixed_cluster_ports:
            fixed_port = fixed_cluster_ports[cluster_id]
            final_probs_joint[fixed_port] = final_probs_joint.get(fixed_port, 0) + prob_cluster
            continue

        if cluster_id not in cluster_model_paths:
            continue

        try:
            model = joblib.load(cluster_model_paths[cluster_id]["model"])
            le = joblib.load(cluster_model_paths[cluster_id]["encoder"])
        except:
            continue

        try:
            port_probs = model.predict_proba(user_input)[0]
        except AttributeError:
            pred_label = model.predict(user_input)[0]
            port_probs = np.zeros(len(le.classes_))
            port_probs[pred_label] = 1.0
        
        port_probs = model.predict_proba(user_input)[0]
        try:
            port_names = le.inverse_transform(np.arange(len(port_probs)))
        except:
            port_names = model.classes_

        # 각 군집 내에서 Top-2 항구만 사용
        top2_ports_idx = np.argsort(port_probs)[-2:][::-1]
        for j in top2_ports_idx:
            port = port_names[j]
            prob = port_probs[j]
            joint_prob = prob_cluster * prob # 🔗 Joint 확률 = 군집확률 × 항구확률
            final_probs_joint[port] = joint_prob

    # 예측 평가 (joint 확률로 정렬된 top-3)
    if final_probs_joint:
        sorted_ports = sorted(final_probs_joint.items(), key=lambda x: x[1], reverse=True)
        top3_ports = [p for p, _ in sorted_ports[:3]]

        if true_port == top3_ports[0]:
            top1_correct += 1
        if true_port in top3_ports:
            top3_correct += 1

# ✅ 최종 결과 출력
print("\n📊 17시간 평가 결과")
print(f"전체 샘플 수: {total}")
print(f"🎯 Top-1 정확도: {top1_correct / total:.2%}")
print(f"🎯 Top-3 포함률: {top3_correct / total:.2%}")


📊 17시간 평가 결과
전체 샘플 수: 101
🎯 Top-1 정확도: 41.58%
🎯 Top-3 포함률: 63.37%


## 1차 분류기의 정확도 + top-1 정확도 + top-3 포함률

In [56]:
import numpy as np
import pandas as pd
import joblib
import warnings
warnings.filterwarnings("ignore")

from sklearn.metrics import accuracy_score

# ✅ 1차/2차 모델 경로
cluster_model_paths = {
    1: {"model": "model/port_c1_17hours.joblib", "encoder": "model/encoder_c1_17hours.joblib"},
    2: {"model": "model/port_c2_17hours.joblib", "encoder": "model/encoder_c2_17hours.joblib"},
    3: {"model": "model/port_c3_17hours.joblib", "encoder": "model/encoder_c3_17hours.joblib"},
    4: {"model": "model/port_c4_17hours.joblib", "encoder": "model/encoder_c4_17hours.joblib"},
    6: {"model": "model/port_c6_17hours.joblib", "encoder": "model/encoder_c6_17hours.joblib"},
    7: {"model": "model/port_c7_17hours.joblib", "encoder": "model/encoder_c7_17hours.joblib"}
}
fixed_cluster_ports = { 0: "PHMNL", 5: "VNHPH" }

# ✅ cluster_model 불러오기 (1차 soft voting 모델)
cluster_model = joblib.load("model/cluster_17hours.joblib")

# ✅ 데이터 로딩 (CSV: LAT, LON, COG, HEADING, CLUSTER_ID, PORT_NAME)
df = pd.read_csv("test_17h_datasets.csv")

# ✅ 저장용 리스트
top1_correct, top3_correct, total = 0, 0, 0
cluster_correct = 0  # 1차 분류기 정확도용

for i, row in df.iterrows():
    x_input = np.array([[row["LAT"], row["LON"], row["COG"], row["HEADING"]]])
    true_cluster = row["CLUSTER_ID"]
    true_port = row["PORT_NAME"]

    # 1차 군집 예측
    cluster_probs = cluster_model.predict_proba(x_input)[0]
    cluster_labels = cluster_model.classes_
    pred_cluster = cluster_labels[np.argmax(cluster_probs)]

    if pred_cluster == true_cluster:
        cluster_correct += 1

    # Top-2 군집 추출
    top2_idx = np.argsort(cluster_probs)[-2:][::-1]
    final_probs_joint = {}

    for idx in top2_idx:
        cluster_id = cluster_labels[idx]
        p_cluster = cluster_probs[idx]

        # 고정 항구 처리
        if cluster_id in fixed_cluster_ports:
            port = fixed_cluster_ports[cluster_id]
            final_probs_joint[port] = final_probs_joint.get(port, 0) + p_cluster
            continue

        # 모델 불러오기
        if cluster_id not in cluster_model_paths:
            continue
        try:
            model = joblib.load(cluster_model_paths[cluster_id]["model"])
            le = joblib.load(cluster_model_paths[cluster_id]["encoder"])
        except:
            continue

        # 2차 항구 예측 확률
        port_probs = model.predict_proba(x_input)[0]
        try:
            port_names = le.inverse_transform(np.arange(len(port_probs)))
        except:
            port_names = model.classes_

        top2_ports_idx = np.argsort(port_probs)[-2:][::-1]
        for j in top2_ports_idx:
            port = port_names[j]
            prob = port_probs[j]
            joint_prob = p_cluster * prob
            final_probs_joint[port] = joint_prob

    if not final_probs_joint:
        continue

    # Top-3 항구 예측 결과
    sorted_ports = sorted(final_probs_joint.items(), key=lambda x: x[1], reverse=True)
    top3_ports = [port for port, _ in sorted_ports[:3]]
    top1_port = top3_ports[0]

    # 정확도 계산
    total += 1
    if true_port == top1_port:
        top1_correct += 1
    if true_port in top3_ports:
        top3_correct += 1

# ✅ 최종 결과 출력
print("\n📊 평가 결과")
print(f"전체 샘플 수: {total}")
print(f"🎯 1차 군집 예측 정확도 : {cluster_correct/total:.2%}")
print(f"🎯 2차 항구 Top-1 정확도: {top1_correct/total:.2%}")
print(f"🎯 2차 항구 Top-3 포함률: {top3_correct/total:.2%}")


📊 평가 결과
전체 샘플 수: 101
🎯 1차 군집 예측 정확도 : 70.30%
🎯 2차 항구 Top-1 정확도: 41.58%
🎯 2차 항구 Top-3 포함률: 63.37%


# 테스트 데이터로 군집별 정확도 확인하기

In [59]:
import numpy as np
import pandas as pd
import joblib
from collections import defaultdict
import warnings
warnings.filterwarnings("ignore")

# ✅ 1차/2차 모델 경로
cluster_model_paths = {
    1: {"model": "model/port_c1_17hours.joblib", "encoder": "model/encoder_c1_17hours.joblib"},
    2: {"model": "model/port_c2_17hours.joblib", "encoder": "model/encoder_c2_17hours.joblib"},
    3: {"model": "model/port_c3_17hours.joblib", "encoder": "model/encoder_c3_17hours.joblib"},
    4: {"model": "model/port_c4_17hours.joblib", "encoder": "model/encoder_c4_17hours.joblib"},
    6: {"model": "model/port_c6_17hours.joblib", "encoder": "model/encoder_c6_17hours.joblib"},
    7: {"model": "model/port_c7_17hours.joblib", "encoder": "model/encoder_c7_17hours.joblib"}
}
fixed_cluster_ports = { 0: "PHMNL", 5: "VNHPH" }

# ✅ 모델 & 데이터 불러오기
cluster_model = joblib.load("model/cluster_17hours.joblib")
df = pd.read_csv("test_17h_datasets.csv")

# ✅ 전체 정확도 변수
top1_correct, top3_correct, total = 0, 0, 0
cluster_correct = 0

# ✅ 군집별 정확도 저장용
cluster_stats = defaultdict(lambda: {
    "total": 0,
    "cluster_correct": 0,
    "top1_correct": 0,
    "top3_correct": 0
})

# ✅ 예측 루프
for _, row in df.iterrows():
    x_input = np.array([[row["LAT"], row["LON"], row["COG"], row["HEADING"]]])
    true_cluster = row["CLUSTER_ID"]
    true_port = row["PORT_NAME"]

    cluster_probs = cluster_model.predict_proba(x_input)[0]
    cluster_labels = cluster_model.classes_
    pred_cluster = cluster_labels[np.argmax(cluster_probs)]

    total += 1
    cluster_stats[true_cluster]["total"] += 1

    if pred_cluster == true_cluster:
        cluster_correct += 1
        cluster_stats[true_cluster]["cluster_correct"] += 1

    top2_idx = np.argsort(cluster_probs)[-2:][::-1]
    final_probs_joint = {}

    for idx in top2_idx:
        cluster_id = cluster_labels[idx]
        p_cluster = cluster_probs[idx]

        if cluster_id in fixed_cluster_ports:
            port = fixed_cluster_ports[cluster_id]
            final_probs_joint[port] = final_probs_joint.get(port, 0) + p_cluster
            continue

        if cluster_id not in cluster_model_paths:
            continue

        model = joblib.load(cluster_model_paths[cluster_id]["model"])
        le = joblib.load(cluster_model_paths[cluster_id]["encoder"])
        port_probs = model.predict_proba(x_input)[0]

        try:
            port_names = le.inverse_transform(np.arange(len(port_probs)))
        except:
            port_names = model.classes_

        top2_ports_idx = np.argsort(port_probs)[-2:][::-1]
        for j in top2_ports_idx:
            port = port_names[j]
            prob = port_probs[j]
            joint_prob = p_cluster * prob
            final_probs_joint[port] = joint_prob

    if not final_probs_joint:
        continue

    sorted_ports = sorted(final_probs_joint.items(), key=lambda x: x[1], reverse=True)
    top3_ports = [port for port, _ in sorted_ports[:3]]
    top1_port = top3_ports[0]

    if true_port == top1_port:
        top1_correct += 1
        cluster_stats[true_cluster]["top1_correct"] += 1
    if true_port in top3_ports:
        top3_correct += 1
        cluster_stats[true_cluster]["top3_correct"] += 1

# ✅ 군집별 정확도 정리
result = []
for cluster_id in sorted(cluster_stats.keys()):
    stats = cluster_stats[cluster_id]
    total_samples = stats["total"]
    result.append({
        "CLUSTER_ID": cluster_id,
        "샘플 수": total_samples,
        "1차 군집 정확도": stats["cluster_correct"] / total_samples if total_samples else 0,
        "Top-1 항구 정확도": stats["top1_correct"] / total_samples if total_samples else 0,
        "Top-3 항구 포함률": stats["top3_correct"] / total_samples if total_samples else 0,
    })

# ✅ 출력
result_df = pd.DataFrame(result)
print(result_df.to_string(index=False))

 CLUSTER_ID  샘플 수  1차 군집 정확도  Top-1 항구 정확도  Top-3 항구 포함률
          1    24   0.000000      0.000000      0.000000
          2    19   0.842105      0.473684      0.842105
          3    28   1.000000      0.750000      0.964286
          4     7   1.000000      0.000000      0.857143
          6    18   0.833333      0.444444      0.555556
          7     5   1.000000      0.800000      1.000000


In [17]:
import numpy as np

num = 3 #이걸 수정
# 1. npy 파일 불러오기 (군집 1 예시)
X_test = np.load(f"./datasets/X_test_c{num}_14hours.npy")
y_test = np.load(f"./datasets/y_test_c{num}_14hours.npy", allow_pickle=True)
#cluster_ids = np.load(f"./test datasets/cluster_ids_cluster{num}.npy")

# 2. 사용할 샘플 선택 (예: 0번째)
# 샘플 선택
i = 10
input_sample = X_test[i].reshape(1, -1)
true_port = y_test[i]
#true_cluster = cluster_ids[i]

# 예측 실행
pred_result, pred_cluster = predict_top_ports_from_array(input_sample)

# 1. 예측 결과가 존재할 경우만 출력
if pred_result:
    print(f"\n📌 [샘플 {i}] 예측 결과 요약")

    # 실제 군집 vs 예측 군집
    print(f"✅ 실제 선박이 속한 군집 번호 (정답): {num}")
    print(f"✅ 예측된 군집 번호 (1차 분류기 결과): {pred_cluster}")

    # 예측된 항구 Top-3 출력
    print("\n🔮 [2차 분류기] 예측된 항구 Top-3 (joint 확률 기준):")
    for rank, (port, prob) in enumerate(pred_result.items(), 1):
        print(f"  {rank}. 📦 {port} → {prob:.2%}")

    # Top-3 항구 후보 목록
    top3 = list(pred_result.keys())

    # 평가 결과
    print("\n🎯 평가 결과:")

    # 1차 군집 예측이 맞았는가?
    if pred_cluster == num:
        print("✅ [1차 군집 예측] 정확하게 맞췄습니다! 🎯")
    else:
        print("❌ [1차 군집 예측] 군집 번호가 틀렸습니다. 💥")

    # 2차 항구 예측이 Top-3 안에 실제값 포함되었는가?
    if num in top3:
        print("✅ [2차 항구 예측] 실제 항구가 Top-3 안에 포함되어 있습니다! 🛳️")
    else:
        print("❌ [2차 항구 예측] Top-3 안에 실제 항구가 없습니다. ❌")

else:
    print("❗ 예측 실패: joint 확률 결과가 없습니다 (모델 예측 실패)")

NameError: name 'predict_top_ports_from_array' is not defined