In [1]:
# ================================================================
# セル：ALL-IN-ONE  (依存パッケージ → ファイル取得 → 循環対策 → pytest)
# ================================================================
import os, sys, subprocess, shlex, textwrap, importlib, shutil
from google.colab import drive

# ---------- 0. 外部ライブラリをインストール ---------- #
subprocess.run(shlex.split(
    "pip -q install mahjong pytest requests beautifulsoup4"
), check=True)   # mahjong ライブラリが無いと shanten_calc が失敗する :contentReference[oaicite:0]{index=0}

# ---------- 1. Google Drive をマウント ---------- #
drive.mount("/content/drive")

# ---------- 2. パッケージ用ディレクトリを用意 ---------- #
package_dir = "/content/drive/MyDrive/mahjong_py"   # パッケージ名に合わせる
os.makedirs(package_dir, exist_ok=True)
if "/content/drive/MyDrive" not in sys.path:
    sys.path.append("/content/drive/MyDrive")

# ---------- 3. GitHub から実装 + テストを取得 ---------- #
files = [
    "__init__.py","analyzer.py","converters.py","display_agari_fixed.py",
    "display_call.py","display_discard_hand_at.py","display_dora_fixed.py",
    "display_handflow.py","display_reach_fixed.py","display_ryuukyoku.py",
    "parser.py","shanten_calc.py","splitter.py","utils.py",
    # --- tests ---
    "test_analyzer.py","test_converters.py","test_display_agari_fixed.py",
    "test_display_call.py","test_display_discard_hand_at.py","test_display_dora_fixed.py",
    "test_display_handflow.py","test_display_reach_fixed.py","test_display_ryuukyoku_info.py",
    "test_init.py","test_parser.py","test_shanten_calc.py","test_splitter.py"
]
base_url = "https://raw.githubusercontent.com/furapotedesu/tenhouhaihu/main/"
for f in files:
    dest = f"{package_dir}/{f}"
    subprocess.run(shlex.split(f"wget -q {base_url}{f} -O {dest}"), check=True)

# ---------- 4. 循環 import を防ぐ __init__.py を強制生成 ---------- #
init_src = textwrap.dedent("""
    # auto-generated to avoid circular imports
    from . import utils          # utils を最初に
    from . import converters
    from . import splitter
    from . import parser
    from . import analyzer       # utils の後
    from . import display_handflow
    from . import display_agari_fixed
    from . import display_reach_fixed
    from . import display_ryuukyoku
    from . import display_dora_fixed
    from . import display_call
    from . import display_discard_hand_at
    from . import shanten_calc
""").lstrip()
with open(f"{package_dir}/__init__.py", "w", encoding="utf-8") as f:
    f.write(init_src)

# utils.py が空ファイルなら 1 行 stub を入れておく
utils_path = f"{package_dir}/utils.py"
if os.path.exists(utils_path) and os.path.getsize(utils_path) == 0:
    with open(utils_path, "w", encoding="utf-8") as f:
        f.write('"""stub utils"""\n')

# ---------- 5. インポートキャッシュ & __pycache__ をクリア ---------- #
for m in list(sys.modules):
    if m.startswith("mahjong_py"):
        del sys.modules[m]
importlib.invalidate_caches()
for root, dirs, _ in os.walk(package_dir):
    for d in dirs:
        if d == "__pycache__":
            shutil.rmtree(os.path.join(root, d))

# ---------- 6. pytest を実行 ---------- #
subprocess.run(shlex.split(
    f'pytest -q "{package_dir}"'
), check=True)


Mounted at /content/drive


CompletedProcess(args=['pytest', '-q', '/content/drive/MyDrive/mahjong_py'], returncode=0)

In [3]:


# ================= 選択UI =================
print("どの方法で牌譜データを取得しますか？")
print("い：HTMLファイルをアップロードする")
print("ろ：年度別ZIPファイルをダウンロードする")
print("は：最近の牌譜（list.cgi）を取得する")

choice = ""
while choice not in {"い", "ろ", "は"}:
    choice = input("選択肢を入力してください（い・ろ・は）：").strip()

# ================= 「い」選択時：HTMLアップロード処理 =================
if choice == "い":
    print("📂 HTMLファイルをアップロードしてください（例：tenhou_logs.html）")
    uploaded = files.upload()
    html_filename = list(uploaded.keys())[0]

    with open(html_filename, 'r', encoding='utf-8') as f:
        soup = BeautifulSoup(f, 'html.parser')

    links = set()
    for a in soup.find_all('a', href=True):
        href = a['href']
        if href.startswith("https://tenhou.net/"):
            links.add(href)

    links = sorted(links)
    print(f"✅ tenhouリンク抽出完了：{len(links)} 件")

    with open("converted_links.txt", "w", encoding="utf-8") as f:
        for url in links:
            f.write(url + "\n")

    print("✅ converted_links.txt を保存しました（この後 STEP3〜5 に自動接続）")

# ================= 「ろ」選択時：年度別ZIP処理 =================
elif choice == "ろ":
    year = input("何年の牌譜をダウンロードしますか？（例：2023）：").strip()
    filename = f"scraw{year}.zip"
    url = f"https://tenhou.net/sc/raw/{filename}"

    save_dir = f"tenhou_logs_{year}"
    os.makedirs(save_dir, exist_ok=True)
    zip_path = os.path.join(save_dir, filename)

    print(f"⬇️ {filename} をダウンロードしています...")
    r = requests.get(url, headers={"User-Agent": "Mozilla/5.0"})

    if r.status_code == 200 and len(r.content) > 1000:
        with open(zip_path, "wb") as f:
            f.write(r.content)
        print(f"✅ ダウンロード完了：{zip_path}")
    else:
        raise Exception(f"❌ ダウンロード失敗 or 空ファイル（{r.status_code}）")

    extract_dir = os.path.join(save_dir, "unpacked")
    os.makedirs(extract_dir, exist_ok=True)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_dir)
    print(f"✅ 解凍完了：{extract_dir}")

    html_dir = os.path.join(extract_dir, "htmls")
    os.makedirs(html_dir, exist_ok=True)
    count = 0

    for root, _, files_in_dir in os.walk(extract_dir):
        for file in files_in_dir:
            if file.startswith("scc") and file.endswith(".gz"):
                gz_path = os.path.join(root, file)
                out_path = os.path.join(html_dir, file[:-3])
                with gzip.open(gz_path, 'rb') as f_in, open(out_path, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
                    count += 1

    print(f"✅ {count} 件の .gz を .html に変換")

    raw_links = []
    pattern = r"https?://tenhou\.net/0/\?log=[\w\-]+"
    for file in os.listdir(html_dir):
        if file.endswith(".html"):
            path = os.path.join(html_dir, file)
            with open(path, encoding="utf-8", errors="ignore") as f:
                text = f.read()
                found = re.findall(pattern, text)
                raw_links.extend(found)

    print(f"✅ リンク抽出：{len(raw_links)} 件")

    with open("converted_links.txt", "w", encoding="utf-8") as f:
        for url in raw_links:
            f.write(url + "\n")

    print("✅ converted_links.txt を保存しました（この後 STEP3〜5 に自動接続）")

# ================= 「は」選択時：list.cgi から .gz を処理 =================
elif choice == "は":
    print("最近の牌譜のどちらを取得しますか？")
    print("1：直近7日間のリスト")
    print("2：全過去分（list.cgi?old）")
    list_choice = ""
    while list_choice not in {"1", "2"}:
        list_choice = input("選択肢を入力してください（1 または 2）：").strip()

    base_url = "https://tenhou.net/sc/raw/"
    if list_choice == "1":
        list_url = base_url + "list.cgi"
        list_name = "tenhou_list_current.txt"
    else:
        list_url = base_url + "list.cgi?old"
        list_name = "tenhou_list_old.txt"

    print(f"⬇️ {list_url} を取得中...")
    r = requests.get(list_url, headers={"User-Agent": "Mozilla/5.0"})
    if r.status_code != 200 or len(r.text) < 1000:
        raise Exception("❌ list取得失敗")

    with open(list_name, "w", encoding="utf-8") as f:
        f.write(r.text)

    print(f"✅ list保存完了：{list_name}")

    # ✅ 現在のlist形式に合った正規表現で抽出
    with open(list_name, "r", encoding="utf-8", errors="ignore") as f:
        text = f.read()

    matches = re.findall(r"file:'(scc\d{10}\.html\.gz)'", text)
    gz_files = sorted(set(matches))
    print(f"✅ sccファイル数：{len(gz_files)} 件（例：{gz_files[:3]})")

    # 保存ディレクトリ準備
    os.makedirs("list_mode/htmls", exist_ok=True)
    converted_links = []

    for filename in gz_files:                     # ← ★コロンを追加★
        gz_url = f"{base_url}dat/{filename}"      # ✅ 年ディレクトリなし
        local_gz = filename
        local_html = filename[:-3]

        try:
            r = requests.get(gz_url, headers={"User-Agent": "Mozilla/5.0"}, timeout=10)

            if r.status_code != 200:
                print(f"⚠️ スキップ（HTTP {r.status_code}）：{gz_url}")
                continue

            if len(r.content) < 500:
                print(f"⚠️ スキップ（小さすぎる）：{gz_url}（{len(r.content)} bytes）")
                continue

            with open(local_gz, "wb") as f:
                f.write(r.content)
            print(f"✅ ダウンロード成功：{gz_url}")

            with gzip.open(local_gz, "rb") as f_in, open(local_html, "wb") as f_out:
                shutil.copyfileobj(f_in, f_out)
            print(f"✅ 解凍成功：{local_html}")

            with open(local_html, encoding="utf-8", errors="ignore") as f:
                html = f.read()

            found = re.findall(r"https?://tenhou\.net/0/\?log=[\w\-]+", html)
            if found:
                print(f"✅ ログ抽出成功（{filename}）：{len(found)} 件")
                converted_links.extend(found)

            os.remove(local_gz)
            os.remove(local_html)

        except Exception as e:
            print(f"⚠️ 処理エラー: {filename} → {e}")

    # URL保存
    with open("converted_links.txt", "w", encoding="utf-8") as f:
        for url in converted_links:
            if url.startswith("http"):
                f.write(url + "\n")

    print("✅ converted_links.txt を保存しました（この後 STEP3〜5 に自動接続）")


# ================= STEP3：正規化 =================
with open("converted_links.txt", "r", encoding="utf-8") as f:
    raw_links = [
        line.strip()
        for line in f
        if line.strip().startswith("http") and "tenhou.net" in line
    ]

normalized_links = []
for url in raw_links:
    url = url.replace("/tenhou.net/3/", "/tenhou.net/0/")
    url = re.sub(r"\?log=", "log/?", url)
    url = re.sub(r"&tw=[0-3]", "", url)
    normalized_links.append(url)

print("✅ 正規化後リンク（例）：")
print("\n".join(normalized_links[:5]))
print(f"✅ 有効リンク数：{len(normalized_links)} 件（全件処理）")

# ================= STEP4：mjlog ダウンロード =================
output_filename = "mjlog_all.txt"

with open(output_filename, "w", encoding="utf-8") as out_file:
    for i, url in enumerate(normalized_links):
        match = re.search(r"(?:\?|/)log[\/=](.+)", url)
        log_id = match.group(1) if match else f"log_{i+1}"

        try:
            response = requests.get(url, headers={"User-Agent": "Mozilla/5.0"})
            response.raise_for_status()
            content = response.text.strip()

            out_file.write(content + "\n")
            out_file.write("=" * 80 + "\n")

            print(f"✅ [{i+1}/10] 保存成功：{log_id}")
        except Exception as e:
            print(f"⚠️ エラー：{log_id} → {e}")

        time.sleep(1)

print(f"✅ 保存完了：{output_filename}")

# ================= STEP5：UNタグ デコード =================
def multi_url_decode(s, max_times=5):
    for _ in range(max_times):
        decoded = urllib.parse.unquote(s)
        if decoded == s:
            break
        s = decoded
    return s

input_filename = output_filename
output_decoded = "mjlog_all_decoded.txt"

with open(input_filename, "r", encoding="utf-8") as f:
    content = f.read()

logs = content.split("=" * 80)
decoded_logs = []

for log in logs:
    if "<UN" not in log:
        decoded_logs.append(log)
        continue

    def replace_names(match):
        attr_str = match.group(1)
        for i in range(4):
            attr_str = re.sub(
                rf'n{i}="([^"]+)"',
                lambda m: f'n{i}="{multi_url_decode(m.group(1))}"',
                attr_str
            )
        return f"<UN {attr_str}>"

    log = re.sub(r"<UN\s+([^>]+)>", replace_names, log)
    decoded_logs.append(log)

with open(output_decoded, "w", encoding="utf-8") as f:
    f.write(("=" * 80 + "\n").join(decoded_logs))

print(f"🎉 復号完了：{output_decoded}")


どの方法で牌譜データを取得しますか？
い：HTMLファイルをアップロードする
ろ：年度別ZIPファイルをダウンロードする
は：最近の牌譜（list.cgi）を取得する


KeyboardInterrupt: Interrupted by user