Skip to content
This repository was archived by the owner on Jun 10, 2024. It is now read-only.
This repository was archived by the owner on Jun 10, 2024. It is now read-only.

Multi threading with Pytorch #416

@NguyenVanThanhHust

Description

@NguyenVanThanhHust

Hi. I want to use multi threading decoded and classify with pytorch.

I tried SampleDecode Multi Threading and SamplePytorchh Resnet seperately successfully.

I tried to push decoded imgs to queue and pytorch read from queue.
I created threads to decode and 1 thread to clasiffy
But the thread that process video doesn't start after all previous decoded thread finish

Can you make an example how to do that? That would be very help full.

Below is my code when I tried to use mentioned approach

#
# Copyright 2021 NVIDIA Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Starting from Python 3.8 DLL search policy has changed.
# We need to add path to CUDA DLLs explicitly.
import sys
import os

import torch
import torchvision

if os.name == 'nt':
    # Add CUDA_PATH env variable
    cuda_path = os.environ["CUDA_PATH"]
    if cuda_path:
        os.add_dll_directory(cuda_path)
    else:
        print("CUDA_PATH environment variable is not set.", file=sys.stderr)
        print("Can't set CUDA DLLs search path.", file=sys.stderr)
        exit(1)

    # Add PATH as well for minor CUDA releases
    sys_path = os.environ["PATH"]
    if sys_path:
        paths = sys_path.split(';')
        for path in paths:
            if os.path.isdir(path):
                os.add_dll_directory(path)
    else:
        print("PATH environment variable is not set.", file=sys.stderr)
        exit(1)

import pycuda.driver as cuda
import PyNvCodec as nvc
import PytorchNvCodec as pnvc
import numpy as np
from queue import Queue

main_queue = Queue()

from category import categories

from threading import Thread
import time

class Worker(Thread):
    def __init__(self, gpuID, encFile, threadId):
        Thread.__init__(self)
        # Retain primary CUDA device context and create separate stream per thread.
        self.ctx = cuda.Device(gpuID).retain_primary_context()
        self.ctx.push()
        self.str = cuda.Stream()
        self.ctx.pop()
        self.gpuId = gpuID
        self.threadId = threadId

        # Resnet expects images to be 3 channel planar RGB of 224x244 size at least.
        self.target_w, self.target_h = 224, 224


        # Create Decoder with given CUDA context & stream.
        self.nvDec = nvc.PyNvDecoder(encFile, self.ctx.handle, self.str.handle)
        
        width, height = self.nvDec.Width(), self.nvDec.Height()
        hwidth, hheight = int(width / 2), int(height / 2)

        # Determine colorspace conversion parameters.
        # Some video streams don't specify these parameters so default values
        # are most widespread bt601 and mpeg.
        cspace, crange = self.nvDec.ColorSpace(), self.nvDec.ColorRange()
        if nvc.ColorSpace.UNSPEC == cspace:
            cspace = nvc.ColorSpace.BT_601
        if nvc.ColorRange.UDEF == crange:
            crange = nvc.ColorRange.MPEG
        self.cc_ctx = nvc.ColorspaceConversionContext(cspace, crange)
        print('Color space: ', str(cspace))
        print('Color range: ', str(crange))

        # Initialize colorspace conversion chain
        if self.nvDec.ColorSpace() != nvc.ColorSpace.BT_709:
            self.nvYuv = nvc.PySurfaceConverter(width, height, self.nvDec.Format(), nvc.PixelFormat.YUV420, self.ctx.handle, self.str.handle)
        else:
            self.nvYuv = None

        self.to_dim = nvc.PySurfaceResizer(self.target_w, self.target_h, nvc.PixelFormat.YUV420,
                                  self.gpuId)

        self.to_rgb = nvc.PySurfaceConverter(self.target_w, self.target_h,
                                        nvc.PixelFormat.YUV420, nvc.PixelFormat.RGB,
                                        self.gpuId)

        self.to_pln = nvc.PySurfaceConverter(self.target_w, self.target_h, nvc.PixelFormat.RGB,
                                        nvc.PixelFormat.RGB_PLANAR, self.gpuId)

        # Use most widespread bt601 and mpeg just for illustration purposes.
        self.cc_ctx = nvc.ColorspaceConversionContext(nvc.ColorSpace.BT_601,
                                             nvc.ColorRange.MPEG)
        self.num_frame = 0

    def run(self):
        try:
            while True:
                try:
                    # Decode 1 compressed video frame to CUDA memory.
                    self.rawSurface = self.nvDec.DecodeSingleSurface()
                    if (self.rawSurface.Empty()):
                        print('No more video frames')
                        break
                except nvc.HwResetException:
                    print('Continue after HW decoder was reset')
                    continue
 
                yuvSurface = self.nvYuv.Execute(self.rawSurface, self.cc_ctx)
                if (yuvSurface.Empty()):
                    print('Failed to do yuv conversion')
                    break

                # Downscale YUV420.
                yuv_small = self.to_dim.Execute(yuvSurface)
                if yuv_small.Empty():
                    print('Can not downscale yuv420 surface')
                    break

                # Convert from YUV420 to interleaved RGB.
                rgb24_small = self.to_rgb.Execute(yuv_small, self.cc_ctx)
                if rgb24_small.Empty():
                    print('Can not convert yuv420 -> rgb')
                    break

                # Convert to planar RGB.
                rgb24_planar = self.to_pln.Execute(rgb24_small, self.cc_ctx)
                if rgb24_planar.Empty():
                    print('Can not convert rgb -> rgb planar')
                    break

                # Export to PyTorch tensor
                surf_plane = rgb24_planar.PlanePtr()
                img_tensor = pnvc.makefromDevicePtrUint8(surf_plane.GpuMem(),
                                                        surf_plane.Width(),
                                                        surf_plane.Height(),
                                                        surf_plane.Pitch(),
                                                        surf_plane.ElemSize())

                img_tensor.resize_(3, self.target_h, self.target_w)
                img_np = img_tensor.detach().cpu().numpy()
                # print(type(img_tensor), img_tensor.device)
                # img_tensor = img_tensor.type(dtype=torch.cuda.FloatTensor)
                img_tensor = torch.divide(img_tensor, 255.0)

                # data_transforms = torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406],
                #                                                 std=[0.229, 0.224, 0.225])
                # surface_tensor = data_transforms(img_tensor)
                main_queue.put(img_np)
                print("main_queue.qsize(): ", main_queue.qsize())
                
                self.num_frame += 1
                if(0 == self.num_frame % 10):
                    print(self.num_frame)
                    print(self.threadId, main_queue.qsize())

        except Exception as e:
            print(getattr(e, 'message', str(e)))
            fout.close()

class MainThread(Thread):
    def __init__(self, ):
        Thread.__init__(self)
        self.model = torchvision.models.resnet50(pretrained=True)
        self.model.eval()
        self.model.to('cuda')
        print("run here")

    def run(self):
        print("run here too")
        try:
            print("run here too")
            while not main_queue.empty():
                # surface_tensor = main_queue.get()
                surface_np = main_queue.get()
                surface_tensor = torch.from_numpy(surface_np)
                surface_tensor = surface_tensor.cuda().float()
                input_batch = surface_tensor.unsqueeze(0).to('cuda')

                # Run inference.
                with torch.no_grad():
                    output = self.model(input_batch)

                probabilities = torch.nn.functional.softmax(output[0], dim=0)

                top5_prob, top5_catid = torch.topk(probabilities, 5)
                for i in range(top5_prob.size(0)):
                    print(categories[top5_catid[i]], top5_prob[i].item())
        except Exception as e:
            print("Exception: ", e)
            time.sleep(1)

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print('Provide gpu ID, paths to input video file.')
        exit

    gpu_id = int(sys.argv[1])
    input_video = sys.argv[2]


    cuda.init()
    num_threads = 2
    thread_pool = []
    for i in range(0, num_threads):
        thread = Worker(gpu_id, input_video, i+1)
        thread.start()
        thread_pool.append(thread)
    mainThread = MainThread()
    mainThread.start()


    # run_inference_on_video(gpu_id, input_video)

    for thread in thread_pool:
        thread.join()
    mainThread.join()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions