<a href="https://colab.research.google.com/github/Rodrigoradzinski/Segmenta-o-de-videos/blob/main/Segmenta%C3%A7%C3%A3o_Sem%C3%A2ntica_em_V%C3%ADdeos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Segmentação Semântica em Vídeos usando DeepLab

In [1]:
!wget -O 'people6.mp4' 'https://github.com/Rodrigoradzinski/Segmenta-o-de-videos/raw/main/people6.mp4'

--2023-05-18 23:56:58--  https://github.com/Rodrigoradzinski/Segmenta-o-de-videos/raw/main/people6.mp4
Resolving github.com (github.com)... 140.82.112.4
Connecting to github.com (github.com)|140.82.112.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/Rodrigoradzinski/Segmenta-o-de-videos/main/people6.mp4 [following]
--2023-05-18 23:56:59--  https://raw.githubusercontent.com/Rodrigoradzinski/Segmenta-o-de-videos/main/people6.mp4
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 24492547 (23M) [application/octet-stream]
Saving to: ‘people6.mp4’


2023-05-18 23:56:59 (139 MB/s) - ‘people6.mp4’ saved [24492547/24492547]



In [2]:
import os
import tarfile                   
import tempfile                  
from six.moves import urllib     
from io import BytesIO           

from matplotlib import gridspec
from matplotlib import pyplot as plt
import numpy as np
import cv2 
import IPython
from PIL import Image
from tqdm import tqdm
from tabulate import tabulate 
from sklearn.metrics import confusion_matrix

import tensorflow as tf
print(tf.__version__)

import warnings
warnings.simplefilter("ignore", DeprecationWarning)

2.12.0


# Construção da rede neural

modelo foi treinado com base na base de dados [Cityscapes](https://www.cityscapes-dataset.com/).

In [3]:
class ModeloDeepLab(object):
    """Classe para carregar o modelo deeplab e fazer a inferência."""

    FROZEN_GRAPH_NAME = 'frozen_inference_graph'

    def __init__(self, tarball_path):
        """ Cria e carrega o model deeplab pré-treinado. """
        self.graph = tf.Graph()
        graph_def = None

        # Extrai os frozen graph do arquivo tar.
        tar_file = tarfile.open(tarball_path)
        for tar_info in tar_file.getmembers():
            if self.FROZEN_GRAPH_NAME in os.path.basename(tar_info.name):
                file_handle = tar_file.extractfile(tar_info)
                graph_def = tf.compat.v1.GraphDef.FromString(file_handle.read())
                break
        tar_file.close()

        if graph_def is None:
            raise RuntimeError('Nao foi possivel encontrar o inference graph no arquivo tar.')

        with self.graph.as_default():
            tf.import_graph_def(graph_def, name='')

        self.sess = tf.compat.v1.Session(graph=self.graph)

    def run(self, image, INPUT_TENSOR_NAME = 'ImageTensor:0', OUTPUT_TENSOR_NAME = 'SemanticPredictions:0'):
        """Roda a inferência em uma imagem. 

        Parâmetros: 
            image: Objeto PIL.Image que contém a imagem de input. 
            INPUT_TENSOR_NAME: O nome do tensor de entrada. padrão=ImageTensor
            OUTPUT_TENSOR_NAME: O name do tensor de saída. padrão=SemanticPredictions

        Returns:
            resized_image: imagem de entrada RGB redimensionada  
            seg_map: Mapa de segmentação do `resized_image` 
        """
        width, height = image.size
        target_size = (2049,1025)  # tamanho das imagens do dataset Cityscapes
        resized_image = image.convert('RGB').resize(target_size, Image.ANTIALIAS)

        batch_seg_map = self.sess.run(
            OUTPUT_TENSOR_NAME,
            feed_dict={INPUT_TENSOR_NAME: [np.asarray(resized_image)]})
        
        seg_map = batch_seg_map[0]  # espera o batch size = 1
        if len(seg_map.shape) == 2:
            seg_map = np.expand_dims(seg_map,-1)  # adiciona uma dimensão extra, necessária pro cv2.resize

        seg_map = cv2.resize(seg_map, (width,height), interpolation=cv2.INTER_NEAREST)
        return seg_map

# Funções para visualização



In [4]:
def criar_colormap():
    """
    Retorna:
        Array numpy com o mapa de cores para visualizar resultados da segmentação. 
    """
    colormap = np.array([
        [128,  64, 128],
        [244,  35, 232],
        [ 70,  70,  70],
        [102, 102, 156],
        [190, 153, 153],
        [153, 153, 153],
        [250, 170,  30],
        [220, 220,   0],
        [120, 155,  42],
        [152, 251, 152],
        [ 93, 165, 227],
        [220,  20,  60],
        [255,   0,   0],
        [ 34,  34, 142],
        [  0,   0,  70],
        [  0,  60, 100],
        [  0,  80, 100],
        [  0,   0, 230],
        [119,  11,  32],
        [  0,   0,   0]], dtype=np.uint8)
    return colormap

In [5]:
def label_to_color_image(label):
    """
    Parâmetros:
        label: array 2D que armazena o rótulo da segmentação.

    Retorna:
        mapa de segmentação: array 2D com valores do tipo float. O elemento da array é a cor indexada pelo elemento correspondente no rótulo de input. 
        Ou seja, vai retornar uma imagem que corresponde à original, porém vai pintar os pixels com aquilo que o algoritmo previu
    """
    
    # retorna erro se o rótulo de entrada não possui 2 dimensões
    if label.ndim != 2:
        raise ValueError('Expect 2-D input label')

    colormap = criar_colormap()

    # retorna erro se o valor/índice do rótulo é maior que o valor de indice maximo permitida para a lista de cores.
    if np.max(label) >= len(colormap):
        raise ValueError('label value too large.')

    return colormap[label]

In [6]:
def vis_segmentacao(image, seg_map):
  
    seg_image = label_to_color_image(seg_map).astype(np.uint8)
    plt.figure(figsize=(20, 4))
    # cria os subplots no formato de grid
    grid_spec = gridspec.GridSpec(1, 4, width_ratios=[6, 6, 6, 1])

    # subplot para exibir a imagem original (imagem de entrada) 
    plt.subplot(grid_spec[0])
    plt.imshow(image)
    plt.axis('off')
    plt.title('Imagem original')

    # subplot para exibir o mapa de segmentação
    plt.subplot(grid_spec[1])
    plt.imshow(seg_image)
    plt.axis('off')
    plt.title('Mapa de segmentação')

    # subplot para exibir a sobreposição da segmentação 
    plt.subplot(grid_spec[2])
    plt.imshow(image)
    plt.imshow(seg_image, alpha=0.7)
    plt.axis('off')
    plt.title('Sobreposição da segmentação') 

    # subplot para exibir os rótulos, em forma de legendas (cor e nome lado a lado)
    unique_labels = np.unique(seg_map)
    ax = plt.subplot(grid_spec[3])
    plt.imshow(FULL_COLOR_MAP[unique_labels].astype(np.uint8), interpolation='nearest')
    ax.yaxis.tick_right()
    plt.yticks(range(len(unique_labels)), LABEL_NAMES[unique_labels])
    plt.xticks([], [])
    ax.tick_params(width=0.0)
    plt.grid('off')
    plt.show()

    # salva a sobreposição da imagem em uma variável, que será retornada pela função (para depois podermos salvar a imagem em um arquivo)
    img_final = cv2.addWeighted(np.uint8(image), 0.3, np.uint8(seg_image), 0.7, 0)

    return img_final

In [7]:
LABEL_NAMES = np.asarray([
    'rua', 'faixa de pedestres', 'prédios', 'muro', 'cerca', 'poste', 'luz de trânsito',
    'sinal de trânsito', 'mato/ vegetação', 'terreno', 'céu', 'pessoa', 'motorista', 'carro', 'caminhão',
    'ônibus', 'tren', 'moto', 'bicicleta', 'vazio'])

len(LABEL_NAMES)

20

In [8]:
FULL_LABEL_MAP = np.arange(len(LABEL_NAMES)).reshape(len(LABEL_NAMES), 1) # IDs das classes  
FULL_COLOR_MAP = label_to_color_image(FULL_LABEL_MAP)                     # cores associadas aos IDs

print(FULL_LABEL_MAP, FULL_COLOR_MAP)

[[ 0]
 [ 1]
 [ 2]
 [ 3]
 [ 4]
 [ 5]
 [ 6]
 [ 7]
 [ 8]
 [ 9]
 [10]
 [11]
 [12]
 [13]
 [14]
 [15]
 [16]
 [17]
 [18]
 [19]] [[[128  64 128]]

 [[244  35 232]]

 [[ 70  70  70]]

 [[102 102 156]]

 [[190 153 153]]

 [[153 153 153]]

 [[250 170  30]]

 [[220 220   0]]

 [[120 155  42]]

 [[152 251 152]]

 [[ 93 165 227]]

 [[220  20  60]]

 [[255   0   0]]

 [[ 34  34 142]]

 [[  0   0  70]]

 [[  0  60 100]]

 [[  0  80 100]]

 [[  0   0 230]]

 [[119  11  32]]

 [[  0   0   0]]]


# Carregamento da rede neural

Há dois modelos pré-treinados (*frozen graph*) que podemos usar, com dois backbones distintos: MobileNetV2 and Xception65. 
* **MobileNetV2** - permite fazer a inferência rapidamente, portanto processará imagem/vídeo em menos tempo. Seu backbone MobileNet foi projeto para rodar mais rápido em dispositivos mobile ou dispositivos que não possuem um hardware tão potente.

* **Xception65** - é mais custoso computacionalmente em relação ao MobileNet, porém oferece resultados mais precisos e consequentemente uma segmentação mais próxima da real.

Para esse dataset que veremos logo em seguida o MobileNetV2 obteve no geral bons resultados e muitas vezes até melhores, porém em outros tipos de imagens é provável que o Xception65 garanta resultados mais robustos no geral.

In [9]:
MODEL_NAME = 'mobilenetv2_coco_cityscapes_trainfine'
#MODEL_NAME = 'xception65_cityscapes_trainfine'

In [10]:
DOWNLOAD_URL_PREFIX = 'http://download.tensorflow.org/models/'
MODEL_URLS = {
    'mobilenetv2_coco_cityscapes_trainfine' : 'deeplabv3_mnv2_cityscapes_train_2018_02_05.tar.gz',
    'xception65_cityscapes_trainfine'       : 'deeplabv3_cityscapes_train_2018_02_06.tar.gz',
}

In [11]:
TARBALL_NAME = 'deeplab_model.tar.gz'
model_dir = tempfile.mkdtemp()
tf.io.gfile.makedirs(model_dir)

In [12]:
download_path = os.path.join(model_dir, TARBALL_NAME)
download_path

'/tmp/tmp9snv29x2/deeplab_model.tar.gz'

In [14]:
urllib.request.urlretrieve(DOWNLOAD_URL_PREFIX + MODEL_URLS[MODEL_NAME], download_path)

('/tmp/tmp9snv29x2/deeplab_model.tar.gz',
 <http.client.HTTPMessage at 0x7fd9bddba7d0>)

In [15]:
model = ModeloDeepLab(download_path)

In [17]:
def mostrar(imagem, imagem_cinza=False):
	fig = plt.gcf()
	fig.set_size_inches(18,6)
	if imagem_cinza:
		plt.imshow(imagem, cmap='gray')
	else:
		plt.imshow(cv2.cvtColor(imagem, cv2.COLOR_BGR2RGB), cmap='gray')
	plt.axis('off')
	plt.show()


# Segmentação em vídeo

In [18]:
def vis_segmentacao_video(image, seg_map, index):
    plt.figure(figsize=(12, 7))

    seg_image = label_to_color_image(seg_map).astype(np.uint8)
    plt.imshow(image)
    plt.imshow(seg_image, alpha=0.7)
    plt.axis('off')
    plt.title('segmentação | frame #%d'%index)
    plt.grid('off')
    plt.tight_layout()

    # Mostra os resultados em tempo real, conforme é executado cada frame do vídeo (como se fosse um streaming mesmo) 
    f = BytesIO()
    plt.savefig(f, format='jpeg')
    IPython.display.display(IPython.display.Image(data=f.getvalue()))
    f.close()
    plt.close()

    img_final = cv2.addWeighted(np.uint8(image), 0.3, np.uint8(seg_image), 0.7, 0)
    return img_final

In [67]:
def executar_visualizacao_video(frame, index):
    original_im = Image.fromarray(frame[..., ::-1])
    seg_map = model.run(original_im)
    frame_processado = vis_segmentacao_video(original_im, seg_map, index)
    img_processada = cv2.cvtColor(frame_processado, cv2.COLOR_RGB2BGR)
    return processed_frames 

In [70]:
def executar_visualizacao_video(frames, index):
    processed_frames = []
    for frame in frames:
        original_im = Image.fromarray(frame[..., ::-1])
        seg_map = model.run(original_im)
        frame_processado = vis_segmentacao_video(original_im, seg_map, index)
        img_processada = cv2.cvtColor(frame_processado, cv2.COLOR_RGB2BGR)
        processed_frames.append(img_processada)
    return processed_frames

In [36]:
import numpy as np

def show_frames_side_by_side(frames):
    fig, axes = plt.subplots(3, 2, figsize=(10, 15))
    axes = axes.flatten()

    for i, frame in enumerate(frames):
        axes[i].imshow(frame)
        axes[i].axis('off')

    plt.tight_layout()
    plt.show()



In [20]:
def configs_video(video_largura, video_altura, nome_arquivo='resultado.avi'): 
  fourcc = cv2.VideoWriter_fourcc(*'XVID') 
  fps = 24
  saida_video = cv2.VideoWriter(nome_arquivo, fourcc, fps, (video_largura, video_altura))
  return saida_video 

In [84]:
import numpy as np

def executar_visualizacao_video(frames, index):
    processed_frames = []
    for frame in frames:
        original_im = Image.fromarray(frame[..., ::-1])
        seg_map = model.run(original_im)
        frame_processado = vis_segmentacao_video(original_im, seg_map, index)
        img_processada = cv2.cvtColor(frame_processado, cv2.COLOR_RGB2BGR)
        processed_frames.append(img_processada)
    return processed_frames


def show_frames_side_by_side(processed_frames):
    fig, axes = plt.subplots(3, 2, figsize=(10, 10))
    axes = axes.flatten()

    for i, processed_frames in enumerate(processed_frames):
        axes[i].imshow(processed_frames)
        axes[i].axis('off')

    # Para casos em que não há frames suficientes para preencher a grade completa
    for j in range(len(processed_frames), 6):
        axes[j].axis('off')

    plt.tight_layout()
    plt.show()


def vis_segmentacao_video(image, seg_map, index):
    plt.figure(figsize=(12, 7))

    seg_image = label_to_color_image(seg_map).astype(np.uint8)
    plt.imshow(image)
    plt.imshow(seg_image, alpha=0.7)
    plt.axis('off')
    plt.title('segmentação | frame #%d' % index)
    plt.grid('off')
    plt.tight_layout()

    # Mostra os resultados em tempo real, conforme é executado cada frame do vídeo (como se fosse um streaming mesmo) 
    f = BytesIO()
    plt.savefig(f, format='jpeg')
    IPython.display.display(IPython.display.Image(data=f.getvalue()))
    f.close()
    plt.close()

    img_final = cv2.addWeighted(np.uint8(image), 0.3, np.uint8(seg_image), 0.7, 0)
    return img_final


video_teste = '/content/people6.mp4'

cap = cv2.VideoCapture(video_teste)
num_frames = 200  
frame_inicio = 0  
frame_atual = 0   

conectado, video = cap.read()
video_largura, video_altura = video.shape[1], video.shape[0]
saida_video = configs_video(video_largura, video_altura)

frames = []

try:
    for i in range(num_frames):
        _, frame = cap.read()
        if not _:
            break

        if i < frame_inicio:
            continue

        frames.append(frame)

        if len(frames) == 6:
            show_frames_side_by_side(frames)
            IPython.display.clear_output(wait=True)
            frames = []

        frame_atual += 1
        saida_video.write(frame) 

except KeyboardInterrupt:
    plt.close()
    print("Stream stopped.")

print("Finalizado")
saida_video.release()


Finalizado


In [22]:
!ffmpeg -y -loglevel panic -i resultado.avi resultado_driveseg.mp4

In [23]:
def exibir_video(nome_arquivo, largura=720, altura=480):
  import io
  import base64
  from IPython.display import HTML
  video_encoded = base64.b64encode(io.open(nome_arquivo, 'rb').read())
  return HTML(data='''<video width="{0}" height="{1}" alt="Video teste" controls>
                        <source src="data:video/mp4;base64,{2}" type="video/mp4" />
                      </video>'''.format(largura, altura, video_encoded.decode('ascii')))

In [24]:
exibir_video('resultado_driveseg.mp4')