-
Notifications
You must be signed in to change notification settings - Fork 26.4k
/
text_generation.py
260 lines (223 loc) · 12.2 KB
/
text_generation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import enum
from transformers import MODEL_FOR_CAUSAL_LM_MAPPING, TF_MODEL_FOR_CAUSAL_LM_MAPPING
from ..utils import add_end_docstrings, is_tf_available
from .base import PIPELINE_INIT_ARGS, Pipeline
if is_tf_available():
import tensorflow as tf
class ReturnType(enum.Enum):
TENSORS = 0
NEW_TEXT = 1
FULL_TEXT = 2
@add_end_docstrings(PIPELINE_INIT_ARGS)
class TextGenerationPipeline(Pipeline):
"""
Language generation pipeline using any `ModelWithLMHead`. This pipeline predicts the words that will follow a
specified text prompt.
This language generation pipeline can currently be loaded from [`pipeline`] using the following task identifier:
`"text-generation"`.
The models that this pipeline can use are models that have been trained with an autoregressive language modeling
objective, which includes the uni-directional models in the library (e.g. gpt2). See the list of available models
on [huggingface.co/models](https://huggingface.co/models?filter=text-generation).
"""
# Prefix text to help Transformer-XL and XLNet with short prompts as proposed by Aman Rusia
# in https://github.com/rusiaaman/XLNet-gen#methodology
# and https://medium.com/@amanrusia/xlnet-speaks-comparison-to-gpt-2-ea1a4e9ba39e
XL_PREFIX = """
In 1991, the remains of Russian Tsar Nicholas II and his family (except for Alexei and Maria) are discovered. The
voice of Nicholas's young son, Tsarevich Alexei Nikolaevich, narrates the remainder of the story. 1883 Western
Siberia, a young Grigori Rasputin is asked by his father and a group of men to perform magic. Rasputin has a vision
and denounces one of the men as a horse thief. Although his father initially slaps him for making such an
accusation, Rasputin watches as the man is chased outside and beaten. Twenty years later, Rasputin sees a vision of
the Virgin Mary, prompting him to become a priest. Rasputin quickly becomes famous, with people, even a bishop,
begging for his blessing. <eod> </s> <eos>
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.check_model_type(
TF_MODEL_FOR_CAUSAL_LM_MAPPING if self.framework == "tf" else MODEL_FOR_CAUSAL_LM_MAPPING
)
if "prefix" not in self._preprocess_params:
# This is very specific. The logic is quite complex and needs to be done
# as a "default".
# It also defines both some preprocess_kwargs and generate_kwargs
# which is why we cannot put them in their respective methods.
prefix = None
if self.model.config.prefix is not None:
prefix = self.model.config.prefix
if prefix is None and self.model.__class__.__name__ in [
"XLNetLMHeadModel",
"TransfoXLLMHeadModel",
"TFXLNetLMHeadModel",
"TFTransfoXLLMHeadModel",
]:
# For XLNet and TransformerXL we add an article to the prompt to give more state to the model.
prefix = self.XL_PREFIX
if prefix is not None:
# Recalculate some generate_kwargs linked to prefix.
preprocess_params, forward_params, _ = self._sanitize_parameters(prefix=prefix, **self._forward_params)
self._preprocess_params = {**self._preprocess_params, **preprocess_params}
self._forward_params = {**self._forward_params, **forward_params}
def _sanitize_parameters(
self,
return_full_text=None,
return_tensors=None,
return_text=None,
return_type=None,
clean_up_tokenization_spaces=None,
prefix=None,
handle_long_generation=None,
**generate_kwargs
):
preprocess_params = {}
if prefix is not None:
preprocess_params["prefix"] = prefix
if prefix:
prefix_inputs = self.tokenizer(
prefix, padding=False, add_special_tokens=False, return_tensors=self.framework
)
prefix_length = prefix_inputs["input_ids"].shape[-1]
if "max_new_tokens" in generate_kwargs:
pass
elif "max_length" in generate_kwargs:
generate_kwargs["max_length"] += prefix_length
else:
generate_kwargs["max_length"] = self.model.config.max_length + prefix_length
if "min_length" in generate_kwargs:
generate_kwargs["min_length"] += prefix_length
if handle_long_generation is not None:
if handle_long_generation not in {"hole"}:
raise ValueError(
f"{handle_long_generation} is not a valid value for `handle_long_generation` parameter expected"
" [None, 'hole']"
)
preprocess_params["handle_long_generation"] = handle_long_generation
preprocess_params.update(generate_kwargs)
forward_params = generate_kwargs
postprocess_params = {}
if return_full_text is not None and return_type is None:
return_type = ReturnType.FULL_TEXT if return_full_text else ReturnType.NEW_TEXT
if return_tensors is not None and return_type is None:
return_type = ReturnType.TENSORS
if return_type is not None:
postprocess_params["return_type"] = return_type
if clean_up_tokenization_spaces is not None:
postprocess_params["clean_up_tokenization_spaces"] = clean_up_tokenization_spaces
return preprocess_params, forward_params, postprocess_params
# overriding _parse_and_tokenize to allow for unusual language-modeling tokenizer arguments
def _parse_and_tokenize(self, *args, **kwargs):
"""
Parse arguments and tokenize
"""
# Parse arguments
if self.model.__class__.__name__ in ["TransfoXLLMHeadModel"]:
kwargs.update({"add_space_before_punct_symbol": True})
return super()._parse_and_tokenize(*args, **kwargs)
def __call__(self, text_inputs, **kwargs):
"""
Complete the prompt(s) given as inputs.
Args:
args (`str` or `List[str]`):
One or several prompts (or one list of prompts) to complete.
return_tensors (`bool`, *optional*, defaults to `False`):
Whether or not to include the tensors of predictions (as token indices) in the outputs.
return_text (`bool`, *optional*, defaults to `True`):
Whether or not to include the decoded texts in the outputs.
return_full_text (`bool`, *optional*, defaults to `True`):
If set to `False` only added text is returned, otherwise the full text is returned Only meaningful if
*return_text* is set to True.
clean_up_tokenization_spaces (`bool`, *optional*, defaults to `False`):
Whether or not to clean up the potential extra spaces in the text output.
prefix (`str`, *optional*):
Prefix added to prompt.
handle_long_generation (`str`, *optional*):
By default, this pipelines does not handle long generation (ones that exceed in one form or the other
the model maximum length). There is no perfect way to adress this (more info
:https://github.com/huggingface/transformers/issues/14033#issuecomment-948385227). This provides common
strategies to work around that problem depending on your use case.
- `None` : default strategy where nothing in particular happens
- `"hole"`: Truncates left of input, and leaves a gap wide enough to let generation happen (might
truncate a lot of the prompt and not suitable when generation exceed the model capacity)
generate_kwargs:
Additional keyword arguments to pass along to the generate method of the model (see the generate method
corresponding to your framework [here](./model#generative-models)).
Return:
A list or a list of list of `dict`: Each result comes as a dictionary with the following keys:
- **generated_text** (`str`, present when `return_text=True`) -- The generated text.
- **generated_token_ids** (`torch.Tensor` or `tf.Tensor`, present when `return_tensors=True`) -- The token
ids of the generated text.
"""
return super().__call__(text_inputs, **kwargs)
def preprocess(self, prompt_text, prefix="", handle_long_generation=None, **generate_kwargs):
inputs = self.tokenizer(
prefix + prompt_text, padding=False, add_special_tokens=False, return_tensors=self.framework
)
inputs["prompt_text"] = prompt_text
if handle_long_generation == "hole":
cur_len = inputs["input_ids"].shape[-1]
if "max_new_tokens" in generate_kwargs:
new_tokens = generate_kwargs["max_new_tokens"]
else:
new_tokens = generate_kwargs.get("max_length", self.model.config.max_length) - cur_len
if new_tokens < 0:
raise ValueError("We cannot infer how many new tokens are expected")
if cur_len + new_tokens > self.tokenizer.model_max_length:
keep_length = self.tokenizer.model_max_length - new_tokens
if keep_length <= 0:
raise ValueError(
"We cannot use `hole` to handle this generation the number of desired tokens exceeds the"
" models max length"
)
inputs["input_ids"] = inputs["input_ids"][:, -keep_length:]
if "attention_mask" in inputs:
inputs["attention_mask"] = inputs["attention_mask"][:, -keep_length:]
return inputs
def _forward(self, model_inputs, **generate_kwargs):
input_ids = model_inputs["input_ids"]
# Allow empty prompts
if input_ids.shape[1] == 0:
input_ids = None
in_b = 1
else:
in_b = input_ids.shape[0]
prompt_text = model_inputs.pop("prompt_text")
generated_sequence = self.model.generate(input_ids=input_ids, **generate_kwargs) # BS x SL
out_b = generated_sequence.shape[0]
if self.framework == "pt":
generated_sequence = generated_sequence.reshape(in_b, out_b // in_b, *generated_sequence.shape[1:])
elif self.framework == "tf":
generated_sequence = tf.reshape(generated_sequence, (in_b, out_b // in_b, *generated_sequence.shape[1:]))
return {"generated_sequence": generated_sequence, "input_ids": input_ids, "prompt_text": prompt_text}
def postprocess(self, model_outputs, return_type=ReturnType.FULL_TEXT, clean_up_tokenization_spaces=True):
generated_sequence = model_outputs["generated_sequence"][0]
input_ids = model_outputs["input_ids"]
prompt_text = model_outputs["prompt_text"]
generated_sequence = generated_sequence.numpy().tolist()
records = []
for sequence in generated_sequence:
if return_type == ReturnType.TENSORS:
record = {"generated_token_ids": sequence}
elif return_type in {ReturnType.NEW_TEXT, ReturnType.FULL_TEXT}:
# Decode text
text = self.tokenizer.decode(
sequence,
skip_special_tokens=True,
clean_up_tokenization_spaces=clean_up_tokenization_spaces,
)
# Remove PADDING prompt of the sequence if XLNet or Transfo-XL model is used
if input_ids is None:
prompt_length = 0
else:
prompt_length = len(
self.tokenizer.decode(
input_ids[0],
skip_special_tokens=True,
clean_up_tokenization_spaces=clean_up_tokenization_spaces,
)
)
if return_type == ReturnType.FULL_TEXT:
all_text = prompt_text + text[prompt_length:]
else:
all_text = text[prompt_length:]
record = {"generated_text": all_text}
records.append(record)
return records