<a href="https://colab.research.google.com/github/clam004/notebook_tutorials/blob/main/babyforce.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Minimal Causal Large Language Model (LLM) Chatbot

run the next two cells to install and import PyTorch and huggingface 

In [9]:
%%capture
! pip install transformers accelerate

In [10]:
#sys libs
import os
import sys
import random
import time
import json
import datetime
from datetime import date
import calendar
import pytz

#data manupulation libs
import numpy as np

#string manupulation libs
import re
import string

#torch libs
import torch
print('torch.__version__', torch.__version__)
print('torch.cuda.device_count()', torch.cuda.device_count())
print('torch.cuda.empty_cache()', torch.cuda.empty_cache())

#huggingface transformers
import transformers
print(transformers.__version__)
from transformers import set_seed
from transformers import AutoTokenizer, AutoModel
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from transformers import GPTJForCausalLM

# seeds
set_seed(42)
np.random.seed(0)
random.seed(0)
torch.manual_seed(0)

%load_ext autoreload
%autoreload 2
%matplotlib inline

torch.__version__ 1.12.1+cu113
torch.cuda.device_count() 1
torch.cuda.empty_cache() None
4.22.1
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


### GPU acceleration

To give yourself a GPU in colab, go to `Runtime`-->`Change runtime type`

You can confirm this worked because if you run the above cell again `torch.cuda.device_count()` will change from 0 to the number of GPUs PyTorch now recognizes, this would be 0 to 1 in colab. 


### The Agent and Environment

For transparency the cell below has all the function and classes that run this minimal demo, it is very long, so run it and scroll all the way to the bottom to get to the demo. You can make code changes and quickly see the results, for example, change the get_background_prompt() function to chnage the initial part of the LLM input prompt that come before the dialog history portion of the final prompt. 

In [12]:
example_dialog_history = [
    {'speaker':'bot','utterance':'Hello! who are you?'},
    {'speaker':'human','utterance':'my name is baby force'},
    {'speaker':'bot','utterance':'hi baby, or is it Mr. Force?'},
    {'speaker':'human','utterance':'actually im a baby girl and my first name is Force'},
    {'speaker':'bot','utterance':'thats a weird name for a baby girl isnt it?'},
    {'speaker':'human','utterance':'its a weird name for any human'},
]


def get_background_prompt(
    human_symbol = '[H]',
    bot_symbol = '[B]',
):

    """ a background_prompt describing what the conversation is
    (e.g. This is a conversation between [{client_name}], a person, and [{robot_name}] ...)
    """

    utc_now = pytz.utc.localize(datetime.datetime.utcnow())
    pst_now = utc_now.astimezone(pytz.timezone("America/Los_Angeles"))
    curr_date = date.today()
    day_of_week = calendar.day_name[curr_date.weekday()]

    local_date = pst_now.strftime("%m/%d/%Y")
    local_time = pst_now.strftime("%I:%M %p")

    background_prompt = \
    "This is a text messaging only conversation between "+human_symbol+" and "+bot_symbol+". "+\
    bot_symbol+" is an artificial intelligence or AI. "+\
    bot_symbol+" knows it is just a computer program. "+\
    bot_symbol+" speaks in a manner that is kind, empathetic and "+\
    "is programmed to keep "+human_symbol+" safe. "+\
    bot_symbol+" likes to tell helpful stories. "+\
    "Today's date is "+day_of_week+" "+local_date+", the time is "+local_time+". "+\
    "\n "

    return background_prompt


def get_model_tokenizer(
    model_load_path = 'EleutherAI/gpt-j-6B',
    tokenizer_name = 'EleutherAI/gpt-j-6B',
    cache_dir = None,
    model_device = None,
    verbose = False,
):

    ''' This is a function to clean up the model preparations, GPU/CPU loading 
    and matching tokenizer

    model architecture is based on the tokenizer name

    set model_device = 'cpu' to force model onto CPU despite having GPUs 
    available, or to torch.device('cuda:3') to get it to the fourth GPU, etc.
    if no GPUs available, cpu is the default. 
    '''

    NUM_GPUS = torch.cuda.device_count()

    if tokenizer_name in ['distilgpt2', 'gpt2', 'gpt2-medium', 'gpt2-large', 'gpt2-xl']:

        tokenizer = GPT2Tokenizer.from_pretrained(
            tokenizer_name,
            pad_token='<|endoftext|>',
            padding_side = 'left',
        )

        model = GPT2LMHeadModel.from_pretrained(
            model_load_path,
            cache_dir = cache_dir, 
            pad_token_id=tokenizer.eos_token_id,
        )

    elif tokenizer_name in ['EleutherAI/gpt-j-6B']:

        tokenizer = AutoTokenizer.from_pretrained(
            tokenizer_name,
            pad_token='<|endoftext|>',
            padding_side = 'left',
        )

        if NUM_GPUS > 0:
          
          model = GPTJForCausalLM.from_pretrained(
              model_load_path,
              revision='float16', 
              torch_dtype=torch.float16, 
              low_cpu_mem_usage=True,
              cache_dir = cache_dir, 
          )

        else:

          model = GPTJForCausalLM.from_pretrained(
              model_load_path,
              cache_dir = cache_dir, 
          )

    else:

        if verbose:
            print('no match for tokenizer found')

        return None, None

    if model_device is not None:
        model = model.to(model_device)
    elif NUM_GPUS == 1:
        if verbose:
            print('model = model.cuda()')
        model = model.cuda()
    elif NUM_GPUS > 1 and tokenizer_name in ['distilgpt2', 'gpt2', 'gpt2-medium', 'gpt2-large', 'gpt2-xl', 'EleutherAI/gpt-j-6B']:
        # break up model and place model components on different GPUs
        if verbose:
            print('model.parallelize()')
        model.parallelize()
    else:
        if verbose:
            print('did not place model on any GPUs, model_device = \'cpu\'')

    if verbose:
        print('model.device', model.device)
        print("num_params", 
            sum(p.numel() for p in model.parameters() if p.requires_grad)/1e9,
            "B"
        ) 

    return model, tokenizer


def end_punctuation(utter):
    
    if len(utter) > 0:
      if utter[-1] not in ["?","!","."]:
          utter+="."
        
    return utter


def extract_str(
    reply, 
    prefix = None,
    stop_strings = [
        '<',
        '[human]',
        '\n',
        '[',
    ],
    verbose = True,
):

    """ this function clips the generated text
    and extracts out the text between a
    pre-specified prefix and suffix

    the prefix could be the enture input text
    the suffix is often the delimiter such as 
    the next line \n token or a period . .
    """

    if prefix is not None:
        reply = reply[len(prefix):]

    if verbose:
        print('predicted future:')
        print(repr(reply))
    
    for string in stop_strings:
        if string in reply:
            reply = reply[:reply.index(string)]
    
    return reply.strip()


def convo_list_dic2list_str(
  conversation_list_dic,
  human_symbol = '[H]: ',
  bot_symbol = '[B]: ',
  utterance_delimiter = '\n',
):

  """ This function takes a list of dictionaries
  and turns them into a list of speaker_symbol + utterance strings

  Args: 
      conversation_list_dic (List[Dict]): 
      ie: [{'speaker': 'bot', 'utterance': 'im waking up!'},
           {'speaker': 'human', 'utterance': 'wakey wakey sleepyhead'}, ...]

  Returns:
      conversation_list_str (List[str]): list of speaker_symbol + utterance strings
      ie: ['\n[C]: Hello Fara.','\n[A]: Hello! How are you doing today?',...]
  """

  speaker2symbol = {
      'bot':bot_symbol,
      'human':human_symbol,
  }

  conversation_list_str = list()

  for u in conversation_list_dic:

      speaker_symbol = speaker2symbol[u['speaker']]
      utterance = end_punctuation(u['utterance'])

      conversation_list_str.append(utterance_delimiter + speaker_symbol + utterance)

  # Elicit next agent utterance
  conversation_list_str.append(utterance_delimiter + bot_symbol)

  return conversation_list_str

def generate_extract_replies(
    model,
    tokenizer,
    prompt,
    max_gen_len = 16, 
    no_repeat_ngram_size = None,
    pad_token_id = 50256,
    do_sample = True,
    top_k = 100, 
    top_p = 0.99, 
    num_return_sequences = 1,
    temperature = 0.9,
    stop_strings = [
        '<',
        '[human]',
        '\n',
        '[',
    ],
    verbose = False,
):

    ''' This function predicts the next utterance
    in a conversation
    '''

    gen_texts = generate_text(
        model,
        tokenizer,
        prompt,
        max_gen_len = max_gen_len, 
        no_repeat_ngram_size = no_repeat_ngram_size,
        pad_token_id = pad_token_id,
        do_sample = do_sample,
        top_k = top_k, 
        top_p = top_p, 
        num_return_sequences = num_return_sequences,
        temperature = temperature,
        verbose = verbose,
    )

    replies = [
        extract_str(
            gen_text,
            prefix = prompt,
            stop_strings = stop_strings,
            verbose = verbose,
        )
        for gen_text in gen_texts
    ]

    return replies


def generate_text(
    model,
    tokenizer,
    prompt,
    max_gen_len = 16, 
    no_repeat_ngram_size = None,
    pad_token_id = 50256,
    do_sample = True,
    top_k = 100, 
    top_p = 0.99, 
    num_return_sequences = 1,
    temperature = 0.9,
    verbose = False,
):

    ''' function for generating text from an input into 
    the app.package model

    prompt (str): text to be tokenized and pushed through model

    if you are doing few shot detection you should leave 
    no_repeat_ngram_size = None and max_len = 16
    as long as the default max_len is more than the expected
    label text

    we leave it up to the label extractor to clip of the portion
    of the generated text that you need
    '''
    NUM_GPUS = torch.cuda.device_count()

    prompt_dic = tokenizer(prompt,return_tensors="pt")
    prompt_ids = prompt_dic.input_ids
    prompt_mask = prompt_dic.attention_mask
    prompt_len = prompt_ids.shape[1]

    if verbose:
        print('prompt_ids.shape', prompt_ids.shape)
        print('prompt_mask.shape', prompt_mask.shape)

    if NUM_GPUS > 0:
        prompt_ids = prompt_ids.to(model.device)
        prompt_mask = prompt_mask.to(model.device)

    output_ids = model.generate(
        prompt_ids,
        attention_mask = prompt_mask,
        max_length = prompt_len + max_gen_len,
        no_repeat_ngram_size = no_repeat_ngram_size,
        pad_token_id = pad_token_id,
        do_sample = do_sample,
        top_k = top_k, 
        top_p = top_p, 
        num_return_sequences = num_return_sequences,
        temperature = temperature,
    )

    generated_text = tokenizer.batch_decode(output_ids)

    return generated_text


class Agent:

    def __init__(self, model, tokenizer):

        super().__init__()

        self.model = model
        self.tokenizer = tokenizer
        self.dialog_history = example_dialog_history

    def initiate_conversation(self,):

        initial_utterance = "Hello! who are you?"

        self.dialog_history = [{'speaker':'bot','utterance':initial_utterance}]

        return initial_utterance

    def receive_respond(self, 
        input_utterance, 
        symbol_utter_separator=': ',
        utterance_delimiter = '\n',
        human_symbol = '[Human]',
        bot_symbol = '[AI]',
        verbose = False,
    ):

        input_utterance = input_utterance.strip()

        background_prompt =  get_background_prompt(
            human_symbol = human_symbol,
            bot_symbol = bot_symbol,
        )

        self.dialog_history.append({'speaker':'human','utterance':input_utterance})

        convo_list_str = convo_list_dic2list_str(
            self.dialog_history,
            human_symbol = human_symbol+symbol_utter_separator,
            bot_symbol = bot_symbol+symbol_utter_separator,
            utterance_delimiter = utterance_delimiter,
        )

        background_dialog_prompt = (background_prompt + ''.join(convo_list_str)).strip()

        if verbose:
            print(repr(background_dialog_prompt))

        replies = generate_extract_replies(
            model = self.model,
            tokenizer = self.tokenizer,
            prompt = background_dialog_prompt,
            max_gen_len = 32, 
            no_repeat_ngram_size = 3,
            pad_token_id = self.tokenizer.eos_token_id,
            do_sample = True,
            top_k = 80, 
            top_p = 0.8, 
            num_return_sequences = 1,
            temperature = 0.8,
            stop_strings = [
                human_symbol,
                '\n',
            ],
            verbose = False, 
        )

        if verbose:
            print(replies)

        self.dialog_history.append({'speaker':'bot','utterance':replies[0]})

        return replies[0]


def chat(agent, verbose):

    print(agent.initiate_conversation())

    while True:

        statement = input("you> ")
        print("agent> ",agent.receive_respond(statement, verbose = verbose))

        if statement == "quit":
            break

### Load the Model and the tokenizer

the function below will print out the number of parameters in billions and confirm if the model was successfully placed on the GPU or left on CPU

In [None]:
model_load_path = 'EleutherAI/gpt-j-6B' #'gpt2-large' #'gpt2-xl'

model, tokenizer = get_model_tokenizer(
      model_load_path = model_load_path,
      tokenizer_name = model_load_path,
      cache_dir = '../modelstates/'+model_load_path,
      model_device = None,
      verbose=True,
)

Downloading:   0%|          | 0.00/836 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/12.1G [00:00<?, ?B/s]

### Instantiate an agent and talk to it

set `verbose = True` to see the whole input prompt to the 

In [None]:
agent = Agent(model, tokenizer)

chat(agent, verbose = False)