# TFRecord 简介

TensorFlow 提供了一种标准的文件格式来存储数据，这个格式就是 [TFRecord](https://www.tensorflow.org/tutorials/load_data/tf_records)。它是一种二进制文件格式，理论上可以保存任何格式的数据。它能更好地利用内存，在 TensorFlow 中快速地复制、移动、读取和存储等。

TFRecord 文件中的数据是通过 `tf.train.Example` [Protocol Buffer](https://www.ibm.com/developerworks/cn/linux/l-cn-gpb/) (简称 PB，谷歌开源的一种数据传输协议) 的格式存储的，数据格式定义在 [example.proto](https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/example/example.proto) 和 [feature.proto](https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/example/feature.proto) 中，如下:

```
message Example {
  Features features = 1;
};

message Features {
  // Map from feature name to feature.
  map<string, Feature> feature = 1;
};

message Feature {
  // Each feature can be exactly one kind.
  oneof kind {
    BytesList bytes_list = 1;
    FloatList float_list = 2;
    Int64List int64_list = 3;
  }
};


message BytesList {
  repeated bytes value = 1;
}
message FloatList {
  repeated float value = 1 [packed = true];
}
message Int64List {
  repeated int64 value = 1 [packed = true];
}
```

其中，`Example` 对应了 `tf.train.Example`，`Features` 对应了 `tf.train.Features`，`Feature` 对应了 `tf.train.Feature`。从 `Example` 的定义可以知道，`tf.train.Example` 包含了一个 `{"string": tf.train.Feature}` 的字典，其中属性名称为一个字符串，属性的取值可以为字符串 (BytesList)，实数列表 (FloatList) 或整数列表 (Int64List)。

## 创建 tf.train.Feature 对象

让我们先来看 `tf.train.Feature`，它的取值可以是 BytesList, FloatList 或 Int64List。

In [1]:
import tensorflow as tf


# The following functions can be used to convert value to a type compatible with tf.Example.
def _bytes_feature(value):
    """Returns a bytes_list from string / byte."""
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))

def _float_feature(value):
    """Returns a float_list from float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))

def _int64_feature(value):
    """Returns an int64_list from bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))

In [2]:
bytes_feat = _bytes_feature([b'Hello World', b'Hello TensorFlow'])
float_feat = _float_feature([3.14, 6.28])
int_feat   = _int64_feature([1024, 2048])

In [3]:
type(bytes_feat)

tensorflow.core.example.feature_pb2.Feature

In [4]:
print(bytes_feat)

bytes_list {
  value: "Hello World"
  value: "Hello TensorFlow"
}



In [5]:
print(float_feat)

float_list {
  value: 3.140000104904175
  value: 6.28000020980835
}



In [6]:
print(int_feat)

int64_list {
  value: 1024
  value: 2048
}



这些 proto messages 可以通过使用 `SerializeToString` 方法将它们序列化为二进制字符串:

In [7]:
serialized_feat = int_feat.SerializeToString()

In [8]:
type(serialized_feat)

bytes

In [9]:
print(serialized_feat)

b'\x1a\x06\n\x04\x80\x08\x80\x10'


## 创建 tf.train.Example message

根据 `Example` 的定义，我们可以对应地生成 `tf.train.Example` message:

In [10]:
def serialize_example():
    """
    Creates a tf.Example message ready to be written to a file.
    """
    
    # Create a dictionary mapping the feature name to the tf.Example-compatible data type.
    feature = {
        "user_id": _int64_feature([1]),
        "gender": _bytes_feature([b'male']),
        "age": _int64_feature([20]),
        "weight": _float_feature([45.6])
    }
    
    # Create a Features message using tf.train.Example.
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    
    # proto messages can be serialized to a binary-string using the .SerializeToString method
    serialized_example = example_proto.SerializeToString()
    
    return example_proto, serialized_example

In [11]:
example_proto, serialized_example = serialize_example()

In [12]:
type(example_proto)

tensorflow.core.example.example_pb2.Example

In [13]:
type(serialized_example)

bytes

In [14]:
print(example_proto)

features {
  feature {
    key: "age"
    value {
      int64_list {
        value: 20
      }
    }
  }
  feature {
    key: "gender"
    value {
      bytes_list {
        value: "male"
      }
    }
  }
  feature {
    key: "user_id"
    value {
      int64_list {
        value: 1
      }
    }
  }
  feature {
    key: "weight"
    value {
      float_list {
        value: 45.599998474121094
      }
    }
  }
}



In [15]:
print(serialized_example)

b'\nH\n\x12\n\x06gender\x12\x08\n\x06\n\x04male\n\x12\n\x06weight\x12\x08\x12\x06\n\x04ff6B\n\x0c\n\x03age\x12\x05\x1a\x03\n\x01\x14\n\x10\n\x07user_id\x12\x05\x1a\x03\n\x01\x01'


为了解析 `serialized_example`，我们可以使用 `tf.train.Example.FromString` 方法:

In [16]:
decoded_serialized_example = tf.train.Example.FromString(serialized_example)

In [17]:
type(decoded_serialized_example)

tensorflow.core.example.example_pb2.Example

In [18]:
print(decoded_serialized_example)

features {
  feature {
    key: "age"
    value {
      int64_list {
        value: 20
      }
    }
  }
  feature {
    key: "gender"
    value {
      bytes_list {
        value: "male"
      }
    }
  }
  feature {
    key: "user_id"
    value {
      int64_list {
        value: 1
      }
    }
  }
  feature {
    key: "weight"
    value {
      float_list {
        value: 45.599998474121094
      }
    }
  }
}



可以看到, `decoded_serialized_example` 跟 `serialized_example` 序列化前的 `example_proto` 是相同的，说明正确解析。

另外，`example_proto` 是一个 `Example` 对象，我们如果想访问 `age` 这个属性的值，可以这样做:

In [19]:
example_proto.features.feature["age"].int64_list.value

[20]

# 生成 TFRecord 文件

TFRecord 文件的生成过程使用了前面介绍的几个类：`tf.train.Example`, `tf.train.Features`, `tf.train.Feature`，知道了这几个类的定义以及它们的嵌套关系，再去理解 TFRecord 的生成就容易多了。  

每个 TFRecord 文件的基本元素是 `tf.train.Example`，其对应的是一个样本数据，每个 Example 包含 Features，存储该样本的各个 feature，每个 feature 包含一个键值对，分别对应 feature 的特征名与实际值。

`tf.python_io` 这个模块提供了纯 Python 实现的用于读写 TFRecord 文件的函数:

- `tf.python_io.TFRecordWriter`: 用于将序列化为二进制字符串的 example 写入文件
- `tf.python_io.tf_record_iterator`: 返回一个读取二进制字符串的迭代器 (iterator)

## 创建 TFRecord 文件

创建 TFRecord 文件，主要通过 TF 中的 `tf.python_io.TFRecordWriter` 函数来实现，示例代码如下:

In [20]:
# 创建向 TFRecord 文件写记录的 writer
writer = tf.python_io.TFRecordWriter("test.tfrecord")

# 循环构造输入样例
for i in range(4):
    # 创建 example
    example = tf.train.Example(
        features=tf.train.Features(
            feature={
                "label": _int64_feature([i]),
                "index": _int64_feature([i, i+1, i+2]),
                "value": _float_feature([i*0.1, i*0.2, i*0.3])
            }
        )
    )
    # 将 example 序列化为二进制字符串后，写入 test.tfrecord
    writer.write(example.SerializeToString())
    
# 关闭输出流
writer.close()

## 读取 TFRecord 文件

上面我们创建了 TFRecord 文件，接着我们可以用 TFRecord 的 Python 接口来读取它们，主要是 `tf.python_io.tf_record_iterator` 函数，它输入 TFRecord 文件，得到一个迭代器，每个元素是一个已经序列化为二进制字符串的 Example，可以再通过 `ParseFromString` 方法来解析。代码如下:

In [21]:
record_iterator = tf.python_io.tf_record_iterator("test.tfrecord")

for string_record in record_iterator:
    print("[string_reocrd]: {}".format(string_record))
    
    example = tf.train.Example()
    example.ParseFromString(string_record)
    
    print("[example_proto]: {}".format(example))
    print("[value of example]: {}".format(example.features.feature["value"].float_list.value))
    
    # Exit after 1 iteration
    break

[string_reocrd]: b'\n=\n\x10\n\x05index\x12\x07\x1a\x05\n\x03\x00\x01\x02\n\x19\n\x05value\x12\x10\x12\x0e\n\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\n\x0e\n\x05label\x12\x05\x1a\x03\n\x01\x00'
[example_proto]: features {
  feature {
    key: "index"
    value {
      int64_list {
        value: 0
        value: 1
        value: 2
      }
    }
  }
  feature {
    key: "label"
    value {
      int64_list {
        value: 0
      }
    }
  }
  feature {
    key: "value"
    value {
      float_list {
        value: 0.0
        value: 0.0
        value: 0.0
      }
    }
  }
}

[value of example]: [0.0, 0.0, 0.0]


## 使用队列读取 TFRecord 文件

上面是纯 Python 的读取方式，但不是一种高效的方式，TF 提供了使用 TFRecords 文件建立输入流水线的方式。在 `tf.data` 出现之前，使用的是 `QueueRunner` 方式，即文件队列机制，这种方式目前已经不用了，这里仅给出示例代码:

In [22]:
def get_serialized_examples(filename_queue):
    num_records = 2000
    reader = tf.TFRecordReader()
    _, serialized_example = reader.read_up_to(filename_queue, num_records)
    
    return serialized_example

def input_pipeline(filenames, batch_size, num_epochs=None):
    filename_queue = tf.train.string_input_producer(
        tf.train.match_filenames_once(filenames),
        num_epochs=num_epochs, shuffle=False)
    
    serialized_example = get_serialized_examples(filename_queue)
    
    # min_after_dequeue 表示从样例队列中出队的样例个数
    # 值越大表示打乱顺序效果越好，同时意味着消耗更多内存
    min_after_dequeue = 1000
    
    # capacity 表示批数据队列的容量，推荐设置
    # min_after_dequeue + (num_threads + a small safety margin) * batch_size
    capacity = min_after_dequeue + 5 * batch_size
    
    num_threads = 4
    
    # 序列化为二进制字符串的 Example
    batch_serialized_example = tf.train.shuffle_batch(
        [serialized_example],
        batch_size=batch_size,
        num_threads=num_threads,
        capacity=capacity,
        enqueue_many=True,
        min_after_dequeue=min_after_dequeue,
        allow_smaller_final_batch=True)
    
#     features = {
#         "label": tf.FixedLenFeature([], tf.int64),
#         "index": tf.VarLenFeature(tf.int64),
#         "value": tf.VarLenFeature(tf.float32)
#     }
    
    features = {
        "label": tf.FixedLenFeature([], tf.int64),
        "index": tf.FixedLenFeature([3], tf.int64),
        "value": tf.FixedLenFeature([3], tf.float32)
    }
    
    # 解析 Example
    parsed_features = tf.parse_example(batch_serialized_example, features=features)
    
    batch_labels = parsed_features["label"]
    batch_index  = parsed_features["index"]
    batch_value  = parsed_features["value"]
    
    return batch_serialized_example, batch_labels, batch_index, batch_value

In [23]:
batch_serialized_example, batch_labels, batch_index, batch_value = input_pipeline("test.tfrecord", 2, num_epochs=1)

init_op = [tf.global_variables_initializer(), tf.local_variables_initializer()]
with tf.Session() as sess:
    sess.run(init_op)
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord, sess=sess)
    try:
        while not coord.should_stop():
            _batch_serialized_example, _batch_labels, _batch_index, _batch_value = sess.run(
                [batch_serialized_example, batch_labels, batch_index, batch_value])

            break
    except tf.errors.OutOfRangeError:
        print("Finish")
    finally:
        coord.request_stop()
    coord.join(threads)

In [24]:
_batch_serialized_example

array([b'\n=\n\x10\n\x05index\x12\x07\x1a\x05\n\x03\x01\x02\x03\n\x19\n\x05value\x12\x10\x12\x0e\n\x0c\xcd\xcc\xcc=\xcd\xccL>\x9a\x99\x99>\n\x0e\n\x05label\x12\x05\x1a\x03\n\x01\x01',
       b'\n=\n\x10\n\x05index\x12\x07\x1a\x05\n\x03\x00\x01\x02\n\x19\n\x05value\x12\x10\x12\x0e\n\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\n\x0e\n\x05label\x12\x05\x1a\x03\n\x01\x00'],
      dtype=object)

In [25]:
_batch_labels

array([1, 0])

In [26]:
_batch_index

array([[1, 2, 3],
       [0, 1, 2]])

In [27]:
_batch_value

array([[0.1, 0.2, 0.3],
       [0. , 0. , 0. ]], dtype=float32)

# 参考资料

- [Using TFRecords and tf.Example  |  TensorFlow Core  |  TensorFlow](https://www.tensorflow.org/tutorials/load_data/tf_records#tfrecord_files_using_tfpython_io)
- [tf.io.parse_example  |  TensorFlow Core 1.13  |  TensorFlow](https://www.tensorflow.org/api_docs/python/tf/io/parse_example)
- [十图详解tensorflow数据读取机制 - 知乎](https://zhuanlan.zhihu.com/p/27238630)
- [实例介绍TensorFlow的输入流水线](https://www.zybuluo.com/Team/note/1078850)