In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Build your own Grounded RAG application using Vertex AI Search Standalone APIs for RAG and LangChain

<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/generative-ai/blob/main/search/retrieval-augmented-generation/vertex_ai_search_standalone_apis.ipynb">
      <img width="32px" src="https://www.gstatic.com/pantheon/images/bigquery/welcome_page/colab-logo.svg" alt="Google Colaboratory logo"><br> Open in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fgenerative-ai%2Fmain%2Fsearch%2Fretrieval-augmented-generation%2Fvertex_ai_search_standalone_apis.ipynb">
      <img width="32px" src="https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN" alt="Google Cloud Colab Enterprise logo"><br> Open in Colab Enterprise
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/main/search/retrieval-augmented-generation/vertex_ai_search_standalone_apis.ipynb">
      <img src="https://www.gstatic.com/images/branding/gcpiconscolors/vertexai/v1/32px.svg" alt="Vertex AI logo"><br> Open in Vertex AI Workbench
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/generative-ai/blob/main/search/retrieval-augmented-generation/vertex_ai_search_standalone_apis.ipynb">
      <img width="32px" src="https://upload.wikimedia.org/wikipedia/commons/9/91/Octicons-mark-github.svg" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>

| | |
|-|-|
| Author(s) | [Abhishek Bhagwat](https://github.com/Abhishekbhagwat), [Rajesh Thallam](https://github.com/RajeshThallam), [Holt Skinner](https://github.com/holtskinner) |
| Reviewers(s) | [Alan Blount](https://github.com/zeroasterisk), [Skander Hannachi](https://github.com/SkanderHn)|
| Last updated | 2024-08-07 |

# 📌 Overview

In this notebook, we show you how to use the [Vertex AI Search Component APIs for RAG](https://cloud.google.com/generative-ai-app-builder/docs/builder-apis) to build a custom search solution on your own documents.

---

Building a robust custom (DIY) Retrieval Augmented Generation (RAG) system for grounding can be challenging. Vertex AI Search simplifies the process with a suite of flexible standalone APIs to help you create your own search solutions.

* **[Document AI Layout Parser](https://cloud.google.com/document-ai/docs/layout-parse-chunk)**: Transforms documents into structured representations, making content easily accessible. Creates context-aware chunks for improved information retrieval in generative AI and discovery applications.
* **[Ranking API](https://cloud.google.com/generative-ai-app-builder/docs/ranking)**: Re-ranks search results based on relevance to the original query. Enhances RAG accuracy by optimizing retrieval beyond initial nearest neighbor search.
* **[Check Grounding API](https://cloud.google.com/generative-ai-app-builder/docs/check-grounding)**: Acts as a "validator" to determine whether statements or claims are supported by provided facts (essentially how grounded a given piece of text is in a given set of reference text). Enables online flagging of ungrounded responses and offline evaluation of generative responses.
* **[Grounded Generation API](https://cloud.google.com/generative-ai-app-builder/docs/grounded-gen)**: Generate grounded answers to prompts based on the input data.

**Key Features**:

* **Leverage Vertex AI Search technology**:  Build custom RAG and Grounded Generation solutions using the same technology that powers Vertex AI Search.
* **Granular control**: Tailor your RAG system to specific use cases and offer greater control to your users.
* **Seamless integration**: Combine these APIs with core services like Embeddings API and Vector Search for advanced grounded AI applications.

---

# 📐 Architecture

Following is a high-level architecture of what we will build in this notebook.

You will perform the following steps:

- **Step 1. Data Ingestion:** Parse the documents in a Cloud Storage bucket using [Document AI Layout Parser](https://cloud.google.com/document-ai/docs/layout-parse-chunk) and convert the raw text chunks as embeddings using the [Vertex AI Embeddings API](https://cloud.google.com/vertex-ai/generative-ai/docs/embeddings/get-text-embeddings). The generated embeddings power semantic search using [Vertex AI Vector Search](https://cloud.google.com/vertex-ai/docs/vector-search/overview) (vector database).

- **Step 2. Retrieval:** Retrieve relevant chunks from Vertex AI Vector Search for a given user query and re-rank the chunks using [Ranking API](https://cloud.google.com/generative-ai-app-builder/docs/ranking).

- **Step 3. Answer generation:** Use the [Grounded Generation API](https://cloud.google.com/vertex-ai/generative-ai/docs/multimodal/send-multimodal-prompts) to generate an answer for the given user query based on the re-ranked chunks retrieved from the vector search.

- **Step 4. Answer validation:** The generated answer is validated with [Check Grounding API](https://cloud.google.com/generative-ai-app-builder/docs/check-grounding) to determine how grounded the answer is to the relevant chunks retrieved.

The notebook uses the [Google Cloud + LangChain integrations](https://python.langchain.com/v0.1/docs/integrations/platforms/google/) to orchestrate the pipeline.

![vais-standalone-architecture.png](https://storage.googleapis.com/github-repo/search/vais-standalone-apis/vais-standalone-architecture.png)

---

# 🎬 Getting Started

The following steps are necessary to run this notebook, no matter what notebook environment you're using.

If you're entirely new to Google Cloud, [get started here](https://cloud.google.com/docs/get-started).


### Google Cloud Project Setup

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.
1. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).
1. [Enable the Service Usage, Vertex AI, Cloud Storage, Document AI, Discovery Engine APIs](https://console.cloud.google.com/flows/enableapi?apiid=serviceusage.googleapis.com,aiplatform.googleapis.com,storage.googleapis.com,documentai.googleapis.com,discoveryengine.googleapis.com).

### Google Cloud Permissions

**To run the complete Notebook, including the optional section, you will need to have the [Owner role](https://cloud.google.com/iam/docs/understanding-roles) for your project.**

If you want to skip the optional section, you need at least the following [roles](https://cloud.google.com/iam/docs/granting-changing-revoking-access):
* **`roles/serviceusage.serviceUsageAdmin`** to enable APIs
* **`roles/iam.serviceAccountAdmin`** to modify service agent permissions
* **`roles/aiplatform.user`** to use AI Platform components
* **`roles/storage.objectAdmin`** to modify and delete GCS buckets
* **`roles/documentai.admin`** to create and use Document AI Processors
* **`roles/discoveryengine.admin`** to modify Vertex AI Search assets

### Install Vertex AI SDK and Other Required Packages

In [None]:
%pip install google-cloud-aiplatform google-cloud-discoveryengine google-cloud-documentai google-cloud-documentai-toolbox google-cloud-storage langchain-google-vertexai langchain-google-community[vertexaisearch,docai] rich --upgrade --quiet

### Restart Runtime

To use the newly installed packages in this Jupyter runtime, you must restart the runtime. You can do this by running the cell below, which will restart the current kernel.

In [None]:
# Restart kernel after installs so that your environment can access the new packages
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Please wait until it is finished before continuing to the next step. ⚠️</b>
</div>

### Authenticate

If you're using Colab, run the code in the next cell. Follow the popups and authenticate with an account that has access to your Google Cloud [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#identifying_projects).

If you're running this notebook somewhere besides Colab, make sure your environment has the right Google Cloud access. If that's a new concept to you, consider looking into [Application Default Credentials for your local environment](https://cloud.google.com/docs/authentication/provide-credentials-adc#local-dev) and [initializing the Google Cloud CLI](https://cloud.google.com/docs/authentication/gcloud). In many cases, running `gcloud auth application-default login` in a shell on the machine running the notebook kernel is sufficient.

More authentication options are discussed [here](https://cloud.google.com/docs/authentication).

In [None]:
# Colab authentication.
import sys

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()

### Set Google Cloud project information and Initialize Vertex AI SDK

To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).

Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

Make sure to change `PROJECT_ID` in the next cell. You can leave the values for `REGION` unless you have a specific reason to change them.

In [None]:
import vertexai

PROJECT_ID = "[your-project-id]"  # @param {type:"string"}
REGION = "us-central1"  # @param {type:"string"}

vertexai.init(project=PROJECT_ID, location=REGION)

### Import Libraries

In [None]:
import hashlib
from typing import List, Optional
import uuid

from IPython.display import HTML, display
from google.api_core.client_options import ClientOptions
from google.cloud import aiplatform, discoveryengine, documentai, storage
from google.cloud.aiplatform import MatchingEngineIndex, MatchingEngineIndexEndpoint
from langchain.docstore.document import Document
from langchain.prompts import PromptTemplate
from langchain.retrievers.contextual_compression import ContextualCompressionRetriever
from langchain_community.document_loaders.gcs_directory import GCSDirectoryLoader
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough, chain
from langchain_google_community import VertexAICheckGroundingWrapper, VertexAIRank
from langchain_google_community.docai import DocAIParser
from langchain_google_vertexai import VertexAI
from langchain_google_vertexai.embeddings import VertexAIEmbeddings
from langchain_google_vertexai.vectorstores.vectorstores import VectorSearchVectorStore
import markdown as md
from rich import print

### Initialize variables

Set the values for the name of your project. 

<div class="alert alert-block alert-info">
ⓘ You might already have all of these resources created in which case you should use their names and set <code>CREATE_RESOURCES=False</code>. If you do not already have this all created, you should set new names for your Cloud Storage bucket, index, index endpoint, and Document AI processor.
</div>

**TIP:** stick to `hyphenated-lower-case` naming conventions, and use the same project name as a component of each of these names.

In [None]:
# Cloud storage buckets
GCS_BUCKET_URI = "gs://[your-bucket-name]"  # @param {type:"string"}
GCS_OUTPUT_PATH = f"{GCS_BUCKET_URI}"  # DocAI Layout Parser Output Path
GCS_BUCKET_NAME = GCS_BUCKET_URI.replace("gs://", "")

# Vertex AI Vector Search
# parameter description here
# https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.MatchingEngineIndex#google_cloud_aiplatform_MatchingEngineIndex_create_tree_ah_index
VS_INDEX_NAME = "[your-index-name]"  # @param {type:"string"}
VS_INDEX_ENDPOINT_NAME = "[your-index-endpoint-name]"  # @param {type:"string"}
VS_CONTENTS_DELTA_URI = f"{GCS_BUCKET_URI}/index/embeddings"
VS_DIMENSIONS = 768
VS_APPROX_NEIGHBORS = 150
VS_INDEX_UPDATE_METHOD = "STREAM_UPDATE"
VS_INDEX_SHARD_SIZE = "SHARD_SIZE_SMALL"
VS_LEAF_NODE_EMB_COUNT = 500
VS_LEAF_SEARCH_PERCENT = 80
VS_DISTANCE_MEASURE_TYPE = "DOT_PRODUCT_DISTANCE"
VS_MACHINE_TYPE = "e2-standard-16"
VS_MIN_REPLICAS = 1
VS_MAX_REPLICAS = 1
VS_DESCRIPTION = "Index for DIY RAG with Vertex AI APIs"  # @param {type:"string"}

# Models
EMBEDDINGS_MODEL_NAME = "text-embedding-004"
LLM_MODEL_NAME = "gemini-1.5-pro"

# DocumentAI Processor
DOCAI_LOCATION = "us"  # @param ["us", "eu"]
DOCAI_PROCESSOR_NAME = "[your-docai-processor-name]"  # @param {type:"string"}

# Enable/disable flags
# flag to create Google Cloud resources configured above
# refer to the notes before this cell
CREATE_RESOURCES = False  # @param {type:"boolean"}
# flag to run data ingestion
RUN_INGESTION = True  # @param {type:"boolean"}

In [None]:
# @title Utility function to create resources


def create_uuid(name: str) -> str:
    hex_string = hashlib.md5(name.encode("UTF-8")).hexdigest()
    return str(uuid.UUID(hex=hex_string))


def create_bucket(bucket_name: str) -> storage.Bucket:
    # create Cloud Storage bucket if does not exists
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    if bucket.exists():
        print(f"Bucket {bucket.name} exists")
        return bucket

    if not CREATE_RESOURCES:
        return bucket

    bucket = storage_client.create_bucket(bucket_name)
    print(f"Bucket {bucket.name} created")
    return bucket


def create_index() -> Optional[MatchingEngineIndex]:
    index_names = [
        index.resource_name
        for index in MatchingEngineIndex.list(filter=f"display_name={VS_INDEX_NAME}")
    ]

    if len(index_names) > 0:
        vs_index = MatchingEngineIndex(index_name=index_names[0])
        print(
            f"Vector Search index {vs_index.display_name} exists with resource name {vs_index.resource_name}"
        )
        return vs_index

    if not CREATE_RESOURCES:
        print(
            f"CREATE_RESOURCES flag set to {CREATE_RESOURCES}. Skip creating resources"
        )
        return None

    print(f"Creating Vector Search index {VS_INDEX_NAME} ...")
    vs_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
        display_name=VS_INDEX_NAME,
        dimensions=VS_DIMENSIONS,
        approximate_neighbors_count=VS_APPROX_NEIGHBORS,
        distance_measure_type=VS_DISTANCE_MEASURE_TYPE,
        leaf_node_embedding_count=VS_LEAF_NODE_EMB_COUNT,
        leaf_nodes_to_search_percent=VS_LEAF_SEARCH_PERCENT,
        description=VS_DESCRIPTION,
        shard_size=VS_INDEX_SHARD_SIZE,
        index_update_method=VS_INDEX_UPDATE_METHOD,
        project=PROJECT_ID,
        location=REGION,
    )
    print(
        f"Vector Search index {vs_index.display_name} created with resource name {vs_index.resource_name}"
    )
    return vs_index


def create_index_endpoint() -> Optional[MatchingEngineIndexEndpoint]:
    endpoint_names = [
        endpoint.resource_name
        for endpoint in MatchingEngineIndexEndpoint.list(
            filter=f"display_name={VS_INDEX_ENDPOINT_NAME}"
        )
    ]

    if len(endpoint_names) > 0:
        vs_endpoint = MatchingEngineIndexEndpoint(index_endpoint_name=endpoint_names[0])
        print(
            f"Vector Search index endpoint {vs_endpoint.display_name} exists with resource name {vs_endpoint.resource_name}"
        )
        return vs_endpoint

    if not CREATE_RESOURCES:
        print(
            f"CREATE_RESOURCES flag set to {CREATE_RESOURCES}. Skip creating resources"
        )
        return None

    print(f"Creating Vector Search index endpoint {VS_INDEX_ENDPOINT_NAME} ...")
    vs_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
        display_name=VS_INDEX_ENDPOINT_NAME,
        public_endpoint_enabled=True,
        description=VS_DESCRIPTION,
        project=PROJECT_ID,
        location=REGION,
    )
    print(
        f"Vector Search index endpoint {vs_endpoint.display_name} created with resource name {vs_endpoint.resource_name}"
    )
    return vs_endpoint


def deploy_index(
    index: MatchingEngineIndex, endpoint: MatchingEngineIndexEndpoint
) -> Optional[MatchingEngineIndexEndpoint]:
    index_endpoints = [
        (deployed_index.index_endpoint, deployed_index.deployed_index_id)
        for deployed_index in index.deployed_indexes
    ]

    if len(index_endpoints) > 0:
        vs_deployed_index = MatchingEngineIndexEndpoint(
            index_endpoint_name=index_endpoints[0][0]
        )
        print(
            f"Vector Search index {index.display_name} is already deployed at endpoint {vs_deployed_index.display_name}"
        )
        return vs_deployed_index

    if not CREATE_RESOURCES:
        print(
            f"CREATE_RESOURCES flag set to {CREATE_RESOURCES}. Skip creating resources"
        )
        return None

    print(
        f"Deploying Vector Search index {index.display_name} at endpoint {endpoint.display_name} ..."
    )
    deployed_index_id = (
        f'{VS_INDEX_NAME}_{create_uuid(VS_INDEX_NAME).split("-")[-1]}'.replace("-", "_")
    )
    vs_deployed_index = endpoint.deploy_index(
        index=index,
        deployed_index_id=deployed_index_id,
        display_name=VS_INDEX_NAME,
        machine_type=VS_MACHINE_TYPE,
        min_replica_count=VS_MIN_REPLICAS,
        max_replica_count=VS_MAX_REPLICAS,
    )
    print(
        f"Vector Search index {index.display_name} is deployed at endpoint {vs_deployed_index.display_name}"
    )
    return vs_deployed_index


def create_docai_processor(
    processor_display_name: str = DOCAI_PROCESSOR_NAME,
    processor_type: str = "LAYOUT_PARSER_PROCESSOR",
) -> Optional[documentai.Processor]:
    # Set the api_endpoint if you use a location other than 'us'
    opts = ClientOptions(api_endpoint=f"{DOCAI_LOCATION}-documentai.googleapis.com")
    docai_client = documentai.DocumentProcessorServiceClient(client_options=opts)
    parent = docai_client.common_location_path(PROJECT_ID, DOCAI_LOCATION)
    # Check if processor exists
    processor_list = docai_client.list_processors(parent=parent)
    processors = [
        processor.name
        for processor in processor_list
        if (
            processor.display_name == processor_display_name
            and processor.type_ == processor_type
        )
    ]

    if len(processors) > 0:
        docai_processor = docai_client.get_processor(name=processors[0])
        print(
            f"Document AI processor {docai_processor.display_name} is already created"
        )
        return docai_processor

    if not CREATE_RESOURCES:
        print(
            f"CREATE_RESOURCES flag set to {CREATE_RESOURCES}. Skip creating resources"
        )
        return None

    # Create a processor
    print(
        f"Creating Document AI processor {processor_display_name} of type {processor_type} ..."
    )
    docai_processor = docai_client.create_processor(
        parent=parent,
        processor=documentai.Processor(
            display_name=processor_display_name, type_=processor_type
        ),
    )
    print(
        f"Document AI processor {processor_display_name} of type {processor_type} is created."
    )
    return docai_processor

In [None]:
# @title Utility methods for adding index to Vertex AI Vector Search


def get_batches(items: List, n: int = 1000) -> List[List]:
    n = max(1, n)
    return [items[i : i + n] for i in range(0, len(items), n)]


def add_data(vector_store, chunks) -> None:
    if RUN_INGESTION:
        batch_size = 1000
        texts = get_batches([chunk.page_content for chunk in chunks], n=batch_size)
        metadatas = get_batches([chunk.metadata for chunk in chunks], n=batch_size)

        for i, (b_texts, b_metadatas) in enumerate(zip(texts, metadatas)):
            print(f"Adding {len(b_texts)} data points to index")
            is_complete_overwrite = bool(i == 0)
            vector_store.add_texts(
                texts=b_texts,
                metadatas=b_metadatas,
                is_complete_overwrite=is_complete_overwrite,
            )
    else:
        print("Skipping ingestion. Enable `RUN_INGESTION` flag")

In [None]:
# @title Utility methods for displaying rich content results


def get_chunk_content(results: List) -> List:
    return [
        doc.page_content.replace("\n", "<br>")
        + f'<br><br> <b><a href="">Source: {doc.metadata.get("source")}</a></b>'
        for doc in results
    ][:5]


CONTRASTING_COLORS = [
    "rgba(255, 0, 0, 0.2)",  # Semi-transparent red
    "rgba(0, 255, 0, 0.2)",  # Semi-transparent green
    "rgba(0, 0, 255, 0.2)",  # Semi-transparent blue
    "rgba(255, 255, 0, 0.2)",  # Semi-transparent yellow
    "rgba(0, 255, 255, 0.2)",  # Semi-transparent cyan
    "rgba(255, 0, 255, 0.2)",  # Semi-transparent magenta
    "rgba(255, 165, 0, 0.2)",  # Semi-transparent orange
    "rgba(255, 105, 180, 0.2)",  # Semi-transparent pink
    "rgba(75, 0, 130, 0.2)",  # Semi-transparent indigo
    "rgba(255, 192, 203, 0.2)",  # Semi-transparent light pink
    "rgba(64, 224, 208, 0.2)",  # Semi-transparent turquoise
    "rgba(128, 0, 128, 0.2)",  # Semi-transparent purple
    "rgba(210, 105, 30, 0.2)",  # Semi-transparent chocolate
    "rgba(220, 20, 60, 0.2)",  # Semi-transparent crimson
    "rgba(95, 158, 160, 0.2)",  # Semi-transparent cadet blue
    "rgba(255, 99, 71, 0.2)",  # Semi-transparent tomato
    "rgba(144, 238, 144, 0.2)",  # Semi-transparent light green
    "rgba(70, 130, 180, 0.2)",  # Semi-transparent steel blue
]


def convert_markdown_to_html(text: str) -> str:
    # Convert Markdown to HTML, ensuring embedded HTML is preserved and interpreted correctly.
    md_extensions = [
        "extra",
        "abbr",
        "attr_list",
        "def_list",
        "fenced_code",
        "footnotes",
        "md_in_html",
        "tables",
        "admonition",
        "codehilite",
        "legacy_attrs",
        "legacy_em",
        "meta",
        "nl2br",
        "sane_lists",
        "smarty",
        "toc",
        "wikilinks",
    ]
    return str(md.markdown(text, extensions=md_extensions))


# Utility function to create HTML table with colored results
def display_html_table(simple_results: List[str], reranked_results: List[str]) -> None:
    # Find all unique values in both lists
    unique_values = set(simple_results + reranked_results)

    # Ensure we have enough colors for all unique values
    # If not, colors will repeat, which might not be ideal but is necessary if the number of unique values exceeds the number of colors
    colors = CONTRASTING_COLORS * (len(unique_values) // len(CONTRASTING_COLORS) + 1)

    # Create a dictionary to map each unique value to a color
    color_map = dict(zip(unique_values, colors))

    # Initialize the HTML table with style for equal column widths
    html = """
    <style>
    td, th {
        padding: 8px;
        text-align: left;
        border-bottom: 1px solid #ddd;
        color: #000;
    }
    tr {background-color: #ffffff;}
    /* Set table layout to fixed to respect column widths */
    table {
        table-layout: fixed;
        width: 100%; /* You can adjust the overall table width as needed */
        max-height: 100vh !important; /* Set the maximum height of the table */
        overflow-y: auto; /* Add a vertical scrollbar if the content exceeds the maximum height */
    }
    /* Set equal width for both columns */
    td, th {
        width: 50%;
    }
    .text-black {
        color: #000; /* Set the text color to black */
    }
    </style>
    <table>
    <tr><th>Retriever Results</th><th>Reranked Results</th></tr>
    """
    # Iterate over the results and assign the corresponding color to each cell
    for simple, reranked in zip(simple_results, reranked_results):
        html += f"""
        <tr>
            <td style='color: black; background-color: {color_map[simple]}; font-size: 8px;'>
                <p class='text-black'>{convert_markdown_to_html(simple)}</p>
            </td>
            <td style='color: black; background-color: {color_map[reranked]}; font-size: 8px;'>
                <p class='text-black'>{convert_markdown_to_html(reranked)}</p>
            </td>
        </tr>
        """
    html += "</table>"
    display(HTML(html))


def get_sxs_comparison(
    simple_retriever, reranking_api_retriever, query, search_kwargs
) -> List:
    simple_results = get_chunk_content(
        simple_retriever.invoke(query, search_kwargs=search_kwargs)
    )
    reranked_results = get_chunk_content(
        reranking_api_retriever.invoke(query, search_kwargs=search_kwargs)
    )
    display_html_table(simple_results, reranked_results)

    return reranked_results


def display_grounded_generation(response) -> None:
    # Extract the answer with citations and cited chunks
    answer_with_citations = response.answer_with_citations
    cited_chunks = response.cited_chunks

    # Build HTML for the chunks
    chunks_html = "".join(
        [
            f"<div id='chunk-{index}' class='chunk'>"
            + f"<div class='source'>Source {index}: <a href='{chunk['source'].metadata['source']}' target='_blank'>{chunk['source'].metadata['source']}</a></div>"
            + f"<p>{chunk['chunk_text']}</p>"
            + "</div>"
            for index, chunk in enumerate(cited_chunks)
        ]
    )

    # Replace citation indices with hoverable spans
    for index in range(len(cited_chunks)):
        answer_with_citations = answer_with_citations.replace(
            f"[{index}]",
            f"<span class='citation' onmouseover='highlight({index})' onmouseout='unhighlight({index})'>[{index}]</span>",
        )

    # The complete HTML
    html_content = f"""
    <style>
    body {{
        font-family: Arial, sans-serif;
        background-color: #e7f0fd;
        padding: 20px;
    }}
    .answer-box {{
        background-color: #f8f9fa;
        border-left: 4px solid #0056b3;
        padding: 20px;
        margin-bottom: 20px;
        color: #000;
    }}
    .citation {{
        background-color: transparent;
        cursor: pointer;
    }}
    .chunk {{
        background-color: #ffffff;
        border-left: 4px solid #007bff;
        padding: 10px;
        margin-bottom: 10px;
        transition: background-color 0.3s;
        color: #000;
    }}
    .source {{
        font-weight: bold;
        margin-bottom: 5px;
    }}
    a {{
        text-decoration: none;
        color: #0056b3;
    }}
    a:hover {{
        text-decoration: underline;
    }}
    </style>
    <div class='answer-box'>{answer_with_citations}</div>
    <div class='chunks-box'>{chunks_html}</div>
    <script>
    function highlight(index) {{
        // Highlight the citation in the answer
        document.querySelectorAll('.citation').forEach(function(citation) {{
            if (citation.textContent === '[' + index + ']') {{
                citation.style.backgroundColor = '#ffff99';
            }}
        }});
        // Highlight the corresponding chunk
        document.getElementById('chunk-' + index).style.backgroundColor = '#ffff99';
    }}
    function unhighlight(index) {{
        // Unhighlight the citation in the answer
        document.querySelectorAll('.citation').forEach(function(citation) {{
            if (citation.textContent === '[' + index + ']') {{
                citation.style.backgroundColor = 'transparent';
            }}
        }});
        // Unhighlight the corresponding chunk
        document.getElementById('chunk-' + index).style.backgroundColor = '#ffffff';
    }}
    </script>
    """
    display(HTML(html_content))

# ⚙️ Initialize resources

The DIY RAG application requires the following resources, which will be provisioned by this step if not already present:

- Document AI Layout Parser processor to parse the input documents
- Vertex AI Vector Search index and endpoint to host the index for vector search
- Cloud Storage bucket to store documents

<div class="alert alert-block alert-warning">
<b>⚠️ Resource creation will be skipped if <code>CREATE_RESOURCES</code> flag is set to <code>False</code> in the Initialize Variables section.  ⚠️</b>
</div>


In [None]:
if CREATE_RESOURCES:
    print("Creating new resources.")
else:
    print("Resource creation is skipped.")

# Create bucket if not exists
bucket = create_bucket(GCS_BUCKET_NAME)

# Create vector search index if not exists else return index resource name
vs_index = create_index()

# Create vector search index endpoint if not exists else return index endpoint resource name
vs_endpoint = create_index_endpoint()

# Deploy index to the index endpoint
deploy_index(vs_index, vs_endpoint)

# Create Document Layout Processor
docai_processor = create_docai_processor(processor_display_name=DOCAI_PROCESSOR_NAME)
PROCESSOR_NAME = docai_processor.name  # DocAI Layout Parser Processor Name

# 📥 Data Ingestion

## 📄 Document Processing and Indexing

This steps reads documents from Cloud Storage bucket, parses them using Document AI layout processor, extracts chunks from the parsed document, generates emebeddings using Vertex AI Embeddings API and add them to the Vertex AI Vector Search index.

[These](https://cloud.google.com/generative-ai-app-builder/docs/prepare-data#storage-unstructured) are some sample public datasets available in GCS for usage.

### Step 1. Process Documents

**1.1 Read document paths from Cloud Storage bucket**

Here we are reading documents from a public Cloud Storage bucket with Alphabet investor reports for years 2021, 2022 and 2023. You can replace them with your own documents hosted in Cloud Storage bucket.

In [None]:
loader = GCSDirectoryLoader(
    project_name=PROJECT_ID,
    bucket="github-repo",
    prefix="search/vais-standalone-apis/alphabet-investor-pdfs",
)
doc_blobs = loader.load()

**1.2 Parse raw documents and chunk them**

We will be utilizing the Document AI Layout Parser to read files from Cloud Storage bucket as Blobs and then convert them as **layout-aware** chunks. Layout Parser extracts document content elements like text, tables, and lists, and creates context-aware chunks that are incredibly useful for building RAG applications.

- Define Document AI Layout Parser

In [None]:
parser = DocAIParser(
    project_id=PROJECT_ID,
    location=DOCAI_LOCATION,
    processor_name=PROCESSOR_NAME,
    gcs_output_path=GCS_OUTPUT_PATH,
)

- Process the documents

In [None]:
docs = list(
    parser.batch_parse(
        doc_blobs,  # filter only last 40 for docs after 2020
        chunk_size=500,
        include_ancestor_headings=True,
    )
)

- Examine a chunk

Let's examine one of the chunks. Notice that the document is parsed into different sections like title, subtitle and even a markdown table (especially a complex table with merged cells!).

This makes it easy for retrieval as well for the downstream generation tasks. For example, LLM can now reason more effectively and more accurate.

In [None]:
print(docs[1].page_content)

### Step 2: Index the chunk embeddings

The previous chunks of text are still just text. This step creates [embeddings](https://cloud.google.com/vertex-ai/generative-ai/docs/embeddings/get-text-embeddings) of the text chunks returned from the layout parser and upserts them into [Vertex AI Vector Search](https://cloud.google.com/vertex-ai/docs/vector-search/overview) index. 

Next up we will then use Vertex AI Vector Search as a retriever for the RAG pipeline. Vector Search offers blazing fast retrieval that scales to billions of vectors with high recall, resulting in better searches at speed.

<div class="alert alert-block alert-warning">
<b>⚠️ Remember to run the Initialize Resources section to create and configure Vector Search index. ⚠️</b>
</div>

**2.1 Define the model for creating embeddings.**

In [None]:
embedding_model = VertexAIEmbeddings(model_name=EMBEDDINGS_MODEL_NAME)

**2.2 Initialize the Vertex AI Vector Search retriever.**

In [None]:
vector_store = VectorSearchVectorStore.from_components(
    project_id=PROJECT_ID,
    region=REGION,
    gcs_bucket_name=GCS_BUCKET_NAME,
    index_id=vs_index.resource_name,
    endpoint_id=vs_endpoint.resource_name,
    embedding=embedding_model,
    stream_update=True,
)

**2.3 Store chunks as embeddings in the Vector Search index and raw texts in the Cloud Storage bucket.**

<div class="alert alert-block alert-warning">
<b>⚠️ To skip ingestion and query pre-indexed documents, set  <code>RUN_INGESTION</code> <code>False</code>.
⚠️</b>
</div>


In [None]:
add_data(vector_store, docs)

# 🤖 Serving

All of the setup is done.  You retrieved source documents, processed and chunked them, embedded them into vectors and upserted them into Vector Search.  

Now it's time to do some searches and generate grounded text.

## 🔎 Retrieval and Ranking

### Step 3. Retrieve and Re-rank Chunks

In this step, Vertex AI Vector Search retrieves the top-k relevant results, which are then re-ranked by Vertex AI Ranking API based on chunk content and semantic similarity to the query.

More on the Vertex Search Ranking API:

> The Vertex AI Search Ranking API is one of the standalone APIs in Vertex AI Agent Builder. It takes a list of documents and reranks those documents based on how relevant the documents are to a query. Compared to embeddings, which look only at the semantic similarity of a document and a query, the ranking API can give you precise scores for how well a document answers a given query. The ranking API can be used to improve the quality of search results after retrieving an initial set of candidate documents.
>
> The ranking API is stateless so there's no need to index documents before calling the API. All you need to do is pass in the query and documents. This makes the API well suited for reranking documents from any document retrievers.
>
> For more information, see [Rank and rerank documents](https://cloud.google.com/generative-ai-app-builder/docs/ranking).

**3.1 Define and combine retriever using Vector Search and reranker using the Vertex AI Ranking API.**

In [None]:
# Instantiate the VertexAIReranker with the SDK manager
reranker = VertexAIRank(
    project_id=PROJECT_ID,
    location_id="global",
    ranking_config="default_ranking_config",
    title_field="source",  # metadata field to preserve with reranked results
    top_n=5,
)

basic_retriever = vector_store.as_retriever(
    search_kwargs={"k": 5}
)  # fetch top 5 documents

# Create the ContextualCompressionRetriever with the VertexAIRanker as a Reranker
retriever_with_reranker = ContextualCompressionRetriever(
    base_compressor=reranker, base_retriever=basic_retriever
)

**3.2 Examine results before and after re-ranking**

See the difference reranking makes! By prioritizing semantically relevant documents, the Ranking API improves the LLM's context, leading to more accurate and well-reasoned answers. Compare the `Retriever Results` and the `Reranked Results` side-by-side to see the improvement.

In [None]:
reranked_results = get_sxs_comparison(
    simple_retriever=basic_retriever,
    reranking_api_retriever=retriever_with_reranker,
    query="what was google cloud revenue in 2023 ?",
    search_kwargs={"k": 5},
)

## 💬 Answer Generation

You have retrieved the most relevant facts from the all of your indexed source data.  Now we pass those facts into the LLM for answer generation, which will be grounded on the facts.

### Step 4. Query in Real Time and Check Grounding

Let's now configure a standard retrieval and answer generation chain that follows: `query` -> `vector search` -> `retrieve documents` -> `LLM for answer generation` with a couple of changes:

1. We will pass retrieved documents to the reranker API via the `VertexAIRank` and get the reranked documents to generate the answer.

2. After the answer is generated by the LLM, pass the answer and the retrieved documents from vector search as facts to the `VertexAICheckGroundingWrapper` to check how grounded the response from the LLM is.

More on the Vertex AI Check Grounding API:

> The [Vertex AI Check Grounding API](https://cloud.google.com/generative-ai-app-builder/docs/check-grounding) is one of the standalone APIs in [Vertex AI Agent Builder](https://cloud.google.com/generative-ai-app-builder/docs/builder-apis). It is used to determine how grounded a piece of text (called an answer candidate) is in a given set of reference texts (called facts).

> The Check Grounding API returns an overall support score of 0 to 1, which indicates how much the answer candidate agrees with the given facts. The response also includes citations to the facts supporting each claim in the answer candidate.

> You can use the Check Grounding API for checking any piece of text. It could be a human-generated blurb or a machine-generated response. A typical use case would be to check an LLM-generated response with respect to a given set of facts. Among other things, the citations generated by the API would help distinguish hallucinated claim in the response from grounded claims.

> For more information, see [Check Grounding](https://cloud.google.com/generative-ai-app-builder/docs/check-grounding).

**4.1 Define and configure retrieval and answer generation chain**

- Configure retriever from the vector store previously defined

In [None]:
retriever = vector_store.as_retriever(search_kwargs={"k": 5})

- Configure LLM with prompt template to generate answer

In [None]:
llm = VertexAI(model_name="gemini-1.5-pro-001", max_output_tokens=1024)
template = """
Answer the question based only on the following context:
{context}

Question:
{query}
"""
prompt = PromptTemplate.from_template(template)

create_answer = prompt | llm

- Define wrapper to call Vertex AI Check Grounding API on the generated answer

In [None]:
output_parser = VertexAICheckGroundingWrapper(
    project_id=PROJECT_ID,
    location_id="global",
    grounding_config="default_grounding_config",
    top_n=3,
)

- Define QA chain with Check Grounding

In [None]:
@chain
def check_grounding_output_parser(answer_candidate: str, documents: List[Document]):
    return output_parser.with_config(configurable={"documents": documents}).invoke(
        answer_candidate
    )


setup_and_retrieval = RunnableParallel(
    {"context": retriever, "query": RunnablePassthrough()}
)


@chain
def qa_with_check_grounding(query):
    docs = setup_and_retrieval.invoke(query)
    answer_candidate = create_answer.invoke(docs)
    check_grounding_output = check_grounding_output_parser.invoke(
        answer_candidate, documents=docs["context"]
    )
    return check_grounding_output

**4.2 Invoke Generation Generation API Chain.**

In [None]:
result = qa_with_check_grounding.invoke("what was google cloud revenue in 2023 ?")
print(result)

**4.3 Check grounding**


In [None]:
display_grounded_generation(result)

In [None]:
result = qa_with_check_grounding.invoke(
    "what were the downstream effects of covid on alphabet?"
)
display_grounded_generation(result)

Congratulations!  You created a search engine from source documents, and wired in a real time RAG pipeline to retrieve only the most relevant facts and include them in your LLM generated responses, and you included a grounding verification step to ensure high quality results.

If you would like to evaluate your generated answered on more dimensions, take a look at the [Vertex AI Eval Service metrics for RAG](https://cloud.google.com/vertex-ai/generative-ai/docs/models/evaluation-examples#rag-qa) and you can get scores and explanations on many metrics like `question_answering_quality`, `question_answering_relevance`, `question_answering_helpfulness`, `groundedness`, `fulfillment`, `coherence`, `toxicity`, and more.

---

# 🧹 Cleaning up

Clean up resources created in this notebook.

In [None]:
DELETE_DOCAI_PROCESSOR = False
DELETE_INDEX = False
DELETE_BUCKET = False

- **Delete datapoints from Vector Search index**

In [None]:
# Delete datapoints from Vertex AI Vector Store


def delete_from_vector_search(
    vs_index: MatchingEngineIndex,
    vs_endpoint: MatchingEngineIndexEndpoint,
    delete: bool = False,
):
    neighbors = vs_endpoint.find_neighbors(
        deployed_index_id=vs_index.deployed_indexes[0].deployed_index_id,
        queries=[[0.0] * VS_DIMENSIONS],
        num_neighbors=5000,
        return_full_datapoint=False,
    )

    datapoint_ids = [neighbor.id for neighbor in neighbors[0]]

    # Delete datapoints
    if delete:
        print(f"Deleting {len(datapoint_ids)} datapoints")
        response = vs_index.remove_datapoints(datapoint_ids=datapoint_ids)
        print(response)


delete_from_vector_search(vs_index, vs_endpoint, delete=DELETE_INDEX)

- 🗑️ **Remove Vertex AI Vector Search Index and Endpoint**

In [None]:
if DELETE_INDEX:
    print(f"Undeploying all indexes and deleting the index endpoint {vs_endpoint}")
    vs_endpoint.undeploy_all()
    vs_endpoint.delete()
    print(f"Deleting the index {vs_index}")
    vs_index.delete()

- 🗑️ **Remove Document AI Processor**

In [None]:
if DELETE_DOCAI_PROCESSOR:
    docai_client = documentai.DocumentProcessorServiceClient()
    request = documentai.DeleteProcessorRequest(name=docai_processor.name)
    operation = docai_client.delete_processor(request=request)
    print("Waiting for delete processor operation to complete...")
    response = operation.result()
    print(response)

- 🗑️ **Remove Google Cloud Storage bucket**

In [None]:
if DELETE_BUCKET:
    ! gsutil -m rm -r $STAGING_BUCKET_URI

---