# LLM Reference Architecture using Redis & Google Cloud Platform

<a href="https://colab.research.google.com/github/RedisVentures/redis-google-llms/blob/main/BigQuery_Palm_Redis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook serves as a getting started guide for working with LLMs on Google Cloud Platform with Redis Enterprise.

## Intro
Google's Vertex AI has expanded its capabilities by introducing [Generative AI](https://cloud.google.com/vertex-ai/docs/generative-ai/learn/overview). This advanced technology comes with a specialized [in-console studio experience](https://cloud.google.com/vertex-ai/docs/generative-ai/start/quickstarts/quickstart), a [dedicated API](https://cloud.google.com/vertex-ai/docs/generative-ai/start/quickstarts/api-quickstart) and [Python SDK](https://cloud.google.com/vertex-ai/docs/python-sdk/use-vertex-ai-python-sdk) designed for deploying and managing instances of Google's powerful Gemini language models.

Redis Enterprise offers robust vector database features, with an efficient API for vector index creation, management, distance metric selection, similarity search, and hybrid filtering. When coupled with its versatile data structures - including lists, hashes, JSON, and sets - Redis Enterprise shines as the optimal solution for crafting high-quality Large Language Model (LLM)-based applications. It embodies a streamlined architecture and exceptional performance, making it an instrumental tool for production environments.

Below we will work through several design patterns with Vertex AI LLMs and Redis Enterprise that will ensure optimal production performance.

___
## Contents
- Setup
    1. Prerequisites
    2. Obtain Dataset
    3. Generate Embeddings
    4. Create Index
    5. Query
- Building a RAG Pipeline from scratch
- Demo

___

# Setup

## 1. Prerequisites
Before we begin, we must install some required libraries, authenticate with Google, create a Redis database, and initialize other required components.

### Install required libraries

In [1]:
!pip install -U "redisvl>=0.3.7" google-cloud-aiplatform langchain-community unstructured[pdf] gradio

import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

Collecting redisvl
  Downloading redisvl-0.2.0-py3-none-any.whl (63 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/63.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━[0m [32m61.4/63.9 kB[0m [31m2.4 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.9/63.9 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
Collecting langchain-community
  Downloading langchain_community-0.2.1-py3-none-any.whl (2.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m26.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting unstructured[pdf]
  Downloading unstructured-0.14.3-py3-none-any.whl (2.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m48.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting gradio
  Downloading gradio-4.31.5-py3-none-any.whl (12.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

{'status': 'ok', 'restart': True}

### Using Free Redis Cloud account on GCP
You can also use Forever Free instance of Redis Cloud. To activate it:
- Head to https://redis.com/try-free/
- Register (using gmail-based registration is the easiest)
- Create New Subscription
- Use the following options:
    - Fixed plan, Google Cloud
    - New 30Mb Free database
- Create new RedisStack DB

If you are registering at Redis Cloud for the first time - the last few steps would be performed for you by default. Capture the host, port and default password of the new database. You can use these instead of default `localhost` based in the following code block.

^^^ If prompted press the Restart button to restart the kernel. ^^^

### Install Redis locally (optional)
If you have a Redis db running elsewhere with [Redis Stack](https://redis.io/docs/about/about-stack/) installed, you don't need to run it on this machine. You can skip to the "Connect to Redis server" step.

In [1]:
%%sh
curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list
sudo apt-get update  > /dev/null 2>&1
sudo apt-get install redis-stack-server  > /dev/null 2>&1
redis-stack-server --daemonize yes

deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb jammy main
Starting redis-stack-server, database path /var/lib/redis-stack


gpg: cannot open '/dev/tty': No such device or address
curl: (23) Failed writing body


### Connect to Redis server
Replace the connection params below with your own if you are connecting to an external Redis instance.

In [2]:
import os
import redis

# Redis connection params
REDIS_HOST = os.getenv("REDIS_HOST", "localhost") #"redis-12110.c82.us-east-1-2.ec2.cloud.redislabs.com"
REDIS_PORT = os.getenv("REDIS_PORT", "6379")      #12110
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", "")  #"pobhBJP7Psicp2gV0iqa2ZOc1WdXXXXX"

# Create Redis client
redis_client = redis.Redis(
  host=REDIS_HOST,
  port=REDIS_PORT,
  password=REDIS_PASSWORD
)

# Test connection
redis_client.ping()

True

In [3]:
# Clear Redis database (optional)
redis_client.flushdb()

True

### Authenticate to Google Cloud

In [4]:
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

Authenticated


In [5]:
from getpass import getpass

# input your GCP project ID and region for Vertex AI
PROJECT_ID = getpass("PROJECT_ID:") #'central-beach-194106'
REGION = input("REGION:") #'us-central1'

PROJECT_ID:··········
REGION:us-central1


## 2. Obtain dataset

Below pull the dataset from ...

In [6]:
#Procure a dataset - downloading a publication from IRS
!mkdir resources
!wget https://www.irs.gov/pub/irs-pdf/p5718.pdf -P resources/

--2024-05-29 17:44:32--  https://www.irs.gov/pub/irs-pdf/p5718.pdf
Resolving www.irs.gov (www.irs.gov)... 23.201.171.228, 2600:1408:5400:4b2::f50, 2600:1408:5400:48d::f50
Connecting to www.irs.gov (www.irs.gov)|23.201.171.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8749823 (8.3M) [application/pdf]
Saving to: ‘resources/p5718.pdf’


2024-05-29 17:44:32 (60.5 MB/s) - ‘resources/p5718.pdf’ saved [8749823/8749823]



### Create text embeddings with Vertex AI embedding model
Use the [Vertex AI API for text embeddings](https://cloud.google.com/vertex-ai/docs/generative-ai/embeddings/get-text-embeddings), developed by Google.

> Text embeddings are a dense vector representation of a piece of content such that, if two pieces of content are semantically similar, their respective embeddings are located near each other in the embedding vector space. This representation can be used to solve common NLP tasks, such as:
> - **Semantic search**: Search text ranked by semantic similarity.
> - **Recommendation**: Return items with text attributes similar to the given text.
> - **Classification**: Return the class of items whose text attributes are similar to the given text.
> - **Clustering**: Cluster items whose text attributes are similar to the given text.
> - **Outlier Detection**: Return items where text attributes are least related to the given text.

The Vertex AI text-embeddings API lets you create a text embedding using Generative AI on Vertex AI. The `textembedding-gecko` model accepts a maximum of 3,072 input tokens (i.e. words) and outputs 768-dimensional vector embeddings.

### Set up embeddings
We define a helper function to create embeddings from a list of texts convert them to a byte string for efficient storage in Redis.



In [7]:
from redisvl.utils.vectorize import VertexAITextVectorizer

vectorizer = VertexAITextVectorizer(
    model = "text-embedding-004",
    api_config = {"project_id": PROJECT_ID, "location": REGION}
)

## 3. Generate Embeddings
The next step is to create chunks of the pdf and then embed each chunk as a vector.

In [8]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import UnstructuredFileLoader

doc = "resources/p5718.pdf"

# set up the file loader/extractor and text splitter to create chunks
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=2500, chunk_overlap=0
)
loader = UnstructuredFileLoader(
    doc, mode="single", strategy="fast"
)

# extract, load, and make chunks
chunks = loader.load_and_split(text_splitter)

print("Done preprocessing. Created", len(chunks), "chunks of the original pdf", doc)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.


Done preprocessing. Created 44 chunks of the original pdf resources/p5718.pdf


In [9]:
# Embed each chunk content
embeddings = vectorizer.embed_many([chunk.page_content for chunk in chunks], as_buffer=True, dtype="float32")

# Check to make sure we've created enough embeddings, 1 per document chunk
len(embeddings) == len(chunks)

True

## 4. Create Index

Now that we have created embeddings that represent the text in our dataset, we will create an index that enables efficient search over the embeddings.

**Why do we need to enable search???**
Using Redis for vector search allows us to retrieve chunks of text data that are **similar** or **relevant** to an input question or query. This will be extremely helpful for our sample generative ai / LLM application.

In [10]:
from redisvl.schema import IndexSchema
from redisvl.index import SearchIndex


index_name = "redisvl"

schema = IndexSchema.from_dict({
  "index": {
    "name": index_name,
    "prefix": "chunk"
  },
  "fields": [
    {
        "name": "chunk_id",
        "type": "tag",
        "attrs": {
            "sortable": True
        }
    },
    {
        "name": "content",
        "type": "text"
    },
    {
        "name": "text_embedding",
        "type": "vector",
        "attrs": {
            "dims": vectorizer.dims,
            "distance_metric": "cosine",
            "algorithm": "flat",
            "datatype": "float32"
        }
    }
  ]
})

In [11]:
# Create an index from schema and the client
index = SearchIndex(schema, redis_client)
index.create(overwrite=True, drop=True)

In [12]:
# Load expects an iterable of dictionaries
data = [
    {
        'chunk_id': f'{i}',
        'content': chunk.page_content,
        'text_embedding': embeddings[i]
    } for i, chunk in enumerate(chunks)
]

# RedisVL handles batching automatically
keys = index.load(data, id_field="chunk_id")

## 5. Query
Now we can use RedisVL to perform a variety of vector search operations.

In [13]:
from redisvl.query import VectorQuery

query = "What is TCC?"

query_embedding = vectorizer.embed(query)

vector_query = VectorQuery(
    vector=query_embedding,
    vector_field_name="text_embedding",
    num_results=3,
    return_fields=["chunk_id", "content"],
    return_score=True
)

# show the raw redis query
str(vector_query)

'*=>[KNN 3 @text_embedding $vector AS vector_distance] RETURN 3 chunk_id content vector_distance SORTBY vector_distance ASC DIALECT 2 LIMIT 0 3'

In [14]:
# execute the query with RedisVL
index.query(vector_query)

[{'id': 'chunk:7',
  'vector_distance': '0.500085473061',
  'chunk_id': '7',
  'content': 'IRIS uses QuickAlerts, an IRS e-mail service, to disseminate information quickly regarding IRIS issues to subscribers. This service keeps tax professionals up to date on IRIS issues throughout the year, with emphasis on issues during the filing season. After subscribing, customers will receive “round the clock” communication issues such as electronic specifica- tions and system information needed for Software Developers and Transmitters to transmit to IRS. New subscribers may sign up through the “subscription page” link located on the QuickAlerts “more” e-file Benefits for Tax Professionals page.\n\n9\n\nPublication 5718\n\n1.3 Registration and Application Process\n\nExternal users must register with the current IRS credential service provider and complete the IRIS Application for Transmitter Control Code (TCC) to submit transmissions using the IRIS intake platform. Information returns filed thro

In [15]:
# paginate through results
for result in index.paginate(vector_query, page_size=1):
    print(result[0]["chunk_id"], result[0]["vector_distance"], flush=True)

7 0.500085473061
1 0.502500534058
11 0.505242109299


In [16]:
from redisvl.query.filter import Text

query = "What is TCC?"

query_embedding = vectorizer.embed(query)

text_filter = Text("content") % "Social Security"

vector_query = VectorQuery(
    vector=query_embedding,
    vector_field_name="text_embedding",
    num_results=3,
    return_fields=["chunk_id", "content"],
    return_score=True,
    filter_expression=text_filter
)

# show the raw redis query
str(vector_query)

'@content:(Social Security)=>[KNN 3 @text_embedding $vector AS vector_distance] RETURN 3 chunk_id content vector_distance SORTBY vector_distance ASC DIALECT 2 LIMIT 0 3'

In [17]:
# execute the query with RedisVL
index.query(vector_query)

[{'id': 'chunk:9',
  'vector_distance': '0.572170257568',
  'chunk_id': '9',
  'content': 'Select the role of Transmitter on your application. Note: The TCC for a Transmitter can be used to transmit your own returns and others. You may not use an Issuer TCC to transmit information returns for others.\n\n11\n\nPublication 5718\n\n1.3.3 Third-Party Transmitters\n\nIf you do not have an in-house programmer familiar with XML or do not wish to purchase A2A software that is certified to support the information returns that you plan to file, you can file through a Third-Party Transmitter or use the online Taxpayer Portal. Visit www.irs.gov/ iris for additional information.\n\nOnly those persons listed as an Authorized User on the IRIS Application for TCC qualify to receive information about a Receipt ID associated with a TCC listed on that application.\n\nIf your Third-Party Transmitter needs technical assistance regarding a Receipt ID associated with records that were submitted on behalf of 

# Building a RAG Pipeline from Scratch
We're going to build a complete RAG pipeline from scratch incorporating the following components:

- Standard retrieval and chat completion
- Dense content representation to improve accuracy
- Query re-writing to improve accuracy
- Semantic caching to improve performance
- Conversational session history to improve personalization

In [18]:
#@title Setup RedisVL *AsyncSearchIndex*

from redis.asyncio import Redis
from redisvl.index import AsyncSearchIndex

# Create Redis client
redis_client = Redis(
    host=REDIS_HOST,
    port=REDIS_PORT,
    password=REDIS_PASSWORD
)

index = AsyncSearchIndex(index.schema, redis_client)

In [19]:
#@title Setup VertexAI Generative Model with Safety Settings
from vertexai.generative_models import GenerativeModel, Part, HarmCategory, HarmBlockThreshold


model = GenerativeModel("gemini-1.5-flash-001")

# Define safety settings
safety_settings = {
    HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_ONLY_HIGH,
    HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
    HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
    HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
}

# Define generation config
generation_config = {
    "max_output_tokens": 2048,
    "temperature": 0.5,
    "top_p": 1
}

### Baseline Retrieval Augmented Generation

Below we build a simple RAG pipeline with three helper methods:


*   `answer_question` -- full RAG operation
    * `retrieve_context` -- search Redis for relevant sources
    * `promptify`  -- combine system instructions, user question, and sources



In [20]:
async def answer_question(index: AsyncSearchIndex, query: str):
    """Answer the user's question"""

    SYSTEM_PROMPT = """You are a helpful tax analyst assistant that has access
    to publications from the IRS
    """

    query_vector = vectorizer.embed(query)

    # Fetch context from Redis using vector search
    context = await retrieve_context(index, query_vector)

    prompt = f'''
    System: {SYSTEM_PROMPT}
    User: {promptify(query, context)}
    '''

    responses = model.generate_content(
        [prompt],
        generation_config=generation_config,
        safety_settings=safety_settings,
        stream=False
    )
    # Response provided by LLM
    if(responses.candidates[0].finish_reason.value == 1):
        return(responses.candidates[0].content.parts[0].text)
    else:
        return(f"Content has been blocked for {responses.candidates[0].finish_reason.name} reasons.")


async def retrieve_context(index: AsyncSearchIndex, query_vector) -> str:
    """Fetch the relevant context from Redis using vector search"""
    results = await index.query(
        VectorQuery(
            vector=query_vector,
            vector_field_name="text_embedding",
            return_fields=["content"],
            num_results=3
        )
    )
    content = "\n".join([result["content"] for result in results])
    return content


def promptify(query: str, context: str) -> str:
    return f'''Use the provided context below derived from public documenation to answer the user's question. If you can't answer the user's
    question, based on the context; do not guess. Do your best finding the answer in the context, but if there is no context at all,
    respond with "I don't know".

    User question:

    {query}

    Helpful context:

    {context}

    Answer:
    '''

In [21]:
# Generate a list of questions
questions = [
    "What is TCC?",
    "Who should apply for an IRIS TCC?",
    "What is a JWK?",
    "Should I buy a yacht??"
]

In [22]:
import asyncio

results = await asyncio.gather(*[
    answer_question(index, question) for question in questions
])

In [23]:
for question, result in zip(questions,results):
  print(question+": \n"+result+"\n\n")

What is TCC?: 
TCC stands for Transmitter Control Code. 



Who should apply for an IRIS TCC?: 
If you are transmitting information returns to the IRS or if you are developing software to file information returns electronically, you must apply for one or more TCCs using the IRIS Application for TCC available online. 



What is a JWK?: 
A JSON Web Key Set (JWKs) is used for e-Services API authentication. It contains a public key that validates the API consumer application. 



Should I buy a yacht??: 
I don't know. 





# Improve performance and cut costs with LLM Semantic Caching

In [24]:
from redis import Redis
from redisvl.extensions.llmcache import SemanticCache

# Create Redis client
redis_client = Redis(
  host=REDIS_HOST,
  port=REDIS_PORT,
  password=REDIS_PASSWORD
)

# Create the Semantic Cache
llmcache = SemanticCache(
    name="llmcache",
    vectorizer=vectorizer,
    redis_client=redis_client,
    ttl=120,
    distance_threshold=0.2
)

In [25]:
from functools import wraps


# Create an LLM caching decorator
def cache(func):
    @wraps(func)
    async def wrapper(index, query_text, *args, **kwargs):
        query_vector = llmcache._vectorizer.embed(query_text)

        # Check the cache with the vector
        if result := llmcache.check(vector=query_vector):
            return result[0]['response']

        response = await func(index, query_text, query_vector=query_vector)
        llmcache.store(query_text, response, query_vector)
        return response
    return wrapper


@cache
async def answer_question(index: AsyncSearchIndex, query: str, **kwargs):
    """Answer the user's question"""

    SYSTEM_PROMPT = """You are a helpful tax analyst assistant that has access
    to publications from the IRS
    """

    # Fetch context from Redis using vector search
    context = await retrieve_context(index, kwargs["query_vector"])

    prompt = f'''
    System: {SYSTEM_PROMPT}
    User: {promptify(query, context)}
    '''

    responses = model.generate_content(
        [prompt],
        generation_config=generation_config,
        safety_settings=safety_settings,
        stream=False
    )
    # Response provided by LLM
    if(responses.candidates[0].finish_reason.value == 1):
        return(responses.candidates[0].content.parts[0].text)
    else:
        return(f"Content has been blocked for {responses.candidates[0].finish_reason.name} reasons.")


In [26]:
from datetime import datetime

In [27]:
query = "What is a JWK?"

startTime = datetime.now()
await answer_question(index, query)
print(f"Total time: {datetime.now() - startTime}")

Total time: 0:00:00.700675


In [28]:
# Now try again with semantic caching enabled!
query = "What's a JWK?"

startTime = datetime.now()
await answer_question(index, query)
print(f"Total time: {datetime.now() - startTime}")

Total time: 0:00:00.102795


In [None]:
import gradio as gr

async def respond(message, history):
    print(message)
    result = await answer_question(index, message)
    print(result)
    return result

gr.ChatInterface(respond).launch(debug=True)