# Loading PDF file

In [1]:
import os
from dotenv import load_dotenv

In [20]:
pdf_path = 'resources/bss-ch01.pdf'
db_path = 'contents'
db_name = 'secure_software-ch12'
load_dotenv()

True

In [4]:
# from pypdf import PdfReader

# def load_pdf(file_path):
#     """
#     Reads the text content from a PDF file and returns it as a single string.

#     Parameters:
#     - file_path (str): The file path to the PDF file.

#     Returns:
#     - str: The concatenated text content of all pages in the PDF.

#     Raises:
#     - FileNotFoundError: If the specified file_path does not exist.
#     - PyPDF2.utils.PdfReadError: If the PDF file is encrypted or malformed.

#     Example:
#     >>> pdf_text = load_pdf("example.pdf")
#     >>> print(pdf_text)
#     "This is the text content extracted from the PDF file."
#     """
#     # Logic to read pdf
#     reader = PdfReader(file_path)

#     # Loop over each page and store it in a variable
#     text = ""
#     for page in reader.pages:
#         text += page.extract_text()

#     return text

# text = load_pdf(file_path=pdf_path)

In [5]:
# print(len(text))

66897


In [39]:
# pdf_text = load_pdf(file_path=pdf_path)

# Splitting the text

In [30]:
# import re
# def split_text(text: str):
#     """
#     Splits a text string into a list of non-empty substrings based on the specified pattern.
#     The "\n \n" pattern will split the document para by para
#     Parameters:
#     - text (str): The input text to be split.

#     Returns:
#     - List[str]: A list containing non-empty substrings obtained by splitting the input text.

#     """
#     split_text = re.split('\n \n', text)
#     return [i for i in split_text if i != ""]

In [40]:
# text = split_text(pdf_text)

# Splitting per page of the pdf

In [5]:
import fitz
text = []
with fitz.open(pdf_path) as doc:
    for page in doc:
        text.append(page.get_text())
    

In [18]:
print(len(text))
print(len(text[0]))
with open('temp.txt', 'w') as file:
    file.writelines(text)

28
1548


# Embedding the text

In [6]:
import google.generativeai as genai
from chromadb import Documents, EmbeddingFunction, Embeddings
import os

class GeminiEmbeddingFunction(EmbeddingFunction):
    """
    Custom embedding function using the Gemini AI API for document retrieval.

    This class extends the EmbeddingFunction class and implements the __call__ method
    to generate embeddings for a given set of documents using the Gemini AI API.

    Parameters:
    - input (Documents): A collection of documents to be embedded.

    Returns:
    - Embeddings: Embeddings generated for the input documents.

    Raises:
    - ValueError: If the Gemini API Key is not provided as an environment variable (GEMINI_API_KEY).

    Example:
    >>> gemini_embedding_function = GeminiEmbeddingFunction()
    >>> input_documents = Documents(["Document 1", "Document 2", "Document 3"])
    >>> embeddings_result = gemini_embedding_function(input_documents)
    >>> print(embeddings_result)
    Embeddings for the input documents generated by the Gemini AI API.
    """
    def __call__(self, input: Documents) -> Embeddings:
        gemini_api_key = os.getenv("GEMINI_API_KEY")
        if not gemini_api_key:
            raise ValueError("Gemini API Key not provided. Please provide GEMINI_API_KEY as an environment variable")
        genai.configure(api_key=gemini_api_key)
        model = "models/embedding-001"
        title = "Custom query"
        return genai.embed_content(model=model,
                                   content=input,
                                   task_type="retrieval_document",
                                   title=title)["embedding"]


  from .autonotebook import tqdm as notebook_tqdm


# Storing vectors into DB

In [7]:
import chromadb
def create_chroma_db(documents, path, name):
    """
    Creates a Chroma database using the provided documents, path, and collection name.

    Parameters:
    - documents: An iterable of documents to be added to the Chroma database.
    - path (str): The path where the Chroma database will be stored.
    - name (str): The name of the collection within the Chroma database.

    Returns:
    - Tuple[chromadb.Collection, str]: A tuple containing the created Chroma Collection and its name.
    """
    chroma_client = chromadb.PersistentClient(path=path)
    db = chroma_client.create_collection(name=name, embedding_function=GeminiEmbeddingFunction())

    for i, d in enumerate(documents):
        db.add(documents=d, ids=str(i))

    return db, name


In [21]:
db,name =create_chroma_db(documents=text, path=db_path, name=db_name)

In [10]:
def load_chroma_collection(path, name):
    """
    Loads an existing Chroma collection from the specified path with the given name.

    Parameters:
    - path (str): The path where the Chroma database is stored.
    - name (str): The name of the collection within the Chroma database.

    Returns:
    - chromadb.Collection: The loaded Chroma Collection.
    """
    chroma_client = chromadb.PersistentClient(path=path)
    db = chroma_client.get_collection(name=name, embedding_function=GeminiEmbeddingFunction())

    return db


In [22]:
db=path=load_chroma_collection(db_path, name=db_name)

# Retrieval

In [12]:
def get_relevant_passage(query, db, n_results):
  passage = db.query(query_texts=[query], n_results=n_results)['documents'][0]
  return passage

In [23]:
relevant_text = get_relevant_passage("One of the keys to recovering from an attack",db,3)

In [26]:
print(relevant_text)
with open('temp2.txt', 'w') as file:
    file.writelines(relevant_text)

['context. A key insight about security is to realize that any given system, no\nmatter how “secure,” can probably be broken. In the end, security must\nbe understood in terms of a simple question: Secure against what and\nfrom whom?\nUnderstanding security is best understood by thinking about goals.\nWhat is it we are trying to protect? From whom are we protecting it? How\ncan we get what we want?\nPrevention\nAs in today’s criminal justice system, much more attention is paid to security\nafter something bad happens than before. In both cases, an ounce of preven-\ntion is probably worth a pound of punishment.\nInternet time compresses not only the software development life cycle\n(making software risk management a real challenge), it also directly affects\nthe propagation of attacks. Once a successful attack on a vulnerability is\nfound, the attack spreads like wildﬁre on the Internet. Often, the attack is\nembedded in a simple script, so that an attacker requires no more skill than\n

# Generation

In [27]:
def make_rag_prompt(query, relevant_passage):
  escaped = relevant_passage.replace("'", "").replace('"', "").replace("\n", " ")
  prompt = ("""You are a helpful and informative bot that answers questions using text from the reference passage included below. \
  Be sure to respond in a complete sentence, being comprehensive, including all relevant background information. \
  However, you are talking to a non-technical audience, so be sure to break down complicated concepts and \
  strike a friendly and converstional tone. \
  If the passage is irrelevant to the answer, you may ignore it.
  QUESTION: '{query}'
  PASSAGE: '{relevant_passage}'

  ANSWER:
  """).format(query=query, relevant_passage=escaped)

  return prompt

In [28]:
import google.generativeai as genai
def generate_response(prompt):
    gemini_api_key = os.getenv("GEMINI_API_KEY")
    if not gemini_api_key:
        raise ValueError("Gemini API Key not provided. Please provide GEMINI_API_KEY as an environment variable")
    genai.configure(api_key=gemini_api_key)
    model = genai.GenerativeModel('gemini-pro')
    answer = model.generate_content(prompt)
    return answer.text

# Bringing it all together

In [29]:

def generate_answer(db,query):
    #retrieve top 3 relevant text chunks
    relevant_text = get_relevant_passage(query,db,n_results=3)
    prompt = make_rag_prompt(query, 
                             relevant_passage="".join(relevant_text)) # joining the relevant chunks to create a single passage
    answer = generate_response(prompt)

    return answer
    
    
    

In [30]:
db=load_chroma_collection(path=db_path, #replace with path of your persistent directory
                          name=db_name) #replace with the collection name

answer = generate_answer(db,query="Who understand the auditing well?")
print(answer)


Auditing is best understood by accountants, banks, other financial institutions, and most businesses who routinely audit their inventories.  Public companies must even have their records audited by accounting firms to meet Security Exchange Commission (SEC) regulations.
