NEXT FEW CELLS FOR INSTALLING NECESSARY PACKAGES; HAVE TO RESTART KERNEL AFTER

In [None]:
!pip install PyPDF2

In [None]:
# Install Vertex AI LLM SDK
! pip install --user --upgrade google-cloud-aiplatform==1.35.0 langchain==0.0.323
! pip install typing-inspect==0.8.0
! pip install --user typing_extensions==4.5.0

# Dependencies required by Unstructured PDF loader
! sudo apt -y -qq install tesseract-ocr libtesseract-dev
! sudo apt-get -y -qq install poppler-utils
! pip install --user unstructured==0.7.5 pdf2image==1.16.3 pytesseract==0.3.10 pdfminer.six==20221105

# For Matching Engine integration dependencies (default embeddings)
! pip install --user tensorflow_hub==0.13.0 tensorflow_text==2.12.1

In [None]:
# # NOTE THAT TYPING-EXTENSIONS SPECIFICALLY HAS BEEN TRICKY ON VERSIONING
# # THE ABOVE CELL SHOULD BE SUFFICIENT, BUT RUN THIS IF NECESSARY
# !pip install typing-extensions --upgrade
# # THIS GAVE VERSION Version: 4.5.0 IF YOU HAVE ISSUES LATER

In [None]:
# Automatically restart kernel after installs so that your environment can access the new packages
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

In [None]:
# GET HELPER FUNCTIONS NEEDED FOR MATCHING ENGINE LATER IN NB
import os
import urllib.request

if not os.path.exists("utils"):
    os.makedirs("utils")

url_prefix = "https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/main/language/use-cases/document-qa/utils"
files = ["__init__.py", "matching_engine.py", "matching_engine_utils.py"]

for fname in files:
    urllib.request.urlretrieve(f"{url_prefix}/{fname}", filename=f"utils/{fname}")

In [None]:
# IMPORTS
import json
import textwrap

# Utils
import time
import uuid
from typing import List

import numpy as np
import vertexai

# Vertex AI
from google.cloud import aiplatform

print(f"Vertex AI SDK version: {aiplatform.__version__}")

# LangChain
import langchain

print(f"LangChain version: {langchain.__version__}")

from typing_extensions import TypeAlias
from langchain.chains import RetrievalQA
from langchain.document_loaders import GCSDirectoryLoader
from langchain.embeddings import VertexAIEmbeddings
from langchain.llms import VertexAI
from langchain.prompts import PromptTemplate
from langchain.text_splitter import RecursiveCharacterTextSplitter


# Import custom Matching Engine packages
from utils.matching_engine import MatchingEngine
from utils.matching_engine_utils import MatchingEngineUtils

In [None]:
PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]
REGION = "us-central1" #NOTE THAT YOU'LL WANT SAME REGION FOR INSTANCE, BUCKETS, ENDPOINTS, ETC.

# INIT VERTEX AI SDK 
vertexai.init(project=PROJECT_ID, location=REGION)

In [None]:
# Utility functions for Embeddings API with rate limiting
def rate_limit(max_per_minute):
    period = 60 / max_per_minute
    print("Waiting")
    while True:
        before = time.time()
        yield
        after = time.time()
        elapsed = after - before
        sleep_time = max(0, period - elapsed)
        if sleep_time > 0:
            print(".", end="")
            time.sleep(sleep_time)


class CustomVertexAIEmbeddings(VertexAIEmbeddings):
    requests_per_minute: int
    num_instances_per_batch: int

    # Overriding embed_documents method
    def embed_documents(self, texts: List[str]):
        limiter = rate_limit(self.requests_per_minute)
        results = []
        docs = list(texts)

        while docs:
            # Working in batches because the API accepts maximum 5
            # documents per request to get embeddings
            head, docs = (
                docs[: self.num_instances_per_batch],
                docs[self.num_instances_per_batch :],
            )
            chunk = self.client.get_embeddings(head)
            results.extend(chunk)
            next(limiter)

        return [r.values for r in results]

In [None]:
# TEXT MODEL INTEGRATED WITH LANGCHAIN 
llm = VertexAI(
    model_name="text-bison@001",
    max_output_tokens=1024,
    temperature=0.2,
    top_p=0.8,
    top_k=40,
    verbose=True,
)

# EMBEDDINGS API INTEGRATED WITH LANGCHAIN 
EMBEDDING_QPM = 100
EMBEDDING_NUM_BATCH = 5
embeddings = CustomVertexAIEmbeddings(
    requests_per_minute=EMBEDDING_QPM,
    num_instances_per_batch=EMBEDDING_NUM_BATCH,
)

In [None]:
# NOTE TO SELF - ME = MATCHING ENGINE
ME_REGION = "us-central1" # NEEDS TO ALIGN WITH REGION VAR
ME_INDEX_NAME = "tax-rag-me-index-test"  # REPLACE WITH YOUR OWN NAMING CONVENTION
ME_EMBEDDING_DIR = "tax-rag-me-bucket-test"  # SAME
ME_DIMENSIONS = 768  # WHEN USING VERTEX PaLM EMBEDDING 

In [None]:
# CREATE BUCKET IF YOU HAVEN'T ALREADY
!gsutil mb -l {REGION} gs://{ME_EMBEDDING_DIR} 

In [None]:
# CREATE A DUMMY EMBEDDINGS FILE TO INITIALIZE WHEN CREATING THE INDEX

# DUMMY EMBEDDING
init_embedding = {"id": str(uuid.uuid4()), "embedding": list(np.zeros(ME_DIMENSIONS))}

# DUMP EMBEDDING TO LOCAL FILE 
with open("embeddings_0.json", "w") as f:
    json.dump(init_embedding, f)

# write embedding to Cloud Storage
! set -x && gsutil cp embeddings_0.json gs://{ME_EMBEDDING_DIR}/init_index/embeddings_0.json

In [None]:
# CREATE MATCHING ENGINE VAR
mengine = MatchingEngineUtils(PROJECT_ID, ME_REGION, ME_INDEX_NAME)

In [None]:
# GET INDEX
index = mengine.create_index(
    embedding_gcs_uri=f"gs://{ME_EMBEDDING_DIR}/init_index",
    dimensions=ME_DIMENSIONS,
    index_update_method="streaming",
    index_algorithm="tree-ah",
)
if index:
    print(index.name)

In [None]:
# AND ENDPOINT
index_endpoint = mengine.deploy_index()
if index_endpoint:
    print(f"Index endpoint resource name: {index_endpoint.name}")
    print(
        f"Index endpoint public domain name: {index_endpoint.public_endpoint_domain_name}"
    )
    print("Deployed indexes on the index endpoint:")
    for d in index_endpoint.deployed_indexes:
        print(f"    {d.id}")

In [None]:
# POINT TO BUCKET WITH THE PDFS YOU WANT FOR SEMANTIC SEARCH LATER
PDF_BUCKET = "irs_written_determinations_test" # REPLACE WITH YOUR OWN BUCKET
BUCKET = 'gs://irs_written_determinations_test/' # HAD ISSUES WITH NEEDING GS SOMETIMES AND NOT OTHER TIMES

EXTREMELY IMPORTANT NOTE READ BEFORE MOVING FORARD!!

TLDR; NEED TO USE NANO TO EDIT ~/.local/lib/python3.10/site-packages/unstructured/partition/strategies.py \
ADD THE SNIPPET if sum(1 for _ in PDFPage.get_pages(fp, check_extractable=True)) > 0: BEFORE THE PART THAT EXECUTES THE NEXT() STATEMENT AT THE CODE BLOCK STARTING WITH def _fp_is_extractable(fp):

documents = loader.load() LINE USED TO MAKE THE BELOW CELL BARF; CHECKED PDFs ONE BY ONE AND FOUND 0303021.pdf WAS THE FIRST CULPRIT. THERE WAS NO REAL ERROR, IT JUST SAID "StopIteration: ", WHICH I GOOGLED AROUND AND SAW HAPPENS WHEN AN ITERATOR OR GENERATOR IS EXHAUSTED. 

ERROR NOTES SHOW THAT THIS HAPPENED FROM A LINE OF CODE next(PDFPage.get_pages(fp, check_extractable=True)) IN ~/.local/lib/python3.10/site-packages/unstructured/partition/strategies.py. NOTE THAT PDFPage.get_pages(fp, check_extractable=True) RETURNS A GENERATOR, SO THIS MAKES SENSE.

THIS IMPLIES THAT THE ISSUE IS THAT FOR THIS PDF, THE PDFPage.get_pages FINDS NO PAGES (I ASSUME THIS GENERATOR CONTAINS PAGES BUT IDK FOR SURE) FOR THAT FILE PATH; I THINK THIS ERROR IS BIZARRE BECAUSE THE PDF CLEARLY EXISTS AND HAS PAGES. LATER ON I WANT TO LOOK INTO THIS, BUT FOR NOW I THINK IT'S ENOUGH TO JUST EXCLUDE FILES THAT CAUSE THIS PROBLEM. SO, MY WORKAROUND IS TO GO INTO THE FILE AND ADD THE CODE if sum(1 for _ in PDFPage.get_pages(fp, check_extractable=True)) > 0: BEFORE THE PART THAT EXECUTES THE NEXT() STATEMENT SO THAT WE ONLY DEAL WITH FILES THAT THIS FUNCTION CAN HANDLE. AGAIN, AT A LATER POINT I'LL TRY TO FIGURE OUT A FIX THAT DOESN'T TOSS GOOD FILES; FOR NOW HOPEFULLY THIS IS AN UNCOMMON ERROR AND COSTS US FAIRLY LITTLE.

IF YOU HAVE ISSUES IN THE FUTURE KNOW THAT THIS WAS PART OF THE CODE BLOCK STARTING WITH def _fp_is_extractable(fp):

ADDENDUM - MY FIX NOT ONLY STOPPED IT FROM BARFING, BUT THE FILE WASN'T EVEN TOSSED; RAN CELLS WITH JUST THAT ONE PDF AS THE INPUT AND THE DOWNSTREAM VECTOR SEARCH RETURNED CHUNKS FROM THE DOC RELATED TO THE Q; WE GUCCI 

NOTE THAT YOU HAVE TO GIVE STORAGE ADMIN ACCESS TO THE SERVICE ACCOUNT ENDING IN compute@developer.gserviceaccount.com TO THE BUCKETS FOR THIS TO WORK; YOU CAN DO THIS IN THE IAM TAB OF THE GCP PAGE

In [None]:
# INGEST PDF FILES 

print(f"Processing documents from {PDF_BUCKET}")
loader = GCSDirectoryLoader(
    project_name=PROJECT_ID, bucket=PDF_BUCKET#, prefix=folder_prefix
)
documents = loader.load()
# ADD DOC NAME/SOURCE TO METADATA 
for document in documents:
    doc_md = document.metadata
    document_name = doc_md["source"].split("/")[-1]
    # GET DOC SOURCE FROM DOC LOADER 
    doc_source_prefix = "/".join(PDF_BUCKET.split("/")[:3])
    doc_source_suffix = "/".join(doc_md["source"].split("/")[4:-1])
    source = f"{doc_source_prefix}/{doc_source_suffix}"
    document.metadata = {"source": source, "document_name": document_name}

print(f"# of documents loaded (pre-chunking) = {len(documents)}")

In [None]:
# CHECK METADATA
documents[0].metadata

IF YOU NEED TO RESTART THE KERNEL AT ANY POINT (E.G. WANT TO SHUT DOWN INSTANCE WHILE NOT WORKING IN NOTEBOOKS) IT IS SUPER USEFUL TO HAVE THE DOCUMENTS LIST AVAILABLE WITHOUT HAVING TO RE-INGEST THEM. BELOW CELL STORES THE VAR FOR YOU AND THEN THE ONE BELOW WILL READ IT IN WITHOUT HAVING TO RE-RUN ABOVE INGESTION CELL

In [None]:
%store documents

In [None]:
# %store -r documents

In [None]:
# SPLIT DOCS INTO CHUNKS FOR AFFORDABLE SEARCH 
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=50,
    separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""],
)
doc_splits = text_splitter.split_documents(documents)

# ADD CHUNK NUM TO METADAT
for idx, split in enumerate(doc_splits):
    split.metadata["chunk"] = idx

print(f"# of documents = {len(doc_splits)}")


In [None]:
doc_splits[0].metadata

In [None]:
%store doc_splits

In [None]:
# %store -r doc_splits

CONFIGURE MATCHING ENGINE AS VECTOR STORE

In [None]:
ME_INDEX_ID, ME_INDEX_ENDPOINT_ID = mengine.get_index_and_endpoint()
print(f"ME_INDEX_ID={ME_INDEX_ID}")
print(f"ME_INDEX_ENDPOINT_ID={ME_INDEX_ENDPOINT_ID}")

In [None]:
%store ME_INDEX_ID ME_INDEX_ENDPOINT_ID

In [None]:
# INIT ME VECTOR STORE W/TEXT EMBEDDING MODEL

me = MatchingEngine.from_components(
    project_id=PROJECT_ID,
    region=ME_REGION,
    gcs_bucket_name=f"gs://{ME_EMBEDDING_DIR}".split("/")[2],
    embedding=embeddings,
    index_id=ME_INDEX_ID,
    endpoint_id=ME_INDEX_ENDPOINT_ID,
)

In [None]:
# STORE DOCS AS EMBEDDINGS IN MATCHING ENGINE INDEX 
# LIMITED API RATE MAY MEAN THIS TAKES A WHILE  
texts = [doc.page_content for doc in doc_splits]
metadatas = [
    [
        {"namespace": "source", "allow_list": [doc.metadata["source"]]},
        {"namespace": "document_name", "allow_list": [doc.metadata["document_name"]]},
        {"namespace": "chunk", "allow_list": [str(doc.metadata["chunk"])]},
    ]
    for doc in doc_splits
]

In [None]:
%store texts
%store metadatas

In [None]:
# %store -r texts
# %store -r metadatas

A FEW NOTES ON ADDING EMBEDDINGS TO VECTORE STORE:

1 - THIS TAKES A VERY LONG TIME (IT'S THE LONGEST PART OF THIS PROCESS)\
2 - I'VE SEEN MANY INSTANCES OF THE KERNEL DYING OR CONNECTION BEING INTERRUPTED BEFORE THE PROCESS FINISHES\
3 - ADDING THE EMBEDDINGS 100 AT A TIME IS MEANT TO HELP MITIGATE THIS - EVEN IF SOMETHING HAPPENS TO INTERRUPT, THE ONES ALREADY ADDED WILL BE THERE MOVING FORWARD\
4 - THE GSUTIL CELL GIVES THE NUMBER OF EMBEDDINGS ADDED TO THE EMBEDDING DIRECTORY; IN THE EVENT THAT SOMETHING INTERRUPTS THE ADDITIONS, RUN THIS CELL TO SEE HOW FAR IT MADE IT THROUGH THE DOCS AND THEN RE-RUN THE FOR LOOP STARTING AT THE REQUISITE POINT IN THE TEXTS/METADATAS LISTS

In [None]:
# ADD EMBEDDINGS TO THE VECTOR STORE
for i in np.arange(0, len(metadatas), 100):
    doc_ids = me.add_texts(texts=texts[i - 100:i], metadatas=metadatas[i - 100:i])


In [None]:
!gsutil du gs://{ME_EMBEDDING_DIR} | wc -l

In [None]:
# SPOT CHECK LAST ONE LOOKS CORRECT
doc_ids[-1]

CLEANUP/DELETING RESOURCES - RUNNING THESE WILL DELETE EVERYTHING!

In [None]:
# CLEANUP_RESOURCES = True

In [None]:
# ME_INDEX_ID, ME_INDEX_ENDPOINT_ID = mengine.get_index_and_endpoint()
# print(f"ME_INDEX_ID={ME_INDEX_ID}")
# print(f"ME_INDEX_ENDPOINT_ID={ME_INDEX_ENDPOINT_ID}")

In [None]:
# if CLEANUP_RESOURCES and "mengine" in globals():
#     print(
#         f"Undeploying all indexes and deleting the index endpoint {ME_INDEX_ENDPOINT_ID}"
#     )
#     mengine.delete_index_endpoint()

In [None]:
# if CLEANUP_RESOURCES and "mengine" in globals():
#     print(f"Deleting the index {ME_INDEX_ID}")
#     mengine.delete_index()

In [None]:
# if CLEANUP_RESOURCES and "ME_EMBEDDING_DIR" in globals():
#     print(f"Deleting contents from the Cloud Storage bucket {ME_EMBEDDING_DIR}")
#     ME_EMBEDDING_BUCKET = "/".join(ME_EMBEDDING_DIR.split("/")[:3])

#     shell_output = ! gsutil du -ash gs://$ME_EMBEDDING_BUCKET
#     print(shell_output)
#     print(
#         f"Size of the bucket {ME_EMBEDDING_BUCKET} before deleting = {' '.join(shell_output[0].split()[:2])}"
#     )

#     # uncomment below line to delete contents of the bucket
#     ! gsutil -m rm -r gs://$ME_EMBEDDING_BUCKET