In [85]:
import os, glob, numpy as np, pandas as pd

BASE_DIR = r"C:\analyst\StatsBomb Open Data\open-data-master\data"
EVT_DIR  = os.path.join(BASE_DIR, "analysis_events")
P = 0.15  # 상위 15% 컷 (원하시면 0.10, 0.12 등으로 조정)

xgs = []
for fp in glob.glob(os.path.join(EVT_DIR, "*.pkl")):
    df = pd.read_pickle(fp)
    if "event_type" not in df.columns or "tags" not in df.columns:
        continue
    shot = df["event_type"].astype(str).str.lower().eq("shot")
    # tags에서 shot.statsbomb_xg만 추출
    for t in df.loc[shot, "tags"]:
        if isinstance(t, dict):
            v = t.get("shot.statsbomb_xg")
            if v is not None:
                try: xgs.append(float(v))
                except: pass

xgs = np.asarray(xgs, dtype=float)
if xgs.size:
    thr = float(np.quantile(xgs, 1 - P))
    print(f"총 샷={xgs.size}, 상위 {int(P*100)}% 임계값 xG = {thr:.4f}")
else:
    print("xG 데이터를 찾지 못했습니다.")

총 샷=7600, 상위 15% 임계값 xG = 0.1878


In [1]:
# -*- coding: utf-8 -*-
import os, glob
import numpy as np
import pandas as pd
from tqdm import tqdm

# ===== 경로 =====
BASE_DIR = r"C:\analyst\StatsBomb Open Data\open-data-master\data"
IN_DIR   = os.path.join(BASE_DIR, "analysis_events")    # 입력
OUT_DIR  = os.path.join(BASE_DIR, "analysis_attack")    # 출력
os.makedirs(OUT_DIR, exist_ok=True)

# ===== 상수 =====
EPS = 1e-9
BREAK_EVENT_TYPES = {"pass", "carry", "dribble"}        # break_line 평가 대상
EV_TYPES = BREAK_EVENT_TYPES.union({"shot"})            # 처리 대상 전체
BIG_CHANCE = 0.0                                        # 빅찬스 컷(유지용, 현재 효과 없음)

# Zone14: x∈[68, 88.4), y∈[20.4, 47.6], 시작 x1≤87
Z14_X_MIN, Z14_X_MAX = 68.0, 88.4
Z14_Y_MIN, Z14_Y_MAX = 20.4, 47.6
Z14_X1_MAX = 87.0
# Box: x≥88.4, y∈[15.3, 52.7]
BOX_X_MIN = 88.4
BOX_Y_MIN, BOX_Y_MAX = 15.3, 52.7

BASE_META_COLS = [
    "match_id","event_id","period","time", "status",
    "player_name","player_position","sequence_id","event_type"
]

# 시퀀스 내 누적 집계 제외대상 이벤트
maintain_only_types_exc_dispos = {"Duel", "Pressure", "Block", "Dribbled Past"}

# ===== 유틸 =====
def tag_has(d, key):
    return isinstance(d, dict) and (key in d) and pd.notna(d[key])

def tag_eq(d, key, val):
    return isinstance(d, dict) and (key in d) and (d[key] == val)

def get_end_location(row):
    et = str(row.get("event_type", "")).lower()
    tags = row.get("tags", {})
    if not isinstance(tags, dict):
        return None
    return tags.get(f"{et}.end_location", tags.get("end_location"))

# ===== 기하 =====
def orient(A, B, C):
    return (B[0]-A[0])*(C[1]-A[1]) - (B[1]-A[1])*(C[0]-A[0])

def segments_cross_strict(A, B, C, D, eps=1e-9):
    o1, o2 = orient(A,B,C), orient(A,B,D)
    o3, o4 = orient(C,D,A), orient(C,D,B)
    ab_splits_cd = (o1 > eps and o2 < -eps) or (o1 < -eps and o2 > eps)
    cd_splits_ab = (o3 > eps and o4 < -eps) or (o3 < -eps and o4 > eps)
    return ab_splits_cd and cd_splits_ab

# =======================
# ① BREAK LINE
# =======================
def _find_break_segment(row):
    et = str(row.get("event_type","")).lower()
    if et not in BREAK_EVENT_TYPES:
        return None
    start = row.get("location")
    lines = row.get("opponent_line")
    if start is None or lines is None:
        return None
    end = get_end_location(row)
    if end is None:
        return None
    try:
        x1,y1 = float(start[0]), float(start[1])
        x2,y2 = float(end[0]),   float(end[1])
    except Exception:
        return None
    if not (x2 > x1 + EPS):  # 전진만
        return None
    A, B = (x1, y1), (x2, y2)
    for poly in (lines or []):
        if not poly or len(poly) < 2:
            continue
        for i in range(len(poly)-1):
            C, D = poly[i], poly[i+1]
            if C is None or D is None or len(C) < 2 or len(D) < 2:
                continue
            if segments_cross_strict(A, B, (C[0], C[1]), (D[0], D[1])):
                return ((int(C[0]), int(C[1])), (int(D[0]), int(D[1])))
    return None

def compute_break_check(row):
    if "opponent_line" not in row or row["opponent_line"] is None:
        return np.nan
    seg = _find_break_segment(row)
    if not seg:
        return np.nan
    (cx, cy), (dx, dy) = seg
    return [[int(cx), int(cy)], [int(dx), int(dy)]]

def _pass_failed(tags):
    return tag_has(tags, "pass.outcome.name")

def _dribble_incomplete(tags):
    return tag_eq(tags, "dribble.outcome.name", "Incomplete")

def metric_break_line_try_suc(row):
    if "opponent_line" not in row or row["opponent_line"] is None:
        return (np.nan, np.nan)
    bc = row.get("break_check", np.nan)
    is_member = isinstance(bc, (list, tuple)) and len(bc) == 2
    if not is_member:
        return (0, 0)
    tags = row.get("tags", {})
    is_fail = _pass_failed(tags) or _dribble_incomplete(tags)
    return (1, 0) if is_fail else (0, 1)

# Δx 가중(멤버십 행에만)
def compute_break_dx(row):
    bc = row.get("break_check", np.nan)
    if not (isinstance(bc, (list, tuple)) and len(bc) == 2):
        return 0.0
    start = row["location"]
    end   = get_end_location(row)
    dx = float(end[0]) - float(start[0])
    return dx if dx > 0 else 0.0

# =======================
# ② ZONE14 PASS
# =======================
def metric_zone14_pass_try_suc(row):
    if str(row.get("event_type","")).lower() != "pass":
        return (0, 0)
    tags = row.get("tags", {})
    start = row.get("location")
    end   = tags.get("pass.end_location") if isinstance(tags, dict) else None
    if start is None or end is None:
        return (0, 0)
    try:
        x1 = float(start[0]); x2, y2 = float(end[0]), float(end[1])
    except Exception:
        return (0, 0)
    in_zone14 = (x1 <= Z14_X1_MAX) and (Z14_X_MIN <= x2 < Z14_X_MAX) and (Z14_Y_MIN <= y2 <= Z14_Y_MAX)
    if not in_zone14:
        return (0, 0)
    return (1, 0) if _pass_failed(tags) else (0, 1)

# =======================
# ③ BOX PASS
# =======================
def metric_box_pass_try_suc(row):
    if str(row.get("event_type","")).lower() != "pass":
        return (0, 0)
    tags = row.get("tags", {})
    end   = tags.get("pass.end_location") if isinstance(tags, dict) else None
    if end is None:
        return (0, 0)
    try:
        x2, y2 = float(end[0]), float(end[1])
    except Exception:
        return (0, 0)
    in_box = (x2 >= BOX_X_MIN) and (BOX_Y_MIN <= y2 <= BOX_Y_MAX)
    if not in_box:
        return (0, 0)
    return (1, 0) if _pass_failed(tags) else (0, 1)

# =======================
# ④ SHOT (xG 연속값, 빅찬스 컷 라인 유지)
# =======================
def metric_shot_split(row):
    if str(row.get("event_type", "")).lower() != "shot":
        return (0.0, 0.0, 0.0)
    tags = row.get("tags", {})
    if not isinstance(tags, dict):
        return (0.0, 0.0, 0.0)
    # (유지) 빅찬스 컷: 현재 BIG_CHANCE=0.0 이라 실질 영향 없음
    xg = tags.get("shot.statsbomb_xg")
    try:
        xg = float(xg) if xg is not None else None
    except Exception:
        xg = None
    if xg is None or xg < BIG_CHANCE:
        return (0.0, 0.0, 0.0)

    outcome = tags.get("shot.outcome.name")
    SAVED = {
        "Saved", "Saved Off T", "Saved To Post",
        "Saved To Bar", "Saved To Center", "Saved To Left", "Saved To Right"
    }
    OFFPOST = {"Off T", "Wayward", "Post"}

    if outcome in SAVED:
        return (xg, 0.0, 0.0)
    if outcome == "Blocked":
        return (0.0, xg, 0.0)
    if outcome in OFFPOST:
        return (0.0, 0.0, xg)
    return (0.0, 0.0, 0.0)

# =======================
# 4.5 함수: 행(row) → (pass_miss, receipt_miss, dribble_miss, dispossessed, miscontrol)
# =======================
def atk_fail(row):
    et = str(row["event_type"]).lower()
    tags = row["tags"]  # 평탄화 키 사용

    # 평탄화 키 그대로 사용
    pass_out    = tags.get("pass.outcome.name")
    receipt_out = tags.get("ball_receipt.outcome.name")
    dribble_out = tags.get("dribble.outcome.name")

    danger_zero = (
        row["break_line_try"] + row["break_line_suc"] +
        row["zone14_pass_try"] + row["zone14_pass_suc"] +
        row["box_pass_try"] + row["box_pass_suc"]
    ) == 0

    pass_miss    = int(et == "pass" and pass_out in ("Incomplete", "Out") and danger_zero)
    receipt_miss = int(et.startswith("ball receipt") and receipt_out == "Incomplete")
    dribble_miss = int(et == "dribble" and dribble_out == "Incomplete")
    dispossessed = int(et == "dispossessed")
    miscontrol   = int(et == "miscontrol")

    return (pass_miss, receipt_miss, dribble_miss, dispossessed, miscontrol)

# =======================
# ⑤ 파일 처리(시퀀스 포함)
# =======================
def process_file(df, match_id):
    # (A) 처리 대상 서브셋
    ev = df.copy()
    #ev = df[df["event_type"].astype(str).str.lower().isin(EV_TYPES)].copy()

    # (B) 메타 보정
    if "match_id" not in ev.columns:
        ev["match_id"] = match_id
    for c in BASE_META_COLS:
        if c not in ev.columns:
            ev[c] = np.nan

    # (C) 시퀀스 지표(전체 df 기준으로 계산 후 ev에 매핑)
    seq_first = df.groupby("sequence_id", sort=False)["time"].transform("first")
    #seq_num = df.groupby("sequence_id", sort=False).cumcount() + 1
    seq_num = (~df["event_type"].isin(maintain_only_types_exc_dispos)).groupby(df["sequence_id"], sort=False).cumsum()
    ev["seq_duration"] = ev["time"] - seq_first.reindex(ev.index)
    ev["seq_event_num"] = seq_num.reindex(ev.index)

    # (D) break_line → try/suc → Δx 가중
    if "opponent_line" in ev.columns:
        ev["break_check"] = ev.apply(compute_break_check, axis=1)
        ev[["break_line_try","break_line_suc"]] = ev.apply(
            metric_break_line_try_suc, axis=1, result_type="expand"
        )
        ev["break_dx"] = ev.apply(compute_break_dx, axis=1)  # 멤버십만 Δx
        ev["break_line_try_w"] = ev["break_line_try"].fillna(0) * ev["break_dx"]
        ev["break_line_suc_w"] = ev["break_line_suc"].fillna(0) * ev["break_dx"]
    else:
        ev["break_check"] = np.nan
        ev["break_line_try"]  = np.nan
        ev["break_line_suc"]  = np.nan
        ev["break_line_try_w"] = np.nan
        ev["break_line_suc_w"] = np.nan

    # (E) zone14 / box
    ev[["zone14_pass_try","zone14_pass_suc"]] = ev.apply(
        metric_zone14_pass_try_suc, axis=1, result_type="expand"
    )
    ev[["box_pass_try","box_pass_suc"]] = ev.apply(
        metric_box_pass_try_suc, axis=1, result_type="expand"
    )

    # (F) shot 3분류
    ev[["shot_saved","shot_blocked","shot_offpost"]] = ev.apply(
        metric_shot_split, axis=1, result_type="expand"
    )

    # (G) Atk_fail_action
    ev[["pass_miss","receipt_miss","dribble_miss","dispossessed","miscontrol"]] = ev.apply(
    atk_fail, axis=1, result_type="expand"
    )

    # (H) 저장 컬럼 순서 (요청 순서)
    out_cols = BASE_META_COLS + [
        "break_line_try","break_line_try_w",
        "break_line_suc","break_line_suc_w",
        "break_check",
        "zone14_pass_try","zone14_pass_suc",
        "box_pass_try","box_pass_suc",
        "shot_saved","shot_blocked","shot_offpost",
        "pass_miss","receipt_miss","dribble_miss","dispossessed","miscontrol",
        "seq_duration","seq_event_num",
    ]
    return ev[out_cols]

# =======================
# ⑥ 메인 루프
# =======================
files = sorted(glob.glob(os.path.join(IN_DIR, "*.pkl")))
for fp in tqdm(files, desc="analysis_attack 생성 (break→zone→box→shot + seq)"):
    match_id = os.path.splitext(os.path.basename(fp))[0]
    df = pd.read_pickle(fp)
    out = process_file(df, match_id)
    out.to_pickle(os.path.join(OUT_DIR, f"{match_id}.pkl"))


analysis_attack 생성 (break→zone→box→shot + seq): 100%|██████████| 295/295 [04:47<00:00,  1.03it/s]


In [11]:
import os
import pandas as pd

BASE_DIR = r"C:\analyst\StatsBomb Open Data\open-data-master\data"
MATCH_ID = "3788741"  # 확인할 결과 파일 ID
path = os.path.join(BASE_DIR, "analysis_attack", f"{MATCH_ID}.pkl")

df = pd.read_pickle(path)

# ▶ 열 생략 없이 전부 보이도록 설정
pd.options.display.expand_frame_repr = False
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_rows', None)

# 미리보기 행 수 (원하시면 조정하세요; 전체 행을 원하면 df.to_string(...)에서 head 제거)
N_ROWS = 3000
df = pd.read_pickle(path).reset_index(drop=True)  # 0..n-1로 재라벨링
col, val = "shot_saved", 0.01
print(df.index[df[col].ge(val)].tolist())
print(f"[로드 완료] {path}  shape={df.shape}")
#print(df.loc[1210:3000].to_string(index=False, max_cols=None))
df.loc[1590:1650]

[934, 1592, 1908, 2329, 2439, 2771]
[로드 완료] C:\analyst\StatsBomb Open Data\open-data-master\data\analysis_attack\3788741.pkl  shape=(3803, 28)


Unnamed: 0,match_id,event_id,period,time,status,player_name,player_position,sequence_id,event_type,break_line_try,break_line_try_w,break_line_suc,break_line_suc_w,break_check,zone14_pass_try,zone14_pass_suc,box_pass_try,box_pass_suc,shot_saved,shot_blocked,shot_offpost,pass_miss,receipt_miss,dribble_miss,dispossessed,miscontrol,seq_duration,seq_event_num
1590,3788741,066a,1,2162.83,0,Lorenzo Insigne,Left Wing,A70,Ball Receipt*,0,0.0,0,0.0,,0,0,0,0,0.0,0.0,0.0,0,0,0,0,0,3.006,4.0
1591,3788741,1fde,1,2162.83,0,Lorenzo Insigne,Left Wing,A70,Carry,0,0.0,0,0.0,,0,0,0,0,0.0,0.0,0.0,0,0,0,0,0,3.006,5.0
1592,3788741,7156,1,2164.066,0,Lorenzo Insigne,Left Wing,A70,Shot,0,0.0,0,0.0,,0,0,0,0,0.042096,0.0,0.0,0,0,0,0,0,4.242,6.0
1593,3788741,6b69,1,2164.193,0,Okay Yokuşlu,Center Defensive Midfield,A70,Block,0,0.0,0,0.0,,0,0,0,0,0.0,0.0,0.0,0,0,0,0,0,4.369,6.0
1594,3788741,ae04,1,2164.922,0,Uğurcan Çakır,Goalkeeper,H79,Goal Keeper,0,0.0,0,0.0,,0,0,0,0,0.0,0.0,0.0,0,0,0,0,0,0.0,1.0
1595,3788741,013e,1,2177.721,0,Uğurcan Çakır,Goalkeeper,H79,Pass,0,0.0,0,0.0,,0,0,0,0,0.0,0.0,0.0,0,0,0,0,0,12.799,2.0
1596,3788741,c2fa,1,2179.892,0,Caglar Söyüncü,Left Center Back,H79,Ball Receipt*,0,0.0,0,0.0,,0,0,0,0,0.0,0.0,0.0,0,0,0,0,0,14.97,3.0
1597,3788741,a595,1,2181.467,0,Ciro Immobile,Center Forward,H79,Pressure,0,0.0,0,0.0,,0,0,0,0,0.0,0.0,0.0,0,0,0,0,0,16.545,3.0
1598,3788741,e18c,1,2181.753,0,Caglar Söyüncü,Left Center Back,H79,Pass,0,0.0,0,0.0,,0,0,0,0,0.0,0.0,0.0,0,0,0,0,0,16.831,4.0
1599,3788741,bb9d,1,2182.493,0,Hakan Çalhanoğlu,Left Midfield,H79,Ball Receipt*,0,0.0,0,0.0,,0,0,0,0,0.0,0.0,0.0,0,0,0,0,0,17.571,5.0
