# LELA70331 Computational Linguistics Week 8

This week we are going to take a look at text generation and dialogue systems.

### Text Generation

We are going to look first at methods for creative generation of text

### Ngram-based generation

The simplest way to perform generation is to learn and apply n-gram probabilities. The way this works is that we first calculate the probability of each word in a corpus following each other word. We then pick a random word to start the sentence and the select a next word that is probable given that word. We then output that next word and then select another word that is probable given THAT word. We then repeat for N words.

This allows us to generate sentences that look a little like natural language, and to imitate particular genres and styles.

In [None]:
import nltk
nltk.download('gutenberg')
nltk.corpus.gutenberg.fileids()

In [None]:
carroll = nltk.corpus.gutenberg.words('carroll-alice.txt')
kjv = nltk.corpus.gutenberg.words('bible-kjv.txt')
macbeth = nltk.corpus.gutenberg.words('shakespeare-macbeth.txt')
bigrams = nltk.bigrams(kjv)
cfd = nltk.ConditionalFreqDist(bigrams)

In [None]:
cfd['the']

In [None]:
def generate_argmax(cfdist, word, num=15):
    for i in range(num):
        print(word, end=' ')
        word = cfdist[word].max()
import random
def generate_random(cfdist, word, num=15):
    for i in range(num):
        print(word, end=' ')
        word = random.choices(list(cfd[word].keys()), weights=cfd[word].values(), k=1)[0]

In [None]:
generate_argmax(cfd,"the")

In [None]:
generate_random(cfd,"the")

### Unconditioned Generation with Recurrent Neural Networks

This actually works very similarly in that it randomly picks each element of the sequence we are generating based on its probability in context. The difference is that the probabilities are based on the softmax output layer in a neural network rather than bigrams.

We are going to randomly generate surnames.

In [None]:
from generation_tools import *

In [None]:
class SurnameGenerationModel(nn.Module):
    def __init__(self, char_embedding_size, char_vocab_size, rnn_hidden_size,
                 batch_first=True, padding_idx=0, dropout_p=0.5):
        """
        Args:
            char_embedding_size (int): The size of the character embeddings
            char_vocab_size (int): The number of characters to embed
            rnn_hidden_size (int): The size of the RNN's hidden state
            batch_first (bool): Informs whether the input tensors will
                have batch or the sequence on the 0th dimension
            padding_idx (int): The index for the tensor padding;
                see torch.nn.Embedding
            dropout_p (float): the probability of zeroing activations using
                the dropout method.  higher means more likely to zero.
        """
        super(SurnameGenerationModel, self).__init__()

        self.char_emb = nn.Embedding(num_embeddings=char_vocab_size,
                                     embedding_dim=char_embedding_size,
                                     padding_idx=padding_idx)

        self.rnn = nn.GRU(input_size=char_embedding_size,
                          hidden_size=rnn_hidden_size,
                          batch_first=batch_first)

        self.fc = nn.Linear(in_features=rnn_hidden_size,
                            out_features=char_vocab_size)

        self._dropout_p = dropout_p

    def forward(self, x_in, apply_softmax=False):
        """The forward pass of the model

        Args:
            x_in (torch.Tensor): an input data tensor.
                x_in.shape should be (batch, input_dim)
            apply_softmax (bool): a flag for the softmax activation
                should be false if used with the Cross Entropy losses
        Returns:
            the resulting tensor. tensor.shape should be (batch, char_vocab_size)
        """
        x_embedded = self.char_emb(x_in)

        y_out, _ = self.rnn(x_embedded)

        batch_size, seq_size, feat_size = y_out.shape
        y_out = y_out.contiguous().view(batch_size * seq_size, feat_size)

        y_out = self.fc(F.dropout(y_out, p=self._dropout_p))

        if apply_softmax:
            y_out = F.softmax(y_out, dim=1)

        new_feat_size = y_out.shape[-1]
        y_out = y_out.view(batch_size, seq_size, new_feat_size)

        return y_out

In [None]:
def sample_from_model(model, vectorizer, num_samples=1, sample_size=20,
                      temperature=1.0):
    """Sample a sequence of indices from the model

    Args:
        model (SurnameGenerationModel): the trained model
        vectorizer (SurnameVectorizer): the corresponding vectorizer
        num_samples (int): the number of samples
        sample_size (int): the max length of the samples
        temperature (float): accentuates or flattens
            the distribution.
            0.0 < temperature < 1.0 will make it peakier.
            temperature > 1.0 will make it more uniform
    Returns:
        indices (torch.Tensor): the matrix of indices;
        shape = (num_samples, sample_size)
    """
    begin_seq_index = [vectorizer.char_vocab.begin_seq_index
                       for _ in range(num_samples)]
    begin_seq_index = torch.tensor(begin_seq_index,
                                   dtype=torch.int64).unsqueeze(dim=1)
    indices = [begin_seq_index]
    h_t = None

    for time_step in range(sample_size):
        x_t = indices[time_step]
        x_emb_t = model.char_emb(x_t)
        rnn_out_t, h_t = model.rnn(x_emb_t, h_t)
        prediction_vector = model.fc(rnn_out_t.squeeze(dim=1))
        probability_vector = F.softmax(prediction_vector / temperature, dim=1)
        indices.append(torch.multinomial(probability_vector, num_samples=1))
    indices = torch.stack(indices).squeeze().permute(1, 0)
    return indices

def decode_samples(sampled_indices, vectorizer):
    """Transform indices into the string form of a surname

    Args:
        sampled_indices (torch.Tensor): the inidces from `sample_from_model`
        vectorizer (SurnameVectorizer): the corresponding vectorizer
    """
    decoded_surnames = []
    vocab = vectorizer.char_vocab

    for sample_index in range(sampled_indices.shape[0]):
        surname = ""
        for time_step in range(sampled_indices.shape[1]):
            sample_item = sampled_indices[sample_index, time_step].item()
            if sample_item == vocab.begin_seq_index:
                continue
            elif sample_item == vocab.end_seq_index:
                break
            else:
                surname += vocab.lookup_index(sample_item)
        decoded_surnames.append(surname)
    return decoded_surnames

In [None]:
args = Namespace(
    # Data and Path information
    surname_csv="surnames_with_splits.csv",
    vectorizer_file="ugen_vectorizer.json",
    model_state_file="ugen_model.pth",
    save_dir="/content/gdrive/My Drive/CL_Week_5_Materials/",
    # Model hyper parameters
    char_embedding_size=32,
    rnn_hidden_size=32,
    # Training hyper parameters
    seed=1337,
    learning_rate=0.001,
    batch_size=128,
    num_epochs=100,
    early_stopping_criteria=5,
    # Runtime options
    catch_keyboard_interrupt=True,
    cuda=True,
    expand_filepaths_to_save_dir=True,
    reload_from_files=False,
)

if args.expand_filepaths_to_save_dir:
    args.vectorizer_file = os.path.join(args.save_dir,
                                        args.vectorizer_file)

    args.model_state_file = os.path.join(args.save_dir,
                                         args.model_state_file)

    print("Expanded filepaths: ")
    print("\t{}".format(args.vectorizer_file))
    print("\t{}".format(args.model_state_file))


# Check CUDA
if not torch.cuda.is_available():
    args.cuda = False

args.device = torch.device("cuda" if args.cuda else "cpu")

print("Using CUDA: {}".format(args.cuda))

# Set seed for reproducibility
set_seed_everywhere(args.seed, args.cuda)

# handle dirs
handle_dirs(args.save_dir)

In [None]:


if args.reload_from_files:
    # training from a checkpoint
    dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
                                                              args.vectorizer_file)
else:
    # create dataset and vectorizer
    dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
    dataset.save_vectorizer(args.vectorizer_file)

vectorizer = dataset.get_vectorizer()

model = SurnameGenerationModel(char_embedding_size=args.char_embedding_size,
                               char_vocab_size=len(vectorizer.char_vocab),
                               rnn_hidden_size=args.rnn_hidden_size,
                               padding_idx=vectorizer.char_vocab.mask_index)

train_state = make_train_state(args)

model.load_state_dict(torch.load(train_state['model_filename']))

model = model.to(args.device)

In [None]:
# number of names to generate
num_names = 10
model = model.cpu()
# Generate nationality hidden state
sampled_surnames = decode_samples(
    sample_from_model(model, vectorizer, num_samples=num_names),
    vectorizer)
# Show results
print ("-"*15)
for i in range(num_names):
    print (sampled_surnames[i])

### Conditioned Generation

We now repeat this, but we condition the output of the network on something that isn't in the sequence - a target nationality for the surname

In [None]:
class SurnameGenerationModel(nn.Module):
    def __init__(self, char_embedding_size, char_vocab_size, num_nationalities,
                 rnn_hidden_size, batch_first=True, padding_idx=0, dropout_p=0.5):
        """
        Args:
            char_embedding_size (int): The size of the character embeddings
            char_vocab_size (int): The number of characters to embed
            num_nationalities (int): The size of the prediction vector
            rnn_hidden_size (int): The size of the RNN's hidden state
            batch_first (bool): Informs whether the input tensors will
                have batch or the sequence on the 0th dimension
            padding_idx (int): The index for the tensor padding;
                see torch.nn.Embedding
            dropout_p (float): the probability of zeroing activations using
                the dropout method.  higher means more likely to zero.
        """
        super(SurnameGenerationModel, self).__init__()

        self.char_emb = nn.Embedding(num_embeddings=char_vocab_size,
                                     embedding_dim=char_embedding_size,
                                     padding_idx=padding_idx)

        self.nation_emb = nn.Embedding(num_embeddings=num_nationalities,
                                       embedding_dim=rnn_hidden_size)

        self.rnn = nn.GRU(input_size=char_embedding_size,
                          hidden_size=rnn_hidden_size,
                          batch_first=batch_first)

        self.fc = nn.Linear(in_features=rnn_hidden_size,
                            out_features=char_vocab_size)

        self._dropout_p = dropout_p

    def forward(self, x_in, nationality_index, apply_softmax=False):
        """The forward pass of the model

        Args:
            x_in (torch.Tensor): an input data tensor.
                x_in.shape should be (batch, max_seq_size)
            nationality_index (torch.Tensor): The index of the nationality for each data point
                Used to initialize the hidden state of the RNN
            apply_softmax (bool): a flag for the softmax activation
                should be false if used with the Cross Entropy losses
        Returns:
            the resulting tensor. tensor.shape should be (batch, char_vocab_size)
        """
        x_embedded = self.char_emb(x_in)

        # hidden_size: (num_layers * num_directions, batch_size, rnn_hidden_size)
        nationality_embedded = self.nation_emb(nationality_index).unsqueeze(0)

        y_out, _ = self.rnn(x_embedded, nationality_embedded)

        batch_size, seq_size, feat_size = y_out.shape
        y_out = y_out.contiguous().view(batch_size * seq_size, feat_size)

        y_out = self.fc(F.dropout(y_out, p=self._dropout_p))

        if apply_softmax:
            y_out = F.softmax(y_out, dim=1)

        new_feat_size = y_out.shape[-1]
        y_out = y_out.view(batch_size, seq_size, new_feat_size)

        return y_out

In [None]:
def sample_from_model(model, vectorizer, nationalities, sample_size=20,
                      temperature=1.0):
    """Sample a sequence of indices from the model

    Args:
        model (SurnameGenerationModel): the trained model
        vectorizer (SurnameVectorizer): the corresponding vectorizer
        nationalities (list): a list of integers representing nationalities
        sample_size (int): the max length of the samples
        temperature (float): accentuates or flattens
            the distribution.
            0.0 < temperature < 1.0 will make it peakier.
            temperature > 1.0 will make it more uniform
    Returns:
        indices (torch.Tensor): the matrix of indices;
        shape = (num_samples, sample_size)
    """
    num_samples = len(nationalities)
    begin_seq_index = [vectorizer.char_vocab.begin_seq_index
                       for _ in range(num_samples)]
    begin_seq_index = torch.tensor(begin_seq_index,
                                   dtype=torch.int64).unsqueeze(dim=1)
    indices = [begin_seq_index]
    nationality_indices = torch.tensor(nationalities, dtype=torch.int64).unsqueeze(dim=0)
    h_t = model.nation_emb(nationality_indices)

    for time_step in range(sample_size):
        x_t = indices[time_step]
        x_emb_t = model.char_emb(x_t)
        rnn_out_t, h_t = model.rnn(x_emb_t, h_t)
        prediction_vector = model.fc(rnn_out_t.squeeze(dim=1))
        probability_vector = F.softmax(prediction_vector / temperature, dim=1)
        indices.append(torch.multinomial(probability_vector, num_samples=1))
    indices = torch.stack(indices).squeeze().permute(1, 0)
    return indices

def decode_samples(sampled_indices, vectorizer):
    """Transform indices into the string form of a surname

    Args:
        sampled_indices (torch.Tensor): the inidces from `sample_from_model`
        vectorizer (SurnameVectorizer): the corresponding vectorizer
    """
    decoded_surnames = []
    vocab = vectorizer.char_vocab

    for sample_index in range(sampled_indices.shape[0]):
        surname = ""
        for time_step in range(sampled_indices.shape[1]):
            sample_item = sampled_indices[sample_index, time_step].item()
            if sample_item == vocab.begin_seq_index:
                continue
            elif sample_item == vocab.end_seq_index:
                break
            else:
                surname += vocab.lookup_index(sample_item)
        decoded_surnames.append(surname)
    return decoded_surnames

In [None]:
args = Namespace(
    # Data and Path information
    surname_csv="surnames_with_splits.csv",
    vectorizer_file="cgen_vectorizer.json",
    model_state_file="cgen_model.pth",
    save_dir="/content/gdrive/My Drive/CL_Week_5_Materials/",
    # Model hyper parameters
    char_embedding_size=32,
    rnn_hidden_size=32,
    # Training hyper parameters
    seed=1337,
    learning_rate=0.001,
    batch_size=128,
    num_epochs=100,
    early_stopping_criteria=5,
    # Runtime options
    catch_keyboard_interrupt=True,
    cuda=True,
    expand_filepaths_to_save_dir=True,
    reload_from_files=False,
)

if args.expand_filepaths_to_save_dir:
    args.vectorizer_file = os.path.join(args.save_dir,
                                        args.vectorizer_file)

    args.model_state_file = os.path.join(args.save_dir,
                                         args.model_state_file)

    print("Expanded filepaths: ")
    print("\t{}".format(args.vectorizer_file))
    print("\t{}".format(args.model_state_file))

# Check CUDA
if not torch.cuda.is_available():
    args.cuda = False

args.device = torch.device("cuda" if args.cuda else "cpu")

print("Using CUDA: {}".format(args.cuda))

# Set seed for reproducibility
set_seed_everywhere(args.seed, args.cuda)

# handle dirs
handle_dirs(args.save_dir)

In [None]:
if args.reload_from_files:
    # training from a checkpoint
    dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
                                                              args.vectorizer_file)
else:
    # create dataset and vectorizer
    dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
    dataset.save_vectorizer(args.vectorizer_file)

vectorizer = dataset.get_vectorizer()

model = SurnameGenerationModel(char_embedding_size=args.char_embedding_size,
                               char_vocab_size=len(vectorizer.char_vocab),
                               num_nationalities=len(vectorizer.nationality_vocab),
                               rnn_hidden_size=args.rnn_hidden_size,
                               padding_idx=vectorizer.char_vocab.mask_index,
                               dropout_p=0.5)

train_state = make_train_state(args)

model.load_state_dict(torch.load(train_state['model_filename']))

model = model.to(args.device)

In [None]:
model = model.cpu()
for index in range(len(vectorizer.nationality_vocab)):
    nationality = vectorizer.nationality_vocab.lookup_index(index)
    print("Sampled for {}: ".format(nationality))
    sampled_indices = sample_from_model(model, vectorizer,
                                        nationalities=[index] * 3,
                                        temperature=0.7)
    for sampled_surname in decode_samples(sampled_indices, vectorizer):
        print("-  " + sampled_surname)

### GPT-2

Before moving on I just want to bridge the gap between what we have been doing and the kinds of technology you see in real world systems. A number of earlier GPT models (the language model behind ChatGPT) can be downloaded and run on your own machine. We are going to use GPT-2 (https://openai.com/blog/better-language-models/). The basic principle by which generation works is the same as we have seen above - it just uses more data and a slightly different model type.

It is also very similar to GPT-4. Except these later models were trained on more data and have many more parameters (cf "weights").


In [None]:
!pip install transformers

In [None]:
from transformers import GPT2TokenizerFast, GPT2LMHeadModel, set_seed
import random

In [None]:
model_name = 'gpt2-medium'
num_samples = 1
max_length = 100
top_k = 10

In [None]:
#load the model
set_seed(42)
tokenizer = GPT2TokenizerFast.from_pretrained(model_name)
model = GPT2LMHeadModel.from_pretrained(model_name, pad_token_id=tokenizer.eos_token_id)

In [None]:
prompts = []
prompt = input()
prompts.append(prompt)

In [None]:
output = {}
for text in prompts:
	input_ids = tokenizer.encode(text, return_tensors='pt')
	max_length = len(text.split()) + max_length
	responses = []
	responses = model.generate(input_ids, max_length = max_length, do_sample=True, top_k=top_k, num_return_sequences=num_samples)

	responses = responses[:, input_ids.shape[-1]:]
	output[text] = []
	for i, r in enumerate(responses):
		response = tokenizer.decode(r, skip_special_tokens=True)
		output[text].append(response)

In [None]:
for k in output:
    print('prompt: ' + k + '\n')
    for i, r in enumerate(output[k]):
        print('================================== Output ==================================')
        print(k + " " + r.strip() + '\n')
    print('********************************************************************************************************************')
    print('')

## Rule-based chatbot: Eliza

As described in the week 7 lecture, Eliza is a chatbot that simulates a Rogerian therapist, making use of a set of rules in the form of regular expressions. At the heart of Eliza are the substitution function (re.sub in Python) and grouping. We'll start with a quick recap as to what these are.

We need to import the Regular Expressions module in Python.

In [None]:
import re

This gives as access to the very useful function re.sub.

### re.sub()

This finds all occurences of a given sequence and replaces it with a sequence provided:

In [None]:
utt = 'walked'
re.sub('ed','ing',utt)

### Groups

Grouping is a very powerful technique for picking out substrings from a string that matches a specified pattern. Parentheses are used to indicate the start and end of the substring. It is very powerful when combined with substitution.

You can use parentheses to capture a particular substring within a pattern and then use it in your replacement string within sub. For example:


In [None]:
utt = "procrastinating"
re.sub('([a-z]+)ing','\\1ed',utt)

## A very simple Elizabot

The code below implements a very simple Eliza. The function respond takes an utterance as input and using re.sub to generate responses. The loop below the function creates a simple interface that takes user input and prints the response.

We can extend Eliza's ability by adding additional rules.

In [None]:
def respond(utt):
  utt = re.sub('hello my name is (.+)','hello \\1 how are you feeling today', utt)
  return utt

In [None]:
utt = ""
while utt != 'goodbye':
    utt = input('> ')
    reply = respond(utt)
    if reply != utt:
        print(reply)
    else:
        if utt != "goodbye":
            print("Can you rephrase that?")

### Activity

Add patterns (using substitutions and grouping) to the respond function that will allow Eliza to conduct both of these conversations. Test your system by conducting the conversation with Eliza.

User: hello my name is emma <br>
Eliza: Hello emma my name is Eliza. How are you feeling today? <br>
User: i am feeling very happy <br>
Eliza: Do you often feel happy? <br>
User: yes since I started my new job <br>
Eliza: Can you tell me about starting your new job? <br>

User: hello my name is john <br>
Eliza: Hello john, my name is Eliza. How are you feeling today? <br>
User: i am feeling pretty happy <br>
Eliza: Do you often feel happy? <br>
User: yes since I moved house <br>
Eliza: Can you tell me about moving house? <br>

### Reverse engineering the NLTK Eliza

There have been many implementations of Eliza over the years. One version is built into the NLTK toolkit. This can be run as follows:


In [None]:
import nltk

In [None]:
nltk.chat.eliza.demo()

Activity: Conduct a four-turn-each conversation of your own with the NLTK Eliza. Adds the substitution that you think Eliza is using to generate the responses to your own chatbot using the code below. Where you find Eliza's response to be lacking, update the substitution to give a better response.

In [None]:
def respond(utt):
  utt = re.sub('hello my name is (.+)','hello \\1 how are you feeling today', utt)
  return utt

In [None]:
utt = ""
while utt != 'goodbye':
    utt = input('> ')
    reply = respond(utt)
    if reply != utt:
        print(reply)
    else:
        if utt != "goodbye":
            print("Can you rephrase that?")

## Corpus-based chatbots

Training and running a corpus-based chatbot takes more steps than we have time for today. If you want to have a go in your own time, then you will find a tutorial for doing so in Pytorch here:

https://pytorch.org/tutorials/beginner/chatbot_tutorial.html

There is a link at the top of the page to open the notebook in Colab.

To run this in colab easily just put the following code block at the top and run it before working through the rest of the notebook.

In [None]:
! wget https://zissou.infosci.cornell.edu/convokit/datasets/movie-corpus/movie-corpus.zip
! unzip movie-corpus.zip
! mkdir data
! mv movie-corpus data/

### Activity: Intent classification
This is an activity to help you think about the rule-based intent classification component of your coursework. You should write rules to uniquely and correctly identify each of the following utterances:

PlayMusic: play the weather girls

AddToPlaylist: add this to my italian film soundtrack playlist

RateBook: give the restaurant guidebook 5 stars

SearchScreeningEvent: find screenings of the book thief at around 7

BookRestaurant: book me a table outside for 2 for dinner at the national theatre restaurant

GetWeather: will it be warm enough to eat dinner outside at around 7 tonight

SearchCreativeWork: find me songs films or books about restaurants

Here is the function from your coursework notebook:

In [None]:
import random

def assign_intent(utt, verbose=False):
  PlayMusic_Pattern = re.compile("play|music")
  AddToPlaylist_Pattern = re.compile("add|playlist")
  RateBook_Pattern = re.compile("rate|book")
  SearchScreeningEvent_Pattern = re.compile("screening")
  BookRestaurant_Pattern = re.compile("book|restaurant|food")
  GetWeather_Pattern = re.compile("get|weather")
  SearchCreativeWork_Pattern = re.compile("creative")

  weights = {}
  weights['PlayMusic'] = len(re.findall(PlayMusic_Pattern,  utt))
  weights['AddToPlaylist'] = len(re.findall(AddToPlaylist_Pattern,  utt))
  weights['RateBook'] = len(re.findall(RateBook_Pattern,  utt))
  weights['SearchScreeningEvent'] = len(re.findall(SearchScreeningEvent_Pattern,  utt))
  weights['BookRestaurant'] = len(re.findall(BookRestaurant_Pattern,  utt))
  weights['GetWeather'] = len(re.findall(GetWeather_Pattern,  utt))
  weights['SearchCreativeWork'] = len(re.findall(SearchCreativeWork_Pattern,  utt))
  if verbose:
      print(weights)
  if max(weights.values()) == 0:
      return random.choice(list(weights.keys()))
  else:
      weights_as_list = list(weights.items())
      random.shuffle(weights_as_list)
      weights=dict(weights_as_list)
      return max(weights, key=lambda key: weights[key])

In [None]:
example_inputs = ['play the weather girls','add this to my italian film soundtrack playlist','give the restaurant guidebook 5 stars','find screenings of the book thief at around 7','book me a table outside for 2 for dinner at the national theatre restaurant','will it be warm enough to eat dinner outside at around 7 tonight','find me songs films or books about restaurants']
[print(str(assign_intent(utt)) + " : " + utt) for utt in example_inputs]