# Instructions

This colab contains code used to run parallelized bulk inference on PaLM models for the paper "Personality Traits in Large Language Models" (https://arxiv.org/pdf/2307.00184). The code assumes that all the data produced and consumed in the colab lives in a local filesystem either in a cloud instance running a Jupyter notebook such as Google Colab or a desktop. But those file I/O operations can easily be replaced to use any other file management solutions.

To run this colab:
1. Connect to an appropriate runtime. (For instance, if running the bulk inference directly from the colab, connect to a GPU kernel.)
2. Check experiment parameters below.
3. Run the code cells for analysis.

NOTE: Make sure to store and run this notebook from a location where the Psyborgs codebase package is stored (personality_in_llms.psyborgs)

# **Experiment Parameters** - Set Before Running!

**`batch_size`**: Number of prompts sent in one RPC if batch scoring is supported.

**`num_workers`**: Number of threads (if running in parallel).


In [None]:
admin_session_json_path = 'prod_run_01_numeric_personachat_admin_session.json'  #@param ['BFI44_PANAS_numeric_demographic_admin_session'] {type:'string', allow-input: true, isTemplate: true}
model_id = 'flan_palmchilla_62b_q'  #@param ['palm_62b_q', 'palmchilla_62b_q', 'flan_palmchilla_62b_q', 'flan_palm_540b_q', 'lamda-scoring', 'lamda-scoring-with-lm-scores']
use_custom_model = True  #@param {type:'boolean'}
batch_size = 4 #@param {type:'integer'}
num_workers = 3 #@param {type:'integer'}
results_filename = 'prod_run_01_flan_palmchilla_62b_q_scoring_numeric.pkl'  #@param {type:'string'}

Modify this if `use_custom_model` is True:

In [None]:
user_readable_name = 'Flan-PaLMChilla 62B (Quantized)'  #@param ['PaLM 62B (quantized)', 'PaLMChilla 62B (quantized)', 'FLAN-PaLMChilla 62B (quantized)', 'FLAN-PaLM 540B (quantized)', 'Base Meena2-64B dialog gen 41 for scoring', 'Base Meena2-64B dialog gen 41 for scoring with response score enabled'] {type:'string', allow-input: true, isTemplate: true}
model_endpoint = '/some/endpoint/flan_palmchilla_62b_q'  #@param ['/some/endpoint/palmchilla_62b_q', '/some/endpoint/flan_palmchilla_62b_q'] {type:'string', allow-input: true, isTemplate: true}
model_family = 'PaLM' #@param ['PaLM', 'LaMDA'] {type:'string'}


# Load Libraries & Input

### Install Psyborgs Dependencies

In [None]:
#@markdown Run this cell to install the dependencies needed to run Psyborgs.
#@markdown The dependencies are in a requirements.txt file in the Psyborgs repo.
%pip install -r psyborgs/requirements.txt

## Load Libraries

In [None]:
import json
import dataclasses
import enum
import functools
from typing import List, Dict, Tuple, Iterator, Optional, Union
from matplotlib import pyplot as plt

from psyborgs import llm_scoring_lib, survey_bench_lib, parallel

import dacite
import pandas as pd
import io
import pickle
import time
import numpy as np
import itertools

In [None]:
# this block is used to change the model specification
if model_family == 'PaLM':
  model_family = survey_bench_lib.ModelFamily.PALM
elif model_family == 'LaMDA':
  model_family = survey_bench_lib.ModelFamily.LAMDA

if use_custom_model:
  model_spec = survey_bench_lib.ModelSpec(
      user_readable_name=user_readable_name,
      model_family=model_family,
      model_endpoint=model_endpoint,
  )
else:
  model_spec = model_id

# Load AdministrationSession

In [None]:
def load_admin_session(json_path: str):
  json_file = drive.LoadFile(json_path)
  admin_session_dict = json.loads(json_file)

  # dacite documentation on casting input values to objects can be found here:
  # https://github.com/konradhalas/dacite#casting
  admin_session = dacite.from_dict(data_class=survey_bench_lib.AdministrationSession,
                                   data=admin_session_dict,
                                   config=dacite.Config(cast=[enum.Enum]))

  return admin_session


# `administer_session` Refactored

In [None]:
def administer_session_serially(admin_session: survey_bench_lib.AdministrationSession,
                                verbose: bool = False) -> pd.DataFrame:
  """Administers specified survey items to LLMs and returns raw LLM scores.

  This key function (serial version) 'administers' a battery of survey measures
    to various LLMs specified within an `AdministrationSession` object. Since
    items (e.g., 'I like ice cream') within a measure can be presented to LLMs
    in a variety of ways, each item is administered multiple times across an
    assortment of compatible framing options and standardized response choices
    derived from response scales.

    Framing options within an `AdministrationSession` consist of item preambles
    (e.g., 'With regards to the following statement, "'), item postambles
    (e.g., '", I tend to '), and response choice postambles (e.g., '.').

    Prompts and continuations are assembled in the following format:

    Prompt:
    {item preamble} {item} {item postamble}

    Continuation:
    {response choice} {response choice postamble}

  Args:
    admin_session: An `AdministrationSession` containing a specification of
      desired survey measures, item framing options, and LLM scoring functions.
    verbose: If True, output is printed for debugging.

  Returns:
    A Pandas DataFrame containing raw LLM scores for each item-response choice
      pair and specification information needed to reproduce the score.
  """
  # create dict of LLM scoring functions for reuse
  llm_scoring_fns = survey_bench_lib.create_llm_scoring_fns_dict(admin_session)

  # for efficiency, accumulate raw score data for each item + response choice +
  # options combination in a list, then this list to a pd.DataFrame at the end
  # of the loop
  raw_response_scores_list = []

  # iterate through all measures and scale combinations
  for measure_object in survey_bench_lib.measure_generator(admin_session):

    # iterate through all prompt combinations
    for prompt_object in survey_bench_lib.prompt_generator(
        measure_object, admin_session):

      # iterate through all continuation combinations
      for continuation_object in survey_bench_lib.continuation_generator(
          measure_object, admin_session):

        # iterate through LLM scoring functions to use (this is done here to
        # preempt potential RPC rate limits)
        for model_id, model_scoring_fn in llm_scoring_fns.items():

          # assemble and score payload
          raw_score = survey_bench_lib.assemble_and_score_payload(
              measure_object, prompt_object, continuation_object,
              model_scoring_fn, model_id, verbose)

          # append single score + specification info
          raw_response_scores_list.append(raw_score)

  # convert raw scores list into pd.DataFrame
  raw_response_scores_df = pd.DataFrame(raw_response_scores_list)

  return raw_response_scores_df

In [None]:
def administer_session_serially_by_model(
    admin_session: survey_bench_lib.AdministrationSession,
    model_spec: Union[str, survey_bench_lib.ModelSpec],
    verbose: bool = False) -> pd.DataFrame:
  """Administers session serially for one model spec."""
  # create dict of LLM scoring functions for reuse
  model_scoring_fn = survey_bench_lib.create_llm_scoring_fn(model_spec)

  # for efficiency, accumulate raw score data for each item + response choice +
  # options combination in a list, then this list to a pd.DataFrame at the end
  # of the loop
  raw_response_scores_list = []

  # iterate through all measures and scale combinations
  for measure_object in survey_bench_lib.measure_generator(admin_session):

    # iterate through all prompt combinations
    for prompt_object in survey_bench_lib.prompt_generator(
        measure_object, admin_session):

      # iterate through all continuation combinations
      for continuation_object in survey_bench_lib.continuation_generator(
          measure_object, admin_session):

        # assemble and score payload
        raw_score = survey_bench_lib.assemble_and_score_payload(
            measure_object, prompt_object, continuation_object,
            model_scoring_fn, model_id, verbose)

        # append single score + specification info
        raw_response_scores_list.append(raw_score)

  # convert raw scores list into pd.DataFrame
  raw_response_scores_df = pd.DataFrame(raw_response_scores_list)

  return raw_response_scores_df

# administer_session Parallel


In [None]:
def generate_payload_df(admin_session: survey_bench_lib.AdministrationSession,
                        model_id: str) -> pd.DataFrame:
  """Returns sorted df of prompts, continuations, and info to be scored."""
  # accumulate payloads in a list to be sent to LLM endpoints in parallel
  payload_list = []

  # iterate through all measures and scale combinations
  for measure_iteration in survey_bench_lib.measure_generator(admin_session):

    # iterate through all prompt combinations
    for prompt_iteration in survey_bench_lib.prompt_generator(
        measure_iteration, admin_session):

      # iterate through all continuation combinations
      for continuation_iteration in survey_bench_lib.continuation_generator(
          measure_iteration, admin_session):

        # generate payload spec with null scores and set model_id
        payload_spec = survey_bench_lib.generate_payload_spec(
            measure_iteration, prompt_iteration, continuation_iteration, 0,
            model_id)
        payload_list.append(payload_spec)

  # dataframe is sorted by prompt, continuation
  return pd.DataFrame(payload_list).sort_values(
      ['prompt_text', 'continuation_text'])

## Parallel: 1 prompt, 1 continuation

## Batch Parallel: 1 prompt, multiple continuations

In [None]:
DATASET = [(i%4, i) for i in range(32)]  # list of (prompt, continuation)

def process_batch(p, c_list) -> float: # list
  return [p + c / 10. for c in c_list]

In [None]:
def chunk_array(array, chunk_size):
    return np.split(array, np.arange(chunk_size, len(array), chunk_size))


def build_arg_for_parallel_call(df_, batch_size):
  arg_for_parallel_call = []
  for prompt, sub_df in df_.groupby(['prompt_text']):
    for chunk in chunk_array(sub_df['continuation_text'], batch_size):
      arg_for_parallel_call.append({
          'prompt': prompt,
          'continuations': chunk.values.tolist(),
      })
  return arg_for_parallel_call


def compute_and_flatten_result(df_, llm_batched_scoring_fn, num_workers,
                               batch_size):
  batched_results = parallel.RunInParallel(
        llm_batched_scoring_fn,
        build_arg_for_parallel_call(df_, batch_size),
        num_workers=num_workers,
        report_progress=True)
  return list(itertools.chain(*batched_results))

In [None]:

def score_batches_in_parallel(
    payloads: List[tuple],
    llm_batched_scoring_fn: llm_scoring_lib.LanguageModelScoringFn,
    batch_size: int = 4,
    num_workers: int = 8) -> List[float]:
  """Scores a payload in batches of continuations grouped by prompt.

  Args:
    payloads: An unordered list of (prompt_text, continuation_text) tuple
      payloads to be scored.
    llm_batched_scoring_fn: An LLM scoring function capable of batch scoring of
      continuations (i.e., processing 1 prompt and 1 or multiple continuations).
      Should return a sequence of float values, one for each passed
      continuation.
    num_workers: Number of threads in parallel for making RPCs to LLM services.

  Returns:
    An list of float scores ordered by prompt and continuation of the `payloads`
      input.
  """

  def _score_multiple_continuations(prompt, continuations):
    try:
      return llm_batched_scoring_fn(prompt, continuations)
    except Exception as e:
      print(e)
      # print(f'RPC exception')
      time.sleep(1)
      return _score_multiple_continuations(prompt, continuations)

  # create dataframe from tuples input
  df = pd.DataFrame(
      payloads,
      columns=['prompt_text', 'continuation_text']
  )

  result = compute_and_flatten_result(
      df, _score_multiple_continuations, num_workers=num_workers,
      batch_size=batch_size)

  return result

In [None]:
def process_batch_str(p, c_list) -> str: # list
  return [p + c for c in c_list]

# Call in Parallel

## Administer Session in Batched Parallel Calls by Model

In [None]:
def administer_session_in_batched_parallel_calls_by_model(
    payload_df: pd.DataFrame,
    # model_id: str,
    model_spec: Union[str, survey_bench_lib.ModelSpec],
    batch_size: int = 4,
    num_workers: int = 8,
    debug: bool = False) -> pd.DataFrame:
  """Administers surveys through parallel calls batched by unique prompts.

  Args:
    admin_session_json_path: Relative path of JSON file containing the
      specification of an `AdministrationSession`.
    model_id: String ID of model to be queried. Used to point to the correct
      model endpoint/blade target. Example: 'palmchilla_62b_q'.
    num_workers: Number of threads in parallel for making RPCs to LLM services.
    debug: If True, this function replaces LLM RPCs with calls to a dummny
      scoring function for prompt, continuation string inputs (i.e.,
      `process_batch_str()`).

  Returns:
    A Pandas DataFrame with columns `prompt_text`, `continuation_text`, `score`,
      and all other payload specification information.
  """

  # define helper function(s)
  def _to_scorable_tuple_list(payload_df: pd.DataFrame) -> List[tuple]:
    """Converts prompt, continuation cols of `payload_df` to a list of tuples."""
    return list(zip(payload_df['prompt_text'], payload_df['continuation_text']))

  def _process_batch_str(p, c_list) -> str:  # list
    return [p + c for c in c_list]

  # get scorable list of tuples
  payload_list = _to_scorable_tuple_list(payload_df)

  # score by batches of continuations in parallel
  # if debugging, use `process_batch_str()` as scoring fn instead
  if debug:
    results_list = score_batches_in_parallel(
        payload_list, _process_batch_str, num_workers, batch_size=batch_size)

  # otherwise, use a real LLM scoring function
  else:
    # create scoring function based on specified model_id
    # if debugging, do not use a real LLM scoring function
    if type(model_spec) == str:
      llm_model_spec = admin_session.models[model_id]
    elif type(model_spec) == survey_bench_lib.ModelSpec:
      llm_model_spec = model_spec
    else:
      raise ValueError(f'model_spec is not a recognized type!')

    llm_batched_scoring_fn = survey_bench_lib.create_llm_scoring_fn(
        llm_model_spec)

    results_list = score_batches_in_parallel(
        payloads=payload_list,
        llm_batched_scoring_fn=llm_batched_scoring_fn,
        batch_size=batch_size,
        num_workers=num_workers)

  # append results list as new column to payload_df
  payload_df['score'] = results_list

  return payload_df

# Main Code

In [None]:
# load admin session from json
admin_session = load_admin_session(admin_session_json_path)

In [None]:
# generate payload_spec_df
payload_df = generate_payload_df(admin_session, model_id)
payload_df

In [None]:
# if using a custom model specification, set model_spec is set at the top of
# this notebook.
session_scores = administer_session_in_batched_parallel_calls_by_model(
    payload_df=payload_df,
    model_spec=model_spec,
    batch_size=batch_size,
    num_workers=num_workers,
    debug=False)

In [None]:
session_scores

In [None]:
#@title Convert to .pkl and output
#@markdown Run this cell to convert dataframe into pickle and dump to location
with open(results_filename, 'wb') as f:
    pickle.dump(session_scores, f)