## Install Dependencies

In [None]:
%pip install --user google-cloud-aiplatform google-cloud-documentai --upgrade

In [None]:
%pip install --user farm-haystack[faiss,preprocessing,inference]


## Initialize Vertex

In [1]:
PROJECT_ID = "your_project_id" #@param {type:"string"}
REGION = "us-central1" #@param {type:"string"}

import vertexai
vertexai.init(project=PROJECT_ID, location=REGION)

## Initialize DocAI

In [2]:
LOCATION="us"
PROCESSOR_ID="your_ocr_processor_id" #@param {type:"string"}
PROCESSOR_VERSION="pretrained-ocr-v1.2-2022-11-10"

In [3]:
INPUT_URI="gs://path_to_source_pdf" #@param {type:"string"}
OUTPUT_BUCKET="gs://your_outpit_bucket" #@param {type:"string"}
OUTPUT_DIRECTORY="docai-output/" #@param {type:"string"}
LOCAL_PATH="./data/michigan/output/" #@param {type:"string"}

In [4]:
%mkdir -p $LOCAL_PATH

In [4]:
import re

from google.api_core.client_options import ClientOptions
from google.api_core.exceptions import InternalServerError
from google.api_core.exceptions import RetryError
from google.cloud import documentai
from google.cloud import storage

# TODO(developer): Uncomment these variables before running the sample.
# project_id = 'YOUR_PROJECT_ID'
# location = 'YOUR_PROCESSOR_LOCATION' # Format is 'us' or 'eu'
# processor_id = 'YOUR_PROCESSOR_ID' # Example: aeb8cea219b7c272
# processor_version_id = "YOUR_PROCESSOR_VERSION_ID" # Example: pretrained-ocr-v1.0-2020-09-23
# gcs_input_uri = "YOUR_INPUT_URI" # Format: gs://bucket/directory/file.pdf
# input_mime_type = "application/pdf"
# gcs_output_bucket = "YOUR_OUTPUT_BUCKET_NAME" # Format: gs://bucket
# gcs_output_uri_prefix = "YOUR_OUTPUT_URI_PREFIX" # Format: directory/subdirectory/
# field_mask = "text,entities,pages.pageNumber"  # Optional. The fields to return in the Document object.

In [5]:
def batch_process_documents_processor_version(
    project_id: str,
    location: str,
    processor_id: str,
    processor_version_id: str,
    gcs_input_uri: str,
    input_mime_type: str,
    gcs_output_bucket: str,
    gcs_output_uri_prefix: str,
    local_path: str,
    field_mask: str = None,
    timeout: int = 400,
):
    # You must set the api_endpoint if you use a location other than 'us'.
    opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")

    client = documentai.DocumentProcessorServiceClient(client_options=opts)

    #gcs_document = documentai.GcsDocument(
    #    gcs_uri=gcs_input_uri, mime_type=input_mime_type
    #)

    # Load GCS Input URI into a List of document files
    #gcs_documents = documentai.GcsDocuments(documents=[gcs_document])
    #input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents)

    # NOTE: Alternatively, specify a GCS URI Prefix to process an entire directory
    #
    # gcs_input_uri = "gs://bucket/directory/"
    gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=gcs_input_uri)
    input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix)
    #

    # Cloud Storage URI for the Output Directory
    # This must end with a trailing forward slash `/`
    destination_uri = f"{gcs_output_bucket}/{gcs_output_uri_prefix}"

    gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig(
        gcs_uri=destination_uri, field_mask=field_mask
    )

    # Where to write results
    output_config = documentai.DocumentOutputConfig(gcs_output_config=gcs_output_config)

    # The full resource name of the processor version
    # e.g. projects/{project_id}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}
    name = client.processor_version_path(
        project_id, location, processor_id, processor_version_id
    )

    request = documentai.BatchProcessRequest(
        name=name,
        input_documents=input_config,
        document_output_config=output_config,
    )

    # BatchProcess returns a Long Running Operation (LRO)
    operation = client.batch_process_documents(request)

    # Continually polls the operation until it is complete.
    # This could take some time for larger files
    # Format: projects/PROJECT_NUMBER/locations/LOCATION/operations/OPERATION_ID
    try:
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result(timeout=timeout)
    # Catch exception when operation doesn't finish before timeout
    except (RetryError, InternalServerError) as e:
        print(e.message)

    # NOTE: Can also use callbacks for asynchronous processing
    #
    # def my_callback(future):
    #   result = future.result()
    #
    # operation.add_done_callback(my_callback)

    # Once the operation is complete,
    # get output document information from operation metadata
    metadata = documentai.BatchProcessMetadata(operation.metadata)

    if metadata.state != documentai.BatchProcessMetadata.State.SUCCEEDED:
        raise ValueError(f"Batch Process Failed: {metadata.state_message}")

    storage_client = storage.Client()

    print("Output files:")
    # One process per Input Document
    for process in metadata.individual_process_statuses:
        # output_gcs_destination format: gs://BUCKET/PREFIX/OPERATION_NUMBER/INPUT_FILE_NUMBER/
        # The Cloud Storage API requires the bucket name and URI prefix separately
        matches = re.match(r"gs://(.*?)/(.*)", process.output_gcs_destination)
        if not matches:
            print(
                "Could not parse output GCS destination:",
                process.output_gcs_destination,
            )
            continue

        output_bucket, output_prefix = matches.groups()

        # Get List of Document Objects from the Output Bucket
        output_blobs = storage_client.list_blobs(output_bucket, prefix=output_prefix)

        # Document AI may output multiple JSON files per source file
        for blob in output_blobs:
            # Document AI should only output JSON files to GCS
            if ".json" not in blob.name:
                print(
                    f"Skipping non-supported file: {blob.name} - Mimetype: {blob.content_type}"
                )
                continue

            # Download JSON File as bytes object and convert to Document Object
            print(f"Fetching {blob.name}")
            document = documentai.Document.from_json(
                blob.download_as_bytes(), ignore_unknown_fields=True
            )

            # For a full list of Document object attributes, please reference this page:
            # https://cloud.google.com/python/docs/reference/documentai/latest/google.cloud.documentai_v1.types.Document

            # Read the text recognition output from the processor
            #print("The document contains the following text:")
            #print(document.text)
            # save locally only because haystack utils do not read from GCS
            with open(local_path+blob.name.split('/')[-1].split('.')[0]+".txt", mode="wt") as f:
                f.write(document.text)
                f.flush()
                f.close()

## Batch ingestion with DocAI

In [7]:
batch_process_documents_processor_version(
    project_id=PROJECT_ID,
    location=LOCATION,
    processor_id=PROCESSOR_ID,
    processor_version_id=PROCESSOR_VERSION,
    gcs_input_uri=INPUT_URI,
    input_mime_type="application/pdf",
    gcs_output_bucket=OUTPUT_BUCKET,
    gcs_output_uri_prefix=OUTPUT_DIRECTORY,
    field_mask = None,
    timeout = 4000,
    local_path=LOCAL_PATH,
)

Waiting for operation projects/222360216126/locations/us/operations/196448753284277972 to complete...
Output files:
Fetching docai-output/196448753284277972/0/203-0.json
The document contains the following text:
BEM 203
POLICY
DUPLICATE
RECEIPT OF
ASSISTANCE
1 of 4
CRIMINAL JUSTICE DISQUALIFICATIONS
BPB 2021-010
4-1-2021
Family Independence Program (FIP), Refugee Cash Assistance
(RCA) State Disability Assistance (SDA) and Food Assistance
Program (FAP)
People who have been convicted of certain crimes and probation or
parole violators are not eligible for assistance.
Policy to establish intentional program violations (IPV)
disqualifications and overissuances is found in Bridges
Administrative Manual (BAM) 700, Benefit Overissuances and BAM
720, Intentional Program Violation.
FIP
A person is disqualified for a period of 10 years beginning with the
date of conviction if convicted in court of having made a fraudulent
statement or representation regarding their residence in order to
receive 

## Haystack

In [6]:
# Text converters and preprocessors
from haystack.nodes import TextConverter, PDFToTextConverter, DocxToTextConverter, PreProcessor
# Utilities
from haystack.utils import print_answers, convert_files_to_docs, print_documents, clean_wiki_text
# Document store
from haystack.document_stores import FAISSDocumentStore
# Embedders
from haystack.nodes import MultihopEmbeddingRetriever
# Pipelines
from haystack.pipelines import DocumentSearchPipeline, GenerativeQAPipeline

In [7]:
## Misc ##
from tabulate import tabulate
import numpy as np
import pandas as pd

## Initialize document store
For expediency we'll use a local store, since the number of documents is small. However, at scale consider using an elastic search store or a backing postgres db (postgres has native multi-dimensional distance functions).

In [8]:
FAISS_INDEX="michigan-faiss_index.idx"

In [11]:
document_store = FAISSDocumentStore(
    sql_url="sqlite:///faiss_document_store_michigan.db",
    #sql_url="postgresql+pg8000://docuser:docpw@127.0.0.1:5432/your-docstore", #connection string to cloudsql proxy, which must be running on local machine.
    embedding_dim=768, # to be used with deep passage retrievers
    faiss_index_factory_str="HNSW", #FLAT is slow above 1M docs. Use HNSW in that case
    similarity="cosine", # default is dot_product. Use that or cosine as appropriate one for the embedder of choice
    return_embedding=True,
    duplicate_documents="skip"
)

In [9]:
# If you have already saved a local doc store
document_store = FAISSDocumentStore.load(index_path=FAISS_INDEX)

## Clean and load docs

In [13]:
# If you have text files in a directory
doc_dir = "./data/michigan/output"

all_docs = convert_files_to_docs(dir_path=doc_dir, 
                                 split_paragraphs=True,
                                 clean_func=clean_wiki_text
                                )

In [14]:
preprocessor = PreProcessor(
    clean_empty_lines=True,
    clean_whitespace=True,
    clean_header_footer=True,
    split_by="sentence",
    split_length=4,
    split_respect_sentence_boundary=False,
    #split_overlap=1
)
#docs = preprocessor.process(dicts)
docs = preprocessor.process(all_docs)

print(f"n_files_input: {len(all_docs)}\nn_docs_output: {len(docs)}")

[nltk_data] Downloading package punkt to /home/jupyter/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
Preprocessing: 100%|██████████| 27/27 [00:00<00:00, 211.38docs/s]

n_files_input: 27
n_docs_output: 431





In [15]:
document_store.write_documents(docs)

Writing Documents: 10000it [00:00, 23051.78it/s]          


In [16]:
document_store.save(index_path=FAISS_INDEX)

## Initialize Retriever and Reader

### Retriever

We use an `Embedding Retriever` and invoke `update_embeddings` to index the embeddings of documents in the `FAISSDocumentStore`  
  
Pick either a dense passage retriever, or an embedding retriever 

In [10]:
retriever=MultihopEmbeddingRetriever(
    document_store=document_store,
    embedding_model="sentence-transformers/multi-qa-mpnet-base-cos-v1", #embedding model. distance must match that of the doc store
    model_format="sentence_transformers",
    max_seq_len=512, #max input sequence of embedding model
    top_k=10,
    use_gpu=True,
    batch_size=16,
    num_iterations=3,
    scale_score=True,
    embed_meta_fields=["name"],
)

In [18]:
document_store.update_embeddings(retriever)

Updating Embedding:   0%|          | 0/431 [00:00<?, ? docs/s]

Batches:   0%|          | 0/27 [00:00<?, ?it/s]

Documents Processed: 10000 docs [00:15, 662.09 docs/s]         


In [19]:
document_store.save(index_path=FAISS_INDEX)

## Document Search
Let's test the retriever with a sample doc search.

In [11]:
p_retrieval = DocumentSearchPipeline(retriever)

In [12]:
%%time
res = p_retrieval.run(query="Dad is an undocumented person, will his income still be counted in the FAP budget?", params={"Retriever": {"top_k": 10}})
print_documents(res, max_text_len=256)

Querying:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Querying: 100%|██████████| 1/1 [00:03<00:00,  3.33s/it]


Query: Dad is an undocumented person, will his income still be counted in the FAP budget?

{   'content': "Mary's\n"
               'children are U.S. citizens. Mary is an ineligible non-citizen '
               'for FAP.\n'
               "Mary's income is divided by 3 (number in Mary's group). Of "
               "Mary's\n"
               "income, Bridges budgets the children's portion (2/3) and "
               'excludes\n'
               'A member add that increases benefits is effect...',
    'name': '550-0.txt'}

{   'content': 'DEPARTMENT OF HEALTH & HUMAN SERVICES\n'
               'The number of eligible FAP group members is added to the\n'
               'number of disqualified persons that live with the group.\n'
               "Next the disqualified/ineligible person's income is divided "
               'by\n'
               'the number of persons in step 1.\n'
               'Then the...',
    'name': '550-0.txt'}

{   'content': 'If the FAP budgetable income included FIP




## Retrieval pipeline

In [15]:
from vertexai.preview.language_models import TextGenerationModel,ChatModel,InputOutputTextPair,TextEmbeddingModel

In [49]:
generation_model = TextGenerationModel.from_pretrained("text-bison-32k")
chat_model = ChatModel.from_pretrained("chat-bison-32k")
qa_parameters = {
  "temperature": 0.2,
  "max_output_tokens": 1024,
  "top_p": 0.8,
  "top_k": 40,
  #"candidate_count": 1 # only for chat
}
chat_parameters = {
  "temperature": 0.2,
  "max_output_tokens": 1024,
  "top_p": 0.8,
  "top_k": 40,
}


In [17]:
## get content and similarity scores
def get_scores(response):    
    scores=[]    
    context=[]
    #context.columns=["Paragraph_text"]
    for document in response['documents']:
        context.append([getattr(document,"content")])
        #context.loc[len(context)]={"Paragraph_text":[getattr(document,"content")]}
        scores.append([document.meta['name'], document.id, document.score])
        #scores=pd.concat([scores,
        #                  pd.DataFrame({"Title":document.meta['name'], "Paragraph_id":document.id, "Score":document.score})
        #                 ],
        #                 ignore_index=True)
    scores=pd.DataFrame(scores, columns=["Title", "Paragraph_id", "Score"])
    context=pd.DataFrame(context, columns=["Paragraph_text"])
    return (scores, context)

In [18]:
# A utility function to see document retrieval scores
def print_scores(response):
    t=np.array([getattr(response['answers'][0], "meta")["titles"], getattr(response['answers'][0],"document_ids"), getattr(response['answers'][0], "meta")["doc_scores"]])
    tt=np.transpose(t)
    print(tabulate(tt, headers=["titles", "doc_paragraph_ids", "doc_scores"]))

In [19]:
## number of documents to consider in context
topk=5 #@param {type:"int"}

## A simple Q&A chatbot
No context memory, single turn.

In [44]:
def qa_chatbot():
    quit=False
    while quit == False:
        question = str(input('Question: '))
        ##an optional quit command
        if question == 'quit()':
            quit=True
        else:
            docs=p_retrieval.run(query=question, params={"Retriever": {"top_k": topk}})
            scores, context=get_scores(docs)
            background=""
            for doc in context.Paragraph_text:
                background+=doc+"\n"
            prompt=f"Based on the following context:\n{background}\nAnswer the following question in detail in less than 50 words:\n{question}"
            print(prompt)
            response = generation_model.predict(
                prompt=prompt,
                **qa_parameters
            )
            answer=response.text
            print("\n"+answer)
            print(tabulate(scores, headers=["titles", "doc_paragraph_ids", "doc_scores"]))
            print('\n')

In [45]:
qa_chatbot()

Question:  Dad is an undocumented person, will his income still be counted in the FAP budget?


Querying:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Querying: 100%|██████████| 1/1 [00:00<00:00, 10.05it/s]


Based on the following context:
 Mary's
children are U.S. citizens. Mary is an ineligible non-citizen for FAP.
Mary's income is divided by 3 (number in Mary's group). Of Mary's
income, Bridges budgets the children's portion (2/3) and excludes
A member add that increases benefits is effective the month after
it is reported or, if the new member left another group, the month
after the member delete. 
DEPARTMENT OF HEALTH & HUMAN SERVICES
The number of eligible FAP group members is added to the
number of disqualified persons that live with the group.
Next the disqualified/ineligible person's income is divided by
the number of persons in step 1.
Then the result in step 2 is multiplied by the number of eligible
Do not apply these rules to the income of eligible group members,
or non-group members. (See BEM 212)
Example: Group consists of Mary and her 2 children. 
If the FAP budgetable income included FIP/SDA benefits, use the
grant amount actually received in the overissuance month. Use the

Question:  quit()


## Simple multi-turn chatbot
with context memory

In [55]:
def conv_chatbot():
    quit=False
    while quit == False:
        question = str(input('Question: '))
        ## quit command
        if question == 'quit()':
            quit=True
        else:
            docs=p_retrieval.run(query=question, params={"Retriever": {"top_k": topk}})
            scores, context=get_scores(docs)
            background=""
            for doc in context.Paragraph_text:
                background+=doc+"\n"
            
            new=False
                
            chat = chat_model.start_chat(
                context=f"Based on the following context:\n{background}\nAnswer the following questions in detail in less than 50 words"
            )
            
            while new==False:
                response = chat.send_message(question, **chat_parameters)
                print("\n"+response.text+"\n")
                question = str(input('Question: '))
                ## new context command
                if question=="quit()":
                    quit=True
                    break
                elif question == 'new()':
                    new=True
                   


In [56]:
conv_chatbot()

Question:  Sarah is a full-time college student, can she be eligible for FAP?


Querying:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Querying: 100%|██████████| 1/1 [00:00<00:00,  9.83it/s]



 Yes, as long as she meets the other eligibility requirements, she can be eligible for FAP. 



Question:  are there any work requirements?



 Yes, she must participate in a state or federally-financed work study program during the regular school year. 



Question:  quit()


## Clean up

In [None]:
# run in experimental phase to clean up everything
document_store.delete_documents()