# Retrieval Augmented Generation - Procurement Contract Analyst -  Palm2 & LangChain

## Installation & Authentication

**Install google-cloud-aiplatform & langchain**

In [None]:
# Install langchain and related libraries
!pip install langchain unstructured

# Install Vertex AI LLM SDK
! pip install google-cloud-aiplatform==1.25.0



**Authenticate**

Within colab, a simple user authentication is adequate.

In [1]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth as google_auth
    google_auth.authenticate_user()


##Get Libraries & Classes

###LangChain Libraries

In [9]:
from langchain.embeddings import VertexAIEmbeddings
from langchain.vectorstores import MatchingEngine

"""Vertex Matching Engine implementation of the vector store."""
from __future__ import annotations

import logging
import uuid
from typing import Any, Iterable, List, Optional, Type

from langchain.docstore.document import Document
from langchain.embeddings import TensorflowHubEmbeddings
from langchain.embeddings.base import Embeddings
from langchain.embeddings import VertexAIEmbeddings
from langchain.vectorstores.base import VectorStore


import numpy as np
import json


### Vertex Libraries, Classes & Helper Functions
**Reference Libraries**

In this section, we will identify all the library classes that will be referenced in the code.

**Classes Defined**
*   _VertexCommon
*   VertexLLM
*   VertexEmbeddings


**Functions Defined**

*   rate_limit





In [4]:
# Using Vertex AI
import vertexai
from google.cloud import aiplatform
print(f"Vertex AI SDK version: {aiplatform.__version__}")

# Using Google Cloud Storage Directory loader from langchain
from langchain.document_loaders import GCSDirectoryLoader

import time

from pydantic import BaseModel, Extra, root_validator
from typing import Any, Mapping, Optional, List, Dict
from langchain.llms.base import LLM
from langchain.embeddings.base import Embeddings


from google.cloud import storage
from google.cloud.aiplatform import MatchingEngineIndex, MatchingEngineIndexEndpoint
from google.cloud import aiplatform_v1
from google.oauth2.service_account import Credentials
import google.auth
import google.auth.transport.requests



class _VertexCommon(BaseModel):
    """Wrapper around Vertex AI large language models.

    To use, you should have the
    ``google.cloud.aiplatform.private_preview.language_models`` python package
    installed.
    """
    client: Any = None #: :meta private:
    model_name: str = "text-bison@001"
    """Model name to use."""

    temperature: float = 0.2
    """What sampling temperature to use."""

    top_p: int = 0.8
    """Total probability mass of tokens to consider at each step."""

    top_k: int = 40
    """The number of highest probability tokens to keep for top-k filtering."""

    max_output_tokens: int = 200
    """The maximum number of tokens to generate in the completion."""

    @property
    def _default_params(self) -> Mapping[str, Any]:
        """Get the default parameters for calling Vertex AI API."""
        return {
            "temperature": self.temperature,
            "top_p": self.top_p,
            "top_k": self.top_k,
            "max_output_tokens": self.max_output_tokens
        }

    def _predict(self, prompt: str, stop: Optional[List[str]]) -> str:
        res = self.client.predict(prompt, **self._default_params)
        return self._enforce_stop_words(res.text, stop)

    def _enforce_stop_words(self, text: str, stop: Optional[List[str]]) -> str:
        if stop:
            return enforce_stop_tokens(text, stop)
        return text

    @property
    def _llm_type(self) -> str:
        """Return type of llm."""
        return "vertex_ai"

class VertexLLM(_VertexCommon, LLM):
    model_name: str = "text-bison@001"

    @root_validator()
    def validate_environment(cls, values: Dict) -> Dict:
        """Validate that the python package exists in environment."""
        try:
            from vertexai.preview.language_models import TextGenerationModel
        except ImportError:
            raise ValueError(
                "Could not import Vertex AI LLM python package. "
            )

        try:
            values["client"] = TextGenerationModel.from_pretrained(values["model_name"])
        except AttributeError:
            raise ValueError(
                "Could not set Vertex Text Model client."
            )

        return values

    def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:
        """Call out to Vertex AI's create endpoint.

        Args:
            prompt: The prompt to pass into the model.

        Returns:
            The string generated by the model.
        """
        return self._predict(prompt, stop)



def rate_limit(max_per_minute):
  period = 60 / max_per_minute
  print('Waiting')
  while True:
    before = time.time()
    yield
    after = time.time()
    elapsed = after - before
    sleep_time = max(0, period - elapsed)
    if sleep_time > 0:
      print('.', end='')
      time.sleep(sleep_time)


Vertex AI SDK version: 1.25.0


## Initiatlize Vertex AI

**We will need a project id and location where the Vertex compute and embedding will be hosted**

In [5]:
PROJECT_ID = "argolis-project-340214"  # @param {type:"string"}

LOCATION = "us-central1"  # @param {type:"string"}

# Initialize Vertex AI SDK
vertexai.init(project=PROJECT_ID, location=LOCATION)

## Build the Matching Engine



###*Define Parameters & Create Bucket for Embeddings*
Set the locations of the documents, embeddings, index and dimensions for the embedding vector*

In [6]:
ME_BUCKET = "matching_engine_bucket"
ME_REGION = "us-central1"
ME_INDEX_NAME = f"{PROJECT_ID}-me-index"  # @param {type:"string"}
ME_EMBEDDING_DIR = f"{PROJECT_ID}-me-bucket"  # @param {type:"string"}
ME_DIMENSIONS = 768  # @param {type:"integer"} when using Vertex PaLM Embedding

! set -x && gsutil mb -p $PROJECT_ID -l us-central1 gs://$ME_EMBEDDING_DIR


+ gsutil mb -p argolis-project-340214 -l us-central1 gs://argolis-project-340214-me-bucket
Creating gs://argolis-project-340214-me-bucket/...
ServiceException: 409 A Cloud Storage bucket named 'argolis-project-340214-me-bucket' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


###*Define Matching Engine Class & Utilities Class*

In [10]:

logger = logging.getLogger()


class MatchingEngine(VectorStore):
    """Vertex Matching Engine implementation of the vector store.

    While the embeddings are stored in the Matching Engine, the embedded
    documents will be stored in GCS.

    An existing Index and corresponding Endpoint are preconditions for
    using this module.

    See usage in docs/modules/indexes/vectorstores/examples/matchingengine.ipynb

    Note that this implementation is mostly meant for reading if you are
    planning to do a real time implementation. While reading is a real time
    operation, updating the index takes close to one hour."""

    def __init__(
        self,
        project_id: str,
        region: str,
        index: MatchingEngineIndex,
        endpoint: MatchingEngineIndexEndpoint,
        embedding: Embeddings,
        gcs_client: storage.Client,
        index_client: aiplatform_v1.IndexServiceClient,
        index_endpoint_client: aiplatform_v1.IndexEndpointServiceClient,
        gcs_bucket_name: str,
        credentials: Credentials = None,
    ):
        """Vertex Matching Engine implementation of the vector store.

        While the embeddings are stored in the Matching Engine, the embedded
        documents will be stored in GCS.

        An existing Index and corresponding Endpoint are preconditions for
        using this module.

        See usage in
        docs/modules/indexes/vectorstores/examples/matchingengine.ipynb.

        Note that this implementation is mostly meant for reading if you are
        planning to do a real time implementation. While reading is a real time
        operation, updating the index takes close to one hour.

        Attributes:
            project_id: The GCS project id.
            index: The created index class. See
            ~:func:`MatchingEngine.from_components`.
            endpoint: The created endpoint class. See
            ~:func:`MatchingEngine.from_components`.
            embedding: A :class:`Embeddings` that will be used for
            embedding the text sent. If none is sent, then the
            multilingual Tensorflow Universal Sentence Encoder will be used.
            gcs_client: The Google Cloud Storage client.
            credentials (Optional): Created GCP credentials.
        """
        super().__init__()
        self._validate_google_libraries_installation()

        self.project_id = project_id
        self.region = region
        self.index = index
        self.endpoint = endpoint
        self.embedding = embedding
        self.gcs_client = gcs_client
        self.index_client = index_client
        self.index_endpoint_client = index_endpoint_client
        self.gcs_client = gcs_client
        self.credentials = credentials
        self.gcs_bucket_name = gcs_bucket_name

    def _validate_google_libraries_installation(self) -> None:
        """Validates that Google libraries that are needed are installed."""
        try:
            from google.cloud import aiplatform, storage  # noqa: F401
            from google.oauth2 import service_account  # noqa: F401
        except ImportError:
            raise ImportError(
                "You must run `pip install --upgrade "
                "google-cloud-aiplatform google-cloud-storage`"
                "to use the MatchingEngine Vectorstore."
            )

    def add_texts(
        self,
        texts: Iterable[str],
        metadatas: Optional[List[dict]] = None,
        **kwargs: Any,
    ) -> List[str]:
        """Run more texts through the embeddings and add to the vectorstore.

        Args:
            texts: Iterable of strings to add to the vectorstore.
            metadatas: Optional list of metadatas associated with the texts.
            kwargs: vectorstore specific parameters.

        Returns:
            List of ids from adding the texts into the vectorstore.
        """
        logger.debug("Embedding documents.")
        embeddings = self.embedding.embed_documents(list(texts))
        insert_datapoints_payload = []
        ids = []

        # Streaming index update
        for idx, (embedding, text, metadata) in enumerate(
            zip(embeddings, texts, metadatas)
        ):
            id = uuid.uuid4()
            ids.append(id)
            self._upload_to_gcs(text, f"documents/{id}")
            metadatas[idx]
            insert_datapoints_payload.append(
                aiplatform_v1.IndexDatapoint(
                    datapoint_id=str(id),
                    feature_vector=embedding,
                    restricts=metadata if metadata else [],
                )
            )
            if idx % 100 == 0:
                upsert_request = aiplatform_v1.UpsertDatapointsRequest(
                    index=self.index.name, datapoints=insert_datapoints_payload
                )
                response = self.index_client.upsert_datapoints(request=upsert_request)
                insert_datapoints_payload = []
        if len(insert_datapoints_payload) > 0:
            upsert_request = aiplatform_v1.UpsertDatapointsRequest(
                index=self.index.name, datapoints=insert_datapoints_payload
            )
            _ = self.index_client.upsert_datapoints(request=upsert_request)

        logger.debug("Updated index with new configuration.")
        logger.info(f"Indexed {len(ids)} documents to Matching Engine.")

        return ids

    def _upload_to_gcs(self, data: str, gcs_location: str) -> None:
        """Uploads data to gcs_location.

        Args:
            data: The data that will be stored.
            gcs_location: The location where the data will be stored.
        """
        bucket = self.gcs_client.get_bucket(self.gcs_bucket_name)
        blob = bucket.blob(gcs_location)
        blob.upload_from_string(data)

    def get_matches(
        self,
        embeddings: List[str],
        n_matches: int,
        index_endpoint: MatchingEngineIndexEndpoint,
    ) -> str:
        """
        get matches from matching engine given a vector query
        Uses public endpoint

        """
        import requests
        import json

        request_data = {
            "deployed_index_id": index_endpoint.deployed_indexes[0].id,
            "return_full_datapoint": True,
            "queries": [
                {
                    "datapoint": {"datapoint_id": f"{i}", "feature_vector": emb},
                    "neighbor_count": n_matches,
                }
                for i, emb in enumerate(embeddings)
            ],
        }

        endpoint_address = self.endpoint.public_endpoint_domain_name
        rpc_address = f"https://{endpoint_address}/v1beta1/{index_endpoint.resource_name}:findNeighbors"
        endpoint_json_data = json.dumps(request_data)

        logger.debug(f"Querying Matching Engine Index Endpoint {rpc_address}")

        request = google.auth.transport.requests.Request()
        self.credentials.refresh(request)
        header = {"Authorization": "Bearer " + self.credentials.token}

        return requests.post(rpc_address, data=endpoint_json_data, headers=header)

    def similarity_search(
        self, query: str, k: int = 4, search_distance: float = 0.65, **kwargs: Any
    ) -> List[Document]:
        """Return docs most similar to query.

        Args:
            query: The string that will be used to search for similar documents.
            k: The amount of neighbors that will be retrieved.
            search_distance: filter search results by  search distance by adding a threshold value

        Returns:
            A list of k matching documents.
        """

        logger.debug(f"Embedding query {query}.")
        embedding_query = self.embedding.embed_documents([query])
        deployed_index_id = self._get_index_id()
        logger.debug(f"Deployed Index ID = {deployed_index_id}")

        # TO-DO: Pending query sdk integration
        # response = self.endpoint.match(
        #     deployed_index_id=self._get_index_id(),
        #     queries=embedding_query,
        #     num_neighbors=k,
        # )

        response = self.get_matches(embedding_query, k, self.endpoint)

        if response.status_code == 200:
            response = response.json()["nearestNeighbors"]
        else:
            raise Exception(f"Failed to query index {str(response)}")

        if len(response) == 0:
            return []

        logger.debug(f"Found {len(response)} matches for the query {query}.")

        results = []

        # I'm only getting the first one because queries receives an array
        # and the similarity_search method only recevies one query. This
        # means that the match method will always return an array with only
        # one element.
        for doc in response[0]["neighbors"]:
            page_content = self._download_from_gcs(
                f"documents/{doc['datapoint']['datapointId']}"
            )
            metadata = {}
            if "restricts" in doc["datapoint"]:
                metadata = {
                    item["namespace"]: item["allowList"][0]
                    for item in doc["datapoint"]["restricts"]
                }
            if "distance" in doc:
                metadata["score"] = doc["distance"]
                if doc["distance"] >= search_distance:
                    results.append(
                        Document(page_content=page_content, metadata=metadata)
                    )
            else:
                results.append(Document(page_content=page_content, metadata=metadata))

        logger.debug("Downloaded documents for query.")

        return results

    def _get_index_id(self) -> str:
        """Gets the correct index id for the endpoint.

        Returns:
            The index id if found (which should be found) or throws
            ValueError otherwise.
        """
        for index in self.endpoint.deployed_indexes:
            if index.index == self.index.name:
                return index.id

        raise ValueError(
            f"No index with id {self.index.name} "
            f"deployed on enpoint "
            f"{self.endpoint.display_name}."
        )

    def _download_from_gcs(self, gcs_location: str) -> str:
        """Downloads from GCS in text format.

        Args:
            gcs_location: The location where the file is located.

        Returns:
            The string contents of the file.
        """
        bucket = self.gcs_client.get_bucket(self.gcs_bucket_name)
        try:
            blob = bucket.blob(gcs_location)
            return blob.download_as_string()
        except Exception:
            return ""

    @classmethod
    def from_texts(
        cls: Type["MatchingEngine"],
        texts: List[str],
        embedding: Embeddings,
        metadatas: Optional[List[dict]] = None,
        **kwargs: Any,
    ) -> "MatchingEngine":
        """Use from components instead."""
        raise NotImplementedError(
            "This method is not implemented. Instead, you should initialize the class"
            " with `MatchingEngine.from_components(...)` and then call "
            "`from_texts`"
        )

    @classmethod
    def from_documents(
        cls: Type["MatchingEngine"],
        documents: List[str],
        embedding: Embeddings,
        metadatas: Optional[List[dict]] = None,
        **kwargs: Any,
    ) -> "MatchingEngine":
        """Use from components instead."""
        raise NotImplementedError(
            "This method is not implemented. Instead, you should initialize the class"
            " with `MatchingEngine.from_components(...)` and then call "
            "`from_documents`"
        )

    @classmethod
    def from_components(
        cls: Type["MatchingEngine"],
        project_id: str,
        region: str,
        gcs_bucket_name: str,
        index_id: str,
        endpoint_id: str,
        credentials_path: Optional[str] = None,
        embedding: Optional[Embeddings] = None,
    ) -> "MatchingEngine":
        """Takes the object creation out of the constructor.

        Args:
            project_id: The GCP project id.
            region: The default location making the API calls. It must have
            the same location as the GCS bucket and must be regional.
            gcs_bucket_name: The location where the vectors will be stored in
            order for the index to be created.
            index_id: The id of the created index.
            endpoint_id: The id of the created endpoint.
            credentials_path: (Optional) The path of the Google credentials on
            the local file system.
            embedding: The :class:`Embeddings` that will be used for
            embedding the texts.

        Returns:
            A configured MatchingEngine with the texts added to the index.
        """
        gcs_bucket_name = cls._validate_gcs_bucket(gcs_bucket_name)

        # Set credentials
        if credentials_path:
            credentials = cls._create_credentials_from_file(credentials_path)
        else:
            credentials, _ = google.auth.default()
            request = google.auth.transport.requests.Request()
            credentials.refresh(request)

        index = cls._create_index_by_id(index_id, project_id, region, credentials)
        endpoint = cls._create_endpoint_by_id(
            endpoint_id, project_id, region, credentials
        )

        gcs_client = cls._get_gcs_client(credentials, project_id)
        index_client = cls._get_index_client(project_id, region, credentials)
        index_endpoint_client = cls._get_index_endpoint_client(
            project_id, region, credentials
        )
        cls._init_aiplatform(project_id, region, gcs_bucket_name, credentials)

        return cls(
            project_id=project_id,
            region=region,
            index=index,
            endpoint=endpoint,
            embedding=embedding or cls._get_default_embeddings(),
            gcs_client=gcs_client,
            index_client=index_client,
            index_endpoint_client=index_endpoint_client,
            credentials=credentials,
            gcs_bucket_name=gcs_bucket_name,
        )

    @classmethod
    def _validate_gcs_bucket(cls, gcs_bucket_name: str) -> str:
        """Validates the gcs_bucket_name as a bucket name.

        Args:
              gcs_bucket_name: The received bucket uri.

        Returns:
              A valid gcs_bucket_name or throws ValueError if full path is
              provided.
        """
        gcs_bucket_name = gcs_bucket_name.replace("gs://", "")
        if "/" in gcs_bucket_name:
            raise ValueError(
                f"The argument gcs_bucket_name should only be "
                f"the bucket name. Received {gcs_bucket_name}"
            )
        return gcs_bucket_name

    @classmethod
    def _create_credentials_from_file(
        cls, json_credentials_path: Optional[str]
    ) -> Optional[Credentials]:
        """Creates credentials for GCP.

        Args:
             json_credentials_path: The path on the file system where the
             credentials are stored.

         Returns:
             An optional of Credentials or None, in which case the default
             will be used.
        """

        from google.oauth2 import service_account

        credentials = None
        if json_credentials_path is not None:
            credentials = service_account.Credentials.from_service_account_file(
                json_credentials_path
            )

        return credentials

    @classmethod
    def _create_index_by_id(
        cls, index_id: str, project_id: str, region: str, credentials: "Credentials"
    ) -> MatchingEngineIndex:
        """Creates a MatchingEngineIndex object by id.

        Args:
            index_id: The created index id.

        Returns:
            A configured MatchingEngineIndex.
        """

        from google.cloud import aiplatform_v1

        logger.debug(f"Creating matching engine index with id {index_id}.")
        index_client = cls._get_index_client(project_id, region, credentials)
        request = aiplatform_v1.GetIndexRequest(name=index_id)
        return index_client.get_index(request=request)

    @classmethod
    def _create_endpoint_by_id(
        cls, endpoint_id: str, project_id: str, region: str, credentials: "Credentials"
    ) -> MatchingEngineIndexEndpoint:
        """Creates a MatchingEngineIndexEndpoint object by id.

        Args:
            endpoint_id: The created endpoint id.

        Returns:
            A configured MatchingEngineIndexEndpoint.
            :param project_id:
            :param region:
            :param credentials:
        """

        from google.cloud import aiplatform

        logger.debug(f"Creating endpoint with id {endpoint_id}.")
        return aiplatform.MatchingEngineIndexEndpoint(
            index_endpoint_name=endpoint_id,
            project=project_id,
            location=region,
            credentials=credentials,
        )

    @classmethod
    def _get_gcs_client(
        cls, credentials: "Credentials", project_id: str
    ) -> "storage.Client":
        """Lazily creates a GCS client.

        Returns:
            A configured GCS client.
        """

        from google.cloud import storage

        return storage.Client(credentials=credentials, project=project_id)

    @classmethod
    def _get_index_client(
        cls, project_id: str, region: str, credentials: "Credentials"
    ) -> "storage.Client":
        """Lazily creates a Matching Engine Index client.

        Returns:
            A configured Matching Engine Index client.
        """

        from google.cloud import aiplatform_v1

        # PARENT = f"projects/{project_id}/locations/{region}"
        ENDPOINT = f"{region}-aiplatform.googleapis.com"
        return aiplatform_v1.IndexServiceClient(
            client_options=dict(api_endpoint=ENDPOINT), credentials=credentials
        )

    @classmethod
    def _get_index_endpoint_client(
        cls, project_id: str, region: str, credentials: "Credentials"
    ) -> "storage.Client":
        """Lazily creates a Matching Engine Index Endpoint client.

        Returns:
            A configured Matching Engine Index Endpoint client.
        """

        from google.cloud import aiplatform_v1

        # PARENT = f"projects/{project_id}/locations/{region}"
        ENDPOINT = f"{region}-aiplatform.googleapis.com"
        return aiplatform_v1.IndexEndpointServiceClient(
            client_options=dict(api_endpoint=ENDPOINT), credentials=credentials
        )

    @classmethod
    def _init_aiplatform(
        cls,
        project_id: str,
        region: str,
        gcs_bucket_name: str,
        credentials: "Credentials",
    ) -> None:
        """Configures the aiplatform library.

        Args:
            project_id: The GCP project id.
            region: The default location making the API calls. It must have
            the same location as the GCS bucket and must be regional.
            gcs_bucket_name: GCS staging location.
            credentials: The GCS Credentials object.
        """

        from google.cloud import aiplatform

        logger.debug(
            f"Initializing AI Platform for project {project_id} on "
            f"{region} and for {gcs_bucket_name}."
        )
        aiplatform.init(
            project=project_id,
            location=region,
            staging_bucket=gcs_bucket_name,
            credentials=credentials,
        )

    @classmethod
    def _get_default_embeddings(cls) -> TensorflowHubEmbeddings:
        """This function returns the default embedding."""
        return TensorflowHubEmbeddings()

# Utility functions to create Index and deploy the index to an Endpoint
from datetime import datetime
import time
import logging

from google.cloud import aiplatform_v1 as aipv1
from google.protobuf import struct_pb2

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()


class MatchingEngineUtils:
    def __init__(self, project_id: str, region: str, index_name: str):
        self.project_id = project_id
        self.region = region
        self.index_name = index_name
        self.index_endpoint_name = f"{self.index_name}-endpoint"
        self.PARENT = f"projects/{self.project_id}/locations/{self.region}"

        ENDPOINT = f"{self.region}-aiplatform.googleapis.com"
        # set index client
        self.index_client = aipv1.IndexServiceClient(
            client_options=dict(api_endpoint=ENDPOINT)
        )
        # set index endpoint client
        self.index_endpoint_client = aipv1.IndexEndpointServiceClient(
            client_options=dict(api_endpoint=ENDPOINT)
        )

    def get_index(self):
        # Check if index exists
        request = aipv1.ListIndexesRequest(parent=self.PARENT)
        page_result = self.index_client.list_indexes(request=request)
        indexes = [
            response.name
            for response in page_result
            if response.display_name == self.index_name
        ]

        if len(indexes) == 0:
            return None
        else:
            index_id = indexes[0]
            request = aipv1.GetIndexRequest(name=index_id)
            index = self.index_client.get_index(request=request)
            return index

    def get_index_endpoint(self):
        # Check if index endpoint exists
        request = aipv1.ListIndexEndpointsRequest(parent=self.PARENT)
        page_result = self.index_endpoint_client.list_index_endpoints(request=request)
        index_endpoints = [
            response.name
            for response in page_result
            if response.display_name == self.index_endpoint_name
        ]

        if len(index_endpoints) == 0:
            return None
        else:
            index_endpoint_id = index_endpoints[0]
            request = aipv1.GetIndexEndpointRequest(name=index_endpoint_id)
            index_endpoint = self.index_endpoint_client.get_index_endpoint(
                request=request
            )
            return index_endpoint

    def create_index(
        self,
        embedding_gcs_uri: str,
        dimensions: int,
        index_update_method: str = "streaming",
        index_algorithm: str = "tree-ah",
    ):
        # Get index
        index = self.get_index()
        # Create index if does not exists
        if index:
            logger.info(f"Index {self.index_name} already exists with id {index.name}")
        else:
            logger.info(f"Index {self.index_name} does not exists. Creating index ...")

            if index_update_method == "streaming":
                index_update_method = aipv1.Index.IndexUpdateMethod.STREAM_UPDATE
            else:
                index_update_method = aipv1.Index.IndexUpdateMethod.BATCH_UPDATE

            treeAhConfig = struct_pb2.Struct(
                fields={
                    "leafNodeEmbeddingCount": struct_pb2.Value(number_value=500),
                    "leafNodesToSearchPercent": struct_pb2.Value(number_value=7),
                }
            )
            if index_algorithm == "treeah":
                algorithmConfig = struct_pb2.Struct(
                    fields={"treeAhConfig": struct_pb2.Value(struct_value=treeAhConfig)}
                )
            else:
                algorithmConfig = struct_pb2.Struct(
                    fields={
                        "bruteForceConfig": struct_pb2.Value(
                            struct_value=struct_pb2.Struct()
                        )
                    }
                )
            config = struct_pb2.Struct(
                fields={
                    "dimensions": struct_pb2.Value(number_value=dimensions),
                    "approximateNeighborsCount": struct_pb2.Value(number_value=150),
                    "distanceMeasureType": struct_pb2.Value(
                        string_value="DOT_PRODUCT_DISTANCE"
                    ),
                    "algorithmConfig": struct_pb2.Value(struct_value=algorithmConfig),
                    "shardSize": struct_pb2.Value(string_value="SHARD_SIZE_SMALL"),
                }
            )
            metadata = struct_pb2.Struct(
                fields={
                    "config": struct_pb2.Value(struct_value=config),
                    "contentsDeltaUri": struct_pb2.Value(
                        string_value=embedding_gcs_uri
                    ),
                }
            )

            index_request = {
                "display_name": self.index_name,
                "description": "Index for LangChain demo",
                "metadata": struct_pb2.Value(struct_value=metadata),
                "index_update_method": index_update_method,
            }

            r = self.index_client.create_index(parent=self.PARENT, index=index_request)
            logger.info(
                f"Creating index with long running operation {r._operation.name}"
            )

            # Poll the operation until it's done successfullly.
            logging.info("Poll the operation to create index ...")
            while True:
                if r.done():
                    break
                time.sleep(60)
                print(".", end="")

            index = r.result()
            logger.info(
                f"Index {self.index_name} created with resource name as {index.name}"
            )

        return index

    def deploy_index(
        self,
        machine_type: str = "e2-standard-2",
        min_replica_count: int = 2,
        max_replica_count: int = 10,
        network: str = None,
    ):
        try:
            # Get index if exists
            index = self.get_index()
            if not index:
                raise Exception(
                    f"Index {self.index_name} does not exists. Please create index before deploying."
                )

            # Get index endpoint if exists
            index_endpoint = self.get_index_endpoint()
            # Create Index Endpoint if does not exists
            if index_endpoint:
                logger.info(
                    f"Index endpoint {self.index_endpoint_name} already exists with resource "
                    + f"name as {index_endpoint.name} and endpoint domain name as "
                    + f"{index_endpoint.public_endpoint_domain_name}"
                )
            else:
                logger.info(
                    f"Index endpoint {self.index_endpoint_name} does not exists. Creating index endpoint..."
                )
                index_endpoint_request = {"display_name": self.index_endpoint_name}

                if network:
                    index_endpoint_request["network"] = network
                else:
                    index_endpoint_request["public_endpoint_enabled"] = True

                r = self.index_endpoint_client.create_index_endpoint(
                    parent=self.PARENT, index_endpoint=index_endpoint_request
                )
                logger.info(
                    f"Deploying index to endpoint with long running operation {r._operation.name}"
                )

                logger.info("Poll the operation to create index endpoint ...")
                while True:
                    if r.done():
                        break
                    time.sleep(60)
                    print(".", end="")

                index_endpoint = r.result()
                logger.info(
                    f"Index endpoint {self.index_endpoint_name} created with resource "
                    + f"name as {index_endpoint.name} and endpoint domain name as "
                    + f"{index_endpoint.public_endpoint_domain_name}"
                )
        except Exception as e:
            logger.error(f"Failed to create index endpoint {self.index_endpoint_name}")
            raise e

        # Deploy Index to endpoint
        try:
            # Check if index is already deployed to the endpoint
            for d_index in index_endpoint.deployed_indexes:
                if d_index.index == index.name:
                    logger.info(
                        f"Skipping deploying Index. Index {self.index_name}"
                        + f"already deployed with id {index.name} to the index endpoint {self.index_endpoint_name}"
                    )
                    return index_endpoint

            timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
            deployed_index_id = f"{self.index_name.replace('-', '_')}_{timestamp}"
            deploy_index = {
                "id": deployed_index_id,
                "display_name": deployed_index_id,
                "index": index.name,
                "dedicated_resources": {
                    "machine_spec": {
                        "machine_type": machine_type,
                    },
                    "min_replica_count": min_replica_count,
                    "max_replica_count": max_replica_count,
                },
            }
            logger.info(f"Deploying index with request = {deploy_index}")
            r = self.index_endpoint_client.deploy_index(
                index_endpoint=index_endpoint.name, deployed_index=deploy_index
            )

            # Poll the operation until it's done successfullly.
            logger.info("Poll the operation to deploy index ...")
            while True:
                if r.done():
                    break
                time.sleep(60)
                print(".", end="")

            logger.info(
                f"Deployed index {self.index_name} to endpoint {self.index_endpoint_name}"
            )

        except Exception as e:
            logger.error(
                f"Failed to deploy index {self.index_name} to the index endpoint {self.index_endpoint_name}"
            )
            raise e

        return index_endpoint

    def get_index_and_endpoint(self):
        # Get index id if exists
        index = self.get_index()
        index_id = index.name if index else ""

        # Get index endpoint id if exists
        index_endpoint = self.get_index_endpoint()
        index_endpoint_id = index_endpoint.name if index_endpoint else ""

        return index_id, index_endpoint_id

    def delete_index(self):
        # Check if index exists
        index = self.get_index()

        # create index if does not exists
        if index:
            # Delete index
            index_id = index.name
            logger.info(f"Deleting Index {self.index_name} with id {index_id}")
            self.index_client.delete_index(name=index_id)
        else:
            raise Exception("Index {index_name} does not exists.")

    def delete_index_endpoint(self):
        # Check if index endpoint exists
        index_endpoint = self.get_index_endpoint()

        # Create Index Endpoint if does not exists
        if index_endpoint:
            logger.info(
                f"Index endpoint {self.index_endpoint_name}  exists with resource "
                + f"name as {index_endpoint.name} and endpoint domain name as "
                + f"{index_endpoint.public_endpoint_domain_name}"
            )

            index_endpoint_id = index_endpoint.name
            index_endpoint = self.index_endpoint_client.get_index_endpoint(
                name=index_endpoint_id
            )
            # Undeploy existing indexes
            for d_index in index_endpoint.deployed_indexes:
                logger.info(
                    f"Undeploying index with id {d_index.id} from Index endpoint {self.index_endpoint_name}"
                )
                request = aipv1.UndeployIndexRequest(
                    index_endpoint=index_endpoint_id, deployed_index_id=d_index.id
                )
                r = self.index_endpoint_client.undeploy_index(request=request)
                response = r.result()
                logger.info(response)

            # Delete index endpoint
            logger.info(
                f"Deleting Index endpoint {self.index_endpoint_name} with id {index_endpoint_id}"
            )
            self.index_endpoint_client.delete_index_endpoint(name=index_endpoint_id)
        else:
            raise Exception(
                f"Index endpoint {self.index_endpoint_name} does not exists."
            )

###*Initialize embedding directory with a null vector*

In [12]:

# dummy embedding
init_embedding = {"id": str(uuid.uuid4()), "embedding": list(np.zeros(ME_DIMENSIONS))}

# dump embedding to a local file
with open("embeddings_0.json", "w") as f:
    json.dump(init_embedding, f)

# write embedding to Cloud Storage
! set -x && gsutil cp embeddings_0.json gs://{ME_EMBEDDING_DIR}/init_index/embeddings_0.json

+ gsutil cp embeddings_0.json gs://argolis-project-340214-me-bucket/init_index/embeddings_0.json
Copying file://embeddings_0.json [Content-Type=application/json]...
/ [1 files][  3.8 KiB/  3.8 KiB]                                                
Operation completed over 1 objects/3.8 KiB.                                      


###*Create a Matching Engine Index and deploy to a endpoint*

In [13]:
mengine = MatchingEngineUtils(PROJECT_ID, ME_REGION, ME_INDEX_NAME)

index = mengine.create_index(
    embedding_gcs_uri=f"gs://{ME_EMBEDDING_DIR}/init_index",
    dimensions=ME_DIMENSIONS,
    index_update_method="streaming",
    index_algorithm="tree-ah",
)
if index:
    print(index.name)

index_endpoint = mengine.deploy_index()
if index_endpoint:
    print(f"Index endpoint resource name: {index_endpoint.name}")
    print(
        f"Index endpoint public domain name: {index_endpoint.public_endpoint_domain_name}"
    )
    print("Deployed indexes on the index endpoint:")
    for d in index_endpoint.deployed_indexes:
        print(f"    {d.id}")



projects/742458474659/locations/us-central1/indexes/6528535007873466368
Index endpoint resource name: projects/742458474659/locations/us-central1/indexEndpoints/5965585054452154368
Index endpoint public domain name: 1784211276.us-central1-742458474659.vdb.vertexai.goog
Deployed indexes on the index endpoint:
    argolis_project_340214_me_index_20231014181057


In [14]:
ME_INDEX_ID, ME_INDEX_ENDPOINT_ID = mengine.get_index_and_endpoint()
print(f"ME_INDEX_ID={ME_INDEX_ID}")
print(f"ME_INDEX_ENDPOINT_ID={ME_INDEX_ENDPOINT_ID}")
print(f"ME_INDEX_NAME={ME_INDEX_NAME}")
print(f"ME_BUCKET={ME_BUCKET}")
print(f"PROJECT_ID={PROJECT_ID}")
print(f"ME_REGION={ME_REGION}")

ME_INDEX_ID=projects/742458474659/locations/us-central1/indexes/6528535007873466368
ME_INDEX_ENDPOINT_ID=projects/742458474659/locations/us-central1/indexEndpoints/5965585054452154368
ME_INDEX_NAME=argolis-project-340214-me-index
ME_BUCKET=matching_engine_bucket
PROJECT_ID=argolis-project-340214
ME_REGION=us-central1


###*Create Embedding Engine & Build a Data Store*

In [15]:
# Define Text Embeddings model
embedding = VertexAIEmbeddings()


In [16]:
# Define Matching Engine as Vector Store
me = MatchingEngine.from_components(
    project_id=PROJECT_ID,
    region=ME_REGION,
    gcs_bucket_name=f'gs://{ME_BUCKET}',
    embedding=embedding,
    index_id=ME_INDEX_ID,
    endpoint_id=ME_INDEX_ENDPOINT_ID)


## Absorb documents, split them into chunks and capture metadata

*Loading...*
This takes some time: 1min++

In [17]:
print(ME_BUCKET)

matching_engine_bucket


In [18]:
loader = GCSDirectoryLoader(project_name=PROJECT_ID, bucket="contractunderstandingatticusdataset")
contractdocs = loader.load()

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.


In [19]:
print(f"# of documents = {len(contractdocs)}")
print(contractdocs[0].metadata)
print( contractdocs[0].json)


# of documents = 30
{'source': 'gs://contractunderstandingatticusdataset/2ThemartComInc_19990826_10-12G_EX-10.10_6700288_EX-10.10_Co-Branding Agreement_ Agency Agreement.txt'}
<bound method BaseModel.json of Document(page_content='CO\n\n\n\nBRANDING AND ADVERTISING AGREEMENT\n\nTHIS CO-BRANDING AND ADVERTISING AGREEMENT (the "Agreement") is made as of June 21, 1999 (the "Effective Date") by and between I-ESCROW, INC., with its principal place of business at 1730 S. Amphlett Blvd., Suite 233, San Mateo, California 94402 ("i-Escrow"), and 2THEMART.COM, INC. having its principal place of business at 18301 Von Karman Avenue, 7th Floor, Irvine, California 92612 ("2TheMart").\n\n1. DEFINITIONS.\n\n(a) "CONTENT" means all content or information, in any medium, provided by a party to the other party for use in conjunction with the performance of its obligations hereunder, including without limitation any text, music, sound, photographs, video, graphics, data or software. Content provided by 2T

*Split documents into chunks as needed by the token limit of the LLM and let there be an overlap between the chunks*

In [20]:
# split the documents into chunks
from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
doc_splits = text_splitter.split_documents(contractdocs)
print(f"# of document splits = {len(doc_splits)}")

# of document splits = 2150


*Capture Metadata*

In [21]:
# Add chunk number to metadata
for idx, split in enumerate(doc_splits):
    #print(idx)
    split.metadata["chunk"] = idx

# Separate doc_splits to semantic data and meta data
texts = [doc.page_content for doc in doc_splits]
metadatas = [
    [
        {"namespace": "source", "allow_list": [doc.metadata["source"]]},
        {"namespace": "document_name", "allow_list": [doc.metadata["source"].split("/")[-1]]},
        {"namespace": "chunk", "allow_list": [str(doc.metadata["chunk"])]},
    ]
    for doc in doc_splits
]
print(f"# of document splits = {len(doc_splits)}")

# of document splits = 2150


In [None]:
metadatas[3]

##Load semantic data as texts and metadata into Matching Engine
This takes time: 7mins ++

In [23]:
doc_ids = me.add_texts(texts=texts, metadatas=metadatas)

###*Do some testing of Matching Engine*

In [24]:
# Test whether search from vector store is working
me.similarity_search("image", k=2)

[Document(page_content='th', metadata={'source': 'gs://contractunderstandingatticusdataset/ASPIRITYHOLDINGSLLC_05_07_2012-EX-10.6-OUTSOURCING AGREEMENT.txt', 'document_name': 'ASPIRITYHOLDINGSLLC_05_07_2012-EX-10.6-OUTSOURCING AGREEMENT.txt', 'chunk': '1735', 'score': 0.6547313332557678}),
 Document(page_content='th', metadata={'source': 'gs://contractunderstandingatticusdataset/ASPIRITYHOLDINGSLLC_05_07_2012-EX-10.6-OUTSOURCING AGREEMENT.txt', 'document_name': 'ASPIRITYHOLDINGSLLC_05_07_2012-EX-10.6-OUTSOURCING AGREEMENT.txt', 'chunk': '1735', 'score': 0.6547313332557678})]

In [None]:
me.similarity_search("Twin Cities Power Holdings", k=2, search_distance=0.4)


## Obtain handle to the retriever

We will use the native retriever provided by Chroma DB to perform similarity search within the contracts document vector store among the different document chunks so as to return that document chunk which has the lowest vectoral "distance" with the incoming user query.

In [28]:
# Retriever configuration
NUMBER_OF_RESULTS = 10
SEARCH_DISTANCE_THRESHOLD = 0.6

# Expose index to the retriever
retriever = me.as_retriever(
    search_type="similarity",
    search_kwargs={
        "k": NUMBER_OF_RESULTS,
        "search_distance": SEARCH_DISTANCE_THRESHOLD,
    },
)


## Define a Retrieval QA Chain to use retriever

In [30]:
# Create chain to answer questions
from langchain.chains import RetrievalQA

llm = VertexLLM(
    model_name='text-bison-32k',
    max_output_tokens=256,
    temperature=0.1,
    top_p=0.8,
    top_k=40,
    verbose=True,
)

# Uses LLM to synthesize results from the search index.
# We use Vertex PaLM Text API for LLM
qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=retriever,
    return_source_documents=True)

## Leverage LLM to search from retriever

*Example:*

In [31]:
query = "Who all entered into agreement with Sagebrush?"
result = qa({"query": query})
print(result)

{'query': 'Who all entered into agreement with Sagebrush?', 'result': ' The Partnership, all of the Partners except Alpha Mariah (Prime), Inc. and Beta Mariah (Prime) Inc., and Manager entered into that certain Sagebrush Management and Maintenance Agreement, dated as of September 1, 1989 (the "Agreement").', 'source_documents': [Document(page_content='Each party cooperated and participated in the drafting and preparation of this Agreement and the documents referred to herein, and any and all drafts relating thereto exchanged among the parties shall be deemed the work product of all of the parties and may not be construed against any party by reason of its drafting or preparation. Accordingly, any rule of law or any legal decision that would require interpretation of any ambiguities in this Agreement against any party that drafted or prepared it is of no application and is hereby expressly waived by each of the parties hereto, and any controversy over interpretations of this Agreement s

## Build a Front End

In [None]:
!pip install -q gradio
import gradio as gr
import markdown

def chatbot(inputtext):
    result = qa({"query": inputtext})

    return result['result'], get_public_url(result['source_documents'][0].metadata['source']), result['source_documents'][0].metadata['source']

from google.cloud import storage

def get_public_url(uri):
    """Returns the public URL for a file in Google Cloud Storage."""
    # Split the URI into its components
    components = uri.split("/")

    # Get the bucket name
    bucket_name = components[2]

    # Get the file name
    file_name = components[3]

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)
    return blob.public_url


print("Launching Gradio")

iface = gr.Interface(fn=chatbot,
                     inputs=[gr.Textbox(label="Query")],
                     examples=["What is the agreement made by Twin Cities Power Holdings", "What is the agreement between MICOA & Stratton Cheeseman", "What is the commission % that Stratton Cheeseman will get from MICOA and how much will they get if MICOA's revenues are $100"],
                     title="Contract Analyst",
                     outputs=[gr.Textbox(label="Response"),
                              gr.Textbox(label="URL"),
                              gr.Textbox(label="Cloud Storage URI")],
                     theme=gr.themes.Soft)

iface.launch(share=False)

