# nf_loto_platform 運用確認ノートブック
（ops / 検証 / ログ・生成物チェック用）

このノートブックは、`nf_loto_platform` 一式がローカル環境で正しく動作しているかを、
**運用視点・機能視点・テストケース視点・生成物／ログ視点**から段階的に確認するためのものです。

## このノートブックでできること

- DB 接続／メタデータテーブルの存在確認・初期化
- 特徴量生成パイプラインのスモークテスト
- AutoML / モデル実行バックエンドの最小実行
- ログ・メトリクス・HTML レポートなどの生成物確認
- pytest を使った回帰テストの一部実行

**重要:**  
このノートブックは *設計上* すべてのセルを順番に実行できるようにしてありますが、
実際の DB 接続先や GPU/ライブラリなどの環境依存があるため、
必要に応じてセルをスキップ・編集しながら使ってください。

## 0. 環境セットアップ

まず、プロジェクトルートと `src` を Python パスに通し、バージョン情報を確認します。

In [1]:
from pathlib import Path
import sys
import os

# プロジェクトルート（このノートブックが置かれている notebooks/ の 1 つ上）
NOTEBOOK_PATH = Path.cwd()
PROJECT_ROOT = NOTEBOOK_PATH.parent
SRC_ROOT = PROJECT_ROOT / "src"

print("NOTEBOOK_PATH:", NOTEBOOK_PATH)
print("PROJECT_ROOT :", PROJECT_ROOT)
print("SRC_ROOT     :", SRC_ROOT)

if str(SRC_ROOT) not in sys.path:
    sys.path.insert(0, str(SRC_ROOT))
    print("=> sys.path に SRC_ROOT を追加しました")

# バージョン情報など（必要に応じて追加）
import platform
print("Python :", sys.version)
print("Platform:", platform.platform())

try:
    import nf_loto_platform
    print("nf_loto_platform version: package import OK")
except Exception as e:  # noqa: BLE001
    print("nf_loto_platform import error:", e)


NOTEBOOK_PATH: c:\nf\nf_loto_workspace_ws4_tsfm_ai_scientist_augmented_v4\notebooks
PROJECT_ROOT : c:\nf\nf_loto_workspace_ws4_tsfm_ai_scientist_augmented_v4
SRC_ROOT     : c:\nf\nf_loto_workspace_ws4_tsfm_ai_scientist_augmented_v4\src
=> sys.path に SRC_ROOT を追加しました
Python : 3.11.13 | packaged by Anaconda, Inc. | (main, Jun  5 2025, 13:03:15) [MSC v.1929 64 bit (AMD64)]
Platform: Windows-10-10.0.26200-SP0
nf_loto_platform version: package import OK


## A. DB・テーブルの確認／初期化／セットアップ

このセクションでは、以下を順番に行います。

1. DB 設定ファイル（`config/db.yaml` or `db.yaml.template`）の確認
2. `nf_loto_platform.db.postgres_manager` を使った接続テスト
3. `sql/001_create_nf_model_run_tables.sql` などの DDL の確認
4. 必要に応じて `setup_postgres` を使ったテーブル初期化

実際の接続情報は、あなたのローカル環境の設定に合わせて編集してください。

### A-1. DB 設定ファイルの読み込みと確認

In [2]:
from nf_loto_platform.core import settings

db_cfg = settings.load_db_config()
print("DB 設定:", db_cfg)

if not db_cfg:
    print("""[注意] config/db.yaml または db.yaml.template から有効な設定が読み込めていません。
      このノートブックを進める前に、DB 設定を更新してください。
""")


DB 設定: {'host': 'localhost', 'port': 5432, 'database': 'postgres', 'user': 'postgres', 'password': 'z'}


### A-2. PostgreSQL 接続テスト

In [3]:
from nf_loto_platform.db import postgres_manager

conn = None
try:
    conn = postgres_manager.get_connection()
    print("接続成功:", conn)
    with conn.cursor() as cur:
        cur.execute("SELECT version();")
        ver = cur.fetchone()
        print("PostgreSQL version:", ver)
except Exception as e:  # noqa: BLE001
    print("[接続エラー] DB に接続できませんでした:", e)
finally:
    if conn is not None:
        conn.close()


接続成功: <connection object at 0x0000025F6AD21360; dsn: 'user=postgres password=xxx dbname=postgres host=127.0.0.1 port=5432', closed: 0>
PostgreSQL version: ('PostgreSQL 17.6 on x86_64-windows, compiled by msvc-19.44.35217, 64-bit',)


### A-3. nf_* メタデータテーブルの存在確認

In [4]:
from nf_loto_platform.db_metadata import schema_definitions

NF_TABLES = [
    schema_definitions.NF_MODEL_RUNS_TABLE,
    schema_definitions.NF_MODEL_REGISTRY_TABLE,
    schema_definitions.NF_FEATURE_SETS_TABLE,
    schema_definitions.NF_DATASETS_TABLE,
    schema_definitions.NF_DRIFT_METRICS_TABLE,
    schema_definitions.NF_RESIDUAL_ANOMALIES_TABLE,
    schema_definitions.NF_REPORTS_TABLE,
]

print("チェック対象テーブル:", NF_TABLES)

def list_tables():
    from nf_loto_platform.db import postgres_manager
    with postgres_manager.get_connection() as conn:
        with conn.cursor() as cur:
            cur.execute("""
                SELECT tablename
                FROM pg_catalog.pg_tables
                WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
                ORDER BY tablename;
            """)
            return [r[0] for r in cur.fetchall()]

try:
    existing = list_tables()
    print("既存テーブル一覧:", existing)
    missing = [t for t in NF_TABLES if t not in existing]
    if missing:
        print("[注意] 足りないテーブル:", missing)
    else:
        print("すべてのメタデータテーブルが存在します。")
except Exception as e:  # noqa: BLE001
    print("[エラー] テーブル一覧取得に失敗:", e)


チェック対象テーブル: ['nf_model_runs', 'nf_model_registry', 'nf_feature_sets', 'nf_datasets', 'nf_drift_metrics', 'nf_residual_anomalies', 'nf_reports']
既存テーブル一覧: ['accuracy_metrics', 'datasets', 'directional_metrics', 'experiments', 'generalization_analysis', 'logs', 'model_artifacts', 'model_metadata', 'nf_calibration', 'nf_ckpt', 'nf_config', 'nf_dataset_profile', 'nf_dm_test', 'nf_drift', 'nf_eval_interval', 'nf_eval_point', 'nf_forecasts', 'nf_hparams', 'nf_loto_final', 'nf_metrics', 'nf_model', 'nf_model_complexity', 'nf_model_diagnosis', 'nf_model_profile', 'nf_model_runs', 'nf_models', 'nf_optimization_suggestions', 'nf_parameter_sensitivity', 'nf_pkl', 'nf_predictions', 'nf_quantiles', 'nf_residual_stats', 'nf_run', 'nf_runs', 'nf_series', 'nf_training_state', 'nf_weight_statistics', 'notifications', 'opsmon_db_sizes', 'opsmon_hba_rules', 'opsmon_indexes_unused', 'opsmon_locks_wait', 'opsmon_nf_params_ckpt', 'opsmon_nf_params_class_default', 'opsmon_nf_params_config', 'opsmon_nf_params

### A-4. DDL の実行によるメタデータテーブル初期化

必要であれば、`nf_loto_platform.db.setup_postgres` を経由して
`sql/001_create_nf_model_run_tables.sql` などの DDL を実行し、メタデータテーブルを作成します。

> ⚠ **注意:** 本番環境で実行する場合は、必ず事前にバックアップを取得してから実行してください。

In [5]:
from nf_loto_platform.db import setup_postgres

# 実行前に本当に良いかどうかのフラグ
EXECUTE_DDL = False  # True にすると DDL を実行

if EXECUTE_DDL:
    try:
        setup_postgres.create_metadata_tables()
        print("メタデータテーブルの作成を完了しました。")
    except Exception as e:  # noqa: BLE001
        print("[エラー] DDL 実行に失敗:", e)
else:
    print("EXECUTE_DDL=False のため、DDL は実行していません。")
    print("DDL を実行したい場合は、このセル内のフラグを True に変更してください。")


EXECUTE_DDL=False のため、DDL は実行していません。
DDL を実行したい場合は、このセル内のフラグを True に変更してください。


## B. 特徴量生成パイプラインの確認

このセクションでは、ロト系のベーステーブルから特徴量テーブルを作るまでの流れを確認します。

- B-1. ベーステーブル（例: `nf_loto_final`）の行数・カラム確認
- B-2. `loto_etl` / `loto_etl_updated` による前処理スモークテスト
- B-3. 特徴量テーブル（futr/hist/stat_*）のスキーマ確認

※ 実際にどのテーブルから特徴量を生成するかは、あなたの DB のスキーマに合わせて変更してください。

### B-1. ベーステーブルの確認

In [6]:
import pandas as pd
from nf_loto_platform.db import postgres_manager

BASE_TABLE = "nf_loto_final"  # 必要に応じて変更

try:
    with postgres_manager.get_connection() as conn:
        q = f"SELECT * FROM {BASE_TABLE} ORDER BY 1 LIMIT 100;"
        df_base = pd.read_sql(q, conn)
    print("ベーステーブル:", BASE_TABLE)
    print("shape:", df_base.shape)
    display(df_base.head())
except Exception as e:  # noqa: BLE001
    print("[エラー] ベーステーブル読み込みに失敗:", e)
    df_base = None


ベーステーブル: nf_loto_final
shape: (100, 20)


  df_base = pd.read_sql(q, conn)


Unnamed: 0,loto,num,ds,unique_id,y,co,n1nu,n1pm,n2nu,n2pm,n3nu,n3pm,n4nu,n4pm,n5nu,n5pm,n6nu,n6pm,n7nu,n7pm
0,bingo5,1,2017-04-05,N1,1,0,2.0,8578800.0,51.0,265900.0,133.0,45900.0,593.0,16600.0,3797.0,2300.0,19682.0,600.0,183059.0,200.0
1,bingo5,1,2017-04-05,N2,10,0,2.0,8578800.0,51.0,265900.0,133.0,45900.0,593.0,16600.0,3797.0,2300.0,19682.0,600.0,183059.0,200.0
2,bingo5,1,2017-04-05,N3,13,0,2.0,8578800.0,51.0,265900.0,133.0,45900.0,593.0,16600.0,3797.0,2300.0,19682.0,600.0,183059.0,200.0
3,bingo5,1,2017-04-05,N4,19,0,2.0,8578800.0,51.0,265900.0,133.0,45900.0,593.0,16600.0,3797.0,2300.0,19682.0,600.0,183059.0,200.0
4,bingo5,1,2017-04-05,N5,23,0,2.0,8578800.0,51.0,265900.0,133.0,45900.0,593.0,16600.0,3797.0,2300.0,19682.0,600.0,183059.0,200.0


### B-2. ETL（loto_etl / loto_etl_updated）のスモークテスト

In [7]:
from nf_loto_platform.db import loto_etl, loto_etl_updated

# ここでは「関数が呼べるか／最低限の戻り値が得られるか」を確認するイメージです。
# 実際のシグネチャは実装に合わせて編集してください。

try:
    print("loto_etl モジュール:", loto_etl)
    print("loto_etl_updated モジュール:", loto_etl_updated)
    # 例:
    # etl_df = loto_etl.run_etl(limit=1000)
    # print("etl_df shape:", etl_df.shape)
except Exception as e:  # noqa: BLE001
    print("[エラー] ETL スモークテストに失敗:", e)


loto_etl モジュール: <module 'nf_loto_platform.db.loto_etl' from 'c:\\nf\\nf_loto_workspace_ws4_tsfm_ai_scientist_augmented_v4\\src\\nf_loto_platform\\db\\loto_etl.py'>
loto_etl_updated モジュール: <module 'nf_loto_platform.db.loto_etl_updated' from 'c:\\nf\\nf_loto_workspace_ws4_tsfm_ai_scientist_augmented_v4\\src\\nf_loto_platform\\db\\loto_etl_updated.py'>


### B-3. 特徴量テーブルのスキーマ確認

In [8]:
# 特徴量テーブル名は、あなたの環境の命名に合わせて変更してください。
FEATURE_TABLE = "nf_loto_features"  # 仮の例

try:
    from nf_loto_platform.db import postgres_manager
    import pandas as pd

    with postgres_manager.get_connection() as conn:
        q = f"SELECT * FROM {FEATURE_TABLE} ORDER BY 1 LIMIT 100;"
        df_feat = pd.read_sql(q, conn)
    print("特徴量テーブル:", FEATURE_TABLE)
    print("shape:", df_feat.shape)
    print("カラム一覧:", df_feat.columns.tolist())
    display(df_feat.head())
except Exception as e:  # noqa: BLE001
    print("[注意] 特徴量テーブルの読み込みに失敗:", e)
    print("まだ特徴量生成が未実行、またはテーブル名が環境と合っていない可能性があります。")


[注意] 特徴量テーブルの読み込みに失敗: Execution failed on sql 'SELECT * FROM nf_loto_features ORDER BY 1 LIMIT 100;': リレーション"nf_loto_features"は存在しません
LINE 1: SELECT * FROM nf_loto_features ORDER BY 1 LIMIT 100;
                      ^

まだ特徴量生成が未実行、またはテーブル名が環境と合っていない可能性があります。


  df_feat = pd.read_sql(q, conn)


## C. AutoML / モデル実行バックエンドの確認

このセクションでは、NeuralForecast AutoModels ベースの学習パイプラインが
最低限動作するかどうかを確認します。

- C-1. AutoModelBuilder のインポートと登録モデル一覧の確認
- C-2. 極小データセットでのテスト実行（オプション）

### C-1. AutoModelBuilder の確認

In [9]:
from nf_loto_platform.ml import automodel_builder

print("AutoModelBuilder モジュール:", automodel_builder)

# 登録済みモデル一覧を出すヘルパーがあれば利用し、無ければ dir() ベースで確認
candidates = [name for name in dir(automodel_builder) if "Auto" in name or "create" in name.lower()]
print("AutoModel 関連と思しきシンボル:", candidates)


AutoModelBuilder モジュール: <module 'nf_loto_platform.ml.automodel_builder' from 'c:\\nf\\nf_loto_workspace_ws4_tsfm_ai_scientist_augmented_v4\\src\\nf_loto_platform\\ml\\automodel_builder.py'>
AutoModel 関連と思しきシンボル: ['AutoLSTM', 'AutoMLP', 'AutoMLPMultivariate', 'AutoNBEATS', 'AutoNHITS', 'AutoPatchTST', 'AutoRNN', 'AutoTFT', 'AutoTimeMixer']


### C-2. 極小データセットでのテスト実行（オプション）

In [10]:
import pandas as pd
from datetime import datetime, timedelta

from nf_loto_platform.ml import model_runner

# 極小サンプルデータセットを構築（NeuralForecast 用の基本形式）
dates = [datetime(2020, 1, 1) + timedelta(days=i) for i in range(30)]
df_small = pd.DataFrame({
    "unique_id": ["series_1"] * len(dates),
    "ds": dates,
    "y": [i % 10 for i in range(30)],
})

print("サンプルデータ:")
display(df_small.head())

# 実際の model_runner の API に合わせて設定を組む必要があります。
# ここでは疑似コード／テンプレ的な例として示します。
RUN_TINY_EXPERIMENT = False  # True にすると実際に学習を試みる

if RUN_TINY_EXPERIMENT:
    try:
        result = model_runner.run_single_model_experiment(
            data=df_small,
            model_name="AutoNHITS",   # 実際にインストールされている Auto* 名に合わせて変更
            horizon=7,
            max_trials=1,
            timeout_s=60,
        )
        print("実験結果:", result)
    except Exception as e:  # noqa: BLE001
        print("[エラー] 極小実験の実行に失敗:", e)
else:
    print("RUN_TINY_EXPERIMENT=False のため、実際の学習は実行していません。")


サンプルデータ:


Unnamed: 0,unique_id,ds,y
0,series_1,2020-01-01,0
1,series_1,2020-01-02,1
2,series_1,2020-01-03,2
3,series_1,2020-01-04,3
4,series_1,2020-01-05,4


RUN_TINY_EXPERIMENT=False のため、実際の学習は実行していません。


## D. ログ・モニタリング・リソース利用の確認

ここでは、MLflow / W&B ロガー、および Prometheus / resource_monitor など
モニタリング系のコンポーネントを軽く叩いてみます。

### D-1. MLflow / W&B ロガーのインポート確認

In [11]:
from nf_loto_platform.logging_ext import mlflow_logger, wandb_logger

print("mlflow_logger モジュール:", mlflow_logger)
print("wandb_logger  モジュール:", wandb_logger)


mlflow_logger モジュール: <module 'nf_loto_platform.logging_ext.mlflow_logger' from 'c:\\nf\\nf_loto_workspace_ws4_tsfm_ai_scientist_augmented_v4\\src\\nf_loto_platform\\logging_ext\\mlflow_logger.py'>
wandb_logger  モジュール: <module 'nf_loto_platform.logging_ext.wandb_logger' from 'c:\\nf\\nf_loto_workspace_ws4_tsfm_ai_scientist_augmented_v4\\src\\nf_loto_platform\\logging_ext\\wandb_logger.py'>


### D-2. リソース利用スナップショットの取得

In [12]:
from nf_loto_platform.monitoring import resource_monitor

info = resource_monitor.collect_resource_usage()
print("resource_monitor.collect_resource_usage() ->")
for k, v in info.items():
    print(f"  {k}: {v}")


resource_monitor.collect_resource_usage() ->
  platform: Windows
  platform_release: 10
  timestamp: 1763361557.7006433
  cpu_percent: 28.8
  memory_total: 8258199552
  memory_used: 7382155264
  memory_percent: 89.4
  process_rss: 610942976
  process_vms: 1372717056


## E. WebUI / Streamlit 連携の確認

Streamlit WebUI 自体はノートブック内では直接起動しませんが、
少なくとも Python から import できるか／エントリポイントが見つかるかを確認します。

In [13]:
from pathlib import Path

streamlit_app_path = PROJECT_ROOT / "apps" / "webui_streamlit" / "streamlit_app.py"
runner_path = PROJECT_ROOT / "apps" / "webui_streamlit" / "nf_auto_runner_full.py"

print("streamlit_app.py :", streamlit_app_path, "exists:", streamlit_app_path.exists())
print("nf_auto_runner_full.py:", runner_path, "exists:", runner_path.exists())

print("""ローカルのシェルからは、例えば以下のように起動できます:
  streamlit run apps/webui_streamlit/streamlit_app.py
""")


streamlit_app.py : c:\nf\nf_loto_workspace_ws4_tsfm_ai_scientist_augmented_v4\apps\webui_streamlit\streamlit_app.py exists: True
nf_auto_runner_full.py: c:\nf\nf_loto_workspace_ws4_tsfm_ai_scientist_augmented_v4\apps\webui_streamlit\nf_auto_runner_full.py exists: True
ローカルのシェルからは、例えば以下のように起動できます:
  streamlit run apps/webui_streamlit/streamlit_app.py



## F. 生成物・ログ・メタデータの確認

このセクションでは、モデル実行後に蓄積されることを想定しているテーブルや
レポートファイルなどを確認します。

- F-1. `nf_model_runs` / `nf_model_metrics` などのメタデータテーブル
- F-2. HTML レポートの例


### F-1. nf_* メタデータテーブル中身の確認

In [14]:
import pandas as pd
from nf_loto_platform.db_metadata import schema_definitions
from nf_loto_platform.db import postgres_manager

tables_to_inspect = [
    schema_definitions.NF_MODEL_RUNS_TABLE,
    schema_definitions.NF_MODEL_METRICS_TABLE,
    schema_definitions.NF_DRIFT_METRICS_TABLE,
]

for tbl in tables_to_inspect:
    try:
        with postgres_manager.get_connection() as conn:
            q = f"SELECT * FROM {tbl} ORDER BY 1 LIMIT 20;"
            df = pd.read_sql(q, conn)
        print(f"テーブル {tbl}: shape={df.shape}")
        display(df.head())
    except Exception as e:  # noqa: BLE001
        print(f"[注意] テーブル {tbl} の読み込みに失敗:", e)


AttributeError: module 'nf_loto_platform.db_metadata.schema_definitions' has no attribute 'NF_MODEL_METRICS_TABLE'

### F-2. HTML レポートの表示例

In [15]:
from nf_loto_platform.reports import html_reporter

# シンプルなレポート文字列を作成
html = html_reporter.render_simple_report(title="nf_loto_platform レポート例", body="これはテスト用のレポートです。")

# 一時ファイルに書き出し
from pathlib import Path
tmp_report = PROJECT_ROOT / "tmp_nf_loto_report_example.html"
html_reporter.write_report(tmp_report, html)
print("HTML レポートを書き出しました:", tmp_report)

# Jupyter 上で簡易表示（本格的なレンダリングはブラウザでファイルを開いて確認）
from IPython.display import HTML
HTML(html)


HTML レポートを書き出しました: c:\nf\nf_loto_workspace_ws4_tsfm_ai_scientist_augmented_v4\tmp_nf_loto_report_example.html


## G. pytest による回帰テストの一部実行

最後に、ノートブックから pytest を呼び出して、
一部または全テストを実行するサンプルを示します。

> ⚠ 注意:  
> フルテストは時間がかかる場合があります。まずは静的テストや core 層など、スコープを絞って実行することを推奨します。

In [16]:
import subprocess
import sys

def run_pytest(args):
    cmd = [sys.executable, "-m", "pytest"] + args
    print("実行コマンド:", " ".join(cmd))
    result = subprocess.run(cmd, cwd=PROJECT_ROOT, text=True)
    return result.returncode

# 例1: 静的チェックのみ
# rc = run_pytest(["tests/static"])

# 例2: core / db / ml のみ
# rc = run_pytest(["tests/core", "tests/db", "tests/ml"])

print("必要なスコープを選んでコメントアウトを外し、実行してください。")


必要なスコープを選んでコメントアウトを外し、実行してください。


In [17]:
from urllib.parse import urlparse, parse_qs
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound


def extract_video_id(url: str) -> str:
    """
    YouTubeのURLからvideo_idを取り出す簡易関数。
    """
    parsed = urlparse(url)

    # 通常の https://www.youtube.com/watch?v=... パターン
    if parsed.hostname in ("www.youtube.com", "youtube.com", "m.youtube.com"):
        qs = parse_qs(parsed.query)
        if "v" in qs:
            return qs["v"][0]

    # 短縮URL https://youtu.be/xxxx のパターン
    if parsed.hostname in ("youtu.be",):
        return parsed.path.lstrip("/")

    raise ValueError(f"サポートしていないURL形式です: {url}")


def get_subtitles(url: str, languages=None):
    """
    指定したYouTube URLの字幕情報を取得する。

    Parameters
    ----------
    url : str
        YouTube動画のURL
    languages : list[str] | None
        優先して取得したい字幕の言語リスト（例: ['ja', 'ja-JP', 'en']）。
        Noneの場合、利用可能な字幕から自動選択される。

    Returns
    -------
    dict
        {
          "video_id": str,
          "language": str,
          "subtitles": [
              {"start": float, "duration": float, "text": str},
              ...
          ]
        }
    """
    video_id = extract_video_id(url)
    try:
        if languages is None:
            # 利用可能な字幕のうち、デフォルトを取得
            transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
            # 手動で優先順を決めたいなら、ここで選ぶ
            transcript = transcript_list.find_manually_created_transcript(
                [t.language_code for t in transcript_list]
            )
        else:
            # 指定言語の中から最初に見つかったものを取得
            transcript = YouTubeTranscriptApi.get_transcript(
                video_id, languages=languages
            )
            # get_transcriptはリストを返すので、そのままreturn用に整形する
            return {
                "video_id": video_id,
                "language": languages[0],  # 実際は返却された言語コードを使うのが安全
                "subtitles": transcript,
            }

        # transcript.fetch() でリストを取得
        data = transcript.fetch()
        return {
            "video_id": video_id,
            "language": transcript.language_code,
            "subtitles": data,
        }

    except TranscriptsDisabled:
        print("この動画では字幕が無効化されています。")
        return None
    except NoTranscriptFound:
        print("利用可能な字幕が見つかりませんでした。")
        return None


if __name__ == "__main__":
    url = "https://www.youtube.com/watch?v=D3DHSD2Wg2E&utm_source=chatgpt.com"
    result = get_subtitles(url, languages=["ja", "ja-JP", "en"])

    if result is not None:
        print(f"video id: {result['video_id']}")
        print(f"language: {result['language']}")
        print("---- subtitles ----")
        for item in result["subtitles"][:10]:  # 先頭10件だけ表示
            print(f"{item['start']:.2f}s ({item['duration']:.2f}s): {item['text']}")


AttributeError: type object 'YouTubeTranscriptApi' has no attribute 'get_transcript'

In [None]:
from urllib.parse import urlparse, parse_qs
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound


def extract_video_id(url: str) -> str:
    """
    YouTubeのURLからvideo_idを取り出す簡易関数。
    """
    parsed = urlparse(url)

    # 通常の https://www.youtube.com/watch?v=... パターン
    if parsed.hostname in ("www.youtube.com", "youtube.com", "m.youtube.com"):
        qs = parse_qs(parsed.query)
        if "v" in qs:
            return qs["v"][0]

    # 短縮URL https://youtu.be/xxxx のパターン
    if parsed.hostname in ("youtu.be",):
        return parsed.path.lstrip("/")

    raise ValueError(f"サポートしていないURL形式です: {url}")


def get_subtitles(url: str, languages=None):
    """
    指定したYouTube URLの字幕情報を取得する。

    Parameters
    ----------
    url : str
        YouTube動画のURL
    languages : list[str] | None
        優先して取得したい字幕の言語リスト（例: ['ja', 'ja-JP', 'en']）。
        Noneの場合、利用可能な字幕から自動選択される。

    Returns
    -------
    dict
        {
          "video_id": str,
          "language": str,
          "subtitles": [
              {"start": float, "duration": float, "text": str},
              ...
          ]
        }
    """
    video_id = extract_video_id(url)
    try:
        if languages is None:
            # 利用可能な字幕のうち、デフォルトを取得
            transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
            # 手動で優先順を決めたいなら、ここで選ぶ
            transcript = transcript_list.find_manually_created_transcript(
                [t.language_code for t in transcript_list]
            )
        else:
            # 指定言語の中から最初に見つかったものを取得
            transcript = YouTubeTranscriptApi.get_transcript(
                video_id, languages=languages
            )
            # get_transcriptはリストを返すので、そのままreturn用に整形する
            return {
                "video_id": video_id,
                "language": languages[0],  # 実際は返却された言語コードを使うのが安全
                "subtitles": transcript,
            }

        # transcript.fetch() でリストを取得
        data = transcript.fetch()
        return {
            "video_id": video_id,
            "language": transcript.language_code,
            "subtitles": data,
        }

    except TranscriptsDisabled:
        print("この動画では字幕が無効化されています。")
        return None
    except NoTranscriptFound:
        print("利用可能な字幕が見つかりませんでした。")
        return None


if __name__ == "__main__":
    url = "https://www.youtube.com/watch?v=D3DHSD2Wg2E&utm_source=chatgpt.com"
    result = get_subtitles(url, languages=["ja", "ja-JP", "en"])

    if result is not None:
        print(f"video id: {result['video_id']}")
        print(f"language: {result['language']}")
        print("---- subtitles ----")
        for item in result["subtitles"][:10]:  # 先頭10件だけ表示
            print(f"{item['start']:.2f}s ({item['duration']:.2f}s): {item['text']}")


AttributeError: type object 'YouTubeTranscriptApi' has no attribute 'get_transcript'

In [None]:
# 今の環境（kaiseki）をアクティブにした上で
!pip uninstall youtube-transcript-api -y
!pip install -U youtube-transcript-api


Found existing installation: youtube-transcript-api 1.2.3
Uninstalling youtube-transcript-api-1.2.3:
  Successfully uninstalled youtube-transcript-api-1.2.3
Collecting youtube-transcript-api
  Using cached youtube_transcript_api-1.2.3-py3-none-any.whl.metadata (24 kB)
Using cached youtube_transcript_api-1.2.3-py3-none-any.whl (485 kB)
Installing collected packages: youtube-transcript-api
Successfully installed youtube-transcript-api-1.2.3


In [None]:
import youtube_transcript_api
print(youtube_transcript_api.__version__)


AttributeError: module 'youtube_transcript_api' has no attribute '__version__'

In [None]:
from urllib.parse import urlparse, parse_qs
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound


def extract_video_id(url: str) -> str:
    """
    YouTubeのURLからvideo_idを取り出す簡易関数。
    """
    parsed = urlparse(url)

    # 通常の https://www.youtube.com/watch?v=... パターン
    if parsed.hostname in ("www.youtube.com", "youtube.com", "m.youtube.com"):
        qs = parse_qs(parsed.query)
        if "v" in qs:
            return qs["v"][0]

    # 短縮URL https://youtu.be/xxxx のパターン
    if parsed.hostname in ("youtu.be",):
        return parsed.path.lstrip("/")

    raise ValueError(f"サポートしていないURL形式です: {url}")


def get_subtitles(url: str, languages=None):
    """
    指定したYouTube URLの字幕情報を取得する。

    Parameters
    ----------
    url : str
        YouTube動画のURL
    languages : list[str] | None
        優先して取得したい字幕の言語リスト（例: ['ja', 'ja-JP', 'en']）。
        Noneの場合、利用可能な字幕から自動選択される。

    Returns
    -------
    dict | None
        {
          "video_id": str,
          "language": str,
          "subtitles": [
              {"start": float, "duration": float, "text": str},
              ...
          ]
        }
        字幕が無い場合は None。
    """
    video_id = extract_video_id(url)

    try:
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)

        if languages:
            # 指定言語から最初に見つかったもの
            transcript = transcript_list.find_transcript(languages)
        else:
            # 利用可能なうち、手動作成字幕を優先して取る例
            transcript = transcript_list.find_manually_created_transcript(
                [t.language_code for t in transcript_list]
            )

        data = transcript.fetch()
        return {
            "video_id": video_id,
            "language": transcript.language_code,
            "subtitles": data,
        }

    except TranscriptsDisabled:
        print("この動画では字幕が無効化されています。")
        return None
    except NoTranscriptFound:
        print("利用可能な字幕が見つかりませんでした。")
        return None
    except Exception as e:
        # デバッグ用に例外内容を出す
        print(f"予期せぬエラー: {repr(e)}")
        return None


if __name__ == "__main__":
    url = "https://www.youtube.com/watch?v=D3DHSD2Wg2E&utm_source=chatgpt.com"
    result = get_subtitles(url, languages=["ja", "ja-JP", "en"])

    if result is not None:
        print(f"video id: {result['video_id']}")
        print(f"language: {result['language']}")
        print("---- subtitles (first 10) ----")
        for item in result["subtitles"][:10]:
            print(f"{item['start']:.2f}s ({item['duration']:.2f}s): {item['text']}")


予期せぬエラー: AttributeError("type object 'YouTubeTranscriptApi' has no attribute 'list_transcripts'")


In [None]:
from urllib.parse import urlparse, parse_qs
from pathlib import Path

from youtube_transcript_api import (
    YouTubeTranscriptApi,
    TranscriptsDisabled,
    NoTranscriptFound,
    # VideoUnavailable なども必要に応じて追加できる
)


def extract_video_id(url: str) -> str:
    """
    YouTubeのURLから video_id を取り出す。
    """
    parsed = urlparse(url)

    # 通常の https://www.youtube.com/watch?v=... パターン
    if parsed.hostname in ("www.youtube.com", "youtube.com", "m.youtube.com"):
        qs = parse_qs(parsed.query)
        if "v" in qs:
            return qs["v"][0]

    # 短縮URL https://youtu.be/xxxx のパターン
    if parsed.hostname in ("youtu.be",):
        return parsed.path.lstrip("/")

    raise ValueError(f"サポートしていないURL形式です: {url}")


def get_subtitles(url: str, languages=None):
    """
    指定した YouTube URL の字幕情報を取得する（youtube-transcript-api v1 系対応版）

    Parameters
    ----------
    url : str
        YouTube 動画の URL
    languages : list[str] | None
        優先して取得したい字幕の言語リスト
        例: ['ja', 'ja-JP', 'en']
        None の場合はデフォルト（英語）が使われる

    Returns
    -------
    dict | None
        {
          "video_id": str,
          "language": str,
          "subtitles": [
              {"text": str, "start": float, "duration": float},
              ...
          ]
        }
        字幕を取得できなかった場合は None
    """
    video_id = extract_video_id(url)
    ytt_api = YouTubeTranscriptApi()

    try:
        # 新API: インスタンスメソッド fetch を使う
        fetched = ytt_api.fetch(video_id, languages=languages)
        # fetched は FetchedTranscript オブジェクトなので、
        # 生の list[dict] が欲しければ to_raw_data() を呼ぶ
        raw = fetched.to_raw_data()

        return {
            "video_id": video_id,
            "language": fetched.language_code,
            "subtitles": raw,  # [{"text":..., "start":..., "duration":...}, ...]
        }

    except TranscriptsDisabled:
        print("この動画では字幕が無効化されています。")
        return None
    except NoTranscriptFound:
        print("指定した言語で利用可能な字幕が見つかりませんでした。")
        return None
    except Exception as e:
        print(f"予期せぬエラーが発生しました: {repr(e)}")
        return None



def format_srt_timestamp(t: float) -> str:
    """
    秒(float) → SRT形式 HH:MM:SS,mmm に変換
    """
    hours = int(t // 3600)
    minutes = int((t % 3600) // 60)
    seconds = int(t % 60)
    milliseconds = int(round((t - int(t)) * 1000))
    return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}"


def save_as_srt(result: dict, out_dir: str = "subtitles") -> Path:
    """
    get_subtitles の結果 dict を .srt ファイルとして保存する。
    """
    video_id = result["video_id"]
    lang = result["language"]
    subtitles = result["subtitles"]

    out_path = Path(out_dir)
    out_path.mkdir(exist_ok=True)

    srt_path = out_path / f"{video_id}_{lang}.srt"

    with srt_path.open("w", encoding="utf-8") as f:
        for idx, item in enumerate(subtitles, start=1):
            start = item["start"]
            end = item["start"] + item["duration"]

            start_ts = format_srt_timestamp(start)
            end_ts = format_srt_timestamp(end)

            text = item["text"]

            f.write(f"{idx}\n")
            f.write(f"{start_ts} --> {end_ts}\n")
            f.write(f"{text}\n\n")

    return srt_path


if __name__ == "__main__":
    url = "https://www.youtube.com/watch?v=D3DHSD2Wg2E&utm_source=chatgpt.com"
    result = get_subtitles(url, languages=["ja", "ja-JP", "en"])

    if result is not None:
        print(f"video id: {result['video_id']}")
        print(f"language: {result['language']}")
        print("---- subtitles (first 10) ----")
        for item in result["subtitles"][:10]:
            print(f"{item['start']:.2f}s ({item['duration']:.2f}s): {item['text']}")

        srt_path = save_as_srt(result)
        print(f"SRTファイルとして保存しました: {srt_path}")


video id: D3DHSD2Wg2E
language: ja
---- subtitles (first 10) ----
0.00s (4.52s): インターネットを鍋だとするでネット上に
2.60s (4.08s): あるコンテンツをカレーだとするカレーは
4.52s (4.72s): 大きな鍋に収まっており我々ユーザーは
6.68s (5.12s): この鍋から具材をよそって味を楽しむ普段
9.24s (4.32s): 多くの人は新鮮で味が良いカレーを楽しむ
11.80s (4.44s): 今はいろんな人がいろんな味のカレーを
13.56s (5.40s): 提供してくれるからなだがその鍋を細かく
16.24s (4.56s): 観察していると鍋の奥底に何やら変な色を
18.96s (4.16s): した独特の匂いのカレーを見つけることが
20.80s (4.36s): あるその独特の匂いと色をしたカレーは
SRTファイルとして保存しました: subtitles\D3DHSD2Wg2E_ja.srt


In [None]:
import scrapetube

CHANNEL_ID = "UCQBxe2pLyaVGylZpck_WkOw"

def get_all_video_urls_scrapetube(channel_id: str):
    videos = scrapetube.get_channel(channel_id)
    urls = []

    for video in videos:
        video_id = video["videoId"]
        urls.append(f"https://www.youtube.com/watch?v={video_id}")

    return urls

if __name__ == "__main__":
    urls = get_all_video_urls_scrapetube(CHANNEL_ID)
    print(f"総件数: {len(urls)}")
    for url in urls:
        print(url)


総件数: 302
https://www.youtube.com/watch?v=lYpbcQAkJlI
https://www.youtube.com/watch?v=rLmwDHeiryQ
https://www.youtube.com/watch?v=-Mdg2zRXZ7g
https://www.youtube.com/watch?v=hp-hdi2NQRc
https://www.youtube.com/watch?v=mpFDEMOq_As
https://www.youtube.com/watch?v=1SauUV9GcTs
https://www.youtube.com/watch?v=rLCqiX7M9zU
https://www.youtube.com/watch?v=PEbRlW4TX0Y
https://www.youtube.com/watch?v=cRCS3P0NcQk
https://www.youtube.com/watch?v=rXheK50dTz0
https://www.youtube.com/watch?v=7qVhk1R8YZk
https://www.youtube.com/watch?v=Y4jgOrOZi90
https://www.youtube.com/watch?v=g-IRlNUEWSk
https://www.youtube.com/watch?v=jWDlvIsksgo
https://www.youtube.com/watch?v=8jMnJC5tlv4
https://www.youtube.com/watch?v=049OUztcJJo
https://www.youtube.com/watch?v=xufX05ns_sE
https://www.youtube.com/watch?v=D3DHSD2Wg2E
https://www.youtube.com/watch?v=8LMGasPDg38
https://www.youtube.com/watch?v=T7jklDwkB5E
https://www.youtube.com/watch?v=CRy_UJEwTrQ
https://www.youtube.com/watch?v=aUtAyvh6biE
https://www.youtube.com

In [None]:
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import (
    NoTranscriptFound,
    TranscriptsDisabled,
    VideoUnavailable,
    IpBlocked,
)

def get_subtitles(url: str, languages=None):
    """
    URL から字幕を取得して dict を返す（0.x / 1.x の両方に対応）

    返り値の形式:
      {
        "video_id": str,
        "language": str,
        "subtitles": [
          {"start": float, "duration": float, "text": str},
          ...
        ]
      }

    字幕がない/取れない場合は None を返す。
    IpBlocked は呼び出し元で処理したいので、そのまま再raiseする。
    """
    if languages is None:
        languages = ["ja", "ja-JP", "en"]

    video_id = extract_video_id(url)

    try:
        # --- バージョン差異をここで吸収する ---
        # 新API (1.x): インスタンスメソッド fetch() が存在する
        api = YouTubeTranscriptApi()
        if hasattr(api, "fetch"):
            # 1.x 系
            fetched = api.fetch(video_id, languages=languages)
            raw = fetched.to_raw_data()
            language_code = getattr(fetched, "language_code", languages[0])
        # 旧API (0.x): クラスメソッド get_transcript() が存在する
        elif hasattr(YouTubeTranscriptApi, "get_transcript"):
            raw = YouTubeTranscriptApi.get_transcript(video_id, languages=languages)
            language_code = languages[0]
        else:
            # どちらも無い謎バージョン
            raise RuntimeError(
                "対応していない youtube-transcript-api バージョンです。"
            )

    except IpBlocked:
        # IpBlocked は上位で捌きたいのでそのまま投げ直す
        raise
    except NoTranscriptFound as e:
        print(f"[{video_id}] 字幕が存在しません: {e}")
        return None
    except TranscriptsDisabled as e:
        print(f"[{video_id}] 字幕が無効化されています: {e}")
        return None
    except VideoUnavailable as e:
        print(f"[{video_id}] 動画が利用できません: {e}")
        return None
    except Exception as e:
        # ここに今出ている「type object 'YouTubeTranscriptApi' has no attribute 'get_transcript'」
        # も入っていましたが、新実装では発生しなくなります
        print(f"[{video_id}] 字幕取得中に予期せぬエラー: {e}")
        return None

    # ここまで来ていれば raw は「text, start, duration」を持つ dict のリスト
    subtitles = [
        {
            "start": float(item.get("start", 0.0)),
            "duration": float(item.get("duration", 0.0)),
            "text": item.get("text", ""),
        }
        for item in raw
    ]

    return {
        "video_id": video_id,
        "language": language_code,
        "subtitles": subtitles,
    }


In [None]:
import youtube_transcript_api
from youtube_transcript_api import YouTubeTranscriptApi
import inspect

print("module path:", youtube_transcript_api.__file__)
print("version   :", getattr(youtube_transcript_api, '__version__', 'unknown'))

attrs = [a for a in dir(YouTubeTranscriptApi) if 'transcript' in a.lower() or 'fetch' in a.lower()]
print("YouTubeTranscriptApi attrs:", attrs)

module path: c:\Users\hashimoto.ryohei\miniconda3\envs\kaiseki\Lib\site-packages\youtube_transcript_api\__init__.py
version   : unknown
YouTubeTranscriptApi attrs: ['fetch']


In [None]:
from tqdm.auto import tqdm
import time
import random
from youtube_transcript_api._errors import IpBlocked
import os
import time
import random
from tqdm.auto import tqdm

from requests.exceptions import ProxyError
from youtube_transcript_api._errors import IpBlocked

if __name__ == "__main__":
    # すでに処理済みの video_id を読み込み
    processed_ids = load_processed_ids()
    print(f"既に処理済みの動画本数: {len(processed_ids)}")

    ip_blocked = False

    # tqdm で全体の進捗を表示
    for url in tqdm(urls, desc="Downloading subtitles"):
        if ip_blocked:
            break

        video_id = extract_video_id(url)

        # 一度処理したものはスキップ
        if video_id in processed_ids:
            # print(f"[{video_id}] 既に処理済みのためスキップ: {url}")
            continue

        print(f"[{video_id}] 字幕取得開始: {url}")

        try:
            result = get_subtitles(url, languages=["ja", "ja-JP", "en"])
        except IpBlocked as e:
            # ここが IpBlocked 対応：長時間スリープせず、安全に中断
            print(f"[{video_id}] IpBlocked エラー発生: {e}")
            print(
                "YouTube 側でこの IP からのアクセスがブロック/強レート制限されています。\n"
                "時間をおく、ネットワークを変える、あるいはプロキシ利用などを検討してください。"
            )
            ip_blocked = True
            break

        if result is not None:
            print(f"video id: {result['video_id']}")
            print(f"language: {result['language']}")
            print("---- subtitles (first 10) ----")
            for item in result["subtitles"][:10]:
                print(
                    f"{item['start']:.2f}s ({item['duration']:.2f}s): {item['text']}"
                )

            srt_path = save_as_srt(result)
            print(f"SRTファイルとして保存しました: {srt_path}")

            # 正常終了したものは処理済みとして保存
            save_processed_id(video_id)
            processed_ids.add(video_id)
        else:
            print(f"[{video_id}] 字幕を取得できませんでした（スキップ）")
            # 何度も同じ動画でリトライしないよう、失敗も処理済みにしておく
            save_processed_id(video_id)
            processed_ids.add(video_id)

        # 連続アクセスによるブロックを避けるため、1〜3秒ランダムに待つ
        sleep_sec = random.uniform(1.0, 3.0)
        print(f"次の動画まで {sleep_sec:.2f} 秒待機します...")
        time.sleep(sleep_sec)

    if ip_blocked:
        print("IpBlocked により処理を中断しました。")
    else:
        print("すべての処理が完了しました。")


既に処理済みの動画本数: 25


Downloading subtitles:   0%|          | 0/302 [00:00<?, ?it/s]

[VfkugqpWuQk] 字幕取得開始: https://www.youtube.com/watch?v=VfkugqpWuQk
[VfkugqpWuQk] 字幕取得中に予期せぬエラー: HTTPSConnectionPool(host='www.youtube.com', port=443): Max retries exceeded with url: /watch?v=VfkugqpWuQk (Caused by ProxyError('Cannot connect to proxy.', OSError('Tunnel connection failed: 407 Proxy Authentication Required')))
[VfkugqpWuQk] 字幕を取得できませんでした（スキップ）
次の動画まで 2.28 秒待機します...
[ivDDg4pNVaw] 字幕取得開始: https://www.youtube.com/watch?v=ivDDg4pNVaw
[ivDDg4pNVaw] 字幕取得中に予期せぬエラー: HTTPSConnectionPool(host='www.youtube.com', port=443): Max retries exceeded with url: /watch?v=ivDDg4pNVaw (Caused by ProxyError('Cannot connect to proxy.', OSError('Tunnel connection failed: 407 Proxy Authentication Required')))
[ivDDg4pNVaw] 字幕を取得できませんでした（スキップ）
次の動画まで 1.26 秒待機します...
[vC51_eUUw80] 字幕取得開始: https://www.youtube.com/watch?v=vC51_eUUw80
[vC51_eUUw80] 字幕取得中に予期せぬエラー: HTTPSConnectionPool(host='www.youtube.com', port=443): Max retries exceeded with url: /watch?v=vC51_eUUw80 (Caused by ProxyError('Cannot connec

KeyboardInterrupt: 

In [None]:
if __name__ == "__main__":
    # ===== プロキシ環境変数をこのスクリプト内だけ無効化する =====
    for k in ["HTTP_PROXY", "HTTPS_PROXY", "http_proxy", "https_proxy"]:
        if k in os.environ:
            print(f"{k} を一時的に無効化します（値: {os.environ[k]!r}）")
            os.environ.pop(k, None)

    # すでに処理済みの video_id を読み込み
    processed_ids = load_processed_ids()
    print(f"既に処理済みの動画本数: {len(processed_ids)}")

    ip_blocked = False

    # tqdm で全体の進捗を表示
    for url in tqdm(urls, desc="Downloading subtitles"):
        if ip_blocked:
            break

        video_id = extract_video_id(url)

        # 一度処理したものはスキップ
        if video_id in processed_ids:
            # print(f"[{video_id}] 既に処理済みのためスキップ: {url}")
            continue

        print(f"[{video_id}] 字幕取得開始: {url}")

        try:
            result = get_subtitles(url, languages=["ja", "ja-JP", "en"])
        except IpBlocked as e:
            print(f"[{video_id}] IpBlocked エラー発生: {e}")
            print(
                "YouTube 側でこの IP からのアクセスがブロック/強レート制限されています。\n"
                "・短時間に大量リクエスト\n"
                "・クラウド系IP など\n"
                "が原因です。時間をおいてやり直してください。"
            )
            ip_blocked = True
            break
        except ProxyError as e:
            # もし get_subtitles が ProxyError を投げてくるようにした場合の保険
            print(f"[{video_id}] プロキシ経由の接続に失敗しました: {e}")
            print(
                "社内プロキシなどで 407 Proxy Authentication Required が返されています。\n"
                "このスクリプト内ではプロキシを無効化しましたが、それでも発生する場合は\n"
                "ネットワーク側の制約で YouTube への直接接続が許可されていない可能性があります。"
            )
            break

        if result is not None:
            print(f"video id: {result['video_id']}")
            print(f"language: {result['language']}")
            print("---- subtitles (first 10) ----")
            for item in result["subtitles"][:10]:
                print(
                    f"{item['start']:.2f}s ({item['duration']:.2f}s): {item['text']}"
                )

            srt_path = save_as_srt(result)
            print(f"SRTファイルとして保存しました: {srt_path}")

            # 正常終了したものは処理済みとして保存
            save_processed_id(video_id)
            processed_ids.add(video_id)
        else:
            print(f"[{video_id}] 字幕を取得できませんでした（スキップ）")
            # 何度も同じ動画でリトライしないよう、失敗も処理済みにしておく
            save_processed_id(video_id)
            processed_ids.add(video_id)

        # 連続アクセスによるブロックを避けるため、1〜3秒ランダムに待つ
        sleep_sec = random.uniform(1.0, 3.0)
        print(f"次の動画まで {sleep_sec:.2f} 秒待機します...")
        time.sleep(sleep_sec)

    if ip_blocked:
        print("IpBlocked により処理を中断しました。")
    else:
        print("すべての処理が完了しました。")



既に処理済みの動画本数: 33


Downloading subtitles:   0%|          | 0/302 [00:00<?, ?it/s]

[Et-w3m_zJZw] 字幕取得開始: https://www.youtube.com/watch?v=Et-w3m_zJZw
[Et-w3m_zJZw] 字幕取得中に予期せぬエラー: HTTPSConnectionPool(host='www.youtube.com', port=443): Max retries exceeded with url: /watch?v=Et-w3m_zJZw (Caused by ProxyError('Cannot connect to proxy.', OSError('Tunnel connection failed: 407 Proxy Authentication Required')))
[Et-w3m_zJZw] 字幕を取得できませんでした（スキップ）
次の動画まで 1.64 秒待機します...
[idk0dkRh7dM] 字幕取得開始: https://www.youtube.com/watch?v=idk0dkRh7dM
[idk0dkRh7dM] 字幕取得中に予期せぬエラー: HTTPSConnectionPool(host='www.youtube.com', port=443): Max retries exceeded with url: /watch?v=idk0dkRh7dM (Caused by ProxyError('Cannot connect to proxy.', OSError('Tunnel connection failed: 407 Proxy Authentication Required')))
[idk0dkRh7dM] 字幕を取得できませんでした（スキップ）
次の動画まで 2.09 秒待機します...
[P5M4UYN-eP8] 字幕取得開始: https://www.youtube.com/watch?v=P5M4UYN-eP8
[P5M4UYN-eP8] 字幕取得中に予期せぬエラー: HTTPSConnectionPool(host='www.youtube.com', port=443): Max retries exceeded with url: /watch?v=P5M4UYN-eP8 (Caused by ProxyError('Cannot connec

KeyboardInterrupt: 

In [None]:
import os
import zipfile

# ZIPにしたいフォルダ
folder_to_zip = r"C:\nf\nf_loto_workspace_ws4_patched\notebooks\subtitles"
# 出力するZIPファイルのパス
zip_path = r"C:\nf\nf_loto_workspace_ws4_patched\notebooks\subtitles.zip"

def zip_folder(folder_path: str, zip_path: str) -> None:
    """
    folder_path 配下のファイルをすべて zip_path にZIP圧縮する
    """
    with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        for root, dirs, files in os.walk(folder_path):
            for file in files:
                full_path = os.path.join(root, file)
                # ZIP内のパスをフォルダからの相対パスにする
                arcname = os.path.relpath(full_path, folder_path)
                zf.write(full_path, arcname)

if __name__ == "__main__":
    zip_folder(folder_to_zip, zip_path)
    print(f"ZIP を作成しました: {zip_path}")


ZIP を作成しました: C:\nf\nf_loto_workspace_ws4_patched\notebooks\subtitles.zip


In [None]:
!ls C:\nf\nf_loto_workspace_ws4_patched\notebooks\subtitles

-Mdg2zRXZ7g_ja.srt
049OUztcJJo_ja.srt
1SauUV9GcTs_ja.srt
2H2_ECUUkXs_ja.srt
5B86tKuO5HA_ja.srt
7qVhk1R8YZk_ja.srt
8LMGasPDg38_ja.srt
8jMnJC5tlv4_ja.srt
AiymWztVWu4_ja.srt
CRy_UJEwTrQ_ja.srt
D3DHSD2Wg2E_ja.srt
Et-w3m_zJZw_ja.srt
JKJnPwvybC0_ja.srt
ML-YNovq4cU_ja.srt
Nec_2Pe2cqg_ja.srt
P5M4UYN-eP8_ja.srt
PEbRlW4TX0Y_ja.srt
Pq__eEt7CtI_ja.srt
T7jklDwkB5E_ja.srt
VSuNR-SIX2A_ja.srt
VfkugqpWuQk_ja.srt
WmviqlWkIUg_ja.srt
Y4jgOrOZi90_ja.srt
a9kcdwR9U94_ja.srt
aUtAyvh6biE_ja.srt
cRCS3P0NcQk_ja.srt
eRRZkoQN9II_ja.srt
g-IRlNUEWSk_ja.srt
g7e-k1s0N2U_ja.srt
hPVlVeNl41U_ja.srt
hp-hdi2NQRc_ja.srt
iPAO3z6cmIE_ja.srt
idk0dkRh7dM_ja.srt
ivDDg4pNVaw_ja.srt
jGE8-lr3lAU_ja.srt
jWDlvIsksgo_ja.srt
jlD3OTSO3DE_ja.srt
kkSRKeIm_U4_ja.srt
lYpbcQAkJlI_ja.srt
mpFDEMOq_As_ja.srt
rLCqiX7M9zU_ja.srt
rLmwDHeiryQ_ja.srt
rXheK50dTz0_ja.srt
trx0X3jrKvY_ja.srt
vC51_eUUw80_ja.srt
wFHeNch87f0_ja.srt
wf4hh6CuSA0_ja.srt
xufX05ns_sE_ja.srt
