In [None]:
!pip install git+https://github.com/facebookresearch/segment-anything.git

Collecting git+https://github.com/facebookresearch/segment-anything.git
  Cloning https://github.com/facebookresearch/segment-anything.git to /tmp/pip-req-build-t454xm9_
  Running command git clone --filter=blob:none --quiet https://github.com/facebookresearch/segment-anything.git /tmp/pip-req-build-t454xm9_
  Resolved https://github.com/facebookresearch/segment-anything.git to commit 6fdee8f2727f4506cfbbe553e23b895e27956588
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: segment-anything
  Building wheel for segment-anything (setup.py) ... [?25l[?25hdone
  Created wheel for segment-anything: filename=segment_anything-1.0-py3-none-any.whl size=36587 sha256=421bb5637726ca2ce97909392380f1b5c12f6f824f8e4e33157c71d34b4eab3f
  Stored in directory: /tmp/pip-ephem-wheel-cache-0hwi46p6/wheels/10/cf/59/9ccb2f0a1bcc81d4fbd0e501680b5d088d690c6cfbc02dc99d
Successfully built segment-anything
Installing collected packages: segment-anything
Successfully 

In [None]:
!pip install opencv-python



In [None]:
!pip install git+https://github.com/openai/CLIP.git

Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to /tmp/pip-req-build-vhntsqs3
  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git /tmp/pip-req-build-vhntsqs3
  Resolved https://github.com/openai/CLIP.git to commit a1d071733d7111c9c014f024669f959182114e33
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting ftfy (from clip==1.0)
  Downloading ftfy-6.1.3-py3-none-any.whl (53 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.4/53.4 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: clip
  Building wheel for clip (setup.py) ... [?25l[?25hdone
  Created wheel for clip: filename=clip-1.0-py3-none-any.whl size=1369497 sha256=3887fe6a0f7fde8182ae72c4ac4d17bf88c629e61927c3d95d46957244578f72
  Stored in directory: /tmp/pip-ephem-wheel-cache-0a58led8/wheels/da/2b/4c/d6691fa9597aac8bb85d2ac13b112deb897d5b50f5ad9a37e4
Successfully built clip
Inst

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
import urllib
from functools import lru_cache
from random import randint
from typing import Any, Callable, Dict, List, Tuple

import clip
import cv2
import numpy as np
import PIL
import torch
from segment_anything import SamAutomaticMaskGenerator, sam_model_registry

CHECKPOINT_PATH = os.path.join(os.path.expanduser("~"), ".cache", "SAM")
CHECKPOINT_NAME = "sam_vit_h_4b8939.pth"
CHECKPOINT_URL = "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth"
MODEL_TYPE = "default"
MAX_WIDTH = MAX_HEIGHT = 1024
TOP_K_OBJ = 100
THRESHOLD = 0.85
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


@lru_cache
def load_mask_generator() -> SamAutomaticMaskGenerator:
    if not os.path.exists(CHECKPOINT_PATH):
        os.makedirs(CHECKPOINT_PATH)
    checkpoint = os.path.join(CHECKPOINT_PATH, CHECKPOINT_NAME)
    if not os.path.exists(checkpoint):
        urllib.request.urlretrieve(CHECKPOINT_URL, checkpoint)
    sam = sam_model_registry[MODEL_TYPE](checkpoint=checkpoint).to(device)
    mask_generator = SamAutomaticMaskGenerator(sam)
    return mask_generator


@lru_cache
def load_clip(
    name: str = "ViT-B/32",
) -> Tuple[torch.nn.Module, Callable[[PIL.Image.Image], torch.Tensor]]:
    model, preprocess = clip.load(name, device=device)
    return model.to(device), preprocess


def adjust_image_size(image: np.ndarray) -> np.ndarray:
    height, width = image.shape[:2]
    if height > width:
        if height > MAX_HEIGHT:
            height, width = MAX_HEIGHT, int(MAX_HEIGHT / height * width)
    else:
        if width > MAX_WIDTH:
            height, width = int(MAX_WIDTH / width * height), MAX_WIDTH
    image = cv2.resize(image, (width, height))
    return image


@torch.no_grad()
def get_score(crop: PIL.Image.Image, texts: List[str]) -> torch.Tensor:
    model, preprocess = load_clip()
    preprocessed = preprocess(crop).unsqueeze(0).to(device)
    tokens = clip.tokenize(texts).to(device)
    logits_per_image, _ = model(preprocessed, tokens)
    similarity = logits_per_image.softmax(-1).cpu()
    return similarity[0, 0]


def crop_image(image: np.ndarray, mask: Dict[str, Any]) -> PIL.Image.Image:
    x, y, w, h = mask["bbox"]
    masked = image * np.expand_dims(mask["segmentation"], -1)
    crop = masked[y : y + h, x : x + w]
    if h > w:
        top, bottom, left, right = 0, 0, (h - w) // 2, (h - w) // 2
    else:
        top, bottom, left, right = (w - h) // 2, (w - h) // 2, 0, 0
    # padding
    crop = cv2.copyMakeBorder(
        crop,
        top,
        bottom,
        left,
        right,
        cv2.BORDER_CONSTANT,
        value=(0, 0, 0),
    )
    crop = PIL.Image.fromarray(crop)
    return crop


def get_texts(query: str) -> List[str]:
    return [f"a picture of {query}", "a picture of background"]


def filter_masks(
    image: np.ndarray,
    masks: List[Dict[str, Any]],
    predicted_iou_threshold: float,
    stability_score_threshold: float,
    query: str,
    clip_threshold: float,
) -> List[Dict[str, Any]]:
    filtered_masks: List[Dict[str, Any]] = []

    for mask in sorted(masks, key=lambda mask: mask["area"])[-TOP_K_OBJ:]:
        if (
            mask["predicted_iou"] < predicted_iou_threshold
            or mask["stability_score"] < stability_score_threshold
            or image.shape[:2] != mask["segmentation"].shape[:2]
            or query
            and get_score(crop_image(image, mask), get_texts(query)) < clip_threshold
        ):
            continue

        filtered_masks.append(mask)

    return filtered_masks


def remove_small_segments(segmentation: np.ndarray) -> np.ndarray:
    # ブール配列を整数型に変換（OpenCVの関数はブール型を直接扱えないため）
    segmentation_int = segmentation.astype(np.uint8)  # Trueを1に、Falseを0に変換

    # すべての連結成分を見つけ、ラベル付けする
    num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(segmentation_int)

    # 最大の連結成分（背景を除く）のラベルを見つける
    # 面積はstatsの5番目の列に格納されています（index=4）
    # 背景の成分（ラベル0）を除外して最大のものを見つける
    largest_label = 1 + np.argmax(stats[1:, 4])  # 背景を除く最大領域
    # 最大の連結成分のみを保持
    cleaned_segmentation = (labels == largest_label)

    return cleaned_segmentation

def remove_contained_masks(masks: List[np.ndarray]) -> List[np.ndarray]:
    # マスクが他のマスクに完全に含まれているかどうかをチェック
    remaining_masks = []
    for i, mask_i in enumerate(masks):
        fully_contained = False
        for j, mask_j in enumerate(masks):
            if i != j and np.all(mask_i["segmentation"] <= mask_j["segmentation"]):
                fully_contained = True
                break
        if not fully_contained:
            remaining_masks.append(mask_i)
    return remaining_masks

def remove_overlapping_masks(masks: List[np.ndarray], overlap_threshold: float = 0.8) -> List[np.ndarray]:
    # マスクが他のマスクと大きく重複しているかどうかをチェックし、重複している場合は小さい方を削除
    remaining_masks = []
    removed_indices = set()  # 削除されたマスクのインデックスを保持

    for i, mask_i in enumerate(masks):
        if i in removed_indices:
            continue  # すでに削除されているマスクはスキップ

        for j, mask_j in enumerate(masks):
            if i != j and j not in removed_indices:
                # 両マスク間の重複領域を計算
                intersection = np.logical_and(mask_i["segmentation"], mask_j["segmentation"])
                intersection_area = np.sum(intersection)

                # 小さい方のマスクの面積を計算
                area_i = np.sum(mask_i["segmentation"])
                area_j = np.sum(mask_j["segmentation"])
                min_area = min(area_i, area_j)

                # 重複領域が小さい方のマスクの面積の特定の割合以上なら、小さい方のマスクを削除
                if intersection_area / min_area > overlap_threshold:
                    if area_i < area_j:
                        removed_indices.add(i)
                        break  # 現在のマスクiを削除し、次のマスクに進む
                    else:
                        removed_indices.add(j)
                        # マスクjを削除しても、マスクiの処理は続ける

    # 削除されていないマスクのみを保持
    for i, mask in enumerate(masks):
        if i not in removed_indices:
            remaining_masks.append(mask)

    return remaining_masks


def draw_masks(
    image: np.ndarray, masks: List[np.ndarray], alpha: float = 0.7
) -> np.ndarray:
    masks = remove_overlapping_masks(masks)
    surfaces = []
    transparent_mask = np.zeros_like(image)

    for mask in masks:
        segmentation = remove_small_segments(mask["segmentation"])
        area = np.sum(segmentation)

        if mask["segmentation"].size * 0.01 > area:
          continue

        color = [randint(127, 255) for _ in range(3)]

        # draw mask overlay
        colored_mask = np.expand_dims(segmentation, 0).repeat(3, axis=0)
        colored_mask = np.moveaxis(colored_mask, 0, -1)

        # masked = np.ma.MaskedArray(image, mask=colored_mask, fill_value=color)
        # image_overlay = masked.filled()
        # image = cv2.addWeighted(image, 1 - alpha, image_overlay, alpha, 0)

        # draw contour
        contours, _ = cv2.findContours(
            np.uint8(segmentation), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
        )
        cv2.drawContours(image, contours, -1, (0, 0, 255), 2)

        for contour in contours:
            # Calculate the perimeter of the contour
            perimeter = cv2.arcLength(contour, True)
            # Approximate the contour to a polygon
            epsilon = 0.02 * perimeter  # 2% of the perimeter
            approx = cv2.approxPolyDP(contour, epsilon, True)
            if len(approx) == 4:
              # Draw the approximated polygon (should be a quadrilateral if the shape is close to a rectangle)
              cv2.drawContours(image, [approx], 0, (255, 0, 0), 2)
              cv2.drawContours(transparent_mask, [approx], 0, (255, 0, 0), -1)

              surfaces.append(np.squeeze(approx, axis=1))

    image = cv2.addWeighted(transparent_mask, 1 - alpha, image, alpha, 0)

    return image, surfaces

def crop_and_affine_transform_quadrilateral(original_image: np.ndarray, src_pts: np.ndarray) -> np.ndarray:
    # src_ptsが平行四辺形であると仮定して、アフィン変換を適用
    # 変換後の点を定義 (左上、右上、左下の順)
    width_a = np.sqrt(((src_pts[0][0][0] - src_pts[1][0][0]) ** 2) + ((src_pts[0][0][1] - src_pts[1][0][1]) ** 2))
    width_b = np.sqrt(((src_pts[2][0][0] - src_pts[3][0][0]) ** 2) + ((src_pts[2][0][1] - src_pts[3][0][1]) ** 2))
    height_a = np.sqrt(((src_pts[0][0][0] - src_pts[3][0][0]) ** 2) + ((src_pts[0][0][1] - src_pts[3][0][1]) ** 2))
    height_b = np.sqrt(((src_pts[1][0][0] - src_pts[2][0][0]) ** 2) + ((src_pts[1][0][1] - src_pts[2][0][1]) ** 2))
    max_width = max(int(width_a), int(width_b))
    max_height = max(int(height_a), int(height_b))
    dst_pts = np.array([[0, 0], [max_width - 1, 0], [0, max_height - 1]], dtype='float32')

    # 3つの点からアフィン変換行列を計算
    M = cv2.getAffineTransform(np.float32(src_pts[:3]), dst_pts)

    # アフィン変換を適用して画像を変換
    transformed = cv2.warpAffine(original_image, M, (max_width, max_height))

    return transformed

def crop_test(img, points):
    points = sorted(points, key=lambda x:x[1])  # yが小さいもの順に並び替え。
    top = sorted(points[:2], key=lambda x:x[0])  # 前半二つは四角形の上。xで並び替えると左右も分かる。
    bottom = sorted(points[2:], key=lambda x:x[0], reverse=True)  # 後半二つは四角形の下。同じくxで並び替え。
    points = np.array(top + bottom, dtype='float32')  # 分離した二つを再結合。

    width = max(np.sqrt(((points[0][0]-points[2][0])**2)*2), np.sqrt(((points[1][0]-points[3][0])**2)*2))
    height = max(np.sqrt(((points[0][1]-points[2][1])**2)*2), np.sqrt(((points[1][1]-points[3][1])**2)*2))

    dst = np.array([
            np.array([0, 0]),
            np.array([width-1, 0]),
            np.array([width-1, height-1]),
            np.array([0, height-1]),
            ], np.float32)

    trans = cv2.getPerspectiveTransform(points, dst)  # 変換前の座標と変換後の座標の対応を渡すと、透視変換行列を作ってくれる。
    return cv2.warpPerspective(img, trans, (int(width), int(height)))

def normalize_surface(surface, image_width, image_height):
    # surfaceの各頂点を正規化（0から1の範囲に変換）
    normalized_surface = np.zeros_like(surface, dtype=np.float32)
    normalized_surface[:, 0] = surface[:, 0] / image_width  # x座標を正規化
    normalized_surface[:, 1] = surface[:, 1] / image_height  # y座標を正規化

    return normalized_surface

def denormalize_approx(surface, image_width, image_height):
    denormalized_approx = np.zeros_like(surface, dtype=np.int32)
    denormalized_approx[:, 0] = np.round(surface[:, 0] * image_width).astype(np.int32)  # x座標を元に戻す
    denormalized_approx[:, 1] = np.round(surface[:, 1] * image_height).astype(np.int32)  # y座標を元に戻す

    return denormalized_approx

def segment_frame(predicted_iou_threshold: float,
    stability_score_threshold: float,
    clip_threshold: float,
    frame: str,
    query: str,
    mask_generator,
):
    # mask_generator = load_mask_generator()
    # ori_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # reduce the size to save gpu memory
    image = adjust_image_size(frame)
    try:
      masks = mask_generator.generate(image)
      masks = filter_masks(
          image,
          masks,
          predicted_iou_threshold,
          stability_score_threshold,
          query,
          clip_threshold,
      )
      masked_image, _ = draw_masks(image, masks)
      # masked_image = PIL.Image.fromarray(masked_image)
      return masked_image
    except Exception as e:
      print(e)
      print("error occur")
      return image

def segment(
    predicted_iou_threshold: float,
    stability_score_threshold: float,
    clip_threshold: float,
    image_path: str,
    query: str,
) -> PIL.ImageFile.ImageFile:
    mask_generator = load_mask_generator()
    ori_image = cv2.imread(image_path, cv2.IMREAD_COLOR)
    ori_image = cv2.cvtColor(ori_image, cv2.COLOR_BGR2RGB)

    # reduce the size to save gpu memory
    image = adjust_image_size(ori_image)
    masks = mask_generator.generate(image)
    masks = filter_masks(
        image,
        masks,
        predicted_iou_threshold,
        stability_score_threshold,
        query,
        clip_threshold,
    )
    masked_image, surfaces = draw_masks(image, masks)

    # cropped_images = []
    normalized_surfaces = []
    for surface in surfaces:
      height, width, _ = image.shape
      nor_surface = normalize_surface(surface, width, height)
      normalized_surfaces.append(nor_surface)
    #   ori_h, ori_w, _ = ori_image.shape
    #   sca_surface = denormalize_approx(surface, ori_w, ori_h)

      # cropped_img = crop_test(ori_image, sca_surface)
      # cropped_images.append(cropped_img)

    masked_image = PIL.Image.fromarray(masked_image)
    return masked_image, surfaces

In [None]:
filename = "/content/drive/MyDrive/未踏/prototype/demo.jpg"

image, surfaces = segment(0.8, 0.8, 0.96, filename, "screen")

image

In [None]:
ori_image = cv2.imread(filename, cv2.IMREAD_COLOR)
ori_image = cv2.cvtColor(ori_image, cv2.COLOR_BGR2RGB)
surface = surfaces[0]
ori_h, ori_w, _ = ori_image.shape
ori_image = cv2.resize(ori_image, (int(ori_w * 0.8), int(ori_h * 0.8)))
ori_h, ori_w, _ = ori_image.shape
sca_surface = denormalize_approx(surface, ori_w, ori_h)
cropped_img = crop_test(ori_image, sca_surface)
PIL.Image.fromarray(cropped_img)

In [None]:
filename = "/content/drive/MyDrive/未踏/prototype/demo2.jpg"

image, surfaces = segment(0.8, 0.8, 0.96, filename, "display")

image

### 動画を処理

In [None]:
from tqdm import tqdm

video_path = "/content/drive/MyDrive/未踏/prototype/demo.mp4"

cap = cv2.VideoCapture(video_path)

# 動画のフレームレートを取得
fps = cap.get(cv2.CAP_PROP_FPS)

_, first_frame = cap.read()

first_frame = adjust_image_size(first_frame.copy())
height, width = first_frame.shape[:2]

# 10秒間に相当するフレーム数を計算
frames_to_process = int(10 * fps)

# 出力用の動画ファイルを準備
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output_video.mp4', fourcc, fps, (width, height))

# 処理するフレーム数をカウント
frame_count = 0

mask_generator = load_mask_generator()

for _ in range(int(5 * fps)):
  cap.read()

with tqdm(total=100) as pbar:
  while(cap.isOpened()):
      ret, frame = cap.read()
      if not ret or frame_count >= frames_to_process:
          break

      # フレームごとに物体検出を実行
      detected_frame = segment_frame(0.8, 0.8, 0.96, frame.copy(), "display", mask_generator)
      # detected_frame = segment_frame(0.8, 0.8, 0.96, frame.copy(), "display")

      # 出力用の動画ファイルに書き込む
      out.write(detected_frame)

      pbar.update(100/frames_to_process)
      frame_count += 1

cap.release()
out.release()

 27%|██▋       | 26.66666666666664/100 [12:47<32:07, 26.28s/it] 

slice indices must be integers or None or have an __index__ method
error occur


 32%|███▏      | 31.999999999999954/100 [15:14<29:24, 25.96s/it]

slice indices must be integers or None or have an __index__ method
error occur


100%|█████████▉| 99.99999999999966/100 [48:21<00:00, 29.01s/it]


In [None]:
print(frame.__class__)
print(detected_frame.__class__)
print(frame.size, detected_frame.size)

masked_image = PIL.Image.fromarray(detected_frame)

# masked_image

<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
6220800 1769472
