In [1]:
# https://github.com/tensorflow/tensorflow/blob/r1.3/tensorflow/contrib/resampler/python/ops/resampler_ops_test.py
"""Tests for contrib.resampler.python.ops.resampler_ops"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
from six.moves import xrange
# range는 list 생성, xrange는 lazy evalute한 sequence 생성

import tensorflow
from tensorflow.contrib import resampler
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import errors_impl
from tensorflow.python.ops import array_ops
from tensorflow.python.platform import test

In [2]:
# parameters settings
np.random.seed(0)
data_width = 9
data_height = 9
data_channels = 3
warp_width = 4
warp_height = 4
batch_size = 10
dtype = dtypes.float32

In [3]:
def _make_warp(batch_size, warp_height, warp_width, dtype):
    """Creates batch of warping coordinates."""
    x, y = np.meshgrid(np.linspace(0, warp_width - 1, warp_width), np.linspace(0, warp_height - 1, warp_height))
    warp = np.concatenate((x.reshape([warp_height, warp_width, 1]), y.reshape([warp_height, warp_width, 1])), 2)
    warp = np.tile(warp.reshape([1, warp_height, warp_width, 2]), [batch_size, 1, 1, 1])
    warp += np.random.randn(*warp.shape)
    return warp.astype(dtype)

In [194]:
def _bilinearly_interpolate(data, x, y):
    """
    Performs bilinenar interpolation of grid data at user defined coordinates. This interpolation function:
        a) implicitly pads the input data with 0s.
        b) returns 0 when sampling outside the (padded) image.
    The effect is that the sampled signal smoothly goes to 0 outside the original input domain, rather than producing a jump discontinuity at the image
    boundaries.
    Args:
        data: numpy array of shape `[data_height, data_width]` containing data 
            samples assumed to be defined at the corresponding pixel coordinates.
        x: numpy array of shape `[warp_height, warp_width]` containing x coordinates 
            at which interpolation will be performed.
        y: numpy array of shape `[warp_height, warp_width]` containing y coordinates
            at which interpolation will be performed.
    Returns:
        Numpy array of shape `[warp_height, warp_width]` containing interpolated values.
  """
    shape = x.shape
    x = np.asarray(x) + 1
    y = np.asarray(y) + 1
    data = np.lib.pad(data, 1, "constant", constant_values=0)

    x_0 = np.floor(x).astype(int)
    x_1 = x_0 + 1
    y_0 = np.floor(y).astype(int)
    y_1 = y_0 + 1

    x_0 = np.clip(x_0, 0, data.shape[1] - 1)
    x_1 = np.clip(x_1, 0, data.shape[1] - 1)
    y_0 = np.clip(y_0, 0, data.shape[0] - 1)
    y_1 = np.clip(y_1, 0, data.shape[0] - 1)

    i_a = data[y_0, x_0]
    i_b = data[y_1, x_0]
    i_c = data[y_0, x_1]
    i_d = data[y_1, x_1]
    
    w_a = (x_1 - x) * (y_1 - y)
    w_b = (x_1 - x) * (y - y_0)
    w_c = (x - x_0) * (y_1 - y)
    w_d = (x - x_0) * (y - y_0)

    samples = (w_a * i_a + w_b * i_b + w_c * i_c + w_d * i_d)
    samples.reshape(shape)
    return samples

In [4]:
warp = _make_warp(batch_size, warp_height, warp_width, dtype.as_numpy_dtype) # (10, 4, 4, 2)
data_shape = (batch_size, data_height, data_width, data_channels)
data = np.random.rand(*data_shape).astype(dtype.as_numpy_dtype)

In [196]:
with tensorflow.Session() as sess:
    data_ph = array_ops.placeholder(dtype, shape=(None,) + data.shape[1:])
    warp_ph = array_ops.placeholder(dtype, shape=(None,) + warp.shape[1:])
    outputs = resampler.resampler(data=data_ph, warp=warp_ph)
    # print equal shape
    print(outputs.get_shape().as_list())
    print([None, warp_height, warp_width, data_channels]) 
    out = sess.run(outputs, feed_dict={data_ph: data, warp_ph: warp})

[None, 4, 4, 3]
[None, 4, 4, 3]


In [197]:
# Generate reference output via bilinear interpolation in numpy
reference_output = np.zeros_like(out)
for batch in xrange(batch_size):
    for c in xrange(data_channels):
        reference_output[batch, :, :, c] = _bilinearly_interpolate(
            data[batch, :, :, c],
            warp[batch, :, :, 0],
            warp[batch, :, :, 1])

np.all((reference_output - out) < 1e-4) # True 

True

In [198]:
# cf) np index에 대한 이해 예제
test_shape = [5, 3]
test_data = np.random.rand(*test_shape)
x_test = [[2, 1], [1, 0]]
y_test = [[1, 1], [2, 2]]
print(test_data)
print(test_data[y_test])
print(test_data[x_test])
print(test_data[y_test, x_test])

[[ 0.22848166  0.73045607  0.91852663]
 [ 0.28141611  0.69065354  0.40065741]
 [ 0.29022665  0.96962347  0.34863323]
 [ 0.10784968  0.38858921  0.44767829]
 [ 0.75221083  0.94991427  0.81705698]]
[ 0.40065741  0.40065741]
[ 0.96962347  0.28141611]
[[ 0.40065741  0.69065354]
 [ 0.96962347  0.29022665]]


In [202]:
# cf) np.mgrid vs np.meshgrid
# 참고: http://louistiao.me/posts/numpy-mgrid-vs-meshgrid/
# if wapr_widht == warp_height (이 예제에서 => 2)
# x, y = np.meshgrid(np.linspace(0, warp_width - 1, warp_width), np.linspace(0, warp_height - 1, warp_height))
# y, x = np.mgrid[0:1:2j,0:1:2j]

In [201]:
# tensorflow bilinear 구현
# https://github.com/iwyoo/tf-bilinear_sampler/blob/master/bilinear_sampler.py

def bilinear_sampler(x, v, resize=False, normalize=False, crop=None, out="CONSTANT"):
    """
    Args:
      x - Input tensor [N, H, W, C]
      v - Vector flow tensor [N, H, W, 2], tf.float32
      (optional)
      resize - Whether to resize v as same size as x
      normalize - Whether to normalize v from scale 1 to H (or W).
                  h : [-1, 1] -> [-H/2, H/2]
                  w : [-1, 1] -> [-W/2, W/2]
      crop - Setting the region to sample. 4-d list [h0, h1, w0, w1]
      out  - Handling out of boundary value.
             Zero value is used if out="CONSTANT".
             Boundary values are used if out="EDGE".
  """


    def _get_grid_array(N, H, W, h, w):
        N_i = np.arange(N)
        H_i = np.arange(h+1, h+H+1)
        W_i = np.arange(w+1, w+W+1)
        n, h, w, = np.meshgrid(N_i, H_i, W_i, indexing='ij')
        n = np.expand_dims(n, axis=3) # [N, H, W, 1]
        h = np.expand_dims(h, axis=3) # [N, H, W, 1]
        w = np.expand_dims(w, axis=3) # [N, H, W, 1]
        return n, h, w

    shape = x.get_shape().as_list() # Should it be fixed size ?
    N = shape[0]
    if crop is None:
        H_ = H = shape[1]
        W_ = W = shape[2]
        h = w = 0
    else :
        H_ = shape[1]
        W_ = shape[2]
        H = crop[1] - crop[0]
        W = crop[3] - crop[2]
        h = crop[0]
        w = crop[2]

    if resize:
        if callable(resize):
            v = resize(v, [H, W])
    else:
        v = tf.image.resize_bilinear(v, [H, W])

    if out == "CONSTANT":
        x = tf.pad(x, ((0,0), (1,1), (1,1), (0,0)), mode='CONSTANT')
    elif out == "EDGE":
        x = tf.pad(x, ((0,0), (1,1), (1,1), (0,0)), mode='REFLECT')

    vy, vx = tf.split(v, 2, axis=3)
    if normalize :
        vy *= (H / 2)
        vx *= (W / 2)

    n, h, w = _get_grid_array(N, H, W, h, w) # [N, H, W, 3]

    vx0 = tf.floor(vx)
    vy0 = tf.floor(vy)
    vx1 = vx0 + 1
    vy1 = vy0 + 1 # [N, H, W, 1]

    iy0 = tf.clip_by_value(vy0 + h, 0., H_+1)
    iy1 = tf.clip_by_value(vy1 + h, 0., H_+1)
    ix0 = tf.clip_by_value(vx0 + w, 0., W_+1)
    ix1 = tf.clip_by_value(vx1 + w, 0., W_+1)
    i00 = tf.concat([n, iy0, ix0], 3)
    i01 = tf.concat([n, iy1, ix0], 3)
    i10 = tf.concat([n, iy0, ix1], 3)
    i11 = tf.concat([n, iy1, ix1], 3) # [N, H, W, 3]
    i00 = tf.cast(i00, tf.int32)
    i01 = tf.cast(i01, tf.int32)
    i10 = tf.cast(i10, tf.int32)
    i11 = tf.cast(i11, tf.int32)

    x00 = tf.gather_nd(x, i00)
    x01 = tf.gather_nd(x, i01)
    x10 = tf.gather_nd(x, i10)
    x11 = tf.gather_nd(x, i11)
    w00 = tf.cast((vx1 - vx) * (vy1 - vy), tf.float32)
    w01 = tf.cast((vx1 - vx) * (vy - vy0), tf.float32)
    w10 = tf.cast((vx - vx0) * (vy1 - vy), tf.float32)
    w11 = tf.cast((vx - vx0) * (vy - vy0), tf.float32)
    output = tf.add_n([w00*x00, w01*x01, w10*x10, w11*x11])

    return output