In [None]:
# Copyright (c) Meta Platforms, Inc. and affiliates.

# SAM 3 ビデオセグメンテーションと追跡 (Video segmentation and tracking with SAM 3)

このノートブックでは、SAM 3を使用してインタラクティブなビデオセグメンテーションと高密度追跡（dense tracking）を行う方法を解説します。
以下の機能について学びます：

- **テキストプロンプト**: 自然言語（例：「person」、「shoe」）を使用してオブジェクトをセグメントします。
- **ポイントプロンプト**: ポジティブ/ネガティブクリックを追加して、オブジェクトをセグメントおよび修正します。

用語について：
- **セグメント (segment)** または **マスク (mask)**: 単一フレーム上のオブジェクトに対するモデルの予測結果を指します。
- **マスクレット (masklet)**: ビデオ全体にわたる時空間的なマスクを指します。

# <a target="_blank" href="https://colab.research.google.com/github/facebookresearch/sam3/blob/main/notebooks/sam3_video_predictor_example.ipynb">
#   <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
# </a>

## Google Colab セットアップ

Google Colabで実行する場合は、以下のセルを実行してください。
これにより、必要なライブラリのインストールと、データの読み込み準備が行われます。

In [None]:
# Google Colab環境のセットアップ
try:
    import google.colab
    IN_COLAB = True
    print("Google Colab環境で実行中")
except ImportError:
    IN_COLAB = False
    print("ローカル環境で実行中")

if IN_COLAB:
    # 1. Google Driveをマウント (推奨)
    # 自分のデータやコードを使用する場合は、Google Driveにアップロードしてマウントします
    from google.colab import drive
    drive.mount('/content/drive')
    
    # 2. SAM3のインストールとパス設定
    import os
    import sys
    
    # Google Drive内のSAM3ディレクトリのパス
    # ※ご自身の環境に合わせてパスを変更してください
    DRIVE_SAM3_PATH = "/content/drive/MyDrive/sam3"
    
    if os.path.exists(DRIVE_SAM3_PATH):
        print(f"Google Drive内のSAM3が見つかりました: {DRIVE_SAM3_PATH}")
        os.chdir(DRIVE_SAM3_PATH)
        print("カレントディレクトリを変更しました。")
        
        # 依存関係のインストール
        print("依存関係をインストールしています...")
        # Numpy 2.0との互換性問題を回避するためにバージョンを固定
        !pip install -q "numpy<2.0"
        !pip install -q -e .
        !pip install -q pycocotools
        
    else:
        print(f"Google Drive内に {DRIVE_SAM3_PATH} が見つかりませんでした。")
        print("GitHubからSAM3をクローンしてインストールします...")
        
        # GitHubからクローン
        if not os.path.exists("/content/sam3"):
            !git clone https://github.com/facebookresearch/sam3.git /content/sam3
            
        os.chdir("/content/sam3")
        print("カレントディレクトリを /content/sam3 に変更しました。")
        
        # 依存関係のインストール
        # Numpy 2.0との互換性問題を回避するためにバージョンを固定
        !pip install -q "numpy<2.0"
        !pip install -q -e .
        !pip install -q pycocotools
    
    print("セットアップが完了しました。")
    print(f"現在の作業ディレクトリ: {os.getcwd()}")

### データの使用について

- **Google Driveを使用する場合**: `GT_DIR` や `PRED_DIR` には `/content/drive/MyDrive/...` から始まるパスを指定してください。
- **GitHubからクローンした場合**: 左側のファイルブラウザから `/content/sam3` 内を確認できます。データは別途アップロードが必要です。

In [None]:
# Google Colab環境のセットアップ
try:
    import google.colab
    IN_COLAB = True
    print("Google Colab環境で実行中")
except ImportError:
    IN_COLAB = False
    print("ローカル環境で実行中")

if IN_COLAB:
    # 1. Google Driveをマウント (推奨)
    # 自分のデータやコードを使用する場合は、Google Driveにアップロードしてマウントします
    from google.colab import drive
    drive.mount('/content/drive')
    
    # 2. SAM3のインストールとパス設定
    import os
    import sys
    
    # Google Drive内のSAM3ディレクトリのパス
    # ※ご自身の環境に合わせてパスを変更してください
    DRIVE_SAM3_PATH = "/content/drive/MyDrive/sam3"
    
    if os.path.exists(DRIVE_SAM3_PATH):
        print(f"Google Drive内のSAM3が見つかりました: {DRIVE_SAM3_PATH}")
        os.chdir(DRIVE_SAM3_PATH)
        print("カレントディレクトリを変更しました。")
        
        # 依存関係のインストール
        print("依存関係をインストールしています...")
        !pip install -q -e .
        !pip install -q pycocotools
        
    else:
        print(f"Google Drive内に {DRIVE_SAM3_PATH} が見つかりませんでした。")
        print("GitHubからSAM3をクローンしてインストールします...")
        
        # GitHubからクローン
        if not os.path.exists("/content/sam3"):
            !git clone https://github.com/facebookresearch/sam3.git /content/sam3
            
        os.chdir("/content/sam3")
        print("カレントディレクトリを /content/sam3 に変更しました。")
        
        # 依存関係のインストール
        !pip install -q -e .
        !pip install -q pycocotools
    
    print("セットアップが完了しました。")
    print(f"現在の作業ディレクトリ: {os.getcwd()}")

### データの使用について

- **Google Driveを使用する場合**: `GT_DIR` や `PRED_DIR` には `/content/drive/MyDrive/...` から始まるパスを指定してください。
- **GitHubからクローンした場合**: 左側のファイルブラウザから `/content/sam3` 内を確認できます。データは別途アップロードが必要です。

In [None]:
using_colab = False

In [None]:
if using_colab:
    import torch
    import torchvision
    print("PyTorch version:", torch.__version__)
    print("Torchvision version:", torchvision.__version__)
    print("CUDA is available:", torch.cuda.is_available())
    import sys
    !{sys.executable} -m pip install opencv-python matplotlib scikit-learn
    !{sys.executable} -m pip install 'git+https://github.com/facebookresearch/sam3.git'

In [None]:
!nvidia-smi

## セットアップ (Set-up)

この例では、シングルGPUまたはマルチGPUでの推論が可能です。

In [None]:
import os
import sam3
import torch

sam3_root = os.path.join(os.path.dirname(sam3.__file__), "..")

# マシン上の利用可能なすべてのGPUを使用
gpus_to_use = range(torch.cuda.device_count())
# # シングルGPUのみを使用する場合
# gpus_to_use = [torch.cuda.current_device()]

In [None]:
from sam3.model_builder import build_sam3_video_predictor

predictor = build_sam3_video_predictor(gpus_to_use=gpus_to_use)

#### 推論と可視化のユーティリティ (Inference and visualization utils)

In [None]:
import glob
import os

import cv2
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from sam3.visualization_utils import (
    load_frame,
    prepare_masks_for_visualization,
    visualize_formatted_frame_output,
)

# 軸タイトルのフォントサイズ
plt.rcParams["axes.titlesize"] = 12
plt.rcParams["figure.titlesize"] = 12


def propagate_in_video(predictor, session_id):
    # フレーム0からビデオの終わりまで伝播します
    outputs_per_frame = {}
    for response in predictor.handle_stream_request(
        request=dict(
            type="propagate_in_video",
            session_id=session_id,
        )
    ):
        outputs_per_frame[response["frame_index"]] = response["outputs"]

    return outputs_per_frame


def abs_to_rel_coords(coords, IMG_WIDTH, IMG_HEIGHT, coord_type="point"):
    """絶対座標を相対座標（0-1の範囲）に変換します

    Args:
        coords: 座標のリスト
        coord_type: [x, y] の場合は 'point'、[x, y, w, h] の場合は 'box'
    """
    if coord_type == "point":
        return [[x / IMG_WIDTH, y / IMG_HEIGHT] for x, y in coords]
    elif coord_type == "box":
        return [
            [x / IMG_WIDTH, y / IMG_HEIGHT, w / IMG_WIDTH, h / IMG_HEIGHT]
            for x, y, w, h in coords
        ]
    else:
        raise ValueError(f"Unknown coord_type: {coord_type}")

### 動画の読み込み (Loading an example video)

動画は、**`<frame_index>.jpg` というファイル名のJPEGフレームのリスト**、または **MP4ビデオ** として保存されていることを想定しています。

ffmpeg (https://ffmpeg.org/) を使用してJPEGフレームを抽出するには、以下のようにします：
```
ffmpeg -i <your_video>.mp4 -q:v 2 -start_number 0 <output_dir>/'%05d.jpg'
```
ここで、`-q:v` は高品質のJPEGフレームを生成し、`-start_number 0` はffmpegに `00000.jpg` からJPEGファイルを開始するように指示します。

In [None]:
# "video_path" はJPEGフォルダまたはMP4ビデオファイルである必要があります
video_path = f"{sam3_root}/assets/videos/0001"

In [None]:
# 可視化のために "video_frames_for_vis" を読み込みます（モデルでは使用されません）
if isinstance(video_path, str) and video_path.endswith(".mp4"):
    cap = cv2.VideoCapture(video_path)
    video_frames_for_vis = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        video_frames_for_vis.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    cap.release()
else:
    video_frames_for_vis = glob.glob(os.path.join(video_path, "*.jpg"))
    try:
        # 文字列ソートではなく整数ソートを行います（例："2.jpg" が "11.jpg" の前に来るように）
        video_frames_for_vis.sort(
            key=lambda p: int(os.path.splitext(os.path.basename(p))[0])
        )
    except ValueError:
        # 形式が "<frame_index>.jpg" でない場合は辞書順ソートにフォールバックします
        print(
            f'frame names are not in "<frame_index>.jpg" format: {video_frames_for_vis[:5]=}, '
            f"falling back to lexicographic sort."
        )
        video_frames_for_vis.sort()

### 推論セッションの開始 (Opening an inference session on this video)

SAM 3はインタラクティブなビデオセグメンテーションのためにステートフルな推論を必要とするため、このビデオに対する **推論セッション** を初期化する必要があります。

初期化中に、すべてのビデオフレームを読み込み、そのピクセルをセッション状態に保存します。

In [None]:
response = predictor.handle_request(
    request=dict(
        type="start_session",
        resource_path=video_path,
    )
)
session_id = response["session_id"]

### テキストによるビデオ概念セグメンテーション (Video promptable concept segmentation with text)

SAM 3を使用すると、自然言語でオブジェクトを記述でき、モデルはビデオ全体を通してそのオブジェクトのすべてのインスタンスを自動的に検出して追跡します。

以下の例では、フレーム0にテキストプロンプトを追加し、ビデオ全体に伝播させます。ここでは、ビデオ内のすべての人を検出するために "person"（人）というテキストプロンプトを使用します。SAM 3は自動的に複数の人物インスタンスを識別し、それぞれに一意のオブジェクトIDを割り当てます。

バッファの設定のため、最初の呼び出しは遅くなる可能性があることに注意してください。**速度を測定する場合は、以下のすべてのセルを再実行できます。**

In [None]:
# 注意：すでに1つのテキストプロンプトを実行していて、別のテキストプロンプトに切り替えたい場合、
# まずセッションをリセットする必要があります（そうしないと結果が間違ったものになります）
_ = predictor.handle_request(
    request=dict(
        type="reset_session",
        session_id=session_id,
    )
)

In [None]:
prompt_text_str = "person"
frame_idx = 0  # フレーム0にテキストプロンプトを追加
response = predictor.handle_request(
    request=dict(
        type="add_prompt",
        session_id=session_id,
        frame_index=frame_idx,
        text=prompt_text_str,
    )
)
out = response["outputs"]

plt.close("all")
visualize_formatted_frame_output(
    frame_idx,
    video_frames_for_vis,
    outputs_list=[prepare_masks_for_visualization({frame_idx: out})],
    titles=["SAM 3 Dense Tracking outputs"],
    figsize=(6, 4),
)

In [None]:
# フレーム0からビデオの終わりまで出力を伝播し、すべての出力を収集します
outputs_per_frame = propagate_in_video(predictor, session_id)

# 最後に、可視化のために出力を再フォーマットし、60フレームごとに出力をプロットします
outputs_per_frame = prepare_masks_for_visualization(outputs_per_frame)

vis_frame_stride = 60
plt.close("all")
for frame_idx in range(0, len(outputs_per_frame), vis_frame_stride):
    visualize_formatted_frame_output(
        frame_idx,
        video_frames_for_vis,
        outputs_list=[outputs_per_frame],
        titles=["SAM 3 Dense Tracking outputs"],
        figsize=(6, 4),
    )

### オブジェクトの削除 (Removing objects)

IDを使用して個々のオブジェクトを削除できます。

例として、オブジェクト2（手前のダンサー）を削除してみましょう。

In [None]:
# 手前のダンサーであるID 2を選択します
obj_id = 2
response = predictor.handle_request(
    request=dict(
        type="remove_object",
        session_id=session_id,
        obj_id=obj_id,
    )
)

In [None]:
# フレーム0からビデオの終わりまで出力を伝播し、すべての出力を収集します
outputs_per_frame = propagate_in_video(predictor, session_id)

# 最後に、可視化のために出力を再フォーマットし、60フレームごとに出力をプロットします
outputs_per_frame = prepare_masks_for_visualization(outputs_per_frame)

vis_frame_stride = 60
plt.close("all")
for frame_idx in range(0, len(outputs_per_frame), vis_frame_stride):
    visualize_formatted_frame_output(
        frame_idx,
        video_frames_for_vis,
        outputs_list=[outputs_per_frame],
        titles=["SAM 3 Dense Tracking outputs"],
        figsize=(6, 4),
    )

### ポイントプロンプトによる新しいオブジェクトの追加 (Adding new objects with point prompts)

ポイントプロンプトを使用して新しいオブジェクトを追加できます。

気が変わって、手前のダンサー（先ほど削除した人物）を戻したいとします。インタラクティブなクリックを使用して彼女を戻すことができます。

In [None]:
sample_img = Image.fromarray(load_frame(video_frames_for_vis[0]))

IMG_WIDTH, IMG_HEIGHT = sample_img.size

In [None]:
# ポイントプロンプトでダンサーを戻しましょう。
# 1回のポジティブクリックを使用してダンサーを戻します。

frame_idx = 0
obj_id = 2
points_abs = np.array(
    [
        [760, 550],  # ポジティブクリック
    ]
)
# ポジティブクリックはラベル1、ネガティブクリックはラベル0です
labels = np.array([1])

In [None]:
# ポイントとラベルをテンソルに変換し、相対座標にも変換します
points_tensor = torch.tensor(
    abs_to_rel_coords(points_abs, IMG_WIDTH, IMG_HEIGHT, coord_type="point"),
    dtype=torch.float32,
)
points_labels_tensor = torch.tensor(labels, dtype=torch.int32)

response = predictor.handle_request(
    request=dict(
        type="add_prompt",
        session_id=session_id,
        frame_index=frame_idx,
        points=points_tensor,
        point_labels=points_labels_tensor,
        obj_id=obj_id,
    )
)
out = response["outputs"]

plt.close("all")
visualize_formatted_frame_output(
    frame_idx,
    video_frames_for_vis,
    outputs_list=[prepare_masks_for_visualization({frame_idx: out})],
    titles=["SAM 3 Dense Tracking outputs"],
    figsize=(6, 4),
    points_list=[points_abs],
    points_labels_list=[labels],
)

In [None]:
# フレーム0からビデオの終わりまで出力を伝播し、すべての出力を収集します
outputs_per_frame = propagate_in_video(predictor, session_id)

# 最後に、可視化のために出力を再フォーマットし、60フレームごとに出力をプロットします
outputs_per_frame = prepare_masks_for_visualization(outputs_per_frame)

vis_frame_stride = 60
plt.close("all")
for frame_idx in range(0, len(outputs_per_frame), vis_frame_stride):
    visualize_formatted_frame_output(
        frame_idx,
        video_frames_for_vis,
        outputs_list=[outputs_per_frame],
        titles=["SAM 3 Dense Tracking outputs"],
        figsize=(6, 4),
    )

### ポイントプロンプトによる既存オブジェクトの修正 (Refining an existing object with point prompts)

ポイントプロンプトを使用して、既存のオブジェクトのセグメンテーションマスクを修正することもできます。

（また）気が変わったと仮定して、オブジェクトID 2（先ほど戻した手前のダンサー）について、全身ではなくTシャツだけをセグメントしたいとします。いくつかのポジティブクリックとネガティブクリックでセグメンテーションマスクを調整できます。

In [None]:
# 手前のダンサーについて、全身ではなくTシャツだけをセグメントしたいとします
# 2つのポジティブクリックと2つのネガティブクリックを使用してシャツを選択します

frame_idx = 0
obj_id = 2
points_abs = np.array(
    [
        [740, 450],  # ポジティブクリック
        [760, 630],  # ネガティブクリック
        [840, 640],  # ネガティブクリック
        [760, 550],  # ポジティブクリック
    ]
)
# ポジティブクリックはラベル1、ネガティブクリックはラベル0です
labels = np.array([1, 0, 0, 1])

In [None]:
# ポイントとラベルをテンソルに変換し、相対座標にも変換します
points_tensor = torch.tensor(
    abs_to_rel_coords(points_abs, IMG_WIDTH, IMG_HEIGHT, coord_type="point"),
    dtype=torch.float32,
)
points_labels_tensor = torch.tensor(labels, dtype=torch.int32)

response = predictor.handle_request(
    request=dict(
        type="add_prompt",
        session_id=session_id,
        frame_index=frame_idx,
        points=points_tensor,
        point_labels=points_labels_tensor,
        obj_id=obj_id,
    )
)
out = response["outputs"]

plt.close("all")
visualize_formatted_frame_output(
    frame_idx,
    video_frames_for_vis,
    outputs_list=[prepare_masks_for_visualization({frame_idx: out})],
    titles=["SAM 3 Dense Tracking outputs"],
    figsize=(6, 4),
    points_list=[points_abs],
    points_labels_list=[labels],
)

In [None]:
# フレーム0からビデオの終わりまで出力を伝播し、すべての出力を収集します
outputs_per_frame = propagate_in_video(predictor, session_id)

# 最後に、可視化のために出力を再フォーマットし、60フレームごとに出力をプロットします
outputs_per_frame = prepare_masks_for_visualization(outputs_per_frame)

vis_frame_stride = 60
plt.close("all")
for frame_idx in range(0, len(outputs_per_frame), vis_frame_stride):
    visualize_formatted_frame_output(
        frame_idx,
        video_frames_for_vis,
        outputs_list=[outputs_per_frame],
        titles=["SAM 3 Dense Tracking outputs"],
        figsize=(6, 4),
    )

### セッションを閉じる (Close session)

各セッションは単一のビデオに紐付いています。推論後にセッションを閉じてリソースを解放できます。

（その後、別のビデオで新しいセッションを開始できます。）

In [None]:
# 最後に、推論セッションを閉じてGPUリソースを解放します
# （別のビデオで新しいセッションを開始できます）
_ = predictor.handle_request(
    request=dict(
        type="close_session",
        session_id=session_id,
    )
)

### クリーンアップ (Clean-up)

すべての推論が終了したら、predictorをシャットダウンしてマルチGPUプロセスグループを解放できます。

In [None]:
# すべての推論が終了した後、predictorをシャットダウンして
# マルチGPUプロセスグループを解放できます
predictor.shutdown()