## GStreamer Basics - basic pipeline read buffer

### Resources:

Code adapted from:
https://paulbridger.com/posts/video-analytics-pytorch-pipeline/


Gstreamer References:
https://developer.download.nvidia.cn/embedded/L4T/r32_Release_v1.0/Docs/Accelerated_GStreamer_User_Guide.pdf

DeepStream:
https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_plugin_gst-nvinfer.html

In [1]:
import os
import sys
import time
sys.path.append(os.path.abspath("../Video-Swin-Transformer"))


import numpy as np
import torch, torchvision
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst

import mmcv
from mmcv import Config, DictAction
from mmcv.cnn import fuse_conv_bn
from mmcv.fileio.io import file_handlers
from mmcv.runner.fp16_utils import wrap_fp16_model
from mmcv.parallel import MMDataParallel
from mmcv.runner import get_dist_info, init_dist, load_checkpoint

from mmaction.models import build_model
from mmaction.utils import register_module_hooks

import contextlib

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using: {device}')

# Setup the cofiguration and data file
config_file = '../configs/bsl_config.py'
input_video_path = '../notebooks/source_video.mp4'
check_point_file = '../configs/best_model.pth'
frames_processed = 0
pixel_bytes = 4

Using: cuda


In [2]:
# Stream for my device - no display on screen  - extra nvvidconv needed since 
# gst buffer cannot read from memory otherwise. 
v7 = """
v4l2src device=/dev/video0 ! nvvidconv ! \
video/x-raw(memory:NVMM),framerate=(fraction)30/1,width=320,height=240 ! \
nvvidconv top=0 bottom=240 left = 90 right=320 ! \
video/x-raw(memory:NVMM),width=224,height=224 ! nvvidconv ! video/x-raw,format=RGBA ! \
fakesink name=webcam_stream
"""

v8 = """
v4l2src num-buffers=128 device=/dev/video0 ! nvvidconv ! \
video/x-raw(memory:NVMM),framerate=(fraction)30/1,width=320,height=240 ! \
nvvidconv top=0 bottom=240 left = 90 right=320 ! \
video/x-raw(memory:NVMM),format=RGBA,width=224,height=224 ! nvvidconv ! video/x-raw ! \
queue ! tee name=t t. ! queue ! fakesink name=webcam_stream sync=true t. ! \
queue ! nvvidconv ! nvegltransform ! nveglglessink sync=true
"""

stream_string = v8

### Do inference every 32 frames 

In [7]:
def get_model(config_file, check_point_file, device='cuda:0'):
# Create the configuration from the file 
# Customization for training the BSL data set
    cfg = Config.fromfile(config_file)
    cfg.model.cls_head.num_classes = 5
    cfg.data.test.test_mode = True

    # The flag is used to register module's hooks
    cfg.setdefault('module_hooks', [])

    # remove redundant pretrain steps for testing
    turn_off_pretrained(cfg.model)

    # build the model and load checkpoint
    model = build_model(cfg.model, train_cfg=None, test_cfg=cfg.get('test_cfg'))

    if len(cfg.module_hooks) > 0:
        register_module_hooks(model, cfg.module_hooks)

    fp16_cfg = cfg.get('fp16', None)
    if fp16_cfg is not None:
        wrap_fp16_model(model)

    load_checkpoint(model, check_point_file, map_location=device)
        
    return model # model = MMDataParallel(model, device_ids=[0])

def turn_off_pretrained(cfg):
    # recursively find all pretrained in the model config,
    # and set them None to avoid redundant pretrain steps for testing
    if 'pretrained' in cfg:
        cfg.pretrained = None

    # recursively turn off pretrained value
    for sub_cfg in cfg.values():
        if isinstance(sub_cfg, dict):
            turn_off_pretrained(sub_cfg)

def preprocess_batch(image_batch):
    
    preproc_time = time.time()
    mu_vec = np.reshape([123.675, 116.28, 103.53], (1, 1, 3)) 
    sigma_vec = np.reshape([58.395, 57.12, 57.375], (1, 1, 3))
    
    image_batch = np.divide((image_batch - mu_vec), sigma_vec)
    
    image_batch = image_batch.transpose(3,0,1,2)
    
    image_batch = image_batch[np.newaxis, np.newaxis, :, :, :, :]
    
    print(f"Preprocessing duration: {time.time()-preproc_time:.4f} sec")
    return image_batch

def preprocess_tensor_batch(image_batch):
    
    preproc_time = time.time()
    mu_vec = torch.from_numpy(np.reshape([123.675, 116.28, 103.53], (1, 1, 3))) 
    sigma_vec = torch.from_numpy(np.reshape([58.395, 57.12, 57.375], (1, 1, 3)))
    
    image_batch = torch.div((image_batch - mu_vec), sigma_vec)
    
    image_batch = image_batch.transpose(3,0,1,2)
    
    image_batch = image_batch[None, None, :, :, :, :]
    
    print(f"Preprocessing duration: {time.time()-preproc_time:.4f} sec")
    return image_batch

def interpret(outputs):
    text_form = ('all_done', 'water', 'poop', 'dad', 'mom')
    
    index = [np.argmax(x) for x in outputs][0]
    predicted_prob = outputs[0][index]
    action = text_form[index]
    
    if predicted_prob < 0.1:
        action = '..'  
    else:
        # Add the probability of that action and inference number
        action = f'{action} - prob {predicted_prob:.2f}'
    print(f'Action: {action}')

def on_frame_probe(pad, info):
    
    global t_start, frames_processed, image_batch
    t_start = time.time()
    
    buffer = info.get_buffer()
    print(f'[{buffer.pts / Gst.SECOND:6.2f}]', end='\x1b[2K\r')

    image_tensor = buffer_to_image_tensor(buffer, pad.get_current_caps())
    image_batch.append(image_tensor)

    # If batch not yet full, keep accumulating frames.
    if len(image_batch) < batch_size:
        return Gst.PadProbeReturn.OK
    
    #Do preprocessing on all 32 images at once. 
    image_batch = preprocess_batch(np.stack(image_batch, axis = 0))
    frames_processed += image_batch.shape[3]

    batch_tensors = torch.from_numpy(image_batch).to(torch.float32)
    with torch.no_grad():
        inf_timer = time.time()
        outputs = infer(model, {"imgs": batch_tensors,"label":0})
        interpret(outputs)
        image_batch = []
        print(f"Inference time: {time.time() - inf_timer:.4f} sec")
    return Gst.PadProbeReturn.OK

def buffer_to_image_tensor(buffer, caps):
    
    caps_structure = caps.get_structure(0)
    height = caps_structure.get_value('height')  
    width = caps_structure.get_value('width')

    is_mapped, map_info = buffer.map(Gst.MapFlags.READ)
    if is_mapped:
        try:
            # map buffer to numpy 
            image_array = np.ndarray(
                (height, width, pixel_bytes),
                dtype=np.uint8,
                buffer=map_info.data
            )
            # Return a copy of that array as a numpy array. 
            return image_array[:,:,:3].copy() 
        finally:
            #Clean up the buffer mapping
            buffer.unmap(map_info)

def infer(model, batch):
    """Test model with a single gpu.
    This method tests model with a single gpu and displays test progress bar.
    Args:
        model (nn.Module): Model to be tested.
        data_loader (nn.Dataloader): Pytorch data loader.
    Returns:
        list: The prediction results.
    """
    results = []
    result = model(return_loss=False, **batch)
    results.extend(result)
    return results

In [8]:
# Instantiate Pipeline
Gst.init()

v_timer = time.time()
frames_processed = 0
image_batch = []
batch_size = 32
batch_dim = (32,224,224)

#Load initial Constants
#global mu_vec, sigma_vec
#mu_vec, sigma_vec = instantiate_vectors(batch_dim)

print(f'Constants loaded in {time.time()-v_timer:.4f} seconds') 

m_timer = time.time() 
# Instantiate Model
model = get_model(config_file, check_point_file, device=device).eval()
print(f'Model loaded in {time.time()-m_timer:.4f} seconds') 

# Launch Pipeline
pipeline = Gst.parse_launch(f"{stream_string}")
print(f'Pipeline launched') 

# Probe read buffer and apply callback 'on_frame_probe'
pipeline.get_by_name('webcam_stream').get_static_pad('sink').add_probe(
    Gst.PadProbeType.BUFFER,
    on_frame_probe
)

# Set Pipeline to PLAYING
pipeline.set_state(Gst.State.PLAYING)

# Scan the pipeline bus for errors and other messages.
try:
    while True:
        msg = pipeline.get_bus().timed_pop_filtered(
            Gst.SECOND,
            Gst.MessageType.EOS | Gst.MessageType.ERROR
        )
        if msg:
            text = msg.get_structure().to_string() if msg.get_structure() else ''
            msg_type = Gst.message_type_get_name(msg.type)
            print(f'{msg.src.name}: [{msg_type}] {text}')
            break
            
# Break

except KeyboardInterrupt:
    print ('KeyboardInterrupt exception is caught')
    
# Write pipeline Graph
finally:
    print('Writing graph...')
    open(f'logs/logs.txt', 'w', encoding="utf8").write(
        Gst.debug_bin_to_dot_data(
            pipeline, Gst.DebugGraphDetails.ALL
        )
    )
    pipeline.set_state(Gst.State.NULL)
t_finish = time.time()
print(f'FPS: {frames_processed / (t_finish - t_start):.2f}')
print('done.')

Constants loaded in 0.0006 seconds
../Video-Swin-Transformer/configs/_base_/models/swin/swin_tiny.py
../Video-Swin-Transformer/configs/_base_/default_runtime.py
load checkpoint from local path: ../configs/best_model.pth
Model loaded in 18.3397 seconds
Pipeline launched
Preprocessing duration: 2.1786 sec
Action: dad - prob 0.32
Inference time: 31.2310 sec
Preprocessing duration: 0.4571 sec
Action: dad - prob 0.32
Inference time: 23.7572 sec
Preprocessing duration: 0.2045 sec
Action: dad - prob 0.33
Inference time: 22.3872 sec
Preprocessing duration: 0.2006 sec
Action: dad - prob 0.32
Inference time: 22.3723 sec
pipeline0: [eos] 
Writing graph...
FPS: 5.45
done.


### Code Graveyard

In [None]:
### Code Graveyard...
for words in ["these", "are", "my", "words", "!"]:
    print(words, end='\x1b[2K\r')
    time.sleep(1)
    print(" "*20, end="\r")

In [None]:
def get_model(config_file, check_point_file, device='cuda:0'):
# Create the configuration from the file 
# Customization for training the BSL data set
    cfg = Config.fromfile(config_file)
    cfg.model.cls_head.num_classes = 5
    cfg.data.test.test_mode = True

    # The flag is used to register module's hooks
    cfg.setdefault('module_hooks', [])

    # remove redundant pretrain steps for testing
    turn_off_pretrained(cfg.model)

    # build the model and load checkpoint
    model = build_model(cfg.model, train_cfg=None, test_cfg=cfg.get('test_cfg'))

    if len(cfg.module_hooks) > 0:
        register_module_hooks(model, cfg.module_hooks)

    fp16_cfg = cfg.get('fp16', None)
    if fp16_cfg is not None:
        wrap_fp16_model(model)

    load_checkpoint(model, check_point_file, map_location=device)
        
    return model # model = MMDataParallel(model, device_ids=[0])

def turn_off_pretrained(cfg):
    # recursively find all pretrained in the model config,
    # and set them None to avoid redundant pretrain steps for testing
    if 'pretrained' in cfg:
        cfg.pretrained = None

    # recursively turn off pretrained value
    for sub_cfg in cfg.values():
        if isinstance(sub_cfg, dict):
            turn_off_pretrained(sub_cfg)

def single_gpu_predictor(model, data_loader):
    """Test model with a single gpu.
    This method tests model with a single gpu and displays test progress bar.
    Args:
        model (nn.Module): Model to be tested.
        data_loader (nn.Dataloader): Pytorch data loader.
    Returns:
        list: The prediction results.
    """
    results = []
    #for data in data_loader:
    with torch.no_grad():
        result = model(return_loss=False, **data)
    results.extend(result)
    return results

def normalize(frame):
    mean = np.reshape([123.675, 116.28, 103.53], (1, 1, 3)) 
    std = np.reshape([58.395, 57.12, 57.375], (1, 1, 3))
    
    # Mean normalize then stdev normalize
    frame = (frame - mean) / std   
    return frame


def interpret(index):
    text_form = ['all_done', 'water', 'poop', 'dad', 'mom']

    return text_form[index]


def on_frame_probe(pad, info):
    
    global t_start, frames_processed
    t_start = time.time()
    
    buffer = info.get_buffer()
    print(f'[{buffer.pts / Gst.SECOND:6.2f}]')

    image_tensor = buffer_to_image_tensor(buffer, pad.get_current_caps())
    image_batch = image_tensor.unsqueeze(0).to(device)
    frames_processed += image_batch.size(0)

    with torch.no_grad():
    #detections = detector(image_batch)[0]
        #predictions = model(image_batch)[0]
        print('Inference Call Placeholder')
    return Gst.PadProbeReturn.OK

def buffer_to_image_tensor(buffer, caps):
    
    caps_structure = caps.get_structure(0)
    height = caps_structure.get_value('height')  
    width = caps_structure.get_value('width')

    is_mapped, map_info = buffer.map(Gst.MapFlags.READ)
    if is_mapped:
        try:
            image_array = np.ndarray(
                (height, width, pixel_bytes),
                dtype=np.uint8,
                buffer=map_info.data
            ).copy() # extend array lifetime beyond subsequent unmap
            return preprocess(image_array[:,:,:3]) # RGBA -> RGB
        finally:
            
            #Clean up the buffer mapping
            buffer.unmap(map_info)

In [None]:
# Instantiate Pipeline
Gst.init()

# Define Preprocess function. 
preprocess = torchvision.transforms.ToTensor()
m_timer = time.time()
# Instantiate Model
model = get_model(config_file, check_point_file, device=device).eval()
print(f'Model loaded in {time.time()-m_timer:.2f} seconds') 

# Launch Pipeline
pipeline = Gst.parse_launch(f"{stream_string}")

# Probe read buffer and apply callback 'on_frame_probe'
pipeline.get_by_name('webcam_stream').get_static_pad('sink').add_probe(
    Gst.PadProbeType.BUFFER,
    on_frame_probe
)

# Set Pipeline to PLAYING
pipeline.set_state(Gst.State.PLAYING)

# Scan the pipeline bus for errors and other messages.
try:
    while True:
        msg = pipeline.get_bus().timed_pop_filtered(
            Gst.SECOND,
            Gst.MessageType.EOS | Gst.MessageType.ERROR
        )
        if msg:
            text = msg.get_structure().to_string() if msg.get_structure() else ''
            msg_type = Gst.message_type_get_name(msg.type)
            print(f'{msg.src.name}: [{msg_type}] {text}')
            break
            
# Break

except KeyboardInterrupt:
    print ('KeyboardInterrupt exception is caught')
    
# Write pipeline Graph
finally:
    print('Writing graph...')
    open(f'logs/logs.txt', 'w', encoding="utf8").write(
        Gst.debug_bin_to_dot_data(
            pipeline, Gst.DebugGraphDetails.ALL
        )
    )
    pipeline.set_state(Gst.State.NULL)
t_finish = time.time()
print(f'FPS: {frames_processed / (t_finish - t_start):.2f}')
print('done.')