https://www.tensorflow.org/lite/guide/inference

https://github.com/tensorflow/examples/blob/master/lite/examples/posenet/android/posenet/src/main/java/org/tensorflow/lite/examples/posenet/lib/Posenet.kt

# Pose Estimation @ Edge AI



In [None]:
import os
import numpy as np
from PIL import Image
from PIL import ImageDraw

In [None]:
import io
from IPython.display import display
from IPython.display import clear_output
import ipywidgets
from base64 import b64decode , b64encode
import cv2

In [None]:
cv2.__version__

In [None]:
# import tensorrt
# import tensorflow as tf
import tflite_runtime.interpreter as tflite

In [None]:
import numpy as np

In [None]:
import pathlib
import os

In [None]:
path = pathlib.Path("/home/unccv/drone_project")

In [None]:
#HEIGHT, WIDTH  = input_details[0]["shape"][1:3]
HEIGHT , WIDTH =353,257

In [None]:
test_img = Image.open(os.path.join(path , "test_img.jpg"))

In [None]:
test_img.resize((256,256))

In [None]:
interpreter = tf.lite.Interpreter(model_path=os.path.join(path , "posenet_mobilenet_float_075_1_default_1.tflite"))

In [None]:
!free -m

In [None]:
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

In [None]:
input_details

In [None]:
output_details

In [None]:
input_data = np.expand_dims(test_img.resize((WIDTH ,HEIGHT)), axis=0)
input_data.shape

In [None]:
input_mean , input_std = 127.5  ,127.5
input_data = (np.float32(input_data) - input_mean) / input_std

In [None]:
interpreter.set_tensor(input_details[0]['index'], input_data)

In [None]:
interpreter.invoke()

In [None]:
output_data = interpreter.get_tensor(output_details[0]['index'])
results = np.squeeze(output_data)

In [None]:
results.shape

In [None]:
[None]*10

In [None]:
for idx,it in enumerate(BodyPart):
    print(idx,it)

In [None]:
import enum
import numpy as np
import tflite_runtime.interpreter as tflite

class BodyPart(enum.IntEnum):
    __order__ = "NOSE LEFT_EYE RIGHT_EYE LEFT_EAR RIGHT_EAR LEFT_SHOULDER RIGHT_SHOULDER LEFT_ELBOW RIGHT_ELBOW LEFT_WRIST RIGHT_WRIST LEFT_HIP RIGHT_HIP LEFT_KNEE RIGHT_KNEE LEFT_ANKLE RIGHT_ANKLE"
    NOSE = 0
    LEFT_EYE = 1
    RIGHT_EYE= 2
    LEFT_EAR= 3
    RIGHT_EAR= 4
    LEFT_SHOULDER= 5
    RIGHT_SHOULDER = 6
    LEFT_ELBOW = 7
    RIGHT_ELBOW = 8
    LEFT_WRIST= 9
    RIGHT_WRIST= 10
    LEFT_HIP= 11
    RIGHT_HIP= 12
    LEFT_KNEE= 13
    RIGHT_KNEE = 14
    LEFT_ANKLE = 15
    RIGHT_ANKLE = 16

class Position:
    def __init__(self, x=0,y=0):
        self.x = x
        self.y = y

class KeyPoint:
    def __init__(self,bodypart = BodyPart.NOSE, position = Position() , score=0.0 ):
        self.bodyPart = bodypart
        self.position = position
        self.score = score

class Person:
    def __init__(self,keypoints = [] , score=0.0):
        self.keyPoints = keypoints
        self.score = score

class Posenet:

    def __init__(self,model_path="posenet_model.tflite"):
        self.lastInferenceTimeNanos = -1
        self.interpreter = None
        self.gpuDelegate = None
        self.model_path = model_path
        self.NUM_LITE_THREADS  = 4


    def getInterpreter(self):
        if self.interpreter is not None:
            return self.interpreter
        interpreter = tflite.Interpreter(model_path=self.model_path , num_threads = self.NUM_LITE_THREADS)
        interpreter.allocate_tensors()
        self.input_details = interpreter.get_input_details()
        self.output_details = interpreter.get_output_details()
        self.interpreter = interpreter
        return interpreter

    def close(self):
        self.interpreter.close()
        self.interpreter = None

    def sigmoid(self , x):
        return (1 / (1 + np.exp(-x)))

    def getKeyPointLocations(self, heatmaps):
        height , width , numKeyPoints = heatmaps.shape
        keypointPositions = [None]*numKeyPoints
        for keypoint in range(numKeyPoints):
            maxVal  = heatmaps[0][0][keypoint ]
            maxRow  , maxCol = 0,0
            for row in range(height):
                for col in range(width):
                     if (heatmaps[row][col][keypoint] > maxVal):
                         maxVal = heatmaps[row][col][keypoint]
                         maxRow = row
                         maxCol = col

            keypointPositions[keypoint] = (maxRow, maxCol)

        return keypointPositions

    def getConfidenceScores(self,heatmaps ,offsets,keypointPositions , height , width, HEIGHT , WIDTH):
        numKeyPoints = len(keypointPositions)
        xCoords = np.zeros(numKeyPoints)
        yCoords = np.zeros(numKeyPoints)
        confidenceScores  = np.zeros(numKeyPoints)

        for idx ,position in enumerate(keypointPositions):
            positionY  = keypointPositions[idx][0]
            positionX = keypointPositions[idx][1]
            yCoords[idx] = int( position[0] / float(height - 1) * HEIGHT + offsets[positionY][positionX][idx])
            xCoords[idx] = int( position[1] / float(width - 1) * WIDTH + offsets[positionY][positionX][idx])
            confidenceScores[idx] = self.sigmoid(heatmaps[positionY][positionX][idx])

        return xCoords , yCoords , confidenceScores

    def getPersonDetails(self , numKeyPoints , xCoords , yCoords,confidenceScores):
        person = Person()
        keypointList = []
        totalScore = 0
        for idx,it in enumerate(BodyPart):
            kp = KeyPoint()
            kp.bodyPart = it
            kp.position = Position(xCoords[idx],yCoords[idx]) 
            kp.score  = confidenceScores[idx]
            keypointList.append(kp)
            
            totalScore += confidenceScores[idx]

        person.keyPoints = keypointList
        person.score = totalScore / numKeyPoints
        return person

    def estimateSinglePose(self, image):
        self.getInterpreter()
        
        HEIGHT, WIDTH  = self.input_details[0]["shape"][1:3]
        input_data = np.expand_dims(image.resize((WIDTH ,HEIGHT)), axis=0)
        input_mean , input_std = 127.5  ,127.5
        input_data = (np.float32(input_data) - input_mean) / input_std

        self.interpreter.set_tensor(self.input_details[0]['index'], input_data)
        self.interpreter.invoke()

        heatmaps  = self.interpreter.get_tensor(self.output_details[0]['index'])
        heatmaps  = np.squeeze(heatmaps)

        offsets   = self.interpreter.get_tensor(self.output_details[1]['index'])
        offsets   = np.squeeze(offsets )

        height , width , numKeyPoints = heatmaps.shape

        keypointPositions = self.getKeyPointLocations(heatmaps )
        
        xCoords , yCoords , confidenceScores = (self.getConfidenceScores(heatmaps
                                                    , offsets
                                                    ,keypointPositions
                                                    , height
                                                    , width
                                                    , HEIGHT
                                                    , WIDTH))
        
        
        print(xCoords , yCoords,confidenceScores)
        
        person = self.getPersonDetails( numKeyPoints , xCoords , yCoords,confidenceScores)
        return person
    
    def getDrawnImage(self, image):
        person = self.estimateSinglePose(image)
        out_img = np.array(image.resize((WIDTH ,HEIGHT)))
        for keypoint in person.keyPoints:
            out_img = cv2.circle( out_img , (int(keypoint.position.x) , int(keypoint.position.y)) , 10 , (42, 157, 143))
        return out_img

In [None]:
pnet = Posenet(os.path.join(path , "posenet_mobilenet_float_075_1_default_1.tflite"))

In [None]:
person = pnet.estimateSinglePose(test_img)

In [None]:
person.keyPoints[0].bodyPart

In [None]:
person.keyPoints[0].position.x

In [None]:
person.keyPoints[14].bodyPart

In [None]:
person.keyPoints[14].position.x

In [None]:
class StickMan:
    
    def lineBetweenPoints(self,image,pointA , pointB):
        return cv2.line(image 
                        , (int(pointA.position.x) , int(pointA.position.y)) 
                        , (int(pointB.position.x) , int(pointB.position.y))
                       , (42, 157, 143) , 5)
        
    def draw(self ,image , keypoints):
        out_img = np.array(image.resize((WIDTH ,HEIGHT)))
        for keypoint in person.keyPoints:
            out_img = cv2.circle( out_img , (int(keypoint.position.x) , int(keypoint.position.y)) , 5 , (42, 157, 143) , -1)
        
        
        out_img =  self.lineBetweenPoints(out_img , keypoints[int(BodyPart.LEFT_WRIST)] , keypoints[int(BodyPart.LEFT_ELBOW)])
        out_img =  self.lineBetweenPoints(out_img , keypoints[int(BodyPart.LEFT_ELBOW)] , keypoints[int(BodyPart.LEFT_SHOULDER)])
        out_img =  self.lineBetweenPoints(out_img , keypoints[int(BodyPart.LEFT_SHOULDER)] , keypoints[int(BodyPart.RIGHT_SHOULDER)])
        out_img =  self.lineBetweenPoints(out_img , keypoints[int(BodyPart.RIGHT_SHOULDER)] , keypoints[int(BodyPart.RIGHT_ELBOW)])
        out_img =  self.lineBetweenPoints(out_img , keypoints[int(BodyPart.RIGHT_ELBOW)] , keypoints[int(BodyPart.RIGHT_WRIST)])
        out_img =  self.lineBetweenPoints(out_img , keypoints[int(BodyPart.RIGHT_ELBOW)] , keypoints[int(BodyPart.RIGHT_WRIST)])
        
        out_img =  self.lineBetweenPoints(out_img , keypoints[int(BodyPart.LEFT_HIP)] , keypoints[int(BodyPart.LEFT_KNEE)])
        out_img =  self.lineBetweenPoints(out_img , keypoints[int(BodyPart.LEFT_KNEE)] , keypoints[int(BodyPart.LEFT_ANKLE)])
        
        out_img =  self.lineBetweenPoints(out_img , keypoints[int(BodyPart.RIGHT_HIP)] , keypoints[int(BodyPart.RIGHT_KNEE)])
        out_img =  self.lineBetweenPoints(out_img , keypoints[int(BodyPart.RIGHT_KNEE)] , keypoints[int(BodyPart.RIGHT_ANKLE)])
        
        return out_img

In [None]:
Image.fromarray(StickMan().draw(test_img  ,person.keyPoints))

In [None]:
Image.fromarray(out_img)

In [None]:
Image.fromarray(pnet.getDrawnImage(test_img))

In [None]:
#from jetcam.usb_camera import USBCamera
import cv2
from jetcam.csi_camera import CSICamera
from jetcam.utils import bgr8_to_jpeg

WIDTH = 224
HEIGHT = 224
#camera = USBCamera(width=WIDTH, height=HEIGHT, capture_fps=30)
camera = CSICamera(width=WIDTH, height=HEIGHT, capture_fps=10)

#camera.running = True

In [2]:
import cv2
HEIGHT , WIDTH =353,257

def gstreamer_pipeline (capture_width=1640, capture_height=1232 , display_width=244, 
     display_height=244, framerate=21, flip_method=2) :   
     return ('nvarguscamerasrc ! ' 
    'video/x-raw(memory:NVMM), '
    'width=(int)%d, height=(int)%d, '
    'format=(string)NV12, framerate=(fraction)%d/1 ! '
    'nvvidconv flip-method=%d ! '
    'video/x-raw, width=(int)%d, height=(int)%d, format=(string)BGRx ! '
    'videoconvert ! video/x-raw,format=BGR !'
    #'videorate ! video/x-raw,framerate=5/1 !'
    #'appsink wait-on-eos=false max-buffers=2 drop=True'
    'appsink'% (capture_width,capture_height,framerate,flip_method,display_width,display_height))
    
camera2 = cv2.VideoCapture(gstreamer_pipeline(display_width = WIDTH , display_height=HEIGHT))#, cv2.CAP_GSTREAMER)
#camera2.set(cv2.CAP_PROP_BUFFERSIZE, 1)
#retval, im = camera2.read()

In [3]:
!free -m

              total        used        free      shared  buff/cache   available
Mem:           1971         869         513          10         588         997
Swap:          5081         398        4683


In [None]:
camera2.release()

In [None]:
camera2.read()

In [4]:
def cam_read():
    #return camera2.read()#camera2.retrieve(camera2.grab())
    img = camera2.read()[1]
    #img = camera.value
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return _ , img
    #return _,camera.value
    #return _, camera.read()

In [None]:
#ret,img = cam_read()
ret,img = cam_read()

In [None]:
ret

In [None]:
img.shape

In [None]:
stick_man = StickMan()

In [5]:
image_widget = ipywidgets.Image(format='jpg' , height=256 ,width=256)
display(image_widget)

NameError: name 'ipywidgets' is not defined

In [None]:
try:
    while True:
        image_widget.value =  cv2.imencode('.jpg',cam_read()[1])[1].tobytes()
except KeyboardInterrupt:
    print("Breaking")

In [None]:
try:
    while True:
        img =cam_read()[1]
        pil_img = Image.fromarray(img)
        person = pnet.estimateSinglePose(pil_img)
        img = stick_man.draw(pil_img,person.keyPoints)
        image_widget.value =  cv2.imencode('.jpg',img)[1].tobytes()
        clear_output(wait=True)
except KeyboardInterrupt:
    print("Breaking")


In [None]:
input_data = np.expand_dims(test_img.resize((WIDTH ,HEIGHT)), axis=0)
input_data.shape

In [None]:
input_mean , input_std = 127.5  ,127.5
input_data = (np.float32(input_data) - input_mean) / input_std

In [None]:
interpreter.set_tensor(input_details[0]['index'], input_data)

In [None]:
output_data = interpreter.get_tensor(output_details[0]['index'])
results = np.squeeze(output_data)

In [None]:
results.shape

https://stackoverflow.com/questions/60032705/how-to-parse-the-heatmap-output-for-the-pose-estimation-tflite-model

https://github.com/google-coral/project-posenet

In [None]:
def get_keypoints(heatmaps, offsets, output_stride=32):
        scores = sigmoid(heatmaps)
        num_keypoints = scores.shape[2]
        heatmap_positions = []
        offset_vectors = []
        confidences = []
        for ki in range(0, num_keypoints ):
            x,y = np.unravel_index(np.argmax(scores[:,:,ki]), scores[:,:,ki].shape)
            confidences.append(scores[x,y,ki])
            offset_vector = (offsets[y,x,ki], offsets[y,x,num_keypoints+ki])
            heatmap_positions.append((x,y))
            offset_vectors.append(offset_vector)
        image_positions = np.add(np.array(heatmap_positions) * output_stride, offset_vectors)
        keypoints = [KeyPoint(i, pos, confidences[i]) for i, pos in enumerate(image_positions)]
        return keypoints

In [None]:
!free -m

First, let's load the JSON file which describes the human pose task.  This is in COCO format, it is the category descriptor pulled from the annotations file.  We modify the COCO category slightly, to add a neck keypoint.  We will use this task description JSON to create a topology tensor, which is an intermediate data structure that describes the part linkages, as well as which channels in the part affinity field each linkage corresponds to.

In [None]:
import json
import trt_pose.coco

with open('human_pose.json', 'r') as f:
    human_pose = json.load(f)

topology = trt_pose.coco.coco_category_to_topology(human_pose)

In [None]:
!free -m

In [None]:
import torch2trt
import torch
OPTIMIZED_MODEL = 'resnet18_baseline_att_224x224_A_epoch_249_trt.pth'
HEIGHT = 224
WIDTH = 224
data = torch.zeros((1, 3, HEIGHT, WIDTH)).cuda()

In [None]:
from torch2trt import TRTModule

model_trt = TRTModule()
model_trt.load_state_dict(torch.load(OPTIMIZED_MODEL))

In [None]:
!free -m

Next, let's define a function that will preprocess the image, which is originally in BGR8 / HWC format.

In [None]:
import cv2
import torchvision.transforms as transforms
import PIL.Image

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda()
device = torch.device('cuda')

def preprocess(image):
    global device
    device = torch.device('cuda')
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device)
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

Next, we'll define two callable classes that will be used to parse the objects from the neural network, as well as draw the parsed objects on an image.

In [None]:
from trt_pose.draw_objects import DrawObjects
from trt_pose.parse_objects import ParseObjects

parse_objects = ParseObjects(topology)
draw_objects = DrawObjects(topology)

Assuming you're using NVIDIA Jetson, you can use the [jetcam](https://github.com/NVIDIA-AI-IOT/jetcam) package to create an easy to use camera that will produce images in BGR8/HWC format.

If you're not on Jetson, you may need to adapt the code below.

Next, we'll create a widget which will be used to display the camera feed with visualizations.

Finally, we'll define the main execution loop.  This will perform the following steps

1.  Preprocess the camera image
2.  Execute the neural network
3.  Parse the objects from the neural network output
4.  Draw the objects onto the camera image
5.  Convert the image to JPEG format and stream to the display widget

In [None]:
def execute(image):
    #image = change['new']
    data = preprocess(image)
    cmap, paf = model_trt(data)
    cmap, paf = cmap.detach().cpu(), paf.detach().cpu()
    counts, objects, peaks = parse_objects(cmap, paf)#, cmap_threshold=0.15, link_threshold=0.15)
    draw_objects(image, counts, objects, peaks)
#     image_w.value = bgr8_to_jpeg(image[:, ::-1, :])
#     #return bgr8_to_jpeg(image[:, ::-1, :])
#     return show_local_img(image[:, ::-1, :] , 256,256)
    return image

In [None]:
import os
import numpy
import time
import PIL.Image
outpath = os.path.abspath(os.path.join("." , ".." , ".." , ".." , ".." , "drone_project" , "outdump"))
outpath

In [None]:
image_widget = ipywidgets.Image(format='jpg' , height=256 ,width=256)
display(image_widget)

In [None]:
!free -m

In [None]:
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50]
i = 0
try:
    t1 = time.time()
    while(True):
        
        # Capture frame-by-frame
        #_ , frame = camera2.read()
        #ret, frame = cam_read()
        camera2.grab()
        t2 = time.time()
        
        
            
        # Convert the image from OpenCV BGR format to matplotlib RGB format
        # to display the image
        #frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        #exec_2(frame)
        if int(t2 - t1) > 2:
            ret, frame = cam_read()
            if not ret:
                print("Failed")
                break
            t1 = t2
            #frame = execute(frame)
            image_widget.value = cv2.imencode('.jpg', frame , encode_param)[1].tobytes()
            #cv2.imwrite(os.path.join(outpath , f"{i}.jpg") , frame)
            i += 1
        #image_widget.value = cv2.imencode('.jpg', frame , encode_param)[1].tobytes()
        #frame = execute(frame)
        #show_local_img(frame , 256, 256)
        clear_output(wait=True)
        
        print(f"waiting {t2 - t1}")
        
except KeyboardInterrupt:
    #cam.release()
    print("Stream stopped")

In [None]:
!free -m

In [None]:
exec_2({'new': camera.value})
camera.observe(exec_2, names='value')

In [None]:
def execute(image):
    #image = change['new']
    data = preprocess(image)
    cmap, paf = model_trt(data)
    cmap, paf = cmap.detach().cpu(), paf.detach().cpu()
    counts, objects, peaks = parse_objects(cmap, paf)#, cmap_threshold=0.15, link_threshold=0.15)
    draw_objects(image, counts, objects, peaks)
#     image_w.value = bgr8_to_jpeg(image[:, ::-1, :])
#     #return bgr8_to_jpeg(image[:, ::-1, :])
#     return show_local_img(image[:, ::-1, :] , 256,256)
    return draw_objects

If we call the cell below it will execute the function once on the current camera frame.

In [None]:
image_w

In [None]:
execute({'new': camera.value})

Call the cell below to attach the execution function to the camera's internal value.  This will cause the execute function to be called whenever a new camera frame is received.

In [None]:
camera.observe(execute, names='value')

Call the cell below to unattach the camera frame callbacks.

In [None]:
camera.unobserve_all()