In [None]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Retrieval Augmented Generation (RAG) with Vertex AI Search

## Before you begin

### Install dependencies

We will first need to install and upgrade the Google Cloud Discovery Engine library. Once the library is installed, restart the kernel so that you can use the updated packages

In [None]:
!pip install -q --upgrade google-cloud-discoveryengine

### Import libraries and set environment variables

In [None]:
from typing import Optional

from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine

from google.cloud import discoveryengine_v1alpha as discoveryengine
from google.api_core.client_options import ClientOptions

import socket
import re

In [None]:
UNIQUE_PREFIX = socket.gethostname()
UNIQUE_PREFIX = re.sub('[^A-Za-z0-9]+', '', UNIQUE_PREFIX)

# Cloud project id.
PROJECT_IDS = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_IDS[0]  # @param {type:"string"}

# The Cloud Storage bucket for storing experiments output.
# Remove prefix gs://, e.g. foo_bucket.
GCS_BUCKET_LOCATION = "asia-southeast1"
REGION = 'asia-southeast1'
GCS_BUCKET_URI = f"gs://hp-books-{PROJECT_ID}-{UNIQUE_PREFIX}-{REGION}"

# print variables for verification
print(f"Project ID: {PROJECT_ID}")
print(f"GCS Bucket URI: {GCS_BUCKET_URI}")

### Helper functions

In [None]:
import os
import vertexai
from vertexai.preview.language_models import TextGenerationModel
from vertexai.preview.generative_models import GenerativeModel, Part

LOCATION = REGION = 'us-central1'

vertexai.init(project=PROJECT_ID, location=LOCATION)
parameters = {
    "candidate_count": 1,
    "max_output_tokens": 1024,
    "temperature": 1,
    "top_k": 40
}

# @st.cache_resource
def get_model():
    generation_model = TextGenerationModel.from_pretrained("text-bison@002")
    return generation_model


def get_text_generation(prompt="", **parameters):
    generation_model = get_model()
    response = generation_model.predict(prompt=prompt, **parameters)

    return response.text

def generate_palm_unicorn_v1(input_prompt):
    
    model = TextGenerationModel.from_pretrained("text-unicorn@001")

    response = model.predict(
        input_prompt,
        **parameters
    )
    print(f"Response from Model: {response.text}")
    
    return(response.text)

def generate_palm_bison32k(input_prompt):
    
    model = TextGenerationModel.from_pretrained("text-bison-32k")

    response = model.predict(
        input_prompt,
        **parameters
    )
    print(f"Response from Model: {response.text}")
    
    return(response.text)

def generate(input_prompt):
    model = GenerativeModel("gemini-ultra")
    responses = model.generate_content(
        input_prompt ,
    generation_config={
        "max_output_tokens": 2048,
        "temperature": 0.2,
        "top_p": 1,
        "top_k": 32
    },
        safety_settings=[],
        stream=True,
    )
    
    all_response  = []
    
    for response in responses:
        # print(response.text, end="")
        all_response.append(response.text)
    
    # print (all_response)
    
    return(" ".join(all_response))
    

def generate_pro(input_prompt):
    model = GenerativeModel("gemini-pro")
    responses = model.generate_content(
    input_prompt,
    generation_config={
        "max_output_tokens": 2048,
        "temperature": 0.2,
        "top_p": 1
    },stream=True,)
    
    all_response  = []
    
    for response in responses:
        all_response.append(response.text)
    
    # print (all_response)
    
    return(" ".join(all_response))

In [None]:
# Helper Function to create data store
def create_data_store(
    project_id: str, location: str, data_store_name: str, data_store_id: str
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.DataStoreServiceClient(client_options=client_options)

    # Initialize request argument(s)
    data_store = discoveryengine.DataStore(
        display_name=data_store_name,
        industry_vertical="GENERIC",
        content_config="CONTENT_REQUIRED",
    )

    request = discoveryengine.CreateDataStoreRequest(
        parent=discoveryengine.DataStoreServiceClient.collection_path(
            project_id, location, "default_collection"
        ),
        data_store=data_store,
        data_store_id=data_store_id,
    )
    operation = client.create_data_store(request=request)

    # Make the request
    # The try block is necessary to prevent execution from haulting due to an error being thrown when the datastore takes a while to instantiate
    try:
        response = operation.result(timeout=90)
    except:
        print("long-running operation")

In [None]:
# Helper Function to import documents from GCS bucket into datastore
def import_documents(
    project_id: str,
    location: str,
    data_store_id: str,
    gcs_uri: str,
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.DocumentServiceClient(client_options=client_options)

    # The full resource name of the search engine branch.
    # e.g. projects/{project}/locations/{location}/dataStores/{data_store_id}/branches/{branch}
    parent = client.branch_path(
        project=project_id,
        location=location,
        data_store=data_store_id,
        branch="default_branch",
    )

    source_documents = [f"{gcs_uri}/*"]

    request = discoveryengine.ImportDocumentsRequest(
        parent=parent,
        gcs_source=discoveryengine.GcsSource(
            input_uris=source_documents, data_schema="content"
        ),
        # Options: `FULL`, `INCREMENTAL`
        reconciliation_mode=discoveryengine.ImportDocumentsRequest.ReconciliationMode.INCREMENTAL,
    )

    # Make the request
    operation = client.import_documents(request=request)

    response = operation.result()

    # Once the operation is complete,
    # get information from operation metadata
    metadata = discoveryengine.ImportDocumentsMetadata(operation.metadata)

    # Handle the response
    return operation.operation.name

## 1. Configure a Datastore for our documents

### Upload the book PDFs onto Google Cloud Storage (GCS) bucket
In this section, we will be doing the following:
1. Create a Cloud Storage bucket
2. Upload the PDFs of the Harry Potter series into the Cloud Storage bucket
3. Verify that the PDFs have been uploaded successfully
4. Create an empty datastore and import the PDFs documents into the datastore

In [None]:
# Create a Cloud Storage Bucket
!gcloud storage buckets create $GCS_BUCKET_URI --location=$GCS_BUCKET_LOCATION

# Upload the PDFs located in the books/ directory into the GCS bucket that you created
!gsutil cp -r ./books/* $GCS_BUCKET_URI

In [None]:
# Verify that all Books 1 to 7 are uploaded to the GCS bucket (8 files in total, 2 for Part 1)
!gsutil ls $GCS_BUCKET_URI

In [None]:
# The datastore name can only contain lowercase letters, numbers, and hyphens
DATASTORE_NAME = f"{UNIQUE_PREFIX}-harry-potter-datastore"
DATASTORE_ID = f"{DATASTORE_NAME}-id"
LOCATION = 'global'

# print variables for verification
print(f"Datastore name: {DATASTORE_NAME}")
print(f"Datastore ID: {DATASTORE_ID}")

In [None]:
# Create the datastore
try:
    create_data_store(PROJECT_ID, LOCATION, DATASTORE_NAME, DATASTORE_ID)
    print(f"Datastore {DATASTORE_ID} successfully created")
except:
    print("if not running first time, DATASTORE_ID may already exist")

In [None]:
# Start the import of documents into datastore
import_documents(PROJECT_ID, LOCATION, DATASTORE_ID, GCS_BUCKET_URI)

## 2. Create a Search Engine for your datastore
In this section, you will be creating a Search app which we will connect to the Harry Potter datastore that we have created earlier. 

For the search app, we will set the search_tier to Enterprise tier and to enable advanced LLM features. Enterprise tier is required to get extractive answers from a search query and advanced LLM features are required to sumarize search results.

In [None]:
# Helper function to create a Vertex Search Engine
def create_engine(
    project_id: str, location: str, data_store_name: str, data_store_id: str
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.EngineServiceClient(client_options=client_options)

    # Initialize request argument(s)
    config = discoveryengine.Engine.SearchEngineConfig(
        search_tier="SEARCH_TIER_ENTERPRISE", search_add_ons=["SEARCH_ADD_ON_LLM"]
    )

    engine = discoveryengine.Engine(
        display_name=data_store_name,
        solution_type="SOLUTION_TYPE_SEARCH",
        industry_vertical="GENERIC",
        data_store_ids=[data_store_id],
        search_engine_config=config,
    )

    request = discoveryengine.CreateEngineRequest(
        parent=discoveryengine.DataStoreServiceClient.collection_path(
            project_id, location, "default_collection"
        ),
        engine=engine,
        engine_id=engine.display_name,
    )

    # Make the request
    operation = client.create_engine(request=request)
    response = operation.result(timeout=90)

In [None]:
# Create the Vertex Search Engine
try:
    create_engine(PROJECT_ID, LOCATION, DATASTORE_NAME, DATASTORE_ID)
except:
    print("if not running first time, create_engine may already exist")

## 3. Query your datastore through the Search app

In [None]:
from typing import List

def search_sample(
    project_id: str,
    location: str,
    data_store_id: str,
    search_query: str,
) -> List[discoveryengine.SearchResponse]:
    #  For more information, refer to:
    # https://cloud.google.com/generative-ai-app-builder/docs/locations#specify_a_multi-region_for_your_data_store
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if LOCATION != "global"
        else None
    )

    # Create a client
    client = discoveryengine.SearchServiceClient(client_options=client_options)

    # The full resource name of the search engine serving config
    # e.g. projects/{project_id}/locations/{location}/dataStores/{data_store_id}/servingConfigs/{serving_config_id}
    serving_config = client.serving_config_path(
        project=project_id,
        location=location,
        data_store=data_store_id,
        serving_config="default_config",
    )

    # Optional: Configuration options for search
    # Refer to the `ContentSearchSpec` reference for all supported fields:
    # https://cloud.google.com/python/docs/reference/discoveryengine/latest/google.cloud.discoveryengine_v1.types.SearchRequest.ContentSearchSpec
    content_search_spec = discoveryengine.SearchRequest.ContentSearchSpec(
        # For information about snippets, refer to:
        # https://cloud.google.com/generative-ai-app-builder/docs/snippets
        snippet_spec=discoveryengine.SearchRequest.ContentSearchSpec.SnippetSpec(
            return_snippet=True
        ),
        # For information about search summaries, refer to:
        # https://cloud.google.com/generative-ai-app-builder/docs/get-search-summaries
        summary_spec=discoveryengine.SearchRequest.ContentSearchSpec.SummarySpec(
            summary_result_count=5,
            include_citations=True,
            ignore_adversarial_query=True,
            ignore_non_summary_seeking_query=False,
        ),
    )

    # Refer to the `SearchRequest` reference for all supported fields:
    # https://cloud.google.com/python/docs/reference/discoveryengine/latest/google.cloud.discoveryengine_v1.types.SearchRequest
    request = discoveryengine.SearchRequest(
        serving_config=serving_config,
        query=search_query,
        page_size=10,
        content_search_spec=content_search_spec,
        query_expansion_spec=discoveryengine.SearchRequest.QueryExpansionSpec(
            condition=discoveryengine.SearchRequest.QueryExpansionSpec.Condition.AUTO,
        ),
        spell_correction_spec=discoveryengine.SearchRequest.SpellCorrectionSpec(
            mode=discoveryengine.SearchRequest.SpellCorrectionSpec.Mode.AUTO
        ),
    )

    response = client.search(request)
    return response

In [None]:
# Ask a sample query to get an answer from the search engine!
query = "Who is the best friend of harry potter?"

print(search_sample(PROJECT_ID, LOCATION, DATASTORE_ID, query).summary.summary_text)

## 3. Let's try asking a list of questions located in a CSV file

### Run through all the sample questions in the CSV file

In [None]:
import pandas as pd
import time

filename = "./harry_potter_qa.csv"
df_qa_VertexSearch = pd.read_csv(filename, sep ="|")

System_Prompts = """ You are an expert in reading harry potter books, but only provide evidences from the information provide and do not use an other information
so here are some search results : 
"""

Question_Prompts = """ -- Based on information above help to answer following user question
"""

# Iterate through the first 10 questions
for i in range(0, 10):
    
    print("iteration #", i)
    time.sleep(2)  
    
    RAG_query = df_qa_VertexSearch.loc[i,'Question'] 
    
    print(RAG_query)
    try: 
        RAG_results = search_sample(PROJECT_ID, LOCATION, DATASTORE_ID, RAG_query).summary.summary_text
    except:
        RAG_results = "No results"
        
    Gemini_query = System_Prompts + " " + RAG_results + " " + Question_Prompts + " " + df_qa_VertexSearch.loc[i,'Question']
    
    try:        
        df_qa_VertexSearch.loc[i, "Gemini_pro_model_output_v1"] = generate_pro(Gemini_query)
        df_qa_VertexSearch.loc[i, "palm_bison32k_output_v1"] = generate_palm_bison32k(Gemini_query)

    except:
        df_qa_VertexSearch.loc[i, "Gemini_pro_model_output_v1"] = "No answer found"
        df_qa_VertexSearch.loc[i, "palm_bison32k_output_v1"] = "No answer found"
        print("long-running operation")



output1 = "./results/harry_potter_qa_OOTB-RAG_output.csv"

df_qa_VertexSearch.to_csv(output1)

# 4. Deploy the Search app as a Streamlit app to Cloud Run
For this section, please head to the [Google Cloud console](https://console.cloud.google.com/gen-app-builder/engines) to continue!