In [None]:
!pip install --user --upgrade datasets transformers langchain langchain-huggingface

In [None]:
!rm -r chroma_db/chroma_db_all-mpnet-base-v2
!mkdir chroma_db/chroma_db_all-mpnet-base-v2

In [None]:
GOOGLE_API_KEY='INSERT THE GOOGLE API KEY'

# Import Packages & Set All Parameters

## Import Packages

In [None]:
import os
import torch
from tqdm.notebook import tqdm
from datasets import load_dataset
from transformers import (
    AutoModel,
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    HfArgumentParser,
    TrainingArguments,
    pipeline,
    logging
)

from operator import itemgetter
from langchain.load import dumps, loads
from langchain.vectorstores import Chroma
from langchain.schema import StrOutputParser
from langchain.llms import HuggingFacePipeline
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.schema.runnable import RunnablePassthrough
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from langchain.docstore.document import Document as LangchainDocument

## Set All Parameters

In [None]:
# Name of the model to use from the HuggingFace Hub
model_name = "bagoood/create_python_code_from_instruction_llama"

# Activate 4-bit precision base model loading
use_4bit = True

# Compute dtype for 4-bit base models
bnb_4bit_compute_dtype = "float16"

# Quantization type (fp4 or nf4)
bnb_4bit_quant_type = "nf4"

# Activate nested quantization for 4-bit base models (double quantization)
use_nested_quant = False

# Load the entire model on the GPU 0
device_map = {"": 0}

# Load Model

In [None]:
# Load tokenizer
compute_dtype = getattr(torch, bnb_4bit_compute_dtype)

bnb_config = BitsAndBytesConfig(
    load_in_4bit=use_4bit,
    bnb_4bit_quant_type=bnb_4bit_quant_type,
    bnb_4bit_compute_dtype=compute_dtype,
    bnb_4bit_use_double_quant=use_nested_quant,
)

# Load base model
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=bnb_config,
    device_map=device_map
)
model.config.use_cache = False
model.config.pretraining_tp = 1

# Load LLaMA tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training

# Run text generation pipeline with our next model
pipe = pipeline(task="text-generation", 
                model=model, 
                tokenizer=tokenizer,
                max_length=700)

## Model Testing

In [None]:
def make_inference(model, tokenizer, max_length_val, task):
    context = """You are a helpful assistant to generate Python code that performs a certain task.
                \nBelow is the task that the Python code must do."""
    
    # Ignore warnings
    logging.set_verbosity(logging.CRITICAL)
    
    # Run text generation pipeline with our next model
    pipe = pipeline(task="text-generation", 
                    model=model, 
                    tokenizer=tokenizer,
                    max_length=max_length_val)
    
    reformatted_prompt = f"""<s>[INST]
                                    \n{context}
                                    \n{task}
                                    \n[/INST]
                                    \nResponse:
                                    \n"""

    response = pipe(reformatted_prompt)
    
    return response

In [None]:
task = 'Create a for loop in Python that prints the output of a multiplication table for numbers from 1 to 10'
response = make_inference(model, tokenizer, 140, task)

In [None]:
print(response[0]['generated_text'])

# Load, Process, and Store Dataset

## Load and Process Dataset from The HuggingFace Hub

In [None]:
def transform_prompt_into_a_proper_format(template):
    return f"""<s>[INST]\n{template}\n[/INST]\nResponse:"""

def transform_dataset_into_proper_format(row):    
    instruction = row['instruction']
    input_val = row['input']
    output_val = row['output']
    
    sample = f"""
                \nTask:
                \n{instruction}\n{input_val} 
                \nResponse:
                \n{output_val}
                """
            
    return {'sample': sample}

def convert_huggingface_data_to_documents(dataset):
    # Load dataset
    dataset = load_dataset(dataset_name, split="train").shuffle(seed=42) \
                                                            .select(range(2500)) \
                                                            .map(transform_dataset_into_proper_format)
    
    # convert data into documents type of data
    docs = [LangchainDocument(page_content=doc["sample"], metadata={"index": i}) for i, doc in enumerate(tqdm(dataset))]
                                                                    
    return docs

In [None]:
# Load dataset
dataset_name = "iamtarun/python_code_instructions_18k_alpaca"
dataset = load_dataset(dataset_name, split="train")

# Clean input column
dataset = dataset.map(lambda row: {'input':''} if len(row['input']) == 0 or row['input'] == 'Not applicable' else {'input':row['input']})

transformed_dataset = dataset.map(transform_dataset_into_proper_format)

docs = convert_huggingface_data_to_documents(transformed_dataset)

## Embed the Dataset and Stores it on a Vector Database

In [None]:
embedding_model_name = 'sentence-transformers/all-mpnet-base-v2'
embedding_model_kwargs = {'device': 'cpu'}

embedding_model = AutoModel.from_pretrained(embedding_model_name, 
                                            trust_remote_code=True) 
embeddings = HuggingFaceEmbeddings(
            model_name=embedding_model_name,
            model_kwargs=embedding_model_kwargs)

In [None]:
 vectorstore = Chroma.from_documents(
                     documents=docs,    # Data
                     embedding=embeddings,    # Embedding model
                     persist_directory="chroma_db/chroma_db_all-mpnet-base-v2"    # Directory to save data
                     )

In [None]:
vectorstore_disk = Chroma(
                    persist_directory="chroma_db/chroma_db_all-mpnet-base-v2",    # Directory of db
                    embedding_function=embeddings   # Embedding model
               )

# a vector store retriever to retrieve the embedded documents
retriever = vectorstore_disk.as_retriever(search_kwargs={"k": 3})

## Retrieval Testing

retrieval testing on simple query

In [None]:
simple_query = 'Create a Python program that performs a multiplication table for numbers from 1 to 10'
[print(d.page_content) for d in retriever.get_relevant_documents(simple_query)]

retrieval testing on multi-step query

In [None]:
multi_step_query = 'Create a Python program that performs a multiplication table for numbers from 1 to 10, then store the resulting multiplication table into a pandas data frame'
[print(d.page_content) for d in retriever.get_relevant_documents(multi_step_query)]

retrieval testing on complex query

In [None]:
complex_query = 'Create a Python program that performs a multiplication table for numbers divisible by 2 or 3'
[print(d.page_content) for d in retriever.get_relevant_documents(complex_query)]

# Retrieval Augmented Generation

In [None]:
def format_docs(docs):
    return "\n".join(d.page_content.split('Response:')[1] for d in docs)

def get_response_from_query(pipeline, retriever, query):
    # retrieve documents that has high similarity with the given query
    docs = retriever.get_relevant_documents(query)
    
    # create an instance of the model
    llm = HuggingFacePipeline(pipeline=pipeline)

    # prompt text for the model 
    prompt = PromptTemplate(
        input_variables=["query", "docs"],
        template = """<s>[INST]
                            \nYou are a helpful assistant to generate Python code that performs a certain task.
                            \nBelow is the task that the Python code must do.
                            \n{query}
                            \nUtilize these informations below to generate the code.
                            \n{docs}
                            \nIf you feel like you don't have enough information to develop the code, say "I don't know"
                            \n[/INST]
                            \nResponse:"""
        )
        

    # chain of steps to make rag prompt
    rag_chain = (
        {"docs": retriever | format_docs, "query": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )

    # make prompt based on the chain created
    resp = rag_chain.invoke(query)

    return resp, docs

In [None]:
query = 'Create a for loop in Python that prints the output of a multiplication table for numbers from 1 to 10.'
resp, docs = get_response_from_query(pipe, retriever, query)

print(resp.split('</s>')[0], '\n')

# Retrieval Augmented Generation with Query Translation

## RAG with Multi-Query

In [None]:
def create_chain_for_generating_more_queries():
    # prompt text for generating more queries from a query
    template = """
    You are an AI language model assistant. 
    Your task is to generate three different versions of the query, each versions have their own perspectives.
    By generating multiple perspectives on the user question, your goal is to overcome the limitations of the distance-based similarity search.
    By generating multiple shorter versions of the given user question, your goal is to help the user overcome some of the limitations of the distance-based similarity search. 
    Provide these different versions of the query separated by newline.
    The query is: {query}
    """
   
    prompt_perspectives = ChatPromptTemplate.from_template(transform_prompt_into_a_proper_format(template))

    # create an instance of the model
    llm_gemini = ChatGoogleGenerativeAI(model="gemini-pro", google_api_key=GOOGLE_API_KEY)
    
    # chain of steps to generate more queries 
    generate_queries = (
        prompt_perspectives 
        | llm_gemini
        | StrOutputParser() 
        | (lambda x: x.split("\n"))
    )
    
    return generate_queries
    
def _get_unique_union(documents):
    """ Unique union of retrieved docs """
    # Flatten list of lists, and convert each Document to string
    flattened_docs = [dumps(doc) for sublist in documents for doc in sublist]
    
    # Get unique documents
    unique_docs = list(set(flattened_docs))
    
    return ''.join([loads(doc).page_content for doc in unique_docs][:3])

def create_retrieval_chain(generate_queries, retriever):
    retrieval_chain = generate_queries | retriever.map() | _get_unique_union
    
    return retrieval_chain

def get_response_from_multi_query(pipeline, retrieval_chain, query):
    # retrieve documents that has high similarity with the given query
    docs = retrieval_chain.invoke({"query":query})
    
    # create an instance of the model
    llm = HuggingFacePipeline(pipeline=pipeline)

    # prompt text for the model 
    template = """<s>[INST]
                        \nYou are a helpful assistant to generate Python code that performs a certain task.
                        \nBelow is the task that the Python code must do.
                        \n{query}
                        \nUtilize the information below to help generate the Python code.
                        \n{docs}
                        \n[/INST]
                        \nResponse:"""

    prompt = ChatPromptTemplate.from_template(template)

    # chain of steps to make rag prompt
    rag_chain = (
        {"docs": retrieval_chain, 
         "query": itemgetter("query")} 
        | prompt
        | llm
        | StrOutputParser()
    )
    
    # make prompt based on the chain created
    resp = rag_chain.invoke({"query":query})
        
    return resp, docs

In [None]:
generate_queries = create_chain_for_generating_more_queries()
retrieval_chain = create_retrieval_chain(generate_queries, retriever)

### response for simple query

In [None]:
resp, docs = get_response_from_multi_query(pipe, retrieval_chain, simple_query)

print(resp.split('</s>')[0], '\n')

### response for multi-step query

In [None]:
resp, docs = get_response_from_multi_query(pipe, retrieval_chain, multi_step_query)

print(resp.split('</s>')[0], '\n')

### response for complex query

In [None]:
resp, docs = get_response_from_multi_query(pipe, retrieval_chain, complex_query)

print(resp.split('</s>')[0], '\n')

## RAG-Fusion

In [None]:
def create_chain_for_generating_more_queries():
    # prompt text for generating more queries from a query
    template = """
    You are a helpful assistant that generates multiple instructions based on the given instruction
    Generate multiple search instruction related to: {query}
    Output (4 queries):
    """
    prompt = ChatPromptTemplate.from_template(template)
    
    # create an instance of the gemini model
    llm_gemini = ChatGoogleGenerativeAI(model="gemini-pro", google_api_key=GOOGLE_API_KEY)
    generate_queries = (
        prompt 
        | llm_gemini
        | StrOutputParser() 
        | (lambda x: x.split("\n"))
    )
    
    return generate_queries

def _reciprocal_rank_fusion(results, k=3):    
    # Initialize a dictionary to hold fused scores for each unique document
    fused_scores = {}

    # Iterate through each list of ranked documents
    for docs in results:
        # Iterate through each document in the list, with its rank (position in the list)
        for rank, doc in enumerate(docs):
            # Convert the document to a string format to use as a key (assumes documents can be serialized to JSON)
            doc_str = dumps(doc)
            # If the document is not yet in the fused_scores dictionary, add it with an initial score of 0
            if doc_str not in fused_scores:
                fused_scores[doc_str] = 0
            # Retrieve the current score of the document, if any
            previous_score = fused_scores[doc_str]
            # Update the score of the document using the RRF formula: 1 / (rank + k)
            fused_scores[doc_str] += 1 / (rank + k)

    # Sort the documents based on their fused scores in descending order to get the final reranked results
    reranked_results = [
        (loads(doc), score)
        for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
    ][:k]

    # Return the reranked results as a list of tuples, each containing the document and its fused score
    return ''.join([doc.page_content for doc, score in reranked_results])

def create_retrieval_chain(generate_queries, retriever):
    retrieval_chain = generate_queries | retriever.map() | _reciprocal_rank_fusion
    
    return retrieval_chain

def get_response_from_rag_fusion(pipeline, retrieval_chain, query):
    # retrieve documents that has high similarity with the given query
    docs = retrieval_chain.invoke({"query":query})
    
    # create an instance of the model
    llm = HuggingFacePipeline(pipeline=pipeline)
    
    # prompt text for the model 
    template = """<s>[INST]
                        \nYou are a helpful assistant to generate Python code that performs a certain task.
                        \nBelow is the task that the Python code must do.
                        \n{query}
                        \nUtilize these informations below to generate the code.
                        \n{docs}
                        \nIf you feel like you don't have enough information to develop the code, say "I don't know"
                        \n[/INST]
                        \nResponse:"""
    
    prompt = ChatPromptTemplate.from_template(template)
                
    rag_chain = (   
        {"docs": retrieval_chain, 
         "query": itemgetter("query")} 
        | prompt
        | llm
        | StrOutputParser()  
    )
   
    # make prompt based on the chain created
    resp = rag_chain.invoke({"query":query})
        
    return resp, docs

In [None]:
generate_queries = create_chain_for_generating_more_queries()
retrieval_chain = create_retrieval_chain(generate_queries, retriever)

In [None]:
query = 'Create a Python programs that performs multiplication table for numbers from 1 to 10, then store the result in a dataframe'

resp, docs = get_response_from_rag_fusion(pipe, retrieval_chain, query)

print(resp.split('</s>')[0], '\n')