Import drive to save index (if necessary)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Install required packages -
1. pyterrier for indexing & retrieval
2. transformers, torch, sentencepiece for flan-T5-base, accessing gpu
3. dask for parallelization of index creation process using map_partitions


In [None]:
!pip install python-terrier
!pip install transformers[torch,sentencepiece]
!pip install torch torchvision
# !pip install evaluate
# !pip install rouge_score

In [None]:
import dask.dataframe as dd, os, torch
import numpy as np, pandas as pd, pyterrier as pt

from tqdm import tqdm
from transformers import T5Tokenizer, T5ForConditionalGeneration

# import nltk, evaluate
# nltk.download("punkt", quiet = True)

if not pt.started():
    pt.init(boot_packages = ["com.github.terrierteam:terrier-prf:-SNAPSHOT"])

In [None]:
if torch.cuda.is_available():                 # Check if GPU is available
    print(torch.cuda.get_device_name(0))      # Print GPU device name
else:
    print("GPU not available.")

Load LaMP_3 data from source

In [None]:
folder = 'train'
df = pd.read_json(f'https://ciir.cs.umass.edu/downloads/LaMP/LaMP_3/{folder}/{folder}_questions.json')
df.shape

Sample 1500 after random shuffling for experiments and analysis

In [None]:
df = df.sample(frac = 1, random_state = 42).head(1500)

In [None]:
ddf = dd.from_pandas(df, npartitions = 10)
print(df.shape, '\n')
print(ddf)

Function for Index creation [user profiles] using pyterrier

In [None]:
def create_index(x):
    user_profile_df = pd.DataFrame(x['profile'])
    df = user_profile_df[['id', 'text']].rename(columns = {'id': 'docno'})

    path = f'./lamp3/index_{x["id"]}'

    if os.path.isdir(path):
        # pyterrier creates 10 files in the index creation process
        if len(os.listdir(path)) == 10:
            return path

    try:
        iter_indexer = pt.IterDictIndexer(path, overwrite=True)
        indexref = iter_indexer.index(df.to_dict(orient = "records"))
    except Exception as e:
        print(f"[ERROR] Creating index for {x['id']} with profile length of {df.shape[0]}")
        print(e)

    return path

In [None]:
def partition_func(dataframe):
    return dataframe.apply(create_index, axis = 1)

Run index creation process

In [None]:
%%time
print("Started indexing : ", df.shape[0])
p = ddf.map_partitions(partition_func, meta = (None, 'str'))
indexrefs = p.compute()
print("Finished indexing : ", len(indexrefs), '\n')

PPEF and AIP functions for processing topK documents to create user personalized LLM inputs - These functions are defined in the appendix of LaMP paper

In [None]:
def ppef(profile):
    text, score = profile['text'], profile['score_y']
    return f'{score} is the score for "{text}"'

def aip(topk, inputQ):
    user_context = ", and ".join([ppef(doc) for doc in topk])
    return user_context + f'. {inputQ}'

Query Expansion Using BM25, &/or RM3 - Return User Personalized Prompt

In [None]:
def query_expansion(x, indexref, rtype, k = 1, args = [3, 10]):
    # define retriever pipeline (bm25 and/or rm3) with default tokenizer for preprocessing user query

    profile, input = x['profile'], x['input']
    _, query = input.split('without further explanation. review: ')

    user_profile_df = pd.DataFrame(profile)
    user_profile_df = user_profile_df.rename(columns = {'id': 'docno'})

    try:
        pipeline = None
        bm25 = pt.BatchRetrieve(indexref, wmodel = 'BM25')

        if rtype == 'bm25':
            pipeline = pt.rewrite.tokenise() >> bm25
        else:
            rm3 = pt.rewrite.RM3(indexref, fb_docs = args[0], fb_terms = args[1])
            pipeline = pt.rewrite.tokenise() >> bm25 >> rm3 >> bm25

        # retrieve topk
        topK = pipeline.search(query).head(k)

        # generate user profile personalized prompt
        df = pd.merge(topK, user_profile_df, on = 'docno', how = 'inner')
        prompt = aip(df.to_dict('records'), input)

    except Exception as e:
        prompt = aip([], input)
        print('Error in retrieval for id: ', x['id'])
        print(e)

    return prompt

In [None]:
# indexrefs from dask map_partitions can be a permutation of requried output. So, map 'id' to appropriate 'indexref'
indexrefs = df['id'].map(lambda x: f'./lamp3/index_{str(x)}/')

In [None]:
indexrefs[:5]

Run Query Expansion and Generate User Personalized prompts for downstream LLM tasks - Predict Rating for product (LaMP 3)

In [None]:
%%time
prompts, count = [], 0
for row_1, (index_df2, row_df2) in  tqdm(zip(indexrefs, df.iterrows())):
    prompt = query_expansion(row_df2, row_1, 'rm3', 4)
    prompts.append(prompt)
print('\nNo of 0 retrieved results = ', count)

In [None]:
prompts[5]

Load Flan-T5-Base Model to CUDA

In [None]:
MODEL_NAME = "google/flan-t5-base"
tokenizer = T5Tokenizer.from_pretrained(MODEL_NAME)
model = T5ForConditionalGeneration.from_pretrained(MODEL_NAME, device_map = "auto")
model.to('cuda')

In [None]:
def generate_llm_output(input_data):
    input_ids = tokenizer(input_data, return_tensors = "pt").input_ids.to("cuda")
    outputs = model.generate(input_ids)
    decoded_output = tokenizer.decode(outputs[0])
    return decoded_output

Generate LLM output

In [None]:
%%time
print("[START] Compute LLM Output \n")
outputs = []
for prompt in prompts:
  output = generate_llm_output(prompt)
  outputs.append(output)
  torch.cuda.empty_cache()
print("[END]  Compute LLM Output \n")

In [None]:
outputs[:5]

Save LLM Inputs & Output for further analysis

In [None]:
df_output = pd.DataFrame({'id': list(indexrefs), 'prompts': prompts, 'llm_output': outputs})
df_output.to_csv('output.csv')

In [None]:
del df, df_output, prompts, outputs

import gc
gc.collect()

torch.cuda.empty_cache()

Load output labels for training data

In [None]:
df = pd.read_json('https://ciir.cs.umass.edu/downloads/LaMP/LaMP_3/train/train_outputs.json')
df['id'] = df['golds'].map(lambda x: int(x['id']))
df['target'] = df['golds'].map(lambda x: int(x['output']))
df.head(3)

Compute MAE, RMSE metrics for LaMP 3 dataset using 'output.csv' created above

In [1]:
from sklearn.metrics import mean_squared_error, mean_absolute_error

In [None]:
def compute(sett, df):
    # Load output.csv files [multiple if ran in batches]
    task = pd.concat([pd.read_csv(filename, usecols=['id', 'llm_output']) for filename in sett])

    # extract id from indexref path
    task['id'] = task['id'].map(lambda x: int(x.split('/')[-2].split('_')[1]))

    # Filter label dataset including only relevant ids
    pid_list = task['id'].tolist()
    df_temp = df[df.id.isin(pid_list)]
    ans = df_temp.merge(task, on = 'id', how = 'inner')

    def process(x):
        try:
            return int(x.strip('<pad>').strip('</s>').strip().strip('-').strip())
        except:
            return -1

    ans['prediction'] = ans['llm_output'].map(lambda x: process(x))
    print(ans[ans.prediction == -1][['llm_output']], '\n')

    ans = ans[ans.prediction != -1]

    rmse = mean_squared_error(ans['target'].tolist(), ans['prediction'].tolist(), squared = False)
    mae = mean_absolute_error(ans['target'].tolist(), ans['prediction'].tolist())

    print('MAE  = ', mae)
    print('RMSE = ', rmse, '\n')

    return mae, rmse

In [None]:
mae, rmse = compute(['output.csv'], df)