# PokerStars ‚Äî Data Cleaning (version clean √† rendre)

Ce notebook reconstruit **uniquement si besoin** les artefacts de cleaning utilis√©s ensuite par l'EDA et le Machine Learning (ML) :

1. **Raw PHH/PHHS ‚Üí Parquet**  
   - `hands_parquet/`  
   - `player_hands_parquet/`  (filtrage ~20% joueurs via hash)

2. **Timeline (DuckDB) ‚Üí Parquet**  
   - `timeline_parquet/` (partitionn√© par `stake_label` et `player_bucket`)

3. **√âchantillon √©quilibr√©** (ex: 300 joueurs / stake)  
   - `sample_players_balanced.csv`

4. **Exports bulk** (ex: 200 mains / joueur)  
   - `player_transcripts_bulk_300p_200h/`

5. **Structuration TXT ‚Üí Parquet**  
   - `_structured/hands_parquet/`  
   - `_structured/actions_parquet/`

## Important (anti-duplication dans Drive)
Par d√©faut, ce notebook est **idempotent** :  
- si un dossier de sortie existe d√©j√† et contient des fichiers, l‚Äô√©tape correspondante est **skip** ;  
- pour forcer une reconstruction, mets le flag `FORCE_... = True` au d√©but de l‚Äô√©tape.

In [1]:
# --- Mount Drive (Colab) ---
from google.colab import drive
drive.mount("/content/drive")

# --- Imports ---
import os, glob, re, json, hashlib, shutil, random, time
import pandas as pd
import numpy as np

# --- Paths (Drive) ---
OUT_ROOT_FULL = "/content/drive/MyDrive/pokerstars_clean/follow_20pct_full"

HANDS_DIR_FULL = f"{OUT_ROOT_FULL}/hands_parquet"
PH_DIR_FULL    = f"{OUT_ROOT_FULL}/player_hands_parquet"
TIMELINE_DIR_FULL = f"{OUT_ROOT_FULL}/timeline_parquet"

EXPORT_BULK_DIR = f"{OUT_ROOT_FULL}/player_transcripts_bulk_300p_200h"   # 300 players / stake, 200 hands / player
STRUCT_DIR      = os.path.join(EXPORT_BULK_DIR, "_structured")
STRUCT_HANDS_DIR = os.path.join(STRUCT_DIR, "hands_parquet")
STRUCT_ACT_DIR   = os.path.join(STRUCT_DIR, "actions_parquet")

SAMPLE_CSV = os.path.join(EXPORT_BULK_DIR, "sample_players_balanced.csv")

def dir_has_files(p: str) -> bool:
    return os.path.isdir(p) and any(os.scandir(p))

print("‚úÖ OUT_ROOT_FULL:", OUT_ROOT_FULL)

Mounted at /content/drive
‚úÖ OUT_ROOT_FULL: /content/drive/MyDrive/pokerstars_clean/follow_20pct_full


In [2]:
# --- Raw PHH/PHHS directories (Drive) ---
RAW_DIRS = {
    25:   "/content/drive/MyDrive/projet adv data/0.25",
    50:   "/content/drive/MyDrive/projet adv data/0.5",
    200:  "/content/drive/MyDrive/projet adv data/2",
    400:  "/content/drive/MyDrive/projet adv data/4",
    600:  "/content/drive/MyDrive/projet adv data/6",
    1000: "/content/drive/MyDrive/projet adv data/10",
}

# quick sanity check (does not scan recursively)
for stake, root in RAW_DIRS.items():
    print(f"Stake {stake:>4} | exists={os.path.exists(root)} | path={root}")

Stake   25 | exists=True | path=/content/drive/MyDrive/projet adv data/0.25
Stake   50 | exists=True | path=/content/drive/MyDrive/projet adv data/0.5
Stake  200 | exists=True | path=/content/drive/MyDrive/projet adv data/2
Stake  400 | exists=True | path=/content/drive/MyDrive/projet adv data/4
Stake  600 | exists=True | path=/content/drive/MyDrive/projet adv data/6
Stake 1000 | exists=True | path=/content/drive/MyDrive/projet adv data/10


In [3]:
import re, glob, hashlib, json
import pandas as pd

# R√©glages
KEEP_PCT = 20          # garder ~20% des joueurs
N_BUCKETS = 256        # partition joueurs
FLUSH_HANDS = 20_000   # √©criture mains (tu peux augmenter apr√®s)
FLUSH_PH    = 100_000  # √©criture player-hands (tu peux augmenter apr√®s)

SEC_RE   = re.compile(r'^\[(\d+)\]\s*$', re.MULTILINE)
TABLE_RE = re.compile(r'table\s*=\s*["\']([^"\']+)["\']')

def md5_int(s: str) -> int:
    return int(hashlib.md5(s.encode("utf-8")).hexdigest()[:8], 16)

def keep_player(pid: str, keep_pct: int = KEEP_PCT) -> bool:
    return (md5_int(pid) % 100) < keep_pct

def player_bucket(pid: str, n_buckets: int = N_BUCKETS) -> int:
    return md5_int(pid) % n_buckets

def iter_sections(text: str):
    ms = list(SEC_RE.finditer(text))
    if not ms:
        return
    starts = [m.start() for m in ms] + [len(text)]
    ids = [m.group(1) for m in ms]
    for i, sec_no in enumerate(ids):
        yield int(sec_no), text[starts[i]:starts[i+1]]

def get_str(pat: str, s: str):
    m = re.search(pat, s)
    return m.group(1) if m else None

def get_list_numbers(pat: str, s: str):
    m = re.search(pat, s, flags=re.DOTALL)
    if not m:
        return []
    return [float(x) for x in re.findall(r"[0-9]+\.[0-9]+|[0-9]+", m.group(1))]

def get_list_strings(pat: str, s: str):
    m = re.search(pat, s, flags=re.DOTALL)
    if not m:
        return []
    return re.findall(r"'([^']+)'", m.group(1))

def get_table_id(sec: str):
    m = TABLE_RE.search(sec)
    return m.group(1) if m else None

def split_actions(actions):
    streets = {"preflop": [], "flop": [], "turn": [], "river": [], "showdown": []}
    board   = {"flop": None, "turn": None, "river": None}
    current = "preflop"
    for a in actions:
        if a.startswith("d dh"):
            continue
        if a.startswith("d db "):
            cards = a.split(" ", 2)[2]
            if board["flop"] is None:
                board["flop"] = cards; current="flop";  continue
            if board["turn"] is None:
                board["turn"] = cards; current="turn";  continue
            if board["river"] is None:
                board["river"] = cards; current="river"; continue
        if a.startswith("p") and " sm" in a:
            streets["showdown"].append(a); continue
        streets[current].append(a)
    return streets, board

def player_token(a: str):
    m = re.match(r"p(\d+)\s+(.*)", a)
    return (int(m.group(1)), m.group(2).strip()) if m else (None, a)

def position_labels(n: int):
    if n<=0: return []
    if n==1: return ["UTG"]
    if n==2: return ["SB","BB"]
    if n==3: return ["BTN","SB","BB"]
    utg = max(0, n-4)
    names=[]
    if utg>0:
        names.append("UTG")
        for i in range(1, utg):
            names.append(f"UTG+{i}")
    names += ["CO","BTN","SB","BB"]
    return names

def infer_positions_from_preflop(actions):
    streets, _ = split_actions(actions)
    order=[]
    for a in streets["preflop"]:
        pid,_ = player_token(a)
        if pid and pid not in order:
            order.append(pid)
    pos = position_labels(len(order))
    return {pid: pos[i] for i, pid in enumerate(order)}

print("‚úÖ Helpers charg√©s (KEEP_PCT=20, N_BUCKETS=256)")

‚úÖ Helpers charg√©s (KEEP_PCT=20, N_BUCKETS=256)


In [4]:
# =========================
# STEP 1 ‚Äî Raw PHH/PHHS -> Parquet (hands + player_hands)
# =========================

FORCE_REBUILD_HANDS = False  # <- mets True si tu veux √©craser l'existant

# Output dirs
os.makedirs(HANDS_DIR_FULL, exist_ok=True)
os.makedirs(PH_DIR_FULL, exist_ok=True)

if dir_has_files(HANDS_DIR_FULL) and dir_has_files(PH_DIR_FULL) and not FORCE_REBUILD_HANDS:
    print("‚è≠Ô∏è  STEP 1 skipped: hands_parquet + player_hands_parquet d√©j√† pr√©sents.")
else:
    if FORCE_REBUILD_HANDS:
        print("‚ö†Ô∏è FORCE_REBUILD_HANDS=True -> suppression des anciens dossiers de sortie...")
        if os.path.exists(HANDS_DIR_FULL):
            shutil.rmtree(HANDS_DIR_FULL)
        if os.path.exists(PH_DIR_FULL):
            shutil.rmtree(PH_DIR_FULL)
        os.makedirs(HANDS_DIR_FULL, exist_ok=True)
        os.makedirs(PH_DIR_FULL, exist_ok=True)

    # Buffers
    hands_buf = {}   # stake_label -> list(rows)
    ph_buf    = {}   # (stake_label, bucket) -> list(rows)
    hands_part = 0
    ph_part    = 0

    def flush_hands(stake_label: int):
        global hands_part
        rows = hands_buf.get(stake_label, [])
        if not rows:
            return
        df = pd.DataFrame(rows)
        out_path = os.path.join(HANDS_DIR_FULL, f"stake_label={stake_label}")
        os.makedirs(out_path, exist_ok=True)
        hands_part += 1
        file_path = os.path.join(out_path, f"hands-part-{hands_part:06d}.parquet")
        df.to_parquet(file_path, index=False)
        hands_buf[stake_label] = []

    def flush_ph(stake_label: int, bucket_id: int):
        global ph_part
        key = (stake_label, bucket_id)
        rows = ph_buf.get(key, [])
        if not rows:
            return
        df = pd.DataFrame(rows)
        out_path = os.path.join(PH_DIR_FULL, f"stake_label={stake_label}", f"player_bucket={bucket_id:03d}")
        os.makedirs(out_path, exist_ok=True)
        ph_part += 1
        file_path = os.path.join(out_path, f"ph-part-{ph_part:06d}.parquet")
        df.to_parquet(file_path, index=False)
        ph_buf[key] = []

    def flush_all():
        for s in list(hands_buf.keys()):
            flush_hands(s)
        for (s, b) in list(ph_buf.keys()):
            flush_ph(s, b)

    total_files = 0
    total_hands_seen = 0
    hands_kept = 0
    ph_rows_kept = 0

    for stake_label, root in RAW_DIRS.items():
        paths = list(glob.iglob(os.path.join(root, "**", "*.phh*"), recursive=True))
        paths.sort()
        print(f"\n=== Stake {stake_label} | fichiers: {len(paths)} ===")

        for path in paths:
            total_files += 1

            with open(path, "r", encoding="utf-8", errors="ignore") as f:
                txt = f.read()

            for _, sec in iter_sections(txt):
                venue = get_str(r"venue\s*=\s*['\"]([^'\"]+)['\"]", sec)
                if venue != "PokerStars":
                    continue

                table_id = get_table_id(sec)
                ts = get_str(r"ts\s*=\s*['\"]([^'\"]+)['\"]", sec)

                sb = get_str(r"sb\s*=\s*([0-9.]+)", sec)
                bb = get_str(r"bb\s*=\s*([0-9.]+)", sec)

                hand_uid = md5_int(f"{os.path.basename(path)}|{sec[:200]}")

                players = get_list_strings(r"players\s*=\s*\[(.*?)\]", sec)
                stacks  = get_list_strings(r"stacks\s*=\s*\[(.*?)\]", sec)
                seats   = get_list_strings(r"seats\s*=\s*\[(.*?)\]", sec)
                actions = get_list_strings(r"actions\s*=\s*\[(.*?)\]", sec)

                n = len(players)
                if n == 0 or len(stacks) != n:
                    continue

                total_hands_seen += 1

                kept_mask = [keep_player(pid, KEEP_PCT) for pid in players]
                if not any(kept_mask):
                    continue

                hands_kept += 1
                seats = (seats + [None]*n)[:n]
                pos_map = infer_positions_from_preflop(actions)

                # HAND row (1 par main)
                hands_buf.setdefault(stake_label, []).append({
                    "stake_label": stake_label,
                    "ts": ts,
                    "hand_uid": int(hand_uid),
                    "table_id": table_id,
                    "sb": float(sb) if sb is not None else None,
                    "bb": float(bb) if bb is not None else None,
                    "players_json": json.dumps(players),
                    "starting_stacks_json": json.dumps([float(x) for x in stacks]),
                    "seats_json": json.dumps(seats),
                    "actions_json": json.dumps(actions),
                    "source_file": os.path.basename(path),
                })
                if len(hands_buf[stake_label]) >= FLUSH_HANDS:
                    flush_hands(stake_label)

                # PLAYER_HAND rows (1 par joueur gard√©)
                for i, pid in enumerate(players):
                    if not kept_mask[i]:
                        continue
                    ph_rows_kept += 1
                    b = md5_int(pid) % N_BUCKETS
                    key = (stake_label, b)
                    ph_buf.setdefault(key, []).append({
                        "stake_label": stake_label,
                        "player_bucket": b,
                        "player_id": pid,
                        "table_id": table_id,
                        "hand_uid": int(hand_uid),
                        "ts": ts,
                        "seat_idx": i+1,
                        "pos_label": pos_map.get(i+1, f"P{i+1}"),
                        "starting_stack": float(stacks[i]),
                        "source_file": os.path.basename(path),
                    })
                    if len(ph_buf[key]) >= FLUSH_PH:
                        flush_ph(stake_label, b)

            if total_files % 50 == 0:
                print(f"‚Ä¶ fichiers {total_files} | mains vues {total_hands_seen} | mains gard√©es {hands_kept} | ph_rows {ph_rows_kept}")

    flush_all()

    print("\n‚úÖ STEP 1 termin√©")
    print("Total fichiers lus:", total_files)
    print("Mains vues:", total_hands_seen)
    print("Mains gard√©es (>=1 joueur dans 20%):", hands_kept)
    print("Lignes PLAYER_HAND gard√©es:", ph_rows_kept)
    print("Sorties:")
    print(" -", HANDS_DIR_FULL)
    print(" -", PH_DIR_FULL)

‚è≠Ô∏è  STEP 1 skipped: hands_parquet + player_hands_parquet d√©j√† pr√©sents.


In [5]:
# =========================
# STEP 2 ‚Äî Build timeline_parquet (DuckDB)
# =========================

FORCE_REBUILD_TIMELINE = False  # <- True pour √©craser timeline_parquet

try:
    import duckdb
except ImportError:
    !pip -q install duckdb
    import duckdb

os.makedirs(TIMELINE_DIR_FULL, exist_ok=True)

if dir_has_files(TIMELINE_DIR_FULL) and not FORCE_REBUILD_TIMELINE:
    print("‚è≠Ô∏è  STEP 2 skipped: timeline_parquet d√©j√† pr√©sent.")
else:
    if FORCE_REBUILD_TIMELINE and os.path.exists(TIMELINE_DIR_FULL):
        print("‚ö†Ô∏è FORCE_REBUILD_TIMELINE=True -> suppression timeline_parquet existant...")
        shutil.rmtree(TIMELINE_DIR_FULL)
        os.makedirs(TIMELINE_DIR_FULL, exist_ok=True)

    con = duckdb.connect()
    con.execute("PRAGMA threads=4;")
    con.execute("PRAGMA enable_progress_bar=true;")

    con.execute(f'''
    COPY (
      WITH base AS (
        SELECT
          stake_label,
          player_bucket,
          player_id,
          table_id,
          hand_uid,
          ts,
          seat_idx,
          pos_label,
          starting_stack,
          source_file,
          ROW_NUMBER() OVER w AS rn
        FROM parquet_scan('{PH_DIR_FULL}/**/*.parquet')
        WINDOW w AS (
          PARTITION BY stake_label, table_id, player_id
          ORDER BY ts, hand_uid
        )
      )
      SELECT * FROM base
    )
    TO '{TIMELINE_DIR_FULL}'
    (FORMAT PARQUET, PARTITION_BY (stake_label, player_bucket), COMPRESSION ZSTD);
    ''')

    print("‚úÖ Timeline √©crite dans :", TIMELINE_DIR_FULL)

‚è≠Ô∏è  STEP 2 skipped: timeline_parquet d√©j√† pr√©sent.


In [6]:
# =========================
# STEP 3 ‚Äî Build sample_players_balanced.csv (300 joueurs / stake)
# =========================

FORCE_RESAMPLE_PLAYERS = False
PLAYERS_PER_STAKE = 300
STAKE_LABELS = [25, 50, 200, 400, 600, 1000]

try:
    import duckdb
except ImportError:
    !pip -q install duckdb
    import duckdb

os.makedirs(EXPORT_BULK_DIR, exist_ok=True)

if os.path.exists(SAMPLE_CSV) and (os.path.getsize(SAMPLE_CSV) > 0) and not FORCE_RESAMPLE_PLAYERS:
    print("‚è≠Ô∏è  STEP 3 skipped: sample_players_balanced.csv d√©j√† pr√©sent.")
    print("CSV:", SAMPLE_CSV)
else:
    con = duckdb.connect()
    con.execute("PRAGMA threads=4;")
    con.execute("PRAGMA enable_progress_bar=true;")

    df_samples = con.execute(f'''
    WITH players AS (
        SELECT stake_label, player_id
        FROM parquet_scan('{TIMELINE_DIR_FULL}/**/*.parquet')
        GROUP BY stake_label, player_id
    ),
    ranked AS (
        SELECT
            stake_label,
            player_id,
            ROW_NUMBER() OVER (PARTITION BY stake_label ORDER BY md5(player_id)) AS rn
        FROM players
        WHERE stake_label IN ({",".join(map(str, STAKE_LABELS))})
    )
    SELECT stake_label, player_id
    FROM ranked
    WHERE rn <= {PLAYERS_PER_STAKE}
    ORDER BY stake_label, rn
    ''').df()

    df_samples.to_csv(SAMPLE_CSV, index=False)
    print("‚úÖ sample √©crit :", SAMPLE_CSV)
    print(df_samples.groupby("stake_label")["player_id"].nunique())

‚è≠Ô∏è  STEP 3 skipped: sample_players_balanced.csv d√©j√† pr√©sent.
CSV: /content/drive/MyDrive/pokerstars_clean/follow_20pct_full/player_transcripts_bulk_300p_200h/sample_players_balanced.csv


In [7]:
import json, re

def split_actions_to_streets(actions):
    streets = {"preflop": [], "flop": [], "turn": [], "river": [], "showdown": []}
    board   = {"flop": None, "turn": None, "river": None}
    current = "preflop"
    for a in actions:
        if a.startswith("d dh"):
            continue
        if a.startswith("d db "):
            cards = a.split(" ", 2)[2]
            if board["flop"] is None:
                board["flop"] = cards; current="flop";  continue
            if board["turn"] is None:
                board["turn"] = cards; current="turn";  continue
            if board["river"] is None:
                board["river"] = cards; current="river"; continue
        if a.startswith("p") and " sm" in a:
            streets["showdown"].append(a); continue
        streets[current].append(a)
    return streets, board

def parse_action(act: str, street: str):
    m = re.match(r"p(\d+)\s+(.*)", act)
    if not m:
        return None, act
    pid = int(m.group(1))
    rest = m.group(2).strip()

    if rest == "f":
        txt = "fold"
    elif rest == "cc":
        txt = "call/check"
    else:
        m2 = re.match(r"cbr\s+([0-9.]+)", rest)
        if m2:
            amt = m2.group(1)
            txt = f"bet/raise {amt}"
        else:
            txt = rest
    return pid, txt

def make_hand_transcript(row):
    actions = json.loads(row["actions_json"])
    players = json.loads(row["players_json"])
    stacks  = json.loads(row["starting_stacks_json"])

    streets, board = split_actions_to_streets(actions)

    # rep√®re le seat_idx de notre joueur (pX)
    hero_p = int(row["seat_idx"])

    header = (
        f"{row['ts']} | hand_uid={row['hand_uid']} | table={row['table_id'][:10]}... | "
        f"hero=p{hero_p} ({row['pos_label']}) | start={row['starting_stack']} | "
        f"net_from_stacks={row['net_from_stacks']}"
    )

    def fmt_street(st):
        out = []
        for a in streets[st]:
            pid, txt = parse_action(a, st)
            if pid is None:
                continue
            tag = "HERO" if pid == hero_p else "VIL "
            out.append(f"{tag} p{pid}: {txt}")
        return out

    lines = [header]
    lines.append("  Preflop:")
    lines += ["   - " + x for x in fmt_street("preflop")] or ["   - (‚Äî)"]

    if board["flop"]:
        lines.append(f"  Flop {board['flop']}:")
        lines += ["   - " + x for x in fmt_street("flop")] or ["   - (‚Äî)"]
    if board["turn"]:
        lines.append(f"  Turn {board['turn']}:")
        lines += ["   - " + x for x in fmt_street("turn")] or ["   - (‚Äî)"]
    if board["river"]:
        lines.append(f"  River {board['river']}:")
        lines += ["   - " + x for x in fmt_street("river")] or ["   - (‚Äî)"]

    if streets["showdown"]:
        lines.append("  Showdown:")
        for a in streets["showdown"]:
            pid, rest = parse_action(a, "showdown")
            if pid is None:
                continue
            tag = "HERO" if pid == hero_p else "VIL "
            lines.append(f"   - {tag} p{pid}: shows {a.split('sm',1)[1].strip()}")

    return "\n".join(lines)

print("‚úÖ Fonctions transcript pr√™tes")


# === PATCH : injecter player_id dans le header du transcript ===
# (sans toucher au reste : on wrappe l'ancienne fonction)

make_hand_transcript_old = make_hand_transcript  # on garde l'ancienne

def make_hand_transcript(row):
    txt = make_hand_transcript_old(row)

    pid = row.get("player_id", None) if hasattr(row, "get") else None
    if pid is None:
        return txt

    pid_short = str(pid)[:16]  # tronqu√© (tu peux mettre [:32] si tu veux)

    lines = txt.splitlines()
    if not lines:
        return txt

    # √©vite doublon si d√©j√† pr√©sent
    if "player_id=" in lines[0]:
        return txt

    # injection propre
    if " | hero=" in lines[0]:
        lines[0] = lines[0].replace(" | hero=", f" | player_id={pid_short} | hero=", 1)
    else:
        lines[0] = lines[0] + f" | player_id={pid_short}"

    return "\n".join(lines)

print("‚úÖ Patch appliqu√© : le player_id sera affich√© dans la 1√®re ligne de chaque main export√©e.")

‚úÖ Fonctions transcript pr√™tes
‚úÖ Patch appliqu√© : le player_id sera affich√© dans la 1√®re ligne de chaque main export√©e.


In [8]:
import os, duckdb, pandas as pd, hashlib

OUT_ROOT_FULL  = "/content/drive/MyDrive/pokerstars_clean/follow_20pct_full"
TIMELINE_DIR_FULL = f"{OUT_ROOT_FULL}/timeline_parquet"
HANDS_DIR_FULL    = f"{OUT_ROOT_FULL}/hands_parquet"

EXPORT_DIR = f"{OUT_ROOT_FULL}/player_transcripts"
os.makedirs(EXPORT_DIR, exist_ok=True)

def bucket(pid: str, n=256):
    return int(hashlib.md5(pid.encode("utf-8")).hexdigest()[:8], 16) % n

def build_player_df(player_id: str, stake_label: int, limit_hands: int = 200):
    """
    Renvoie un DF (timeline + actions) pour un joueur donn√© sur un stake.
    """
    b = bucket(player_id)
    con = duckdb.connect()
    con.execute("PRAGMA threads=4;")

    tl = con.execute(f"""
    SELECT *
    FROM parquet_scan('{TIMELINE_DIR_FULL}/stake_label={stake_label}/player_bucket={b:03d}/*.parquet')
    WHERE player_id = '{player_id}'
    ORDER BY table_id, ts, hand_uid
    LIMIT {int(limit_hands)}
    """).df()

    if len(tl) == 0:
        return tl

    hands = con.execute(f"""
    SELECT ts, hand_uid, table_id, actions_json, players_json, starting_stacks_json
    FROM parquet_scan('{HANDS_DIR_FULL}/stake_label={stake_label}/*.parquet')
    WHERE (table_id, hand_uid) IN (SELECT table_id, hand_uid FROM tl)
    """).df()

    df = tl.merge(hands, on=["ts","hand_uid","table_id"], how="left") \
           .sort_values(["table_id","ts","hand_uid"]) \
           .reset_index(drop=True)
    return df

def export_player_transcript(player_id: str, stake_label: int, n_hands: int = 50, filename=None):
    dfp = build_player_df(player_id, stake_label, limit_hands=n_hands)
    dfp = dfp[dfp["actions_json"].notna()].copy()

    if filename is None:
        filename = f"stake{stake_label}_player_{player_id[:12]}_first{n_hands}.txt"
    out_path = os.path.join(EXPORT_DIR, filename)

    with open(out_path, "w", encoding="utf-8") as f:
        for k in range(min(n_hands, len(dfp))):
            f.write("="*120 + "\n")
            f.write(make_hand_transcript(dfp.iloc[k]) + "\n\n")

    print("‚úÖ Export √©crit:", out_path)
    print("   mains √©crites:", min(n_hands, len(dfp)))
    return out_path

print("‚úÖ Fonctions build_player_df / export_player_transcript pr√™tes")
print("üìÅ Export dir:", EXPORT_DIR)



# --- Override: export_player_transcript SAFE (atomic write + mkdir) ---
import inspect

def export_player_transcript(player_id: str, stake_label: int, n_hands: int = 200, filename=None, player_bucket=None):
    # build_player_df peut (ou non) accepter player_bucket selon ta version
    sig = inspect.signature(build_player_df)
    kwargs = {"player_id": player_id, "stake_label": int(stake_label), "limit_hands": int(n_hands)}
    if "player_bucket" in sig.parameters and player_bucket is not None:
        kwargs["player_bucket"] = int(player_bucket)

    dfp = build_player_df(**kwargs)

    # Filtre actions_json si la colonne existe
    if "actions_json" in dfp.columns:
        dfp2 = dfp[dfp["actions_json"].notna()].copy()
    else:
        dfp2 = dfp.copy()

    if len(dfp2) == 0:
        raise ValueError("Aucune main exploitable (df vide apr√®s filtrage actions_json).")

    # Chemin de sortie
    if filename is None:
        base_dir = globals().get("EXPORT_DIR", ".")
        filename = f"stake{stake_label}_player_{str(player_id)[:12]}_first{min(int(n_hands), len(dfp2))}.txt"
        out_path = os.path.join(base_dir, filename)
    else:
        out_path = filename if str(filename).startswith("/") else os.path.join(globals().get("EXPORT_DIR", "."), filename)

    os.makedirs(os.path.dirname(out_path), exist_ok=True)

    # √âcriture safe : tmp puis rename (jamais de fichier final vide)
    tmp_path = out_path + ".tmp"
    n = min(int(n_hands), len(dfp2))

    with open(tmp_path, "w", encoding="utf-8") as f:
        for k in range(n):
            f.write("=" * 120 + "\n")
            f.write(make_hand_transcript(dfp2.iloc[k]) + "\n\n")

    os.replace(tmp_path, out_path)
    return out_path

‚úÖ Fonctions build_player_df / export_player_transcript pr√™tes
üìÅ Export dir: /content/drive/MyDrive/pokerstars_clean/follow_20pct_full/player_transcripts


In [9]:
# =========================
# STEP 4 ‚Äî Export BULK transcripts TXT (300 joueurs / stake, 200 mains / joueur)
# =========================

FORCE_REEXPORT_TRANSCRIPTS = False
N_HANDS_PER_PLAYER = 200

if not os.path.exists(SAMPLE_CSV):
    raise FileNotFoundError(f"‚ùå sample_players_balanced.csv introuvable : {SAMPLE_CSV}")

existing_txt = glob.glob(os.path.join(EXPORT_BULK_DIR, "stake_label=*", "*.txt"))
manifest_path = os.path.join(EXPORT_BULK_DIR, "exports_manifest.csv")

if existing_txt and os.path.exists(manifest_path) and not FORCE_REEXPORT_TRANSCRIPTS:
    print("‚è≠Ô∏è  STEP 4 skipped: des transcripts TXT existent d√©j√† + manifest pr√©sent.")
    print("TXT count:", len(existing_txt))
    print("Manifest:", manifest_path)
else:
    os.makedirs(EXPORT_BULK_DIR, exist_ok=True)

    df_samples = pd.read_csv(SAMPLE_CSV)

    need_cols = {"stake_label", "player_id"}
    missing = need_cols - set(df_samples.columns)
    if missing:
        raise ValueError(f"‚ùå Colonnes manquantes dans sample_players_balanced.csv: {missing}")

    manifest = []
    errors = []

    for stake, sub in df_samples.groupby("stake_label"):
        out_dir_stake = os.path.join(EXPORT_BULK_DIR, f"stake_label={int(stake)}")
        os.makedirs(out_dir_stake, exist_ok=True)

        players = sub["player_id"].astype(str).tolist()
        print(f"\nStake {int(stake)} | joueurs: {len(players)} | out={out_dir_stake}")

        for i, pid in enumerate(players, start=1):
            fname = f"stake{int(stake)}_player_{pid[:12]}_first{N_HANDS_PER_PLAYER}.txt"
            out_path = os.path.join(out_dir_stake, fname)

            # Pour pouvoir relancer sans tout refaire
            if os.path.exists(out_path) and os.path.getsize(out_path) > 0 and not FORCE_REEXPORT_TRANSCRIPTS:
                manifest.append({"stake_label": int(stake), "player_id": pid, "txt_path": out_path, "status": "skipped_exists"})
                continue

            try:
                export_player_transcript(
                    player_id=pid,
                    stake_label=int(stake),
                    n_hands=int(N_HANDS_PER_PLAYER),
                    filename=out_path,
                )
                manifest.append({"stake_label": int(stake), "player_id": pid, "txt_path": out_path, "status": "ok"})
            except Exception as e:
                errors.append({"stake_label": int(stake), "player_id": pid, "error": repr(e)})
                manifest.append({"stake_label": int(stake), "player_id": pid, "txt_path": out_path, "status": "error"})

            if i % 25 == 0:
                print(f"  ... {i}/{len(players)} joueurs trait√©s")

    # Sauvegarder manifest + erreurs (overwrite)
    pd.DataFrame(manifest).to_csv(manifest_path, index=False)

    errors_path = os.path.join(EXPORT_BULK_DIR, "exports_errors.csv")
    pd.DataFrame(errors).to_csv(errors_path, index=False)

    print("\n‚úÖ STEP 4 termin√©.")
    print("Manifest :", manifest_path)
    print("Erreurs  :", errors_path)
    print("Nb OK/skip:", sum(r.get("status") in ["ok", "skipped_exists"] for r in manifest))
    print("Nb erreurs:", len(errors))

‚è≠Ô∏è  STEP 4 skipped: des transcripts TXT existent d√©j√† + manifest pr√©sent.
TXT count: 1468
Manifest: /content/drive/MyDrive/pokerstars_clean/follow_20pct_full/player_transcripts_bulk_300p_200h/exports_manifest.csv


In [10]:
# =========================
# STEP 5 ‚Äî Structuration des transcripts TXT -> Parquet (_structured/hands_parquet + actions_parquet)
# =========================

FORCE_RESTRUCTURE_TRANSCRIPTS = False  # <- True pour supprimer _structured et tout reconstruire

EXPORT_BULK_DIR_PATH = EXPORT_BULK_DIR

# Si le dataset structur√© existe d√©j√†, on skip (par d√©faut)
if dir_has_files(STRUCT_HANDS_DIR) and dir_has_files(STRUCT_ACT_DIR) and not FORCE_RESTRUCTURE_TRANSCRIPTS:
    print("‚è≠Ô∏è  STEP 5 skipped: dataset structur√© d√©j√† pr√©sent.")
    print(" -", STRUCT_HANDS_DIR)
    print(" -", STRUCT_ACT_DIR)
else:
    # =========================
    # PokerStars TRANSCRIPTS TXT -> Parquet
    # (hands + actions) with stake_label + player_id
    # =========================

    import os, re, glob, json, time, shutil
    import pandas as pd

    try:
        from tqdm import tqdm
    except Exception:
        def tqdm(x, **kwargs):  # fallback
            return x

    # ---------
    # CONFIG
    # ---------
    EXPORT_BULK_DIR = EXPORT_BULK_DIR_PATH

    STRUCT_DIR = os.path.join(EXPORT_BULK_DIR, "_structured")
    HANDS_DIR  = os.path.join(STRUCT_DIR, "hands_parquet")
    ACT_DIR    = os.path.join(STRUCT_DIR, "actions_parquet")

    MANIFEST_PATH = os.path.join(STRUCT_DIR, "_manifest_processed.txt")
    ERRORS_PATH   = os.path.join(STRUCT_DIR, "_errors.log")

    # si tu veux forcer un rerun complet (efface manifest + dossiers parquet) -> mets True
    FORCE_RERUN = FORCE_RESTRUCTURE_TRANSCRIPTS

    CHUNK_HANDS   = 10_000
    CHUNK_ACTIONS = 200_000

    # ---------
    # SETUP
    # ---------
    os.makedirs(STRUCT_DIR, exist_ok=True)
    os.makedirs(HANDS_DIR, exist_ok=True)
    os.makedirs(ACT_DIR, exist_ok=True)

    if FORCE_RERUN:
        for p in [MANIFEST_PATH, ERRORS_PATH]:
            if os.path.exists(p):
                os.remove(p)
        for d in [HANDS_DIR, ACT_DIR]:
            if os.path.exists(d):
                shutil.rmtree(d)
            os.makedirs(d, exist_ok=True)
        print("FORCE_RERUN: reset manifest + parquet dirs ‚úÖ")

    # ---------
    # HELPERS
    # ---------
    # Chaque main commence par une ligne type:
    # 2009-07-14 18:28:13 | hand_uid=... | table=... | hero=p4 (BTN) | start=... | net_from_stacks=...
    HAND_START_RE = re.compile(r"(?m)^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\s*\|\s*hand_uid=")

    def _log_error(msg: str):
        with open(ERRORS_PATH, "a", encoding="utf-8") as f:
            f.write(msg.rstrip() + "\n")

    def parse_stake_label(path: str) -> int:
        m = re.search(r"stake_label=(\d+)", path)
        return int(m.group(1)) if m else -1

    def parse_player_id_from_filename(path: str) -> str:
        base = os.path.basename(path)
        m = re.search(r"player_(.+?)_first\d+\.txt$", base)
        if m:
            return m.group(1)
        m2 = re.search(r"player_(.+?)\.txt$", base)
        return m2.group(1) if m2 else base.replace(".txt", "")

    def split_hand_blocks(text: str):
        starts = [m.start() for m in HAND_START_RE.finditer(text)]
        if not starts:
            return []
        blocks = []
        for i, s in enumerate(starts):
            e = starts[i+1] if i+1 < len(starts) else len(text)
            blk = text[s:e].strip()
            if blk:
                blocks.append(blk)
        return blocks

    def parse_action_line(rest: str):
        """
        Transcript actions examples:
          "fold"
          "call/check"
          "bet/raise 30"
          "bet/raise 98.50"
        """
        r = (rest or "").strip()
        low = r.lower()

        nums = re.findall(r"-?\d+(?:\.\d+)?", r)
        amt = float(nums[-1]) if nums else None

        if "fold" in low:
            return {"action_type": "fold", "amount": 0.0, "is_all_in": False}

        if low in {"call/check", "check/call"} or "call/check" in low or "check/call" in low:
            return {"action_type": "call_or_check", "amount": None, "is_all_in": False}

        if low.startswith("bet/raise"):
            return {"action_type": "bet_or_raise", "amount": amt, "is_all_in": False}

        if low.startswith("bet "):
            return {"action_type": "bet", "amount": amt, "is_all_in": False}

        if low.startswith("raise "):
            return {"action_type": "raise", "amount": amt, "is_all_in": False}

        if "check" in low and "call" not in low:
            return {"action_type": "check", "amount": 0.0, "is_all_in": False}

        if "call" in low and "check" not in low:
            return {"action_type": "call", "amount": amt, "is_all_in": False}

        return {"action_type": "other", "amount": amt, "is_all_in": False}

    def parse_hand_block(block: str, stake_label: int, player_id: str, source_file: str, hand_index_in_file: int):
        """
        Header example:
        2009-07-14 18:28:13 | hand_uid=60931459206 | table=... | hero=p4 (BTN) | start=261.0 | net_from_stacks=18.5

        Streets:
          Preflop:
          Flop Jc9c4c:
          Turn 3d:
          River 6c:

        Actions:
          - VIL p3: fold
          - HERO p4: call/check
          - VIL p5: bet/raise 98.50
        """
        lines = [ln.rstrip("\n") for ln in block.splitlines() if ln.strip() != ""]
        if not lines:
            return None, []

        header = lines[0].strip()

        # parsing header robuste
        hm = re.match(
            r"^(?P<dt>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s*\|\s*"
            r"hand_uid=(?P<hand_uid>\d+)\s*\|\s*"
            r"table=(?P<table>[^|]+?)\s*\|\s*"
            r"hero=(?P<hero_player>p\d+)\s*\((?P<hero_pos>[^)]+)\)\s*\|\s*"
            r"start=(?P<start>-?\d+(?:\.\d+)?)\s*\|\s*"
            r"net_from_stacks=(?P<net>-?\d+(?:\.\d+)?)\s*$",
            header
        )

        ts = hm.group("dt") if hm else None
        hand_uid = int(hm.group("hand_uid")) if hm else None
        table_id = hm.group("table").strip() if hm else None
        hero_player = hm.group("hero_player").strip() if hm else None
        hero_pos = hm.group("hero_pos").strip() if hm else None
        start_val = float(hm.group("start")) if hm else None
        net_from_stacks = float(hm.group("net")) if hm else None

        board_flop = None
        board_turn = None
        board_river = None

        current_street = None
        action_rows = []
        action_no = 0

        for ln in lines[1:]:
            s = ln.strip()

            # street headers
            if s.lower().startswith("preflop"):
                current_street = "PREFLOP"
                continue

            if s.lower().startswith("flop"):
                current_street = "FLOP"
                cards = re.findall(r"[2-9TJQKA][cdhs]", s)
                if cards:
                    board_flop = " ".join(cards[:3])
                continue

            if s.lower().startswith("turn"):
                current_street = "TURN"
                cards = re.findall(r"[2-9TJQKA][cdhs]", s)
                if cards:
                    board_turn = cards[-1]
                continue

            if s.lower().startswith("river"):
                current_street = "RIVER"
                cards = re.findall(r"[2-9TJQKA][cdhs]", s)
                if cards:
                    board_river = cards[-1]
                continue

            # action lines: "- VIL p2: fold" / "- HERO p4: bet/raise 30"
            am = re.match(r"^-+\s*(?P<role>HERO|VIL)\s+(?P<p>p\d+)\s*:\s*(?P<rest>.+)$", s, flags=re.IGNORECASE)
            if not am:
                continue

            role = am.group("role").upper()
            actor_p = am.group("p")
            rest = am.group("rest").strip()

            parsed = parse_action_line(rest)

            action_no += 1
            action_rows.append({
                "stake_label": stake_label,
                "player_id": player_id,
                "source_file": source_file,
                "ts": ts,
                "hand_uid": hand_uid,
                "hand_id": hand_uid,   # alias pratique
                "table_id": table_id,
                "street": current_street,
                "action_no": action_no,
                "actor_role": role,     # HERO / VIL
                "actor_player": actor_p,# p4, p3, ...
                "is_hero": (role == "HERO"),
                "action_type": parsed["action_type"],
                "amount": parsed["amount"],
                "raw_action": rest,
            })

        hand_row = {
            "stake_label": stake_label,
            "player_id": player_id,
            "source_file": source_file,
            "hand_index_in_file": hand_index_in_file,
            "ts": ts,
            "hand_uid": hand_uid,
            "hand_id": hand_uid,   # alias pratique
            "table_id": table_id,
            "hero_player": hero_player,
            "hero_pos": hero_pos,
            "start": start_val,
            "net_from_stacks": net_from_stacks,
            "board_flop": board_flop,
            "board_turn": board_turn,
            "board_river": board_river,
        }

        return hand_row, action_rows

    def write_part(df: pd.DataFrame, out_dir: str, stake_label: int, part_idx: int, kind: str):
        stake_dir = os.path.join(out_dir, f"stake_label={stake_label}")
        os.makedirs(stake_dir, exist_ok=True)
        out_path = os.path.join(stake_dir, f"{kind}_part-{part_idx:05d}.parquet")
        df.to_parquet(out_path, index=False, engine="pyarrow")
        return out_path

    # ---------
    # RESUME MANIFEST
    # ---------
    processed = set()
    if os.path.exists(MANIFEST_PATH):
        with open(MANIFEST_PATH, "r", encoding="utf-8") as f:
            for ln in f:
                p = ln.strip()
                if p:
                    processed.add(p)

    # ---------
    # MAIN
    # ---------
    txt_files = glob.glob(os.path.join(EXPORT_BULK_DIR, "stake_label=*", "*.txt"))
    txt_files = sorted(txt_files, key=lambda p: (parse_stake_label(p), p))

    print("EXPORT_BULK_DIR:", EXPORT_BULK_DIR, "| exists:", os.path.exists(EXPORT_BULK_DIR))
    print("TXT files found:", len(txt_files))
    print("Already processed:", len(processed))
    print("Output STRUCT_DIR:", STRUCT_DIR)

    hands_buffer = []
    actions_buffer = []

    current_stake = None
    hands_part_idx = 0
    actions_part_idx = 0

    total_hands = 0
    total_actions = 0
    t0 = time.time()

    for path in tqdm(txt_files, desc="Parsing TXT"):
        if path in processed:
            continue

        stake_label = parse_stake_label(path)
        player_id = parse_player_id_from_filename(path)

        # switch stake -> flush previous stake buffers + reset part index
        if current_stake is None:
            current_stake = stake_label

        if stake_label != current_stake:
            if hands_buffer:
                dfh = pd.DataFrame(hands_buffer)
                write_part(dfh, HANDS_DIR, current_stake, hands_part_idx, kind="hands")
                hands_part_idx += 1
                hands_buffer = []
            if actions_buffer:
                dfa = pd.DataFrame(actions_buffer)
                write_part(dfa, ACT_DIR, current_stake, actions_part_idx, kind="actions")
                actions_part_idx += 1
                actions_buffer = []

            current_stake = stake_label
            hands_part_idx = 0
            actions_part_idx = 0

        try:
            with open(path, "r", encoding="utf-8", errors="replace") as f:
                text = f.read()
        except Exception as e:
            _log_error(f"[READ_FAIL] {path} | {repr(e)}")
            continue

        blocks = split_hand_blocks(text)
        if not blocks:
            _log_error(f"[NO_HANDS_FOUND] {path}")
            with open(MANIFEST_PATH, "a", encoding="utf-8") as f:
                f.write(path + "\n")
            processed.add(path)
            continue

        for hi, blk in enumerate(blocks):
            try:
                hand_row, action_rows = parse_hand_block(
                    blk,
                    stake_label=stake_label,
                    player_id=player_id,
                    source_file=path,
                    hand_index_in_file=hi
                )

                if hand_row is not None:
                    hands_buffer.append(hand_row)
                    total_hands += 1

                if action_rows:
                    actions_buffer.extend(action_rows)
                    total_actions += len(action_rows)

            except Exception as e:
                _log_error(f"[PARSE_FAIL] {path} | hand_idx={hi} | {repr(e)}")
                continue

            # chunk flush
            if len(hands_buffer) >= CHUNK_HANDS:
                dfh = pd.DataFrame(hands_buffer)
                write_part(dfh, HANDS_DIR, current_stake, hands_part_idx, kind="hands")
                hands_part_idx += 1
                hands_buffer = []

            if len(actions_buffer) >= CHUNK_ACTIONS:
                dfa = pd.DataFrame(actions_buffer)
                write_part(dfa, ACT_DIR, current_stake, actions_part_idx, kind="actions")
                actions_part_idx += 1
                actions_buffer = []

        # mark processed file
        with open(MANIFEST_PATH, "a", encoding="utf-8") as f:
            f.write(path + "\n")
        processed.add(path)

    # final flush
    if current_stake is not None:
        if hands_buffer:
            dfh = pd.DataFrame(hands_buffer)
            write_part(dfh, HANDS_DIR, current_stake, hands_part_idx, kind="hands")
            hands_part_idx += 1
            hands_buffer = []
        if actions_buffer:
            dfa = pd.DataFrame(actions_buffer)
            write_part(dfa, ACT_DIR, current_stake, actions_part_idx, kind="actions")
            actions_part_idx += 1
            actions_buffer = []

    dt = time.time() - t0
    print("\n‚úÖ DONE")
    print("Hands parsed:", total_hands)
    print("Actions parsed:", total_actions)
    print(f"Elapsed: {dt:.1f}s")
    print("Hands output dir:", HANDS_DIR)
    print("Actions output dir:", ACT_DIR)
    print("Manifest:", MANIFEST_PATH)
    print("Errors (if any):", ERRORS_PATH)

‚è≠Ô∏è  STEP 5 skipped: dataset structur√© d√©j√† pr√©sent.
 - /content/drive/MyDrive/pokerstars_clean/follow_20pct_full/player_transcripts_bulk_300p_200h/_structured/hands_parquet
 - /content/drive/MyDrive/pokerstars_clean/follow_20pct_full/player_transcripts_bulk_300p_200h/_structured/actions_parquet


In [11]:
# =========================
# FINAL ‚Äî Sanity checks (light)
# =========================

def count_parquet(root: str) -> int:
    if not os.path.exists(root):
        return 0
    return len(glob.glob(os.path.join(root, "**", "*.parquet"), recursive=True))

def count_txt(root: str) -> int:
    if not os.path.exists(root):
        return 0
    return len(glob.glob(os.path.join(root, "**", "*.txt"), recursive=True))

print("\n--- OUTPUT SUMMARY ---")
print("hands_parquet:", HANDS_DIR_FULL, "| parquet:", count_parquet(HANDS_DIR_FULL))
print("player_hands_parquet:", PH_DIR_FULL, "| parquet:", count_parquet(PH_DIR_FULL))
print("timeline_parquet:", TIMELINE_DIR_FULL, "| parquet:", count_parquet(TIMELINE_DIR_FULL))
print("sample_csv:", SAMPLE_CSV, "| exists:", os.path.exists(SAMPLE_CSV))
print("bulk_txt:", EXPORT_BULK_DIR, "| txt:", count_txt(EXPORT_BULK_DIR))
print("structured hands:", STRUCT_HANDS_DIR, "| parquet:", count_parquet(STRUCT_HANDS_DIR))
print("structured actions:", STRUCT_ACT_DIR, "| parquet:", count_parquet(STRUCT_ACT_DIR))


--- OUTPUT SUMMARY ---
hands_parquet: /content/drive/MyDrive/pokerstars_clean/follow_20pct_full/hands_parquet | parquet: 279
player_hands_parquet: /content/drive/MyDrive/pokerstars_clean/follow_20pct_full/player_hands_parquet | parquet: 1528
timeline_parquet: /content/drive/MyDrive/pokerstars_clean/follow_20pct_full/timeline_parquet | parquet: 19290
sample_csv: /content/drive/MyDrive/pokerstars_clean/follow_20pct_full/player_transcripts_bulk_300p_200h/sample_players_balanced.csv | exists: True
bulk_txt: /content/drive/MyDrive/pokerstars_clean/follow_20pct_full/player_transcripts_bulk_300p_200h | txt: 1469
structured hands: /content/drive/MyDrive/pokerstars_clean/follow_20pct_full/player_transcripts_bulk_300p_200h/_structured/hands_parquet | parquet: 13
structured actions: /content/drive/MyDrive/pokerstars_clean/follow_20pct_full/player_transcripts_bulk_300p_200h/_structured/actions_parquet | parquet: 7
