In [1]:

import tensorflow as tf

In [3]:
# 可以把dataset看做一个java里的列表容器对象，其中的shapes指的就是这个ds持有的每一个对象的shape
# 在这个例子里，传入的是一个列表，这个列表里每一个元素都是2*3的数组
test_ds=tf.data.Dataset.from_tensor_slices([[[1,2,3],[1,23,3]],
                                            [[2,3,4],[12,233,13]],
                                            [[2,3,4],[12,233,13]],
                                            [[2,3,4],[12,233,13]]])
print("test ds:",test_ds)

print("==================")
for row in test_ds:
  print(row)

print("==================")
print(test_ds.batch(2))
print("==================")
for b in test_ds.batch(2):
  print(b)

# test_ds 原本的形状是(2,3)，里面持有多少个元素暂时不知道
# batch之后，会把两个元素打包成1个batch，得到了一个BatchDataset shapes: (None, 2, 3)
# 这个None实际上代表的就是那个2，我猜tensorflow由于懒加载机制，所以暂时这个产生的batchdataset不知道具体的batch_size

test ds: <TensorSliceDataset shapes: (2, 3), types: tf.int32>
tf.Tensor(
[[ 1  2  3]
 [ 1 23  3]], shape=(2, 3), dtype=int32)
tf.Tensor(
[[  2   3   4]
 [ 12 233  13]], shape=(2, 3), dtype=int32)
tf.Tensor(
[[  2   3   4]
 [ 12 233  13]], shape=(2, 3), dtype=int32)
tf.Tensor(
[[  2   3   4]
 [ 12 233  13]], shape=(2, 3), dtype=int32)
<BatchDataset shapes: (None, 2, 3), types: tf.int32>
tf.Tensor(
[[[  1   2   3]
  [  1  23   3]]

 [[  2   3   4]
  [ 12 233  13]]], shape=(2, 2, 3), dtype=int32)
tf.Tensor(
[[[  2   3   4]
  [ 12 233  13]]

 [[  2   3   4]
  [ 12 233  13]]], shape=(2, 2, 3), dtype=int32)


In [3]:
# 如何让dataset同时持有多个list，答案就是使用tuple
# 下面这个例子里，这个dataset通过tuple，持有了多个list
test_ds=tf.data.Dataset.from_tensor_slices(([[1,2,3],[4,5,6]],
                                            [[[1,2],[1,2]],
                                             [[4,5],[6,7]]]))
# 通过按顺序打印结果发现，这个dataset里的第一个item，是第一个列表的第一个元素与第二个列表的第一个元素组成的tuple
# 第二个item是第一个列表的第二个元素与第二个列表的第二个元素组成的tuple
# 进而可以得知，输入的各个列表的第0轴的维度应该相同
print(test_ds)
for item in test_ds:
  print(item)

print("================")
test_ds=tf.data.Dataset.from_tensor_slices(
  [tf.constant(1.),tf.constant(2.),
   tf.constant(3.),tf.constant(4.)])
print(test_ds)
print("=========================")
tuple_ds=test_ds.map(lambda i:(i,tf.add(i,0.5)))
print(tuple_ds)
# 这样就获得了一个持有元素为tuple，并且每个tuple里的形状是((),())，即俩数的tuple
# 也就是[(1,1.5),(2,2.5),(3,3.5),(4,4.5)]
print("=========================")
batch_ds=tuple_ds.batch(3)
print(batch_ds)
print("=========================")
for b in batch_ds:
  print(b)
# 注意，这种情况下，由于tuple_ds持有的是多个list，
# 所以对这类对象进行batch的时候，是对所有的list进行batch
# 通过观察batch_ds的形状可知，他是对每个tuple的相同位置的数字进行了batch
# 所以生成的是BatchDataset shapes: ((None,), (None,))这种形状的
# 也就是这个batch持有的仍然是多个list，但是其中的每个元素不再是一个数字了
# 而是batch_size大小的列表，也就是([[1,2,3],[1.5,2.5,3.5]],[[4],[4.5]])

print("=========================")
for b in tuple_ds.take(1):
  print(b)


<TensorSliceDataset shapes: ((3,), (2, 2)), types: (tf.int32, tf.int32)>
(<tf.Tensor: shape=(3,), dtype=int32, numpy=array([1, 2, 3], dtype=int32)>, <tf.Tensor: shape=(2, 2), dtype=int32, numpy=
array([[1, 2],
       [1, 2]], dtype=int32)>)
(<tf.Tensor: shape=(3,), dtype=int32, numpy=array([4, 5, 6], dtype=int32)>, <tf.Tensor: shape=(2, 2), dtype=int32, numpy=
array([[4, 5],
       [6, 7]], dtype=int32)>)
<TensorSliceDataset shapes: (), types: tf.float32>
<MapDataset shapes: ((), ()), types: (tf.float32, tf.float32)>
<BatchDataset shapes: ((None,), (None,)), types: (tf.float32, tf.float32)>
(<tf.Tensor: shape=(3,), dtype=float32, numpy=array([1., 2., 3.], dtype=float32)>, <tf.Tensor: shape=(3,), dtype=float32, numpy=array([1.5, 2.5, 3.5], dtype=float32)>)
(<tf.Tensor: shape=(1,), dtype=float32, numpy=array([4.], dtype=float32)>, <tf.Tensor: shape=(1,), dtype=float32, numpy=array([4.5], dtype=float32)>)
(<tf.Tensor: shape=(), dtype=float32, numpy=1.0>, <tf.Tensor: shape=(), dtype=float3

In [3]:
#py_function与map

"""
https://www.tensorflow.org/tutorials/load_data/text
You want to use Dataset.map to apply this function to each element of the dataset. Dataset.map runs in graph mode.
Graph tensors do not have a value.
In graph mode you can only use TensorFlow Ops and functions.
So you can't .map this function directly: You need to wrap it in a tf.py_function. The tf.py_function will pass regular tensors (with a value and a .numpy() method to access it), to the wrapped python function.
"""

test_ds=tf.data.Dataset.range(1,10)
# print(test_ds)
def py_function(tensor):
  print(type(tensor)) #这一句函数只会在next(iter(test_ds.map(op).take(1)))真正地执行，理解这个很重要
  # 而且类型是tensorflow.python.framework.ops.EagerTensor
  return tensor.numpy()+1

def show(tensor):
  print(type(tensor))
  # Tensor("args_0:0", shape=(), dtype=int64)
  # 可以看出这个tensor是没有值的，这是一个graph的tensor。没法取出相应的值
  # 个人理解
  # 最重要的点是，这个map也是一个op，并不是实际的计算
  # 如果对map输入了这个show，相当于在graph上放置了一个op，放置这个操作会触发这个show执行
  # 所以上面的这个print方法会在放置的时候执行
  # 而此时，在这个graph里，这个op只知道这个tensor的类型（基于dataset但注意dataset也是不知道具体的数据的）
  # 这个tensor指的就是只是指向某个数据的张量
  # 所以没法去执行numpy()
  return 1

def op(tensor):
  # py_function可以在运行时将graph tensor转化成常规的tensor
  res=tf.py_function(py_function,
                     inp=[tensor],
                     Tout=tf.int8)
  res.set_shape([])#由于这个方法被用在map阶段，所以最好还是设置好tensor的形状，以便后续的方法操作
  return res

print(test_ds.map(show))
print(test_ds.map(op))
# 在take阶段，真正地出发了op的执行
print(next(iter(test_ds.map(op).take(1))))

import numpy as np
import tensorflow as tf
print("==========================")
from collections import OrderedDict
import numpy as np
def get_dict(i):
  d=OrderedDict()
  d["a"]=tf.cast(i,tf.float32)
  d["b"]=np.random.normal()
  return (d,i)

test_ds=tf.data.Dataset.range(1,4).map(lambda x:get_dict(x))
print(test_ds)

def pack(features, label):
  print("print once only",features)# 可以看到这个feature直接就是order dict，他的values才是tensor，所以不用挂py_function
  values=list(features.values())
  return tf.add(values,1.1),label
packed_dataset = test_ds.map(pack)

# 所以说，map的时候，pack函数就会执行，但实际上只会执行一次，但这个执行并不是真正意义上的计算，本质是将一个op放在graph上。
# 所以对张量取值是取不到的
# 所以什么时候需要包py_function呢，只要涉及对tensor的具体值的处理，就需要包
# 但就像这个例子里，temp_dataset里持有的是orderdict，并不是tensor，所以不用包
# 再例如如果一个dataset里持有的是tuple，只要不涉及对tensor具体值的处理，也不用包
# 就像ds.map(lambda x,y:(1,2))，你看，这也不用py_function
for features, labels in packed_dataset.take(1):
  print(features.numpy())
  print()
  print(labels.numpy())


<class 'tensorflow.python.framework.ops.Tensor'>
<MapDataset shapes: (), types: tf.int32>
<MapDataset shapes: (1,), types: tf.int8>
<class 'tensorflow.python.framework.ops.EagerTensor'>


InvalidArgumentError: Incompatible shapes at component 0: expected [1] but got [].

In [11]:
import tensorflow as tf

dataset=tf.data.Dataset.range(1,10)
def map_func(x):
  print("exec")
  if tf.less(x,5):
    return "A_class",[1,0]
  return "B_class",[0,1]

for x in dataset.map(map_func): # 仅仅在map处进行了map操作
  print(x)
dataset.map(map_func)

exec
(<tf.Tensor: shape=(), dtype=string, numpy=b'A_class'>, <tf.Tensor: shape=(2,), dtype=int32, numpy=array([1, 0], dtype=int32)>)
(<tf.Tensor: shape=(), dtype=string, numpy=b'A_class'>, <tf.Tensor: shape=(2,), dtype=int32, numpy=array([1, 0], dtype=int32)>)
(<tf.Tensor: shape=(), dtype=string, numpy=b'A_class'>, <tf.Tensor: shape=(2,), dtype=int32, numpy=array([1, 0], dtype=int32)>)
(<tf.Tensor: shape=(), dtype=string, numpy=b'A_class'>, <tf.Tensor: shape=(2,), dtype=int32, numpy=array([1, 0], dtype=int32)>)
(<tf.Tensor: shape=(), dtype=string, numpy=b'B_class'>, <tf.Tensor: shape=(2,), dtype=int32, numpy=array([0, 1], dtype=int32)>)
(<tf.Tensor: shape=(), dtype=string, numpy=b'B_class'>, <tf.Tensor: shape=(2,), dtype=int32, numpy=array([0, 1], dtype=int32)>)
(<tf.Tensor: shape=(), dtype=string, numpy=b'B_class'>, <tf.Tensor: shape=(2,), dtype=int32, numpy=array([0, 1], dtype=int32)>)
(<tf.Tensor: shape=(), dtype=string, numpy=b'B_class'>, <tf.Tensor: shape=(2,), dtype=int32, numpy=

<MapDataset shapes: ((), (2,)), types: (tf.string, tf.int32)>

In [2]:
# 处理SparseTensor
import tensorflow as tf
test_ds=tf.data.Dataset.from_tensor_slices(["1:1 2:3;4","1:2 2:5"])
def input_parser(line):
    res=tf.sparse.SparseTensor(indices=[[0,1],[1,2]],values=[0,2],dense_shape=[2,2])
    return res

parsed_ds=test_ds.map(input_parser)

for row in parsed_ds:
    print(row)

SparseTensor(indices=tf.Tensor(
[[0 1]
 [1 2]], shape=(2, 2), dtype=int64), values=tf.Tensor([0 2], shape=(2,), dtype=int32), dense_shape=tf.Tensor([2 2], shape=(2,), dtype=int64))
SparseTensor(indices=tf.Tensor(
[[0 1]
 [1 2]], shape=(2, 2), dtype=int64), values=tf.Tensor([0 2], shape=(2,), dtype=int32), dense_shape=tf.Tensor([2 2], shape=(2,), dtype=int64))


In [17]:
num_fields=3
num_features=8

# 处理multihot的样本，输入的是格式是string field_id:feature_id1;feature_id2
# 希望的输出是num_fields个shape为[]的string，例如["1","3;4"]
# 由于需要具体的取值，因此需要借助py_function对结果进行截断与搜集
# map返回的一定是一个tensor，而不能是list之类的对象。因为tensorflow是图计算模式的，python代码的map只会执行一次，而真正执行的是底层的图计算
# 注意featureId一定要从0开始，否则转稀疏矩阵时会丢到该值

import tensorflow as tf
test_ds=tf.data.Dataset.from_tensor_slices(["1:1 2:3;4 3:6","1:2 2:5 3:7","1:1 2:4 3:7"])

def input_parser_func(line_tensor):
    # 返回1 3;4这种数据
    real_line=line_tensor
    # 将line.numpy转换为string类型更便于处理一些，但是本例中还是使用了tf提供的api直接处理二进制字符串
    print(str(real_line.numpy(), encoding = "utf-8"))

    row=tf.strings.split(real_line," ")

    feature_strs=list()
    for item in row:
        field_feature=tf.strings.split([item],":").to_sparse()
        print("field_feature",field_feature)
        field_id=tf.strings.to_number(field_feature.values[0], tf.int64)
        print("field_id",field_id)
        feature_strs.append(field_feature.values[1])
    return feature_strs

def input_parser(line):
    """

    :param line: tensor in string
    :return: 返回的是一个list，每个元素都是一个field的feature_id字符串，用;连接
         希望的输出是num_fields个shape为[]的string，例如["1","3;4"]
         注意由于使用了py_function，此处返回的是一个列表，这个列表里有num_fields个tensor，而不是一个shape为[num_fields]的tensor
         如果不使用py_function是可以实现后者的，见tensorflow1中的实现
    """
    py_res=tf.py_function(func=input_parser_func,inp=[line],Tout=[tf.string]*num_fields)

    return py_res

parsed_ds=test_ds.map(input_parser) # 每个item都是num_fields个shape为[]的string
# print(parsed_ds)
for row in parsed_ds:
    print("new row")
    print(row)

batched_ds=parsed_ds.batch(2) # 每个item都是: field_size个 [batch_size,] 的string
# print(batched_ds)

for row in batched_ds:
    print("new batch")
    print(row)

1:1 2:3;4 3:6
field_feature SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]], shape=(2, 2), dtype=int64), values=tf.Tensor([b'1' b'1'], shape=(2,), dtype=string), dense_shape=tf.Tensor([1 2], shape=(2,), dtype=int64))
field_id tf.Tensor(1, shape=(), dtype=int64)
field_feature SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]], shape=(2, 2), dtype=int64), values=tf.Tensor([b'2' b'3;4'], shape=(2,), dtype=string), dense_shape=tf.Tensor([1 2], shape=(2,), dtype=int64))
field_id tf.Tensor(2, shape=(), dtype=int64)
field_feature SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]], shape=(2, 2), dtype=int64), values=tf.Tensor([b'3' b'6'], shape=(2,), dtype=string), dense_shape=tf.Tensor([1 2], shape=(2,), dtype=int64))
field_id tf.Tensor(3, shape=(), dtype=int64)
new row
(<tf.Tensor: shape=(), dtype=string, numpy=b'1'>, <tf.Tensor: shape=(), dtype=string, numpy=b'3;4'>, <tf.Tensor: shape=(), dtype=string, numpy=b'6'>)
1:2 2:5 3:7
field_feature SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]], shape=(2, 2

In [9]:

embedding_param=tf.random.truncated_normal(shape=(num_features,4))

def embedding_layer(batched):
    # num_fields个[batch_size,]的str

    embs=list()
    for i in range(num_fields):

        field_i_str=batched[i]
        # print("field_{}_str".format(i))
        # print(field_i_str)

        field_i_strs=tf.strings.split(field_i_str,";") # RaggedTensor
        # print("field_{}_strs".format(i))
        # print(field_i_strs)

        field_i=tf.strings.to_number(field_i_strs,tf.int64).to_sparse() # tf2之中可以对RaggedTensor直接调用to_number
        # print("field_{}".format(i))
        # print(field_i)
        sparse_tensor=tf.SparseTensor(indices=field_i.indices,values=field_i.values,dense_shape=field_i.dense_shape)
        field_i_emb=tf.nn.embedding_lookup_sparse(params=embedding_param,sp_ids=field_i,sp_weights=None)
        embs.append(field_i_emb)

    emb=tf.stack(embs,axis=1)
    print(emb)


for batch in batched_ds:
    embedding_layer(batch)


<tf.RaggedTensor [[b'1'], [b'2']]>
SparseTensor(indices=tf.Tensor(
[[0 0]
 [1 0]], shape=(2, 2), dtype=int64), values=tf.Tensor([1 2], shape=(2,), dtype=int64), dense_shape=tf.Tensor([2 1], shape=(2,), dtype=int64))
<tf.RaggedTensor [[b'3', b'4'], [b'5']]>
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [1 0]], shape=(3, 2), dtype=int64), values=tf.Tensor([3 4 5], shape=(3,), dtype=int64), dense_shape=tf.Tensor([2 2], shape=(2,), dtype=int64))
<tf.RaggedTensor [[b'6'], [b'7']]>
SparseTensor(indices=tf.Tensor(
[[0 0]
 [1 0]], shape=(2, 2), dtype=int64), values=tf.Tensor([6 7], shape=(2,), dtype=int64), dense_shape=tf.Tensor([2 1], shape=(2,), dtype=int64))
tf.Tensor(
[[[-0.02719916 -0.7736385   0.00539994  0.15242586]
  [ 0.7670817   0.9565993   0.4649043  -0.3597867 ]
  [ 0.9922279   1.4511844   0.34339067  0.22595775]]

 [[ 0.07129501  0.58460325  0.44938353  0.91817623]
  [ 1.0877714  -1.2422072   0.0800534   0.48739117]
  [-0.4828232  -1.5605565  -0.00997701  0.01435741]]], shape=(2,