Converts TFRecord data into numpy. Feel free to modify based on your needs.

Data is saved into pickle files. Every file contains a list of samples. # of the samples in a file can be set via config['num_samples_in_numpy_list']. 

Loading:
data = pickle.load(open(<i>path-to-pkl-file</i>, 'rb'))

Each sample is a dictionary with the following fields:
<ol>
  <li>'label': label of the gesture. A unique ID in {0,1,..,19},</li>
  <li>'length': length of the gesture sequence, i.e., # of frames,</li>
  <li>'depth': tensor of depth images (length, height, width, 1),</li>
  <li>'skeleton': tensor of skeleton joints (length, 180),</li>
  <li>'rgb': tensor of rgb images (length, height, width, 3),</li>
  <li>'segmentation': tensor of segmentation masks (length, height, width, 3).</li>
</ol>


Note that samples have different number of frames.

In [1]:
import tensorflow as tf
import numpy as np
import os
import time
import datetime
%matplotlib inline
import matplotlib.pyplot as plt
import pickle

In [2]:


def preprocessing_op(image_op, shape):
    """
    Creates preprocessing operations that are going to be applied on a single frame.
    
    """
    with tf.name_scope("preprocessing"):
        # Reshape serialized image.
        return tf.reshape(image_op, shape)

def read_and_decode_sequence(filename_queue, config):
    # Create a TFRecordReader.
    readerOptions = tf.python_io.TFRecordOptions(compression_type=tf.python_io.TFRecordCompressionType.GZIP)
    reader = tf.TFRecordReader(options=readerOptions)
    _, serialized_example = reader.read(filename_queue)
    
    # Read one sequence sample.
    # The training and validation files contains the following fields:
    # - label: label of the sequence which take values between 1 and 20.
    # - length: length of the sequence, i.e., number of frames.
    # - depth: sequence of depth images. [length x height x width x numChannels]
    # - rgb: sequence of rgb images. [length x height x width x numChannels]
    # - segmentation: sequence of segmentation maskes. [length x height x width x numChannels]
    # - skeleton: sequence of flattened skeleton joint positions. [length x numJoints]
    #
    # The test files doesn't contain "label" field.
    with tf.name_scope("TFRecordDecoding"):
        if train:
            context_encoded, sequence_encoded = tf.parse_single_sequence_example(
                    serialized_example,
                    # "label" and "lenght" are encoded as context features. 
                    context_features={
                        "label": tf.FixedLenFeature([], dtype=tf.int64),
                        "length": tf.FixedLenFeature([], dtype=tf.int64)
                    },
                    # "depth", "rgb", "segmentation", "skeleton" are encoded as sequence features.
                    sequence_features={
                        "depth": tf.FixedLenSequenceFeature([], dtype=tf.string),
                        "rgb": tf.FixedLenSequenceFeature([], dtype=tf.string),
                        "segmentation": tf.FixedLenSequenceFeature([], dtype=tf.string),
                        "skeleton": tf.FixedLenSequenceFeature([], dtype=tf.string),
                    })
        else:
            context_encoded, sequence_encoded = tf.parse_single_sequence_example(
                    serialized_example,
                    # "label" and "lenght" are encoded as context features. 
                    context_features={
                        "length": tf.FixedLenFeature([], dtype=tf.int64)
                    },
                    # "depth", "rgb", "segmentation", "skeleton" are encoded as sequence features.
                    sequence_features={
                        "depth": tf.FixedLenSequenceFeature([], dtype=tf.string),
                        "rgb": tf.FixedLenSequenceFeature([], dtype=tf.string),
                        "segmentation": tf.FixedLenSequenceFeature([], dtype=tf.string),
                        "skeleton": tf.FixedLenSequenceFeature([], dtype=tf.string),
                    })

        # Fetch data fields.
        seq_rgb = tf.decode_raw(sequence_encoded['rgb'], tf.uint8)
        seq_depth = tf.decode_raw(sequence_encoded['depth'], tf.uint8)
        seq_segmentation = tf.decode_raw(sequence_encoded['segmentation'], tf.uint8)
        
        # Output dimnesionality: [seq_len, height, width, numChannels]
        # tf.map_fn applies the preprocessing function on every image in the sequence, i.e., frame.
        seq_rgb = tf.map_fn(lambda x: preprocessing_op(x, (config['img_height'], config['img_width'], config['img_num_channels'])),
                                elems=seq_rgb,
                                dtype=tf.uint8,
                                back_prop=False)
        seq_depth = tf.map_fn(lambda x: preprocessing_op(x, (config['img_height'], config['img_width'], 1)),
                                elems=seq_depth,
                                dtype=tf.uint8,
                                back_prop=False)
        
        seq_segmentation = tf.map_fn(lambda x: preprocessing_op(x, (config['img_height'], config['img_width'], config['img_num_channels'])),
                                elems=seq_segmentation,
                                dtype=tf.uint8,
                                back_prop=False)
        seq_len = tf.to_int32(context_encoded['length'])
        seq_skeleton = tf.decode_raw(sequence_encoded['skeleton'], tf.float32)
        if train:
            seq_label = context_encoded['label']  
        else:
            seq_label = 0
        
        #[batch_size, seq_len, num_skeleton_joints]
        
        
        return [seq_rgb, seq_depth, seq_segmentation, seq_skeleton, seq_label, seq_len]
    
def input_pipeline(filenames, config):
    with tf.name_scope("input_pipeline"):
        # Create a queue of TFRecord input files.
        filename_queue = tf.train.string_input_producer(filenames, num_epochs=config['num_epochs'], shuffle=False)
        # Read the data from TFRecord files, decode and create a list of data samples by using threads.
        sample_list = [read_and_decode_sequence(filename_queue, config) for _ in range(config['ip_num_read_threads'])]
        # Create batches.

        batch_rgb, batch_depth, batch_segmentation,batch_skeleton, \
                    batch_labels, batch_lens = tf.train.batch_join(sample_list,
                                                                    batch_size=config['batch_size'],
                                                                    capacity=config['ip_queue_capacity'],
                                                                    enqueue_many=False,
                                                                    dynamic_pad=True,
                                                                    allow_smaller_final_batch=True,
                                                                    name="batch_join_and_pad")
        return batch_rgb, batch_depth, batch_segmentation, batch_skeleton, batch_labels, batch_lens
      

In [4]:
config = {}
# TODO: You can change these fields.
train = True #wheter it is train or test. for test, writes a 0 in label
#make sure to restart the kernel after switching because it otherwise still uses the old settings
config['input_dir'] = "./train/" # Directory of the tfrecords.
#config['input_dir'] = "./test/" # Directory of the tfrecords.
config['input_file_format'] = "dataTrain_%d.tfrecords" # File naming
#config['input_file_format'] = "dataTest_%d.tfrecords" # File naming
config['input_file_ids'] = list(range(1,41)) # File IDs to be used for training.
#config['input_file_ids'] = list(range(1,16)) # File IDs to be used for training.
#andres tip: use 16 if validation

config['num_samples_in_numpy_list'] = 100 # Put 100 samples in a pickle data file. You can put everything in a single file as well.
config['output_dir'] = config['input_dir']
config['output_file_format'] = config['input_file_format'].split(".")[0]+".pkl"
config['output_file_start_id'] = 1

# Keep these fields fixed.
config['img_height'] = 80
config['img_width'] = 80
config['img_num_channels'] = 3
config['num_epochs'] = 1
config['batch_size'] = 1
# Capacity of the queue which contains the samples read by data readers.
# Make sure that it has enough capacity.
config['ip_queue_capacity'] = config['batch_size']*10  
config['ip_num_read_threads'] = 1
# Create a list of TFRecord input files.
filenames = [os.path.join(config['input_dir'], config['input_file_format'] % i) for i in config['input_file_ids']]

# Create data loading operators. This will be represented as a node in the computational graph.

rgb_op, depth_op, segmentation_op, skeleton_op, label_op, seq_len_op = input_pipeline(filenames, config)
# Create tensorflow session and initialize the variables (if any).
sess = tf.Session()
init_op = tf.group(tf.global_variables_initializer(),tf.local_variables_initializer())
sess.run(init_op)
# Create threads to prefetch the data.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)


In [5]:
np_file_id = config['output_file_start_id']
output_list = []
num_samples_read = 0
try:
    while not coord.should_stop():
        rgb, depth, segmentation, skeleton, label, seq_len = sess.run([rgb_op, depth_op, segmentation_op, skeleton_op, label_op, seq_len_op])
        num_samples_read += 1
        data_sample = {}
        data_sample['rgb'] = rgb[0] # Data is in batch format. Get rid of the first dimension.
        data_sample['depth'] = depth[0]
        data_sample['segmentation'] = segmentation[0]
        data_sample['skeleton'] = skeleton[0]
        data_sample['label'] = label[0]
        data_sample['length'] = seq_len[0]
        output_list.append(data_sample)
        
        if num_samples_read%config['num_samples_in_numpy_list'] == 0:
            pickle.dump(output_list, open(os.path.join(config['output_dir'], config['output_file_format'] % np_file_id), 'wb'))
            np_file_id += 1
            output_list = []
        
except tf.errors.OutOfRangeError:
    # Save last run.
    if len(output_list) > 0:
        print(len(output_list))
        pickle.dump(output_list, open(os.path.join(config['output_dir'], config['output_file_format'] % np_file_id), 'wb'))
        output_list = []
    print('Done.')
finally:
    # When done, ask the threads to stop.
    coord.request_stop()

# Wait for threads to finish.
coord.join(threads)

KeyboardInterrupt: 