This notebook parses sgf files generated by the `match` command,
and loads them into a Pandas dataframe.

### Load libraries

In [19]:
import dataclasses
import os
import random
import re
from typing import List

import matplotlib.pyplot as plt
import pandas as pd
from sgfmill import sgf

from tqdm.auto import tqdm
from tqdm.contrib.concurrent import process_map

In [None]:
MATCH_DIR = "../tests/testdata/visits-truncated/"
# MATCH_DIR = "/nas/ucb/tony/go-attack/matches/visit-exp3"

RAW_SGF_STRS: List[str] = []

for sgf_file in tqdm([x for x in os.scandir(MATCH_DIR) if x.is_file()]):
    if sgf_file.name[-5:] != ".sgfs":
        continue
    with open(sgf_file.path) as f:
        for line in f.readlines():
            RAW_SGF_STRS.append(line.strip())

len(RAW_SGF_STRS)

In [None]:
@dataclasses.dataclass
class GameInfo:
    board_size: int
    gtype: str
    start_turn_idx: int
    init_turn_num: int
    used_initial_position: bool

    b_name: str
    w_name: str

    win_color: str
    win_name: str
    lose_name: str

    komi: float  # Positive if white has the advantage

    # Number of extra stones black places at start of game,
    # equivalent to the number of white passes at start of game.
    handicap: int

    is_continuation: bool  # Whether game is continuation of previous game

    # Total number of moves (including passes)
    num_moves: int

    # How many times each player passed
    num_b_pass: int
    num_w_pass: int

    ko_rule: str
    score_rule: str
    tax_rule: str
    sui_legal: bool
    has_button: bool
    whb: str  # whiteHandicapBonus
    fpok: bool  # friendly pass ok

    sgf_str: str  # raw sgf string


def get_game_info(sgf_str: str) -> GameInfo:
    sgf_game = sgf.Sgf_game.from_string(sgf_str)

    b_name = sgf_game.get_player_name("b")
    w_name = sgf_game.get_player_name("w")

    win_color = sgf_game.get_winner()
    win_name = {"b": b_name, "w": w_name, None: None}[win_color]
    lose_name = {"b": w_name, "w": b_name, None: None}[win_color]

    if win_color is None:
        win_score = 0
    else:
        win_score = float(sgf_game.get_root().get("RE").split("+")[1])

    komi = sgf_game.get_komi()
    handicap = int(sgf_game.root.get("HA"))

    def comment_prop(
        prop_name: str,
        default=None,
    ) -> str:
        comments = sgf_game.root.get("C")
        if prop_name not in comments:
            return default
        return comments.split(f"{prop_name}=")[1].split(",")[0]

    rule_str = sgf_game.root.get("RU")

    num_b_pass: int = sum(
        node.get_move() == ("b", None) for node in sgf_game.get_main_sequence()
    )
    num_w_pass: int = sum(
        node.get_move() == ("w", None) for node in sgf_game.get_main_sequence()
    )

    return GameInfo(
        board_size=sgf_game.get_size(),
        gtype=comment_prop("gtype"),
        start_turn_idx=int(comment_prop("startTurnIdx")),
        init_turn_num=int(comment_prop("initTurnNum")),
        used_initial_position=comment_prop("usedInitialPosition") == "1",
        b_name=b_name,
        w_name=w_name,
        win_color=win_color,
        win_name=win_name,
        lose_name=lose_name,
        komi=komi,
        handicap=handicap,
        is_continuation=sgf_game.get_root().has_setup_stones(),
        num_moves=len(sgf_game.get_main_sequence()) - 1,
        num_b_pass=num_b_pass,
        num_w_pass=num_w_pass,
        sgf_str=sgf_str,
        ko_rule=re.search(r"ko([A-Z]+)", rule_str).group(1),
        score_rule=re.search(r"score([A-Z]+)", rule_str).group(1),
        tax_rule=re.search(r"tax([A-Z]+)", rule_str).group(1),
        sui_legal=re.search(r"sui([0-9])", rule_str).group(1) == "1",
        has_button="button1" in rule_str,
        whb=re.search(r"whb([A-Z0-9\-]+)", rule_str).group(1)
        if "whb" in rule_str
        else "0",
        fpok="fpok" in rule_str,
    )


random.seed(42)
GAME_INFOS: List[GameInfo] = process_map(
    get_game_info,
    RAW_SGF_STRS,
    max_workers=64,
    chunksize=50,
)

In [None]:
%%time
df = pd.DataFrame([dataclasses.asdict(gi) for gi in GAME_INFOS])
print("gtypes:", df.gtype.unique())
print("Number of cleanup games:", (df.gtype == "cleanuptraining").sum())

# Filter to only normal games
df = df[(df.gtype == "normal")]
print("Fraction continuation:    ", df.is_continuation.mean())
print("Fraction used_initial_pos:", df.used_initial_position.mean())
print("max(init_turn_num)       :", df.init_turn_num.max())

df.head()

### Helper functions

In [24]:
@dataclasses.dataclass
class SGFUrl:
    sgf: str
    text: str

    def sgf_str_to_url(self, sgf_str: str) -> str:
        return f"https://humancompatibleai.github.io/sgf-viewer/#sgf={sgf_str}"

    def __post_init__(self):
        self.url = self.sgf_str_to_url(self.sgf)

    def _repr_html_(self):
        """HTML link to this URL."""
        return f'<a href="{self.url}">{self.text}</a>'

    def __str__(self):
        """Return the underlying string."""
        return self.url

### Analyze data

In [None]:
plt.subplot(1, 2, 1)
df[(df.b_name == "cp127-v1") & (df.w_name == "cp63-v1024")].win_color.hist()

plt.subplot(1, 2, 2)
df[(df.w_name == "cp127-v1") & (df.b_name == "cp63-v1024")].win_color.hist()

In [None]:
cur_df = df[(df.win_name == "cp63-v1024") & (df.lose_name == "cp127-v1")]
len(cur_df)

In [None]:
SGFUrl(sgf=cur_df.sgf_str.iloc[0], text="cp63-v1024 beats cp127-v1 (game2)")