# Decoding的方法
在自回归auto-regressive生成性的语言模型中，step K token预测的概率取决于到K-1时刻的token情况
这种decode模式中，有一些方法如：
1. Beam search
2. Greedy
3. Top-K
4. Top-P

Beam search通过搜索每个时间步概率最高的num_beams进行后续搜索，这样得到总体最高的值
Top-K sampling top-k采样仅采用概率最高的K个进行生成
Top-P sampling top-p采样使用累计概率和为p的进行后续使用

Beam Search适合机器翻译任务
Top-K Top-P适合类似于故事生成的任务

In [1]:
import os
import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf
import tensorflow.keras as keras

from official import nlp
from official.nlp.modeling.ops import sampling_module
from official.nlp.modeling.ops import beam_search

In [2]:
tf.get_logger().setLevel('ERROR')
os.environ['CUDA_VISIBLE_DEVICES'] = '1' # 使用 GPU 1
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
physical_devices = tf.config.list_physical_devices("GPU")
tf.config.experimental.set_memory_growth(physical_devices[0],True)
logical_devices = tf.config.list_logical_devices("GPU")

In [3]:
# 初始化一个transformer模型的参数
params = {}
params['num_heads'] = 2
params['num_layers'] = 2
params['batch_size'] = 2
params['n_dims'] = 256
params['max_decode_length'] = 4

In [4]:
# 初始化一个cache空间 本质上是用来保存先前计算的hidden-states(self-attention和cross-attention的key value)
cache = {
    'layer_%d' % layer: {
        'k': tf.zeros([params['batch_size'], params['max_decode_length'], params['num_heads'], int(params['n_dims']/params['num_heads'])], dtype=tf.float32),
        'v': tf.zeros([params['batch_size'], params['max_decode_length'], params['num_heads'], int(params['n_dims']/params['num_heads'])], dtype=tf.float32)
        } for layer in range(params['num_layers'])
    }
print("cache key shape for layer 1 :", cache['layer_1']['k'].shape)
# 这个cache空间为了self-attention 和 cross-attention准备的 维度就是[batch, seq_len, num-head, d_model]

cache key shape for layer 1 : (2, 4, 2, 128)


In [5]:
# 定义length normalization 用来归一化最后生成的scores
def length_norm(length, dtype):
  """Return length normalization factor."""
  return tf.pow(((5. + tf.cast(length, dtype)) / 6.), 0.0)

In [7]:
# 模拟一个transformer
probabilities = tf.constant([[[0.3, 0.4, 0.3], [0.3, 0.3, 0.4],
                              [0.1, 0.1, 0.8], [0.1, 0.1, 0.8]],
                            [[0.2, 0.5, 0.3], [0.2, 0.7, 0.1],
                              [0.1, 0.1, 0.8], [0.1, 0.1, 0.8]]])
probabilities.shape
# [batch_size = 2, seq_len = 4, vocab_size = 3]

TensorShape([2, 4, 3])

In [8]:
def model_fn(i):
  return probabilities[:, i, :]
# model_fn模拟了一个decoder 输入i返回值是[batch_size, 1, vocab_size]

In [9]:
def _symbols_to_logits_fn():
  """Calculates logits of the next tokens."""
  # 对概率求log?
  def symbols_to_logits_fn(ids, i, temp_cache):
    del ids
    logits = tf.cast(tf.math.log(model_fn(i)), tf.float32)
    return logits, temp_cache
  return symbols_to_logits_fn

# greedy search
greedy search选择概率最高的结果进行下一步推理
类似于argmax
方法是使用sampling_module.SamplingModule()


In [10]:
greedy_obj = sampling_module.SamplingModule(
    length_normalization_fn=None,
    dtype=tf.float32,
    symbols_to_logits_fn=_symbols_to_logits_fn(),
    vocab_size=3,
    max_decode_length=params['max_decode_length'],
    eos_id=10,
    padded_decode=False)
ids, _ = greedy_obj.generate(
    initial_ids=tf.constant([9, 1]), initial_cache=cache)
print("Greedy Decoded Ids:", ids)
# greedy的方法就是完全按照上面所给的概率进行计算的
# 即每次都选择了概率最高的token下标

Greedy Decoded Ids: tf.Tensor(
[[9 1 2 2 2]
 [1 1 1 2 2]], shape=(2, 5), dtype=int32)


In [11]:
top_k_obj = sampling_module.SamplingModule(
    length_normalization_fn=length_norm,
    dtype=tf.float32,
    symbols_to_logits_fn=_symbols_to_logits_fn(),
    vocab_size=3,
    max_decode_length=params['max_decode_length'],
    eos_id=10,
    sample_temperature=tf.constant(1.0),
    top_k=tf.constant(3),
    padded_decode=False,
    enable_greedy=False)
ids, _ = top_k_obj.generate(
    initial_ids=tf.constant([9, 1]), initial_cache=cache)
print("top-k sampled Ids:", ids)

# top_k方法过滤概率最高的K个token进行实验，并在这k个间重新分配概率

top-k sampled Ids: tf.Tensor(
[[9 1 0 0 2]
 [1 2 1 2 2]], shape=(2, 5), dtype=int32)


In [12]:
top_p_obj = sampling_module.SamplingModule(
    length_normalization_fn=length_norm,
    dtype=tf.float32,
    symbols_to_logits_fn=_symbols_to_logits_fn(),
    vocab_size=3,
    max_decode_length=params['max_decode_length'],
    eos_id=10,
    sample_temperature=tf.constant(1.0),
    top_p=tf.constant(0.9),
    padded_decode=False,
    enable_greedy=False)
ids, _ = top_p_obj.generate(
    initial_ids=tf.constant([9, 1]), initial_cache=cache)
print("top-p sampled Ids:", ids)
# top-p和top-k不同的在于其选择累积分布概率和超过p的集合中的概率最小的token

top-p sampled Ids: tf.Tensor(
[[9 2 1 2 1]
 [1 1 1 2 0]], shape=(2, 5), dtype=int32)


In [13]:
beam_size = 2
params['batch_size'] = 1
beam_cache = {
    'layer_%d' % layer: {
        'k': tf.zeros([params['batch_size'], params['max_decode_length'], params['num_heads'], params['n_dims']], dtype=tf.float32),
        'v': tf.zeros([params['batch_size'], params['max_decode_length'], params['num_heads'], params['n_dims']], dtype=tf.float32)
        } for layer in range(params['num_layers'])
    }
print("cache key shape for layer 1 :", beam_cache['layer_1']['k'].shape)
ids, _ = beam_search.sequence_beam_search(
    symbols_to_logits_fn=_symbols_to_logits_fn(),
    initial_ids=tf.constant([9], tf.int32),
    initial_cache=beam_cache,
    vocab_size=3,
    beam_size=beam_size,
    alpha=0.6,
    max_decode_length=params['max_decode_length'],
    eos_id=10,
    padded_decode=False,
    dtype=tf.float32)
print("Beam search ids:", ids)
# beam search通过选择每一步概率最高的beam_size个token进行后续 防止丢失掉总体概率最高的结果

cache key shape for layer 1 : (1, 4, 2, 256)
Beam search ids: tf.Tensor(
[[[9 0 1 2 2]
  [9 1 2 2 2]]], shape=(1, 2, 5), dtype=int32)
