###画像の前処理
* KaggleからCubiCasa5kダウンロード
* いただいたスクリプトでマスク画像生成
* 元画像としてsvg→png変換
- 処理に時間はかかるがリサイズなし
- マスクが複数のフロアがある場合取得できていない（display=Noneをスルーしなくするように変更）
- マスクとオリジナル画像のサイズと位置がずれていた（左上を基準にcropすることで一致）


###訓練中
- 格子なし

In [None]:
!pip install -q git+https://github.com/huggingface/transformers.git

In [None]:
!pip install -q datasets albumentations

### 必要に応じて実行

In [None]:
!unzip -q resize_without_noise.zip

In [None]:
!unzip -q homes_2.zip

In [None]:
rm -r processed_data/

In [None]:
!unzip -q processed_data_grid.zip -d grid

In [None]:
!unzip -qo processed_data3.zip

In [None]:
!zip -r processed_data.zip processed_data

In [None]:
!mv processed_data/annotations/validation/* processed_data/annotations/train/
!mv processed_data/annotations/test/* processed_data/annotations/train/
!mv processed_data/images/validation/* processed_data/images/train/
!mv processed_data/images/test/* processed_data/images/train/

In [None]:
!mv grid/processed_data/images/test/* processed_data/images/test/
!mv grid/processed_data/images/validation/* processed_data/images/validation/
!mv grid/processed_data/annotations/test/* processed_data/annotations/test/
!mv grid/processed_data/annotations/validation/* processed_data/annotations/validation/

In [None]:
!rm processed_data/annotations/train/2ldk_5.png
!cp processed_data/annotations/train/original_floorplan_1268.png processed_data/annotations/test/
!mv processed_data/annotations/train/original_floorplan_1268.png processed_data/annotations/validation/
!rm processed_data/images/train/2ldk_5.png
!cp processed_data/images/train/original_floorplan_1268.png processed_data/images/test/
!mv processed_data/images/train/original_floorplan_1268.png processed_data/images/validation/

In [None]:
import os
import json
from PIL import Image
import numpy as np
import torch
import random
import cv2
from datasets import Dataset, DatasetDict
from tqdm import tqdm

# 入力ディレクトリと出力ディレクトリの設定
# ここでは "original_floorplan.png" と "room_mask_color.png" があるフォルダを指定
input_dir = "/content/mask2former_resize/"
output_dir = "./processed_data"

os.makedirs(f"{output_dir}/images/train", exist_ok=True)
os.makedirs(f"{output_dir}/images/validation", exist_ok=True)
os.makedirs(f"{output_dir}/images/test", exist_ok=True)
os.makedirs(f"{output_dir}/annotations/train", exist_ok=True)
os.makedirs(f"{output_dir}/annotations/validation", exist_ok=True)
os.makedirs(f"{output_dir}/annotations/test", exist_ok=True)

# id_to_label辞書: クラスIDを定義 (ここでは 1: "room" のみ)
id_to_label = {
    1: "room",
}
print("Generated id_to_label mapping:", id_to_label)


In [None]:
# For cubicasa5k

files = [(f"original_floorplan_{i}.png", f"room_mask_color_{i}.png") for i in range(1269)]

# シャッフル
# random.shuffle(files)


train_files = files[:-1]
validation_files = files[-1:]
test_files = files[-1:]


# カラーインスタンスマスクを読み込んで、annotation_mask に変換する処理
def convert_color_mask_to_annotation(color_mask_bgr):
    """
    color_mask_bgr: OpenCVで読み込んだ BGR配列 (height, width, 3)
    戻り値: annotation_mask (同じサイズのBGRまたはRGB配列)
    """
    # アウトプット用
    height, width, _ = color_mask_bgr.shape
    annotation_mask = np.zeros((height, width, 3), dtype=np.uint8)

    # BGR -> unique colors
    # まずユニークな色を取得
    #  shapeを (H*W,3) に潰してからnp.unique() すればOK
    unique_colors = np.unique(color_mask_bgr.reshape(-1, 3), axis=0)

    instance_id = 0
    for c in unique_colors:
        # c は (B,G,R) の1ピクセルカラー
        # 背景(0,0,0)はスキップ
        if np.all(c == [0,0,0]):
            continue

        # インスタンスIDを1増やす
        instance_id += 1

        # c と一致するピクセルを抽出
        mask = np.all(color_mask_bgr == c, axis=-1)  # shape=(H,W), bool

        # アノテーション上では [0, クラスID=1, インスタンスID] という3chに書き込む
        annotation_mask[mask] = [0, instance_id, 1]

    return annotation_mask

# 読み込んで、train/validation/test に分けて保存する関数
def process_floorplan_and_mask(floorplan_png, mask_png, split):
    """
    floorplan_png: 元画像(フロア図)のパス
    mask_png: 部屋マスク(カラーインスタンス)のパス
    split: "train"/"validation"/"test"
    """

    # 画像を読み込み
    floorplan_bgr = cv2.imread(floorplan_png, cv2.IMREAD_COLOR)  # BGR
    color_mask_bgr = cv2.imread(mask_png, cv2.IMREAD_COLOR)      # BGR

    if floorplan_bgr is None:
        print(f"Error: cannot read {floorplan_png}")
        return
    if color_mask_bgr is None:
        print(f"Error: cannot read {mask_png}")
        return

    # アノテーションマスクを生成
    annotation_mask = convert_color_mask_to_annotation(color_mask_bgr)

    # 出力ファイル名
    base_name = os.path.splitext(os.path.basename(floorplan_png))[0]
    image_id = f"{base_name}"

    # 保存先
    image_save_path = f"{output_dir}/images/{split}/{image_id}.jpg"
    annotation_save_path = f"{output_dir}/annotations/{split}/{image_id}.png"

    # OpenCVで書き出し
    cv2.imwrite(image_save_path, floorplan_bgr)

    # アノテーションを保存 (3ch PNG)
    cv2.imwrite(annotation_save_path, annotation_mask)


# 分割ごとに処理
for split, file_list in [("train", train_files), ("validation", validation_files), ("test", test_files)]:
    for (floorplan_png, mask_png) in tqdm(file_list):
        process_floorplan_and_mask(os.path.join(input_dir, floorplan_png),
                                   os.path.join(input_dir, mask_png),
                                   split)

In [None]:
# For homes

input_dir = "./homes"  # Labelme形式のJSONと対応する画像が保存されたディレクトリ
output_dir = "./processed_data"  # 出力フォルダ

# JSONファイルをリスト化
json_files = [f for f in os.listdir(input_dir) if f.endswith(".json")]

# ファイルをシャッフル
#random.shuffle(json_files)

# 分割
train_files = json_files[:48]
validation_files = json_files[48:49]
test_files = json_files[49:]

# id_to_label辞書を作成
id_to_label = {
    1:"room",
}

print("Generated id_to_label mapping:")
print(id_to_label)

# JSONファイルの処理関数
def process_json_file(json_path, split):
    with open(json_path, "r") as f:
        data = json.load(f)

    # 対応する画像ファイルをロード
    image_path = os.path.join(input_dir, data["imagePath"])
    image = Image.open(image_path).convert("RGB")

    # アノテーションマスクを生成
    annotation_mask = np.zeros((image.height, image.width, 3), dtype=np.uint8)


    instance_id = 0
    for shape in data["shapes"]:
        instance_id += 1
        points = shape["points"]

        # ポリゴンを塗りつぶす
        points = np.array(points, dtype=np.int32)
        fill_color = (1, instance_id, 0)
        cv2.fillPoly(annotation_mask, [points], fill_color)

    # 保存用のIDを作成
    image_id = os.path.splitext(os.path.basename(json_path))[0]

    # 画像とアノテーションを保存
    image.save(f"{output_dir}/images/{split}/{image_id}.jpg")
    annotation = Image.fromarray(annotation_mask)
    annotation.save(f"{output_dir}/annotations/{split}/{image_id}.png")

# データセットの分割ごとに処理
for split, files in [("train", train_files), ("validation", validation_files), ("test", test_files)]:
    for json_file in files:
        json_path = os.path.join(input_dir, json_file)
        process_json_file(json_path, split)

### ここからは共通の処理

In [None]:
import os
import json
from PIL import Image
import numpy as np
import torch
import random
import cv2
from datasets import Dataset, DatasetDict
from tqdm import tqdm

output_dir = "./processed_data"

id_to_label = {
    1: "room",
}

# DatasetDictを作成
def load_data(split):
    images_dir = os.path.join(output_dir, f"images/{split}")
    annotations_dir = os.path.join(output_dir, f"annotations/{split}")

    images = []
    annotations = []

    for image_file in tqdm(os.listdir(images_dir)):
        image_path = os.path.join(images_dir, image_file)
        annotation_file = image_file.replace(".jpg", ".png")
        annotation_path = os.path.join(annotations_dir, annotation_file)

        # データをロード
        image = Image.open(image_path).convert("RGB")  # 画像をRGB形式に変換
        annotation = Image.open(annotation_path).convert("RGB")

        # データを追加
        images.append(image)
        annotations.append(annotation)

    # 学習データ(train)の場合のみシャッフルをかける
    if split == "train":
        # (画像, アノテーション) をペアにして1つのリストとしてまとめる
        combined = list(zip(images, annotations))
        # combined リストをシャッフル
        random.shuffle(combined)
        # 再度アンパックして images, annotations に戻す
        images, annotations = zip(*combined)
        # 必要に応じてリスト化しておく
        images, annotations = list(images), list(annotations)

    return {"image": images, "annotation": annotations}

# データセットをロードしてDatasetDictを作成
datasets = {}
for split in ["train", "validation", "test"]:
    data = load_data(split)
    datasets[split] = Dataset.from_dict(data)

dataset_dict = DatasetDict(datasets)

print(dataset_dict)

In [None]:
import numpy as np

example = datasets['train'][1]
seg = np.array(example['annotation'])
# get green channel
np.set_printoptions(threshold=np.inf)


instance_seg = seg[:, :, 0]

In [None]:
np.unique(instance_seg)

In [None]:
instance_seg = np.array(example["annotation"])[:,:,1] # green channel encodes instances
class_id_map = np.array(example["annotation"])[:,:,0] # red channel encodes semantic category
class_labels = np.unique(class_id_map)

# create mapping between instance IDs and semantic category IDs
inst2class = {}
for label in class_labels:
    instance_ids = np.unique(instance_seg[class_id_map == label])
    inst2class.update({i: label for i in instance_ids})
print(inst2class)

In [None]:
from PIL import Image


# let's visualize the first instance (ignoring background)
mask = (instance_seg == 1)
visual_mask = (mask * 255).astype(np.uint8)
Image.fromarray(visual_mask)

In [None]:
import numpy as np

seg = np.array(example['annotation'])
# get green channel
instance_seg = seg[:, :, 1]

In [None]:
R = seg[:, :, 0]
G = seg[:, :, 1]
masks = (R / 10).astype(np.int32) * 256 + (G.astype(np.int32))

In [None]:
visual_mask = (masks * 255).astype(np.uint8)
Image.fromarray(visual_mask)

In [None]:
from transformers import Mask2FormerImageProcessor

processor = Mask2FormerImageProcessor(reduce_labels=True, ignore_index=255, do_resize=False, do_rescale=False, do_normalize=False)

In [None]:
import numpy as np
from torch.utils.data import Dataset

class ImageSegmentationDataset(Dataset):
    """Image segmentation dataset."""

    def __init__(self, dataset, processor, transform=None):
        """
        Args:
            dataset
        """
        self.dataset = dataset
        self.processor = processor
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        image = np.array(self.dataset[idx]["image"].convert("RGB"))

        instance_seg = np.array(self.dataset[idx]["annotation"])[:,:,1]
        class_id_map = np.array(self.dataset[idx]["annotation"])[:,:,0]
        class_labels = np.unique(class_id_map)

        inst2class = {}
        for label in class_labels:
            instance_ids = np.unique(instance_seg[class_id_map == label])
            inst2class.update({i: label for i in instance_ids})

        # apply transforms
        if self.transform is not None:
            transformed = self.transform(image=image, mask=instance_seg)
            image, instance_seg = transformed['image'], transformed['mask']
            # convert to C, H, W
            image = image.transpose(2,0,1)

        if class_labels.shape[0] == 1 and class_labels[0] == 0:
            # Some image does not have annotation (all ignored)
            inputs = self.processor([image], return_tensors="pt")
            inputs = {k:v.squeeze() for k,v in inputs.items()}
            inputs["class_labels"] = torch.tensor([0])
            inputs["mask_labels"] = torch.zeros((0, inputs["pixel_values"].shape[-2], inputs["pixel_values"].shape[-1]))
        else:
          inputs = self.processor([image], [instance_seg], instance_id_to_semantic_id=inst2class, return_tensors="pt")
          inputs = {k: v.squeeze() if isinstance(v, torch.Tensor) else v[0] for k,v in inputs.items()}

        return inputs

In [None]:
import albumentations as A

# GridNoiseクラス
class GridNoise(A.ImageOnlyTransform):
    """
    画像にランダムに格子状のノイズを入れるTransform。
    p: このTransformを適用する確率
    grid_spacing: 格子の間隔
    line_thickness: 線の太さ
    """
    def __init__(self, p=0.5, grid_spacing=50, line_thickness=1, always_apply=False):
        super().__init__(always_apply=always_apply, p=p)
        self.grid_spacing = grid_spacing
        self.line_thickness = line_thickness

    def apply(self, img, **params):
        # imgはnumpy配列(H,W,C)
        height, width, _ = img.shape

        # 格子線を白色(255,255,255)で引く例
        for y in range(0, height, self.grid_spacing):
            cv2.line(img, (0, y), (width-1, y), (0, 0, 0), self.line_thickness)
        for x in range(0, width, self.grid_spacing):
            cv2.line(img, (x, 0), (x, height-1), (0, 0, 0), self.line_thickness)

        return img

# Convert ADE_MEAN and ADE_STD to lists
ADE_MEAN = [0.485, 0.456, 0.406]
ADE_STD = [0.229, 0.224, 0.225]

# note that you can include more fancy data augmentation methods here
# **▼▼▼ train_transformに GridNoise を組み込む (p=0 で初期化) ▼▼▼**
train_transform = A.Compose([
    A.Resize(width=512, height=512),
    #GridNoise(p=0.0, grid_spacing=50, line_thickness=1),
    A.Normalize(mean=ADE_MEAN, std=ADE_STD),
], is_check_shapes=False)

# バリデーション・テストはノイズなし
val_transform = A.Compose([
    A.Resize(width=512, height=512),
    A.Normalize(mean=ADE_MEAN, std=ADE_STD),
], is_check_shapes=False)

train_dataset = ImageSegmentationDataset(datasets["train"], processor=processor, transform=train_transform)
val_dataset = ImageSegmentationDataset(datasets["validation"], processor=processor, transform=val_transform)

In [None]:
from torch.utils.data import DataLoader

def collate_fn(batch):
    pixel_values = torch.stack([example["pixel_values"] for example in batch])
    pixel_mask = torch.stack([example["pixel_mask"] for example in batch])
    class_labels = [example["class_labels"] for example in batch]
    mask_labels = [example["mask_labels"] for example in batch]
    return {"pixel_values": pixel_values, "pixel_mask": pixel_mask, "class_labels": class_labels, "mask_labels": mask_labels}

train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
val_dataloader = DataLoader(val_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

In [None]:
from transformers import Mask2FormerForUniversalSegmentation

# Replace the head of the pre-trained model
# We specify ignore_mismatched_sizes=True to replace the already fine-tuned classification head by a new one
model = Mask2FormerForUniversalSegmentation.from_pretrained("facebook/mask2former-swin-base-coco-instance",
                                                          id2label=id_to_label,
                                                          ignore_mismatched_sizes=True)


In [None]:
# すべてのクラスラベルを1つのマスクに統合する関数
def combine_all_classes(results):
    """
    すべてのセグメントを1つのマスクに統合する。
    """
    segmentation = results['segmentation'].cpu().numpy()
    unified_mask = np.zeros_like(segmentation, dtype=bool)

    # セグメントを統合
    for segment in results['segments_info']:
        segment_mask = (segmentation == segment['id'])
        unified_mask |= segment_mask  # 全体のマスクに統合

    # 統合マスクを画像形式に変換
    unified_mask_image = Image.fromarray((unified_mask * 255).astype(np.uint8))

    return unified_mask_image

In [None]:
example = datasets['test'][0]
image_test1 = example['image']
example = datasets['test'][1]
image_test2 = example['image']
example = datasets['test'][2]
image_test3 = example['image']
display(image_test1)
display(image_test2)
display(image_test3)


In [None]:
import torch
from tqdm.auto import tqdm
from transformers import MaskFormerImageProcessor

processor_test = MaskFormerImageProcessor()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)

running_loss = 0.0
num_samples = 0
best_val_loss = float("inf")
best_model_path = "mask2former_best"
num_epochs=20

for epoch in range(num_epochs):
  print("Epoch:", epoch)
  model.train()

  for idx, batch in enumerate(tqdm(train_dataloader)):
      # Reset the parameter gradients
      optimizer.zero_grad()

      # Forward pass
      outputs = model(
              pixel_values=batch["pixel_values"].to(device),
              mask_labels=[labels.to(device) for labels in batch["mask_labels"]],
              class_labels=[labels.to(device) for labels in batch["class_labels"]],
      )

      # Backward propagation
      loss = outputs.loss
      loss.backward()

      batch_size = batch["pixel_values"].size(0)
      running_loss += loss.item()
      num_samples += batch_size

      if idx % 300 == 0:
        print("Loss:", running_loss/num_samples)

      # Optimization
      optimizer.step()

  # ==== Validation ====
  model.eval()
  val_loss = 0.0
  val_samples = 0

  with torch.no_grad():
      for val_batch in tqdm(val_dataloader, desc="Validation"):
          outputs_val = model(
              pixel_values=val_batch["pixel_values"].to(device),
              mask_labels=[labels.to(device) for labels in val_batch["mask_labels"]],
              class_labels=[labels.to(device) for labels in val_batch["class_labels"]],
          )

          loss_val = outputs_val.loss

          batch_size_val = val_batch["pixel_values"].size(0)
          val_loss += loss_val.item() * batch_size_val
          val_samples += batch_size_val

  epoch_val_loss = val_loss / val_samples
  print(f"[Epoch {epoch}] Validation Loss: {epoch_val_loss:.4f}")

  # best_val_lossを更新した場合のみモデルを保存
  if epoch_val_loss < best_val_loss:
      best_val_loss = epoch_val_loss
      print(f"=> New best model found! Saving model (val_loss: {best_val_loss:.4f})")
      model.save_pretrained(best_model_path)
      processor.save_pretrained(best_model_path)

  # 出力してみる
  inputs_test1 = processor_test(image_test1, return_tensors="pt").to(device)
  inputs_test2 = processor_test(image_test2, return_tensors="pt").to(device)
  inputs_test3 = processor_test(image_test3, return_tensors="pt").to(device)
  with torch.no_grad():
    outputs_test1 = model(**inputs_test1)
    outputs_test2 = model(**inputs_test2)
    outputs_test3 = model(**inputs_test3)
  results_test1 = processor_test.post_process_instance_segmentation(outputs_test1, target_sizes=[image_test1.size[::-1]])[0]
  results_test2 = processor_test.post_process_instance_segmentation(outputs_test2, target_sizes=[image_test2.size[::-1]])[0]
  results_test3 = processor_test.post_process_instance_segmentation(outputs_test3, target_sizes=[image_test3.size[::-1]])[0]
  # 全体のマスクを生成
  unified_mask = combine_all_classes(results_test1)
  display(unified_mask)
  unified_mask = combine_all_classes(results_test2)
  display(unified_mask)
  unified_mask = combine_all_classes(results_test3)
  display(unified_mask)


In [None]:
# 学習時に使ったもの (例)
ADE_MEAN = [0.485, 0.456, 0.406]
ADE_STD = [0.229, 0.224, 0.225]

import albumentations as A

test_transform_512 = A.Compose([
    A.Resize(width=512, height=512),
    A.Normalize(mean=ADE_MEAN, std=ADE_STD),
], is_check_shapes=False)


import numpy as np
import torch

def preprocess_like_training(pil_image, transform, device):
    """
    PIL画像に対し:
      1) Albumentationsで訓練時と同じリサイズ・正規化を実施
      2) (H, W, C) -> (C, H, W) に変換
      3) torch.Tensor化 & バッチ次元追加
      4) pixel_mask を全1で作成
    を行い、モデルに直接渡せるdictを返す。
    """
    # PIL -> NumPy (H, W, C)
    image_np = np.array(pil_image, dtype=np.uint8)

    # Albumentationsで前処理 (リサイズ + 正規化)
    transformed = transform(image=image_np)
    image_processed = transformed["image"]     # shape (H, W, C)

    # チャンネル次元を先頭に (C, H, W)
    image_processed = np.transpose(image_processed, (2, 0, 1))

    # torch.Tensor化 & バッチ次元
    pixel_values = torch.from_numpy(image_processed).float().unsqueeze(0).to(device)

    # pixel_mask: 全部 True (1) のマスクを作成 (サイズは (B=1, H, W))
    _, _, h, w = pixel_values.shape
    pixel_mask = torch.ones((1, h, w), dtype=torch.bool, device=device)

    return {
        "pixel_values": pixel_values,
        "pixel_mask": pixel_mask,
    }


In [None]:
from PIL import Image
import torch


# 前処理して再度推論
test_image_path = "test1.png"
image_test4 = Image.open(test_image_path).convert("RGB")
test_image_path = "test2.png"
image_test5 = Image.open(test_image_path).convert("RGB")

for test_image in [image_test1, image_test2, image_test3, image_test4, image_test5]:
    # Albumentationsによる前処理 (学習時と同じ: 512x512, 正規化)
    inputs_test = preprocess_like_training(
        pil_image=test_image,
        transform=test_transform_512,  # 上で定義した Albumentationsパイプライン
        device=device
    )

    # 推論
    model.eval()
    with torch.no_grad():
        outputs_test = model(**inputs_test)

    # post_process_instance_segmentationでマスクを取り出す
    # target_sizes=[(高さ, 幅)] は「最終的に出力したいマスクサイズ」を指定
    # もし学習時リサイズ(512x512)のまま可視化したいなら (512,512)
    # 元の画像サイズに戻したいなら test_image.size[::-1] (H, W)
    results_test = processor_test.post_process_instance_segmentation(
        outputs_test,
        target_sizes=[test_image.size[::-1]]
    )[0]

    # セグメントのマスクを一つに統合する関数（既存の combine_all_classes など）
    unified_mask = combine_all_classes(results_test)

    # 結果を可視化
    display(unified_mask)
