In [None]:
# default_exp inference.text_generation

# Text Generation
> Text Generation API

In [None]:
#hide
from fastcore.test import test_eq
from nbverbose.showdoc import *

In [None]:
#export
import logging
from typing import List, Dict, Union
from collections import defaultdict

import torch
from torch.utils.data import TensorDataset

from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    PreTrainedTokenizer,
    PreTrainedModel,
)

from fastprogress.fastprogress import progress_bar

from adaptnlp.model import AdaptiveModel, DataLoader
from adaptnlp.model_hub import HFModelResult

from fastai.torch_core import apply, default_device, to_device

In [None]:
#export
logger = logging.getLogger(__name__)

In [None]:
#export
class TransformersTextGenerator(AdaptiveModel):
    "Adaptive model for Transformer's Language Models"

    def __init__(
        self, 
        tokenizer: PreTrainedTokenizer, # A tokenizer object from Huggingface's transformers (TODO)and tokenizers
        model: PreTrainedModel #  A transformers Language model
    ):
        # Load up model and tokenizer
        self.tokenizer = tokenizer
        super().__init__()

        # Sets internal model
        self.set_model(model)
        
         # Setup cuda and automatic allocation of model
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)

    @classmethod
    def load(
        cls, 
        model_name_or_path: str # A key string of one of Transformer's pre-trained Language Model
    ) -> AdaptiveModel:
        "Class method for loading and constructing this Model"
        tokenizer = AutoTokenizer.from_pretrained(model_name_or_path, pad_token="<PAD>")
        model = AutoModelForCausalLM.from_pretrained(model_name_or_path)
        generator = cls(tokenizer, model)
        return generator

    def predict(
        self,
        text: Union[List[str], str], # Sentences to run inference on
        mini_batch_size: int = 32, # Mini batch size
        num_tokens_to_produce: int = 50, # Number of tokens you want to generate
        **kwargs, # Optional arguments for the Transformers `PreTrainedModel.generate()` method
    ) -> List[str]: # A list of predicted sentences
        "Predict method for running inference using the pre-trained sequence classifier model.  Keyword arguments for parameters of the method `Transformers.PreTrainedModel.generate()` can be used as well."
        with torch.no_grad():

            # Make all inputs lists
            if isinstance(text, str):
                text = [text]

            dataset = self._tokenize(text)
            dataloader = DataLoader(dataset, batch_size=mini_batch_size)
            results = []

            logger.info(f'Running text generator on {len(dataset)} text sequences')
            logger.info(f'Batch size = {mini_batch_size}')
            for batch in progress_bar(dataloader):
                self.model.eval()
                batch = apply(to_device, batch)

                if len(batch) == 3:
                    inputs = {
                        'input_ids': batch[0],
                        'attention_masks': batch[1],
                        'token_type_ids': batch[2],
                    }
                else:
                    inputs = {
                        'input_ids': batch[0],
                        'attention_masks': batch[1],
                    }
                # model.generate() does not have batch inference implemented yet
                generated_text = self._batch_generate(
                    inputs=inputs,
                    seq_len=batch[0].shape[1],
                    num_tokens_to_produce=num_tokens_to_produce,
                )
                results += generated_text

        return {"generated_text":results}

    def _tokenize(self, text: Union[List[str], str]) -> TensorDataset:
        """ Batch tokenizes text and produces a `TensorDataset` with text """

        tokenized_text = self.tokenizer.batch_encode_plus(
            text,
            return_tensors="pt",
            padding="longest",
        )

        dataset = TensorDataset(
            tokenized_text["input_ids"],
            tokenized_text["attention_mask"],
        )

        return dataset

    def _batch_generate(
        self, inputs: Dict, seq_len: int, num_tokens_to_produce: int
    ) -> List[str]:
        """Generates text data with varying text sizes"""
        input_ids = inputs["input_ids"]
        attn_mask = inputs["attention_masks"]

        pad_token_id = self.tokenizer.pad_token_id
        eos_token_id = self.tokenizer.eos_token_id
        eos_not_in_sents = torch.ones(input_ids.shape[0]).long().to(self.device)

        # we need to get the token ids of the last non-padded value
        last_non_masked_idx = torch.sum(attn_mask, dim=1) - 1
        start_idx = (
            (last_non_masked_idx)
            .view(-1, 1)
            .repeat(1, self.tokenizer.vocab_size)
            .unsqueeze(1)
        )

        # get correct position ids
        position_ids = torch.tensor(
            [list(range(seq_len)) for i in range(input_ids.shape[0])]
        ).to(self.device)
        for i, position_ids_slice in enumerate(position_ids):
            position_ids_slice[last_non_masked_idx[i] :] = position_ids_slice[
                last_non_masked_idx[i]
            ]

        for step in range(num_tokens_to_produce):
            outputs = self.model(
                input_ids, attention_mask=attn_mask, position_ids=position_ids
            )

            # in the first decoding step, we want to use the 'real' last position for each sentence
            if step == 0:
                next_token_logits = outputs[0].gather(1, start_idx).squeeze(1)
            else:
                next_token_logits = outputs[0][:, -1, :]

            next_tokens = torch.argmax(next_token_logits, dim=-1)

            # this updates which sentences have not seen an <EOS> token so far
            # if one <EOS> token was seen the sentence is finished
            eos_not_in_sents.mul_(next_tokens.ne(eos_token_id).long())

            # either append a padding token here if <EOS> has been seen or append next token
            tokens_to_add = next_tokens * (eos_not_in_sents) + pad_token_id * (
                1 - eos_not_in_sents
            )

            # Update input_ids, attn_mask and position_ids
            input_ids = torch.cat([input_ids, tokens_to_add.unsqueeze(-1)], dim=-1)
            attn_mask = torch.cat(
                [attn_mask, torch.ones((attn_mask.shape[0], 1)).long().to(self.device)],
                dim=1,
            )
            position_ids = torch.cat(
                [position_ids, (position_ids[:, -1] + 1).unsqueeze(-1)], dim=1
            )

        return [
            self.tokenizer.decode(output, skip_special_tokens=True)
            for output in input_ids
        ]

In [None]:
show_doc(TransformersTextGenerator.load)

<h4 id="TransformersTextGenerator.load" class="doc_header"><code>TransformersTextGenerator.load</code><a href="__main__.py#L21" class="source_link" style="float:right">[source]</a></h4>

> <code>TransformersTextGenerator.load</code>(**`model_name_or_path`**:`str`)

Class method for loading and constructing this Model

**Parameters:**


 - **`model_name_or_path`** : *`<class 'str'>`*	<p>A key string of one of Transformer's pre-trained Language Model</p>



**Returns**:
	
 * *`<class 'adaptnlp.model.AdaptiveModel'>`*

In [None]:
o = TransformersTextGenerator.load('gpt2')

HBox(children=(FloatProgress(value=0.0, description='Downloading', max=665.0, style=ProgressStyle(description_…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=1042301.0, style=ProgressStyle(descript…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=456318.0, style=ProgressStyle(descripti…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=1355256.0, style=ProgressStyle(descript…




Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


HBox(children=(FloatProgress(value=0.0, description='Downloading', max=548118077.0, style=ProgressStyle(descri…




In [None]:
show_doc(TransformersTextGenerator.predict)

<h4 id="TransformersTextGenerator.predict" class="doc_header"><code>TransformersTextGenerator.predict</code><a href="__main__.py#L32" class="source_link" style="float:right">[source]</a></h4>

> <code>TransformersTextGenerator.predict</code>(**`text`**:`Union`\[`List`\[`str`\], `str`\], **`mini_batch_size`**:`int`=*`32`*, **`num_tokens_to_produce`**:`int`=*`50`*, **\*\*`kwargs`**)

Predict method for running inference using the pre-trained sequence classifier model.  Keyword arguments for parameters of the method `Transformers.PreTrainedModel.generate()` can be used as well.

**Parameters:**


 - **`text`** : *`typing.Union[typing.List[str], str]`*	<p>Sentences to run inference on</p>


 - **`mini_batch_size`** : *`<class 'int'>`*, *optional*	<p>Mini batch size</p>


 - **`num_tokens_to_produce`** : *`<class 'int'>`*, *optional*	<p>Number of tokens you want to generate</p>


 - **`kwargs`** : *`<class 'inspect._empty'>`*


**Returns**:
	
 * *`typing.List[str]`*	<p>A list of predicted sentences</p>



In [None]:
#export
class EasyTextGenerator:
    "Text Generation Module"

    def __init__(self):
        self.generators: Dict[AdaptiveModel] = defaultdict(bool)

    def generate(
        self,
        text: Union[List[str], str], # List of sentences to run inference on
        model_name_or_path: [str, HFModelResult] = "gpt2", # A model id or path to a pre-trained model repository or custom trained model directory
        mini_batch_size: int = 32, # Mini batch size
        num_tokens_to_produce: int = 50, # Number of tokens you want to generate
        **kwargs, # Optional arguments for the Transformers `PreTrainedModel.generate()` method
    ) -> List[str]: # A list of predicted sentences
        "Predict method for running inference using the pre-trained sequence classifier model. Keyword arguments for parameters of the method `Transformers.PreTrainedModel.generate()` can be used as well."
        name = getattr(model_name_or_path, 'name', model_name_or_path)
        if not self.generators[name]:
            self.generators[name] = TransformersTextGenerator.load(
                name
            )

        generator = self.generators[name]
        return generator.predict(
            text=text,
            mini_batch_size=mini_batch_size,
            num_tokens_to_produce=num_tokens_to_produce,
            **kwargs,
        )

In [None]:
show_doc(EasyTextGenerator.generate)

<h4 id="EasyTextGenerator.generate" class="doc_header"><code>EasyTextGenerator.generate</code><a href="__main__.py#L8" class="source_link" style="float:right">[source]</a></h4>

> <code>EasyTextGenerator.generate</code>(**`text`**:`Union`\[`List`\[`str`\], `str`\], **`model_name_or_path`**:`HFModelResult'>]`=*`'gpt2'`*, **`mini_batch_size`**:`int`=*`32`*, **`num_tokens_to_produce`**:`int`=*`50`*, **\*\*`kwargs`**)

Predict method for running inference using the pre-trained sequence classifier model. Keyword arguments for parameters of the method `Transformers.PreTrainedModel.generate()` can be used as well.

**Parameters:**


 - **`text`** : *`typing.Union[typing.List[str], str]`*	<p>List of sentences to run inference on</p>


 - **`model_name_or_path`** : *`[<class 'str'>, <class 'adaptnlp.model_hub.HFModelResult'>]`*, *optional*	<p>A model id or path to a pre-trained model repository or custom trained model directory</p>


 - **`mini_batch_size`** : *`<class 'int'>`*, *optional*	<p>Mini batch size</p>


 - **`num_tokens_to_produce`** : *`<class 'int'>`*, *optional*	<p>Number of tokens you want to generate</p>


 - **`kwargs`** : *`<class 'inspect._empty'>`*


**Returns**:
	
 * *`typing.List[str]`*	<p>A list of predicted sentences</p>



In [None]:
#hide
# Text from encyclopedia Britannica on Einstein
text = "What has happened?"

generator = EasyTextGenerator()
generated_text = generator.generate(text, model_name_or_path="gpt2", mini_batch_size=2, num_tokens_to_produce=50)
test_eq(generated_text['generated_text'], ['What has happened?\n\nThe first thing that happened was that I was in a room with a bunch of people who were all very nice and nice people. I was sitting in a chair and they were all talking about how they were going to get a job and how'])

Special tokens have been added in the vocabulary, make sure the associated word embedding are fine-tuned or trained.


In [None]:
#hide
from adaptnlp.model_hub import HFModelHub
hub = HFModelHub()
model = hub.search_model_by_name('gpt2')[-1]
generated_text = generator.generate(text, model_name_or_path=model, mini_batch_size=2, num_tokens_to_produce=50)
test_eq(generated_text['generated_text'], ['What has happened?\n\nThe first thing that happened was that I was in a room with a bunch of people who were all very nice and nice people. I was sitting in a chair and they were all talking about how they were going to get a job and how'])