# Lseg (2022)

## download model

In [1]:
%cd /content

/content


In [2]:
#backbone VIT-L16 TEXT VIT-B32
!gdown 1ayk6NXURI_vIPlym16f_RG3ffxBWHxvb

Access denied with the following error:

 	Too many users have viewed or downloaded this file recently. Please
	try accessing the file again later. If the file you are trying to
	access is particularly large or is shared with many people, it may
	take up to 24 hours to be able to view or download the file. If you
	still can't access a file after 24 hours, contact your domain
	administrator. 

You may still be able to access the file from the browser:

	 https://drive.google.com/uc?id=1ayk6NXURI_vIPlym16f_RG3ffxBWHxvb 



In [3]:
!git clone https://github.com/vlmaps/vlmaps
%cd vlmaps

Cloning into 'vlmaps'...
remote: Enumerating objects: 131, done.[K
remote: Counting objects: 100% (29/29), done.[K
remote: Compressing objects: 100% (22/22), done.[K
remote: Total 131 (delta 17), reused 12 (delta 7), pack-reused 102[K
Receiving objects: 100% (131/131), 61.87 MiB | 17.81 MiB/s, done.
Resolving deltas: 100% (57/57), done.
/content/vlmaps


In [4]:
%%capture
# CLIP
!pip install ftfy regex tqdm
!pip install git+https://github.com/openai/CLIP.git

In [5]:
%%capture
!pip install timm

In [6]:
%%capture
!pip install pytorch_lightning

In [7]:
import argparse
import math

import clip
import cv2
from matplotlib import pyplot as plt
import numpy as np
import torch
import torchvision.transforms as transforms
from tqdm import tqdm

from lseg.additional_utils.models import crop_image, pad_image, resize_image
from lseg.modules.models.lseg_net import LSegEncNet
from utils.clip_mapping_utils import *

__file__:  /content/vlmaps/examples/context.py
imported path: /content/vlmaps


## functions

In [8]:
from google.colab.patches import cv2_imshow
from skimage.transform import resize as my_resize

def get_bbox(pred):
  H, W = pred.shape
  rows = np.sum(pred, axis=1)
  columns = np.sum(pred, axis=0)
  i=0
  while  i < H and rows[i]==W:
    i+=1
  top = i
  i=H-1
  while i>=0 and rows[i]==W:
    i-=1
  bottom = i
  i=0
  while i< W and columns[i]==H:
    i+=1
  left = i
  i=W-1
  while i>=0 and columns[i]==H:
    i-=1
  right = i
  if left > right or top > bottom:
    return [0, 0, 0, 0]
  return [left, top, right-left, bottom-top]

def get_lseg_feat(
    model: LSegEncNet,
    image: np.array,
    labels,
    transform,
    crop_size=480,
    base_size=520,
    norm_mean=[0.5, 0.5, 0.5],
    norm_std=[0.5, 0.5, 0.5],
    vis=False,
):
    vis_image = image.copy()
    image = transform(image).unsqueeze(0).cuda()
    img = image[0].permute(1, 2, 0)
    img = img * 0.5 + 0.5

    batch, t, h, w = image.size()
    #print(batch, t, h, w)
    stride_rate = 2.0 / 3.0
    stride = int(crop_size * stride_rate)

    long_size = base_size
    if h > w:
        height = long_size
        width = int(1.0 * w * long_size / h + 0.5)
        short_size = width
    else:
        width = long_size
        height = int(1.0 * h * long_size / w + 0.5)
        short_size = height

    cur_img = resize_image(image, height, width, **{"mode": "bilinear", "align_corners": True})

    if long_size <= crop_size:
        pad_img = pad_image(cur_img, norm_mean, norm_std, crop_size)
        #print(pad_img.shape)
        with torch.no_grad():
            outputs, logits = model(pad_img, labels)
        outputs = crop_image(outputs, 0, height, 0, width)
    else:
        if short_size < crop_size:
            # pad if needed
            pad_img = pad_image(cur_img, norm_mean, norm_std, crop_size)
        else:
            pad_img = cur_img
        _, _, ph, pw = pad_img.shape  # .size()
        assert ph >= height and pw >= width
        h_grids = int(math.ceil(1.0 * (ph - crop_size) / stride)) + 1
        w_grids = int(math.ceil(1.0 * (pw - crop_size) / stride)) + 1
        with torch.cuda.device_of(image):
            with torch.no_grad():
                outputs = image.new().resize_(batch, model.out_c, ph, pw).zero_().cuda()
                logits_outputs = image.new().resize_(batch, len(labels), ph, pw).zero_().cuda()
            count_norm = image.new().resize_(batch, 1, ph, pw).zero_().cuda()
        # grid evaluation
        for idh in range(h_grids):
            for idw in range(w_grids):
                h0 = idh * stride
                w0 = idw * stride
                h1 = min(h0 + crop_size, ph)
                w1 = min(w0 + crop_size, pw)
                crop_img = crop_image(pad_img, h0, h1, w0, w1)
                # pad if needed
                pad_crop_img = pad_image(crop_img, norm_mean, norm_std, crop_size)
                with torch.no_grad():
                    output, logits = model(pad_crop_img, labels)
                cropped = crop_image(output, 0, h1 - h0, 0, w1 - w0)
                cropped_logits = crop_image(logits, 0, h1 - h0, 0, w1 - w0)
                outputs[:, :, h0:h1, w0:w1] += cropped
                logits_outputs[:, :, h0:h1, w0:w1] += cropped_logits
                count_norm[:, :, h0:h1, w0:w1] += 1
        assert (count_norm == 0).sum() == 0
        outputs = outputs / count_norm
        logits_outputs = logits_outputs / count_norm
        outputs = outputs[:, :, :height, :width]
        logits_outputs = logits_outputs[:, :, :height, :width]
    #print(type(outputs))
    outputs = outputs.cpu()
    outputs = outputs.numpy()  # B, D, H, W
    predicts = [torch.max(logit, 0)[1].cpu().numpy() for logit in logits_outputs]
    pred = predicts[0]
    pred = np.array(cv2.resize(np.array(pred, dtype=float), dsize=(w, h), interpolation=cv2.INTER_CUBIC), dtype=int)
    #print(pred)
    #print("pred", pred.shape)
    bbox = get_bbox(pred)
    #print(outputs.shape)
    if vis:
        new_palette = get_new_pallete(len(labels))
        mask, patches = get_new_mask_pallete(pred, new_palette, out_label_flag=True, labels=labels)
        seg = mask.convert("RGBA")
        # cv2_imshow(vis_image[:, :, [2, 1, 0]])
        # cv2.waitKey()
        fig = plt.figure()
        plt.imshow(seg)
        plt.legend(handles=patches, loc="upper left", bbox_to_anchor=(1.0, 1), prop={"size": 20})
        plt.axis("off")
        plt.scatter([bbox[0], bbox[0]+bbox[2]], [ bbox[1],bbox[1]+bbox[3]], color='white')
        plt.tight_layout()
        plt.show()

    return outputs, bbox, pred

In [9]:
#pix_feats, bbox, pred = get_lseg_feat(model, rgb, ['wall', 'other'], transform, crop_size, base_size, norm_mean, norm_std, vis=True)
#print(pix_feats)
#pix_feats.shape

In [10]:
%matplotlib inline
from pycocotools.coco import COCO
import numpy as np
import skimage.io as io
import matplotlib.pyplot as plt
import pylab
pylab.rcParams['figure.figsize'] = (8.0, 10.0)

import os
import zipfile

In [11]:
from pprint import pprint
import PIL

def SaveArchive(folder_to_save, where_to_save):
  try:
      os.mkdir(folder_to_save)
  except OSError as error:
      pass
  fantasy_zip = zipfile.ZipFile('{}.zip'.format(where_to_save), 'w')
  for folder, subfolders, files in os.walk(folder_to_save):
    for file in files:
      fantasy_zip.write(os.path.join(folder, file),
                        os.path.relpath(os.path.join(folder, file), folder_to_save),
                        compress_type = zipfile.ZIP_DEFLATED)
  fantasy_zip.close()

def ExtractArchive(filename, where_to_extract):
  try:
      os.mkdir(where_to_extract)
  except OSError as error:
      pass
  with zipfile.ZipFile(filename, 'r') as zip:
    zip.extractall(where_to_extract)

def ApplyFunction(original_folder, function, dir_to_save, dir_for_images):
  for folder, subfolders, files in os.walk(original_folder):
    for file in files:
      function(folder+'/'+file, dir_to_save, dir_for_images)

def CropImages(ann_file, dir_to_save, dir_for_images):
  try:
      os.mkdir(dir_to_save)
  except OSError as error:
      pass
  coco = COCO(ann_file)
  for key in coco.anns.keys():
    img_num = coco.anns[key]['image_id']
    image_filename = coco.imgs[img_num]['file_name']
    im = PIL.Image.open("{}/{}".format(dir_for_images, image_filename))
    box = list(map(int, coco.anns[key]['bbox']))
    category_id = coco.anns[key]['category_id']
    new_im = im.crop((box[0], box[1], box[0] + box[2], box[1] + box[3])) # (left, top, right, bottom)
    # new_im.show()
    new_im.save('{}/{}_{}.jpg'.format(dir_to_save, image_filename[:-4], key))
    #break

In [22]:
from prompt_toolkit.shortcuts.progress_bar.base import E
import json
import pycocotools._mask as mask_tool
from matplotlib import image as mpimg


def SaveJson(coco):
  d={"annotations":coco.anns, "images":coco.imgs, "categories":coco.cats}
  with open('/content/new_instances.json', 'w') as f:
    json.dump(d, f)

def convert_mask(mask):
    binary_mask = mask.astype(np.uint8)
    #mask.cpu().numpy().squeeze().astype(np.uint8)
    # Find the contours of the mask
    contours, hierarchy = cv2.findContours(binary_mask,
                                        cv2.RETR_EXTERNAL,
                                        cv2.CHAIN_APPROX_SIMPLE)
    if contours != ():
      # Get the largest contour based on area
      largest_contour = max(contours, key=cv2.contourArea)
      # Get the new bounding box
      bbox = [int(x) for x in cv2.boundingRect(largest_contour)]
      # Get the segmentation mask for object
      segmentation = largest_contour.flatten().tolist()
      return segmentation, contours
    else:
      return {}, contours

def get_annotation(img_id, category_id, segmentation, annotation_data, contours):
    annotation = {
        "image_id": img_id,
        "category_id": category_id,
        "segmentation": segmentation,
        "area": int(cv2.contourArea(contours[0])),
        "bbox": [int(x) for x in cv2.boundingRect(contours[0])],
        "iscrowd": 0,
        "score": 1
    }
    #try:
    photo = annotation_data['images'][img_id-1]
    #except:
    #print(img_id)
    #return None
    h, w = photo['height'], photo['width']
    #print(annotation_data['images'][img_id-1]['file_name'], h, w)
    segm = annotation['segmentation']
    # print(segm)
    if type(segm) == list:
        # polygon -- a single object might consist of multiple parts
        # we merge all parts into one mask rle code
        try:
          rles = mask_tool.frPyObjects([segm], h, w)
        except:
          print(type(segm), type(h), type(w))
          print(segm)
          return annotation
        # print(rles[0])

    annotation['segmentation'] = {
        "size": rles[0]['size'],
        "counts": rles[0]['counts'].decode("utf-8")
    }

    return annotation

from google.colab import drive
drive.mount('/content/drive')

def MakeMyModel():
  device = "cuda" if torch.cuda.is_available() else "cpu"
  print(device)
  crop_size = 480  # 480
  base_size = 520  # 520
  vis = False

  model = LSegEncNet("something", arch_option=0, block_depth=0, activation="lrelu", crop_size=crop_size)
  model_state_dict = model.state_dict()
  pretrained_state_dict = torch.load("/content/drive/MyDrive/demo_e200.ckpt")
  pretrained_state_dict = {k.lstrip("net."): v for k, v in pretrained_state_dict["state_dict"].items()}
  model_state_dict.update(pretrained_state_dict)
  model.load_state_dict(pretrained_state_dict)

  model.eval()
  model = model.to(device)

  norm_mean = [0.5, 0.5, 0.5]
  norm_std = [0.5, 0.5, 0.5]
  padding = [0.0] * 3
  transform = transforms.Compose(
      [
          transforms.ToTensor(),
          transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
      ]
  )
  return model, transform


def ApplyModel(model, transform, ann_file, images_dir, show_img=False): #folder_with_annotation,
  coco = COCO(ann_file)
  segm_pred_res = []
  bbox_pred_res = []
  jfile = open(ann_file)
  annotation_data = json.load(jfile)
  for key in coco.anns.keys():
    category_id = coco.anns[key]['category_id']
    category = coco.cats[category_id]['name']
    labels = [' '.join(category.split('.')), 'other']
    img_num = coco.anns[key]['image_id']
    image_path = "{}/{}".format(images_dir, coco.imgs[img_num]['file_name'])
    bgr = cv2.imread(image_path)
    try:
      rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
    except:
      continue
    _, lseg_bbox, pred  = get_lseg_feat(model, rgb, labels, transform,crop_size=480,
                            base_size=520,norm_mean=[0.5, 0.5, 0.5],
                            norm_std=[0.5, 0.5, 0.5],vis=show_img)
    if lseg_bbox ==[0, 0, 0, 0]:
      continue
    segmentation, contours = convert_mask(pred)
    #coco.anns[key]['lseg_bbox'] = list(lseg_bbox)
    #coco.anns[key]['segmentation'] = segmentation
    bbox_pred_res.append({"image_id": img_num, "category_id": category_id,
                          "bbox": lseg_bbox, "score": 1})
    if contours!=():
      segm_pred_res.append(get_annotation(img_num, category_id,
                                        segmentation, annotation_data, contours))
  with open('/content/segm_predicted_result_lseg.json', 'w') as f:
    json.dump(segm_pred_res, f)
  with open('/content/bbox_prediction_result_lseg.json', 'w') as f:
    json.dump(bbox_pred_res, f)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## inference model

In [13]:
images = 'rosbag_for_ann_final' # path to file with cropped images

images = '/content/{}.zip'.format(images)
ExtractArchive(images, '/content/rosbag_for_ann_final')

In [14]:
ann_file = '/content/indoor.json' # path to annotation file

In [15]:
model, transform = MakeMyModel()

100%|████████████████████████████████████████| 338M/338M [00:02<00:00, 168MiB/s]


Downloading model.safetensors:   0%|          | 0.00/1.22G [00:00<?, ?B/s]

In [23]:
ApplyModel(model, transform, ann_file,
                '/content/rosbag_for_ann_final/rosbag_for_ann_final', show_img=False)

loading annotations into memory...
Done (t=0.11s)
creating index...
index created!
