- parquet dataset支持从parquet文件中读取数据
- parquet dataset支持从本地以及S3/OSS/HDFS文件系统中读取对应parquet文件
# If ARROW_NUM_THREADS > 0, specified number of threads will be used.
# If ARROW_NUM_THREADS = 0, no threads will be used.
# If ARROW_NUM_THREADS < 0, all threads will be used.
os.environ['ARROW_NUM_THREADS'] = '2'
class ParquetDataset(dataset_ops.DatasetV2):
def __init__(
self, filenames,
batch_size=1,
fields=None,
partition_count=1,
partition_index=0,
drop_remainder=False,
num_parallel_reads=None,
num_sequential_reads=1):
# Create a `ParquetDataset` from filenames dataset.
def read_parquet(
batch_size,
fields=None,
partition_count=1,
partition_index=0,
drop_remainder=False,
num_parallel_reads=None,
num_sequential_reads=1):
-
filenames
: 文件名,可以接收以下类型的参数。- 0-D 或者 1-D 的
tf.string
类型Tensor
string
类型string
类型的list
或tuple
- 包含一个或多个文件名的
Dataset
- 0-D 或者 1-D 的
-
batch_size
: (可选) 一个输出batch中最大样本数量。 -
fields
: (可选) 需要读取的column。filenames 参数类型 fields 参数要求 fields 参数类型要求 Tensor
/Dataset
必须传入 DataFrame.Field
/DataFrame.Field
类型的list
或tuple
string
/string
类型的list
或tuple
可选, 不传入时默认读取所有column DataFrame.Field
/DataFrame.Field
类型的list
或tuple
/string
/string
类型的list
或tuple
-
partition_count
: (可选) row group partitions的数量。 -
partition_index
: (可选) row group partitions的索引。 -
drop_remainder
: (可选) 如果为True
, ParquetDataset只会返回大小为batch_size
的batch,小于batch_size
的batch将会被丢弃。 -
num_parallel_reads
: (可选)tf.int64
类型的标量,用于设定同时读取的parquet file文件数量。默认逐个依次读取。 -
num_sequential_reads
: (可选)tf.int64
类型的标量,代表按顺序读取的batch数量,默认是1。
DataFrame是一个包含多个命名的column的表。每一个命名的column都具有一种逻辑类型和一种存储类型。
逻辑类型 | 输出类型 |
---|---|
标量(Scalar) | tf.Tensor /DataFrame.Value |
定长List(Fixed-Length List) | tf.Tensor /DataFrame.Value |
变长List(Variable-Length List) | tf.SparseTensor /DataFrame.Value |
变长嵌套List(Variable-Length Nested List) | tf.SparseTensor /DataFrame.Value |
数据分类 | 存储类型 |
---|---|
整数 | int64 uint64 int32 uint32 int8 uint8 |
浮点数 | float64 float32 float16 |
文本 | string |
class DataFrame(object):
class Field(object):
def __init__(self, name,
type=None,
ragged_rank=None,
shape=None):
class Value(collections.namedtuple(
'DataFrameValue', ['values', 'nested_row_splits'])):
def to_sparse(self, name=None):
# Convert values to tensors or sparse tensors from input dataset.
def to_sparse(num_parallel_calls=None):
name
: column 名称type
: 指定元素数据类型,如tf.int64
ragged_rank
: (可选) column为list类型时,用于指定嵌套层数shape
: (可选) column为固定shape的list时,用于指定column的shape
注:对于固定shape的list (Fix-Length List),只需要指定shape即可,无需指定ragged_rank。
由于ParquetDataset的输出中可能会存在DataFrame.Value, 无法直接接入模型,需要将DataFrame.Value转换为SparseTensor。使用dataset.apply调用to_sparse接口即可完成转换。
import tensorflow as tf
from tensorflow.python.data.experimental.ops import parquet_dataset_ops
from tensorflow.python.data.experimental.ops import dataframe
ds = parquet_dataset_ops.ParquetDataset(...)
ds.apply(dataframe.to_sparse())
...
import tensorflow as tf
from tensorflow.python.data.experimental.ops import parquet_dataset_ops
# Read from a parquet file.
ds = parquet_dataset_ops.ParquetDataset('/path/to/f1.parquet',
batch_size=1024)
ds = ds.prefetch(4)
it = tf.data.make_one_shot_iterator(ds)
batch = it.get_next()
# {'a': tensora, 'c': tensorc}
import tensorflow as tf
from tensorflow.python.data.experimental.ops import parquet_dataset_ops
filenames = tf.data.Dataset.from_generator(func, tf.string, tf.TensorShape([]))
# Define data frame fields.
fields = [
parquet_dataset_ops.DataFrame.Field('A', tf.int64),
parquet_dataset_ops.DataFrame.Field('C', tf.int64, ragged_rank=1)]
# Read from parquet files by reading upstream filename dataset.
ds = filenames.apply(parquet_dataset_ops.read_parquet(1024, fields=fields))
ds = ds.prefetch(4)
it = tf.data.make_one_shot_iterator(ds)
batch = it.get_next()
# {'a': tensora, 'c': tensorc}
...
import tensorflow as tf
from tensorflow.python.data.experimental.ops import parquet_dataset_ops
# Read from parquet files on remote services for selected fields.
ds = parquet_dataset_ops.ParquetDataset(
['hdfs://host:port/path/to/f3.parquet'],
batch_size=1024,
fields=['a', 'c'])
ds = ds.prefetch(4)
it = tf.data.make_one_shot_iterator(ds)
batch = it.get_next()
# {'a': tensora, 'c': tensorc}
...