In [1]:
import math
import numpy as np
from scipy import signal

In [None]:
def get_heatmaps(keypoints, boxes, width, height, downsample):
    """
    Arguments:
        keypoints: a numpy float array with shape [num_persons, 17, 3].
            It is in format (y, x, visibility),
            where coordinates `y, x` are in the ranges
            [0, height - 1] and [0, width - 1].
            And a keypoint is visible if `visibility > 0`.
        boxes: a numpy float array with shape [num_persons, 4],
            person bounding boxes in absolute coordinates.
        width, height: integers, size of the original image.
        downsample: an integer.
    Returns:
        a numpy float array with shape [height/downsample, width/downsample, 17].
    """
    min_sigma, max_sigma = 1.0, 3.0

    h = math.ceil(height / downsample)
    w = math.ceil(width / downsample)
    scaler = np.array([h - 1.0, w - 1.0], dtype=np.float32)
    
    ymin, xmin, ymax, xmax = np.split(boxes, 1, axis=1)
    # they have shape [num_persons, 1]

    scale = np.sqrt((ymax - ymin) * (xmax - xmin))
    sigmas = np.squeeze(scale * 0.1, axis=1)

    kernels = []  # each person has different blob size
    sigmas = np.clip(sigmas, min_sigma, max_sigma)

    for sigma in sigmas:
        kernels.append(get_kernel(sigma))

    heatmaps = []
    for i in range(17):

        is_visible = keypoints[:, i, 2] > 0
        num_visible = is_visible.sum()

        if num_visible == 0:
            empty = np.zeros([h, w], dtype=np.float32)
            heatmaps.append(empty)
            continue

        person_id = np.where(is_visible)[0]
        body_part = keypoints[is_visible, i, :2]  
        # it has shape [num_visible, 2]

        # to the [0, 1] range
        body_part /= scaler

        heatmaps_for_part = []
        for i in range(num_visible):
            y, x = body_part[i]
            sigma = np.clip(sigmas)
            heatmap = create_heatmap(y, x, sigma, maps_width, maps_height)
            heatmaps_for_part.append(heatmap)

        heatmaps.append(np.stack(heatmaps_for_part, axis=2).max(2))

    heatmaps = np.stack(heatmaps, axis=2)
    return heatmaps


def get_kernel(std=3):
    """Returns a 2D Gaussian kernel array."""
    k = np.ceil(np.sqrt(- 2.0 * std**2 * np.log(0.01)))
    size = 2 * int(k) + 1
    x = signal.windows.gaussian(size, std=std).reshape([size, 1])
    x = np.outer(x, x).astype(np.float32)
    return x



In [23]:
std = 1
alpha = 0.01

4.0

In [27]:
k = get_kernel(size=2 * 16 + 1, std=5)

In [28]:
k[0, 16:29]

array([0.00597602, 0.00585769, 0.00551656, 0.00499159, 0.00433948,
       0.00362464, 0.00290884, 0.00224287, 0.00166156, 0.00118265,
       0.00080877, 0.0005314 , 0.00033546], dtype=float32)

In [None]:


def get_kernel(size=21, std=3):
    """Returns a 2D Gaussian kernel array."""
    x = signal.gaussian(size, std=std).reshape([size, 1])
    x = np.outer(x, x).astype(np.float32)
    return x





In [None]:
def create_heatmap(y, x, sigma, width, height): 
    
    heatmap = np.zeros([height, width], dtype=np.float32)
    cdef float theta = 4.6052  # -ln(0.01)
    cdef float delta = np.sqrt(theta * 2.0)
    cdef float distance, value
    cdef int y, x, ymin, xmin, ymax, xmax

    center_y = center_y * (float(height) - 1.0)
    center_x = center_x * (float(width) - 1.0)

    ymin = int(max(0.0, center_y - delta * sigma))
    xmin = int(max(0.0, center_x - delta * sigma))
    ymax = int(min(float(height), center_y + delta * sigma))
    xmax = int(min(float(width), center_x + delta * sigma))

    for y in range(ymin, ymax):
        for x in range(xmin, xmax):
            distance = (float(x) - center_x) ** 2 + (float(y) - center_y) ** 2
            value = distance / (2.0*sigma*sigma)
            if value > theta:
                continue
            heatmap[y, x] = min(1.0, np.exp(-value))
    return heatmap

In [None]:
import numpy as np
cimport cython
cimport numpy as np


cdef inline float max(float a, float b):
    return a if a >= b else b


cdef inline float min(float a, float b):
    return a if a <= b else b


@cython.boundscheck(False)
@cython.wraparound(False)
def get_heatmaps(
        np.ndarray[float, ndim=3] keypoints,
        float sigma,
        unsigned int width,
        unsigned int height,
        unsigned int downsample):
    """
    Arguments:
        keypoints: a numpy float array with shape [num_persons, 17, 3].
            It is in format (y, x, visibility),
            where coordinates `y, x` are in the ranges
            [0, height - 1] and [0, width - 1].
            And a keypoint is visible if `visibility > 0`.
        sigma: a float number, size of the gaussian blobs.
        width, height: integers, size of the original image.
        downsample: an integer.
    Returns:
        heatmaps: a numpy float array with shape [maps_height, maps_width, 17],
        where (maps_height, maps_width) = (height/downsample, width/downsample).
    """

    cdef float w = float(width)
    cdef float h = float(height)
    cdef unsigned int maps_width = int(np.ceil(w/float(downsample)))
    cdef unsigned int maps_height = int(np.ceil(h/float(downsample)))
    cdef np.ndarray[np.float, ndim=2] body_part
    cdef np.ndarray[float, ndim=1] scaler = np.array([h - 1.0, w - 1.0], dtype='float32')

    heatmaps = []
    for i in range(17):

        # take a particular body part
        body_part = keypoints[keypoints[:, i, 2] > 0, i, :2].astype('float32')  # shape [num_visible, 2]

        if len(body_part) == 0:
            heatmaps.append(np.zeros((maps_height, maps_width), dtype='float32'))
            continue

        # to the [0, 1] range
        body_part /= scaler

        heatmaps_for_part = []
        for i in range(len(body_part)):
            y, x = body_part[i]
            heatmaps_for_part.append(create_heatmap(y, x, sigma, maps_width, maps_height))

        heatmaps.append(np.stack(heatmaps_for_part, axis=2).max(2))

    heatmaps = np.stack(heatmaps, axis=2)
    return heatmaps


@cython.boundscheck(False)
@cython.wraparound(False)




In [1]:
import tensorflow.compat.v1 as tf
import numpy as  np

import sys
sys.path.append('..')
from detector.constants import DOWNSAMPLE
from detector.input_pipeline.heatmap_creation import get_heatmaps

In [2]:
CROP_SIZE = [56, 36]  # height and width
NUM_KEYPOINTS = 17

In [None]:
def parse_and_preprocess(self, example_proto):
    """
    Returns:
        keypoints: a float tensor with shape [num_persons, 17, 3].
        num_visible: an int tensor with shape [num_persons].
        labels: a float tensor with shape [num_persons, 56, 36, 17].
    """
    features = {
        'image': tf.FixedLenFeature([], tf.string),
        'num_persons': tf.FixedLenFeature([], tf.int64),
        'boxes': tf.FixedLenSequenceFeature([], tf.float32, allow_missing=True),
        'keypoints': tf.FixedLenSequenceFeature([], tf.int64, allow_missing=True)
    }
    parsed_features = tf.parse_single_example(example_proto, features)

    # get size of the image
    shape = tf.image.extract_jpeg_shape(parsed_features['image'])
    height, width = shape[0], shape[1]

    # get number of people on the image
    num_persons = tf.to_int32(parsed_features['num_persons'])
    # it is assumed that num_persons > 0

    # get groundtruth boxes, they are in absolute coordinates
    boxes = tf.reshape(parsed_features['boxes'], [num_persons, 4])

    # get keypoints, they are in absolute coordinates
    keypoints = tf.to_int32(parsed_features['keypoints'])
    keypoints = tf.reshape(keypoints, [num_persons, NUM_KEYPOINTS, 3])

    # get the number of visible keypoints in each box
    is_visible = tf.to_int32(keypoints[:, :, 2] > 0)  # shape [num_persons, 17]
    num_visible = tf.reduce_sum(is_visible, axis=1)  # shape [num_persons]

    box_ind = tf.zeros([num_persons], dtype=tf.int32)
    scaler = tf.to_float(tf.stack([height, width, height, width]))

    crops = tf.image.crop_and_resize(
        tf.expand_dims(heatmaps, 0),
        boxes/scaler, box_ind=box_ind,
        crop_size=CROP_SIZE
    )

    def fn(x):
        """
        Arguments:
            keypoints: a float tensor with shape [17, 3].
            box: a float tensor with shape [4].
        Returns:
            a float tensor with shape [56, 36, 17].
        """
        keypoints, box = x
        ymin, xmin, ymax, xmax = tf.unstack(box, axis=0)
        y, x, v = tf.unstack(keypoints, axis=1)
        keypoints = tf.stack([y, x], axis=1)

        part_id = tf.where(v > 0.0)  # shape [num_visible, 1]
        part_id = tf.to_int32(part_id)
        num_visible = tf.shape(part_id)[0]
        keypoints = tf.gather(keypoints, tf.squeeze(part_id, 1))
        # it has shape [num_visible, 2]

        keypoints -= tf.stack([ymin, xmin])
        h, w = ymax - ymin, xmax - xmin
        scaler = tf.to_float(tf.stack([CROP_SIZE[0]/h, CROP_SIZE[1]/w], axis=0))
        keypoints *= scaler
        keypoints = tf.to_int32(tf.floor(keypoints))  # shape [num_visible, 2]

        y, x = tf.unstack(keypoints, axis=1)
        y = tf.clip_by_value(y, 0, CROP_SIZE[0] - 1)
        x = tf.clip_by_value(x, 0, CROP_SIZE[1] - 1)
        keypoints = tf.stack([y, x], axis=1)

        indices = tf.to_int64(tf.concat([keypoints, part_id], axis=1))
        values = tf.ones([num_visible], dtype=tf.float32)
        binary_map = tf.sparse.SparseTensor(indices, values, dense_shape=CROP_SIZE + [17])
        binary_map = tf.sparse.to_dense(binary_map, default_value=0, validate_indices=False)
        return binary_map

    labels = tf.map_fn(
        fn, (tf.to_float(keypoints), boxes),
        dtype=tf.float32, back_prop=False,
    )

    return crops, labels

In [None]:
tf.reset_default_graph()

dataset_path = '/home/dan/datasets/COCO/multiposenet/train/'
filenames = os.listdir(dataset_path)
filenames = [n for n in filenames if n.endswith('.tfrecords')]
filenames = [os.path.join(dataset_path, n) for n in sorted(filenames)]

dataset = tf.data.Dataset.from_tensor_slices(filenames)
dataset = dataset.flat_map(tf.data.TFRecordDataset)
dataset = dataset.map(parse_and_preprocess)