# Loading tensor data to Tensorflow

Currently ZmqOp only accepts a list of tensors as valid input e.g. [np.array1, np.array2, np.array3 .....]
and the input parameter types has to be [dtype of np.array1, dtype of np.array2, dtype of np.array3]
it outputs [tensor1, tensor2, tensor3], data in tensor[i] == np.array[i]

In [1]:
import time
import numpy as np
import zmq
import msgpack

import tensorflow as tf
import multiprocessing as mp
from nvzmq_ops import ZmqOp

Set up some constant variables

In [2]:
ADDR_EXAMPLE = 'zrpull://127.0.0.1:5555'
SENDER_ADDR = 'tcp://127.0.0.1:5555'
NUM_MSGS = 4

#from tensorcom
allowable_dtypes = {"uint8", "uint16", "int16", "int32", "float16", "float32", "float64"}

Simple sender function, send each msg seperately. No multipart right now

In [3]:
def send_msgs(addr, msgs):
    try:
        ctx = zmq.Context(1)
        socket = ctx.socket(zmq.PUSH)
        socket.bind(addr)
        for msg in msgs:
            socket.send(msg, )
            time.sleep(len(msg) / 1000)
    except Exception as e:
        print(e)
    finally:
        socket.close()
        ctx.term()

Upstream input pipeline

In [4]:
#from tensorcom
def np2dict(a, parts=None, allow_float64=False):
    """Recursively convert numpy tensors in data structures to dictionaries."""
    if isinstance(a, np.ndarray):
        assert allow_float64 or a.dtype != np.dtype("float64")
        dtype = str(a.dtype)
        assert dtype in allowable_dtypes, dtype
        if parts is None:
            return dict(_shape=list(a.shape),
                        _dtype=dtype,
                        _data=a.tobytes())
        else:
            index = len(parts)
            parts.append(a.tobytes())
            return dict(_shape=list(a.shape),
                        _dtype=dtype,
                        _part=index)
    elif isinstance(a, list):
        return [np2dict(x, parts) for x in a]
    elif isinstance(a, dict):
        return {k: np2dict(v, parts) for k,v in a.items()}
    else:
        return a

Start subprocess to send zmq msg. Call ZmqOp to pull data.

In [5]:
def example():
    tensor_data_list, types = [], []
    for i in range(1, NUM_MSGS + 1):
        data = np.arange(i*i, dtype=np.uint8).reshape((i,i))
        types.append(tf.dtypes.as_dtype(data.dtype))
        tensor_data_list.append(data)
    packed = msgpack.dumps(np2dict(tensor_data_list))
    p = mp.Process(target=send_msgs, args=(SENDER_ADDR, [packed]))
    p.start()
    print ('types: ', types)
    zmq_op = ZmqOp(address=ADDR_EXAMPLE)
    
    tensors = zmq_op.pull(types=types)
    with tf.Session() as sess:
        output = sess.run(tensors)
        print (output)

if __name__ == '__main__':
    example()

types:  [tf.uint8, tf.uint8, tf.uint8, tf.uint8]
[array([[0]], dtype=uint8), array([[0, 1],
       [2, 3]], dtype=uint8), array([[0, 1, 2],
       [3, 4, 5],
       [6, 7, 8]], dtype=uint8), array([[ 0,  1,  2,  3],
       [ 4,  5,  6,  7],
       [ 8,  9, 10, 11],
       [12, 13, 14, 15]], dtype=uint8)]
