# 03. 타 맵 적용 및 비교 분석

에란겔에서 학습한 클러스터 모델을 Miramar, Taego, Rondo에 적용합니다.

**분석 목표**
- 동일 전략이 맵별로 승률에 미치는 영향 차이
- 맵 특성에 최적화된 전략 클러스터 발견
- 에란겔 vs 타 맵 전략 분포 비교

In [1]:
import os
import glob
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

import umap
from sklearn.cluster import HDBSCAN          # sklearn 1.3+ 내장
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.linear_model import RANSACRegressor
from tqdm import tqdm

plt.rcParams['font.family'] = 'Malgun Gothic'
plt.rcParams['axes.unicode_minus'] = False

BASE_DIR   = r'C:\배그분석'
OUTPUT_DIR = os.path.join(BASE_DIR, 'analysis_output')
MODEL_DIR  = os.path.join(OUTPUT_DIR, 'models')
DATE_START = '20260211'
DATE_END   = '20260220'

# 에란겔 모델 로드
with open(os.path.join(MODEL_DIR, 'erangel_model.pkl'), 'rb') as f:
    model = pickle.load(f)

reducer       = model['reducer']
clusterer_ref = model['clusterer']   # 참조용 (중심점 활용)
scaler        = model['scaler']
imputer       = model['imputer']
FEATURE_COLS  = model['feature_cols']
cluster_names = model['cluster_names']
PERSONA_LABELS= model['persona_labels']
profile       = model['profile']

print('에란겔 모델 로드 완료')
print(f'피처: {FEATURE_COLS}')


Mon Feb 23 22:50:56 2026 Building and compiling search function
에란겔 모델 로드 완료
피처: ['drop_distance_from_path', 'early_enemy_density', 'rotation_timing_score', 'vehicle_use_ratio', 'bluezone_exposure_ratio', 'safezone_proximity_mean', 'safezone_edge_ratio', 'altitude_variance']


In [2]:
# STEP 1: 헬퍼 함수 (01_data_pipeline과 동일)
import duckdb

def get_files(map_name, date_start, date_end):
    pattern = os.path.join(BASE_DIR, 'final_telemetry_*',
                           f'{map_name}_telemetry_*.parquet')
    all_files = sorted(glob.glob(pattern))
    return [
        f for f in all_files
        if date_start <= os.path.basename(f).replace('.parquet','').split('_')[-1] <= date_end
    ]

def make_sql(files):
    escaped = [f.replace('\\', '/') for f in files]
    return '[' + ','.join([f"'{f}'" for f in escaped]) + ']'

def fit_path(grp):
    x, y = grp['x'].values, grp['y'].values
    if len(x) < 10: return (None, None, None)
    try:
        from sklearn.linear_model import RANSACRegressor
        if x.std() < y.std():
            r = RANSACRegressor(min_samples=0.5, residual_threshold=500).fit(y.reshape(-1,1), x)
            return ('yx', r.estimator_.coef_[0], r.estimator_.intercept_)
        else:
            r = RANSACRegressor(min_samples=0.5, residual_threshold=500).fit(x.reshape(-1,1), y)
            return ('xy', r.estimator_.coef_[0], r.estimator_.intercept_)
    except: return (None, None, None)

def pt_dist(px, py, fp):
    if fp[0] is None: return np.nan
    mode, a, b = fp
    if mode == 'xy': return abs(a*px - py + b) / np.sqrt(a**2 + 1)
    return abs(-px + a*py + b) / np.sqrt(a**2 + 1)

print('헬퍼 함수 정의 완료')


헬퍼 함수 정의 완료


In [3]:
def extract_features(map_name):
    """특정 맵의 피처 테이블 추출"""
    files = get_files(map_name, DATE_START, DATE_END)
    if not files:
        print(f'  {map_name} 파일 없음')
        return None
    print(f'\n{map_name}: {len(files)}개 파일 처리 중...')

    con = duckdb.connect(':memory:')
    con.execute('PRAGMA threads=6; PRAGMA memory_limit="8GB";')
    sql = make_sql(files)

    # 낙하 지점
    df_drop = con.execute(f"""
        WITH r AS (
            SELECT MatchId, \"character.accountId\" AS accountId,
                   \"character.location.x\" AS x, \"character.location.y\" AS y,
                   elapsedTime,
                   ROW_NUMBER() OVER (PARTITION BY MatchId, \"character.accountId\"
                                      ORDER BY elapsedTime) AS rn
            FROM read_parquet({sql})
            WHERE _T='LogPlayerPosition' AND \"common.isGame\">=1.0
              AND \"character.accountId\" IS NOT NULL
              AND \"character.location.x\" IS NOT NULL
        )
        SELECT MatchId, accountId, x, y, elapsedTime AS drop_time FROM r WHERE rn=1
    """).fetchdf()

    # 비행기 경로
    df_air = con.execute(f"""
        SELECT MatchId, \"character.location.x\" AS x, \"character.location.y\" AS y
        FROM read_parquet({sql})
        WHERE _T='LogPlayerPosition' AND \"common.isGame\"=0.1
          AND \"character.location.x\" IS NOT NULL
    """).fetchdf()

    paths = {mid: fit_path(g) for mid, g in df_air.groupby('MatchId')}
    df_drop['drop_distance_from_path'] = df_drop.apply(
        lambda r: pt_dist(r['x'], r['y'], paths.get(r['MatchId'], (None,None,None))), axis=1
    )

    # 위치 전체
    df_pos = con.execute(f"""
        SELECT MatchId, \"character.accountId\" AS accountId,
               \"character.location.x\" AS x, \"character.location.y\" AS y,
               \"character.location.z\" AS z,
               \"character.isInBlueZone\" AS isInBlueZone,
               \"character.isInVehicle\" AS isInVehicle,
               \"common.isGame\" AS isGame,
               \"gameState.safetyZonePosition.x\" AS safe_x,
               \"gameState.safetyZonePosition.y\" AS safe_y,
               \"gameState.safetyZoneRadius\" AS safe_radius,
               elapsedTime
        FROM read_parquet({sql})
        WHERE _T='LogPlayerPosition' AND \"common.isGame\">=1.0
          AND \"character.accountId\" IS NOT NULL
          AND \"character.location.x\" IS NOT NULL
    """).fetchdf()

    df_pos['isInBlueZone'] = df_pos['isInBlueZone'].astype(str).str.lower() == 'true'
    df_pos['isInVehicle']  = df_pos['isInVehicle'].astype(str).str.lower() == 'true'
    df_pos['dist_to_safe'] = np.sqrt(
        (df_pos['x'] - df_pos['safe_x'])**2 + (df_pos['y'] - df_pos['safe_y'])**2
    )

    shrink = [1.5, 2.5, 3.5, 4.5, 5.5, 6.5]
    feats = []
    for (mid, aid), grp in tqdm(df_pos.groupby(['MatchId','accountId']),
                                 desc=f'{map_name} 집계'):
        grp = grp.sort_values('elapsedTime')
        pre  = grp[grp['isGame'].isin([1.0,2.0,3.0,4.0,5.0])]['dist_to_safe'].mean()
        post = grp[grp['isGame'].isin(shrink)]['dist_to_safe'].mean()
        rot  = pre/(pre+post+1e-6) if pd.notna(pre) and pd.notna(post) else np.nan
        dr   = df_drop[(df_drop['MatchId']==mid)&(df_drop['accountId']==aid)]
        if len(dr) > 0:
            dpx, dpy = dr.iloc[0]['x'], dr.iloc[0]['y']
            all_d = df_drop[df_drop['MatchId']==mid]
            density = (np.sqrt((all_d['x']-dpx)**2+(all_d['y']-dpy)**2)<=500).sum()-1
        else:
            density = np.nan
        feats.append({
            'MatchId': mid, 'accountId': aid,
            'rotation_timing_score':   rot,
            'vehicle_use_ratio':       grp['isInVehicle'].mean(),
            'bluezone_exposure_ratio': grp['isInBlueZone'].mean(),
            'safezone_proximity_mean': grp['dist_to_safe'].mean(),
            'safezone_edge_ratio':     (grp['dist_to_safe']/(grp['safe_radius']+1e-6)).mean(),
            'altitude_variance':       grp['z'].std(),
            'early_enemy_density':     density,
            'survival_time':           grp['elapsedTime'].max()-grp['elapsedTime'].min(),
        })

    df_feat = pd.DataFrame(feats).merge(
        df_drop[['MatchId','accountId','drop_distance_from_path']],
        on=['MatchId','accountId'], how='left'
    )

    # matches 조인
    mat_all = [f for f in sorted(glob.glob(os.path.join(BASE_DIR,'matches_*.csv')))
               if DATE_START <= os.path.basename(f).replace('.csv','').split('_')[-1] <= DATE_END]
    if mat_all:
        df_mat = pd.concat([pd.read_csv(f) for f in mat_all], ignore_index=True)
        df_mat = df_mat[df_mat['mapName'].str.contains(map_name, case=False, na=False)]
        df_mat['win_flag']   = (df_mat['winPlace']==1).astype(int)
        df_mat['top10_flag'] = (df_mat['winPlace']<=10).astype(int)
        agg = df_mat.groupby(['matchId','playerId']).agg(
            kills=('kills','sum'), damageDealt=('damageDealt','sum'),
            winPlace=('winPlace','min'), win_flag=('win_flag','max'),
            top10_flag=('top10_flag','max')
        ).reset_index().rename(columns={'matchId':'MatchId','playerId':'accountId'})
        df_feat = df_feat.merge(agg, on=['MatchId','accountId'], how='left')

    print(f'  → {len(df_feat):,}명 완료')
    return df_feat

print('피처 추출 함수 정의 완료')


피처 추출 함수 정의 완료


In [4]:
# STEP 2: 타 맵 피처 추출 + 에란겔 모델로 클러스터 예측
# sklearn HDBSCAN은 transform() 미지원
# → 에란겔 클러스터 중심(centroid)과 각 포인트의 거리로 가장 가까운 클러스터 배정

OTHER_MAPS = ['Miramar', 'Taego', 'Rondo']
map_results = {}

# 에란겔 결과 포함
df_erangel = pd.read_parquet(os.path.join(OUTPUT_DIR, 'erangel_clustered.parquet'))
map_results['Erangel'] = df_erangel

# 에란겔 클러스터 중심 (UMAP 공간)
centroids = df_erangel[df_erangel['cluster']>=0].groupby('cluster')[['umap_x','umap_y']].mean()
print('에란겔 클러스터 중심:')
print(centroids)

def predict_cluster_by_centroid(umap_x, umap_y, centroids):
    """UMAP 좌표에서 가장 가까운 에란겔 클러스터 중심을 찾아 레이블 반환"""
    dists = np.sqrt(
        (centroids['umap_x'].values - umap_x)**2 +
        (centroids['umap_y'].values - umap_y)**2
    )
    return centroids.index[np.argmin(dists)]

for map_name in OTHER_MAPS:
    df_map = extract_features(map_name)
    if df_map is None:
        continue

    df_valid = df_map.dropna(subset=FEATURE_COLS, thresh=int(len(FEATURE_COLS)*0.5)).copy()
    df_valid = df_valid[df_valid['survival_time']>=120].copy()

    X = scaler.transform(imputer.transform(df_valid[FEATURE_COLS].values))
    emb = reducer.transform(X)   # 학습된 UMAP에 새 데이터 투영

    df_valid['umap_x'] = emb[:,0]
    df_valid['umap_y'] = emb[:,1]

    # 중심 거리 기반 클러스터 배정
    df_valid['cluster'] = [
        predict_cluster_by_centroid(ux, uy, centroids)
        for ux, uy in zip(df_valid['umap_x'], df_valid['umap_y'])
    ]
    df_valid['persona'] = df_valid['cluster'].map(cluster_names).map(PERSONA_LABELS)

    out = os.path.join(OUTPUT_DIR, f'{map_name.lower()}_clustered.parquet')
    df_valid.to_parquet(out, index=False)
    map_results[map_name] = df_valid
    print(f'  {map_name} 저장 완료: {out}')

print(f'\n처리된 맵: {list(map_results.keys())}')


에란겔 클러스터 중심:
            umap_x    umap_y
cluster                     
0        13.028237  6.643206
1         0.478802  6.506626
2         9.863125  9.525183

Miramar: 20개 파일 처리 중...


BinderException: Binder Error: Referenced column "common.isGame" not found in FROM clause!
Candidate bindings: "common_isGame", "victim_name", "isCustomGame", "finisher_name", "attacker_name"

LINE 9:             WHERE _T='LogPlayerPosition' AND "common.isGame">=1.0
                                                     ^

In [None]:
# STEP 3: 맵별 클러스터 분포 및 승률 비교 

dist_rows = []
for map_name, df_map in map_results.items():
    total = len(df_map[df_map['cluster']>=0])
    for persona, grp in df_map[df_map['cluster']>=0].groupby('persona'):
        dist_rows.append({
            'map': map_name, 'persona': persona,
            'count': len(grp),
            'pct': len(grp)/total if total>0 else 0,
            'win_rate':   grp['win_flag'].mean()   if 'win_flag'   in grp.columns else np.nan,
            'top10_rate': grp['top10_flag'].mean() if 'top10_flag' in grp.columns else np.nan,
            'avg_kills':  grp['kills'].mean()      if 'kills'      in grp.columns else np.nan,
        })

df_dist = pd.DataFrame(dist_rows)
df_dist.to_csv(os.path.join(OUTPUT_DIR, 'map_persona_summary.csv'),
               index=False, encoding='utf-8-sig')

print('맵별 페르소나 분포 (비율):')
display(df_dist.pivot_table(index='persona', columns='map', values='pct').round(3))
print()
print('맵별 페르소나 우승률:')
display(df_dist.pivot_table(index='persona', columns='map', values='win_rate').round(4))


In [None]:
# STEP 4: 맵별 × 페르소나별 승률 히트맵

win_pivot   = df_dist.pivot_table(index='persona', columns='map', values='win_rate').round(4)
top10_pivot = df_dist.pivot_table(index='persona', columns='map', values='top10_rate').round(4)

fig, axes = plt.subplots(1, 2, figsize=(16, 5))

for ax, pivot, title, cmap in zip(
    axes,
    [win_pivot, top10_pivot],
    ['맵별 x 페르소나별 우승률', '맵별 x 페르소나별 Top10 비율'],
    ['RdYlGn', 'Blues']
):
    im = ax.imshow(pivot.values, cmap=cmap,
                   vmin=0, vmax=float(np.nanmax(pivot.values))*1.4)
    ax.set_xticks(range(len(pivot.columns)))
    ax.set_yticks(range(len(pivot.index)))
    ax.set_xticklabels(pivot.columns, fontsize=11)
    ax.set_yticklabels(pivot.index, fontsize=9)
    plt.colorbar(im, ax=ax)
    for i in range(len(pivot.index)):
        for j in range(len(pivot.columns)):
            val = pivot.values[i,j]
            if not np.isnan(val):
                ax.text(j, i, f'{val:.1%}', ha='center', va='center',
                        fontsize=10, fontweight='bold')
    ax.set_title(title, fontsize=13, fontweight='bold')

plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, 'map_persona_winrate.png'), dpi=150, bbox_inches='tight')
plt.show()
print('저장: map_persona_winrate.png')


In [None]:
# STEP 5: 맵별 전략 피처 레이더 차트

colors_map = {'Erangel':'#E74C3C','Miramar':'#F39C12','Taego':'#3498DB','Rondo':'#2ECC71'}
feature_labels = ['낙하거리','초기적밀도','로테이션\n타이밍','차량의존도',
                  '존타기','안전구역\n거리','외곽비율','고도변화']
N = len(FEATURE_COLS)
angles = np.linspace(0, 2*np.pi, N, endpoint=False).tolist() + [0]

df_means = pd.DataFrame(
    {m: df[FEATURE_COLS].median() for m, df in map_results.items()}
).T
df_norm = (df_means - df_means.min()) / (df_means.max() - df_means.min() + 1e-6)

fig, ax = plt.subplots(figsize=(8,8), subplot_kw={'polar':True})
for map_name in map_results:
    if map_name not in df_norm.index: continue
    vals = df_norm.loc[map_name, FEATURE_COLS].tolist() + [df_norm.loc[map_name, FEATURE_COLS[0]]]
    color = colors_map.get(map_name, 'gray')
    ax.plot(angles, vals, 'o-', linewidth=2.5, color=color, label=map_name)
    ax.fill(angles, vals, alpha=0.12, color=color)

ax.set_xticks(angles[:-1])
ax.set_xticklabels(feature_labels, fontsize=10)
ax.set_ylim(0,1)
ax.legend(loc='upper right', bbox_to_anchor=(1.3,1.1), fontsize=11)
ax.set_title('맵별 전략 피처 비교', fontsize=14, fontweight='bold', pad=20)
plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, 'map_feature_radar.png'), dpi=150, bbox_inches='tight')
plt.show()
print('저장: map_feature_radar.png')


In [None]:
print('전체 분석 완료')
print(f'저장 위치: {OUTPUT_DIR}')
print()
print('생성된 파일:')
for fn in sorted(os.listdir(OUTPUT_DIR)):
    fp = os.path.join(OUTPUT_DIR, fn)
    if os.path.isfile(fp):
        mb = os.path.getsize(fp)/(1024*1024)
        print(f'  {fn:<45} ({mb:.1f} MB)')
print()
print('  04_visualization.ipynb 로 이동')
