<a href="https://colab.research.google.com/github/denisabrantesredis/denisd-GenAI-Workshop/blob/main/Labs/03-RAG_Images/03_Redis_RAG_Images.ipynb" target="_newt">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

<div style="display:flex;width=100%;">
<img src="https://redis.io/wp-content/uploads/2024/04/Logotype.svg?auto=webp&quality=85,75&width=120" alt="Redis" width="90"/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<img src="https://www.gstatic.com/devrel-devsite/prod/v0e0f589edd85502a40d78d7d0825db8ea5ef3b99ab4070381ee86977c9168730/cloud/images/cloud-logo.svg" alt="Google Cloud" width="140"/>
</div>

# Vector Similarity Search with Redis & Google Cloud - RAG for Images

<img src="https://github.com/denisabrantesredis/denisd-GenAI-Workshop/blob/main/_assets/images/redis_gcp.png?raw=true" alt="Redis and Google Cloud" align="center"/>

[Try a similar app with an always-on demo](https://ecommerce.redisventures.com/)

In this notebook, we will build a RAG use case using data from a web page. Redis will be used as the Vector Database and Cache for our use case, while Google Gemini is the LLM that will help generate the answers to the user's questions.

The dataset for this lab contains images of products like shoes, watches, clothes, etc. We will use Google Gemini to provide a description of each product that we can use as metadata.

## Installing the Pre-Reqs

In [None]:
!pip install -q sentence-transformers==3.0.1 >> /.tmp
!pip install -q redis==5.0.8 >> /.tmp
!pip install -q redisvl==0.3.5 >> /.tmp
!pip install -q langchain==0.2.16 >> /.tmp
!pip install -q langchain-core==0.3.6 >> /.tmp
!pip install -q langchain-huggingface==0.0.3 >> /.tmp
!pip install -q langchain-redis==0.0.4 >> /.tmp
!pip install -q langchain-google-genai==2.0.0 >> /.tmp
!pip install -q langchain_experimental==0.3.2 >> /.tmp
!pip install -q open-clip-torch==2.26.1 >> /.tmp
!pip install -q git+https://github.com/openai/CLIP.git >> /.tmp

In [None]:
# patch an issue with RedisVL
!wget https://github.com/denisabrantesredis/denisd-GenAI-Workshop/raw/refs/heads/main/_assets/files/semantic.py
!rm /usr/local/lib/python3.10/dist-packages/redisvl/extensions/llmcache/semantic.py
!cp semantic.py /usr/local/lib/python3.10/dist-packages/redisvl/extensions/llmcache/

### Installing Redis Stack Locally
If you are not using Redis Cloud as a database, uncomment and run the code below to install Redis locally. Then set your connection to 127.0.0.1

In [None]:
# %%sh
# curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg 
# echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list 
# sudo apt-get update  > /dev/null 2>&1
# sudo apt-get install redis-stack-server  > /dev/null 2>&1
# redis-stack-server --daemonize yes 

### Loading Required Packages

In [None]:
import os
import glob
import json
import redis
import torch
import base64
import random
from typing import Any, Dict
from IPython.display import Image
from google.colab import userdata
from langchain_core.messages import HumanMessage
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field, model_serializer
from langchain.output_parsers import PydanticOutputParser
from langchain_google_genai import ChatGoogleGenerativeAI

## Part 1 - Prepare the Environment

### Step 1: Download Dataset from Github

For performance reasons, we will only be working with 20 images. In this github repo, you can find a zip file with 100 images that can be used instead, if you want to test this code in an environment with more resources, like Google Vertex AI.

We will download the images that will be stored in Redis as vectors, as well as a smaller dataset of images that will be used for semantic search.

In [None]:
if not os.path.exists("./img_20"):
  !wget https://github.com/denisabrantesredis/denisd-GenAI-Workshop/raw/refs/heads/main/_assets/files/img20.zip
  !unzip img20.zip

In [None]:
if not os.path.exists("./img_search_20"):
  !wget https://github.com/denisabrantesredis/denisd-GenAI-Workshop/raw/refs/heads/main/_assets/files/img_search20.zip
  !unzip img_search20.zip

### Step 2: Setting up the Redis connection and GCP API Key

<img src="https://github.com/denisabrantesredis/denisd-GenAI-Workshop/blob/main/_assets/images/callout_secrets.png?raw=true" alt="Callout - Use Google Colab secrets instead"/>

In [None]:
if "GOOGLE_API_KEY" not in os.environ:
    if userdata.get('GOOGLE_API_KEY'):
      os.environ["GOOGLE_API_KEY"] = userdata.get('GOOGLE_API_KEY')
    else:
      os.environ["GOOGLE_API_KEY"] = "<insert API key here>"

if userdata.get('REDIS_HOST'):
  REDIS_HOST = userdata.get('REDIS_HOST')
else:
  REDIS_HOST="127.0.0.1"

if userdata.get('REDIS_PORT'):
  REDIS_PORT = userdata.get('REDIS_PORT')
else:
  REDIS_PORT=12000

if userdata.get('REDIS_PASSWORD'):
  REDIS_PASSWORD = userdata.get('REDIS_PASSWORD')
else:
  REDIS_PASSWORD="password"

REDIS_URL = f"redis://default:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}"

#### Testing the Connection to Redis

<img src="https://github.com/denisabrantesredis/denisd-GenAI-Workshop/blob/main/_assets/images/callout_connection.png?raw=true" alt="Callout - Make sure connection works"/>

In [None]:
r = redis.from_url(REDIS_URL)

if r.ping():
    print("Connection successful!")
else:
    print("Connection issue!")

### Step 3: Load the list of images

This lab can greatly benefit from running on a T4 GPU. However, seeing as GPUs are not guaranteed in the free tier, the lab was designed to also run on CPUs, albeit slower.

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
filenames = glob.glob("./img_20/*")
len(filenames)

#### Step 4: Load Gemini and get it to describe an image

In [None]:
llm = ChatGoogleGenerativeAI(
    model="gemini-1.5-flash",
    temperature=0.5,
    top_p=0.95,
    top_k=64,
    max_output_tokens=8192
    )

Display the image that will be sent to the model

In [None]:
with open(filenames[0], "rb") as image_file:
    image_data = base64.b64encode(image_file.read()).decode()

Image(filenames[0])

<img src="https://github.com/denisabrantesredis/denisd-GenAI-Workshop/blob/main/_assets/images/callout_geminiimage.png?raw=true" alt="Callout - Upload Image to Gemini"/>

In [None]:
message = HumanMessage(
    content=[
        {"type": "text", "text": "describe the object in this image"},
        {
            "type": "image_url",
            "image_url": {"url": f"data:image/jpeg;base64,{image_data}"},
        },
    ],
)

Call the model and print the response

In [None]:
# Invoke the model with the message
response = llm.invoke([message])

# Print the model's response
print(response.content)

&nbsp;

## Part 2: Categorizing the Images

Usually, a dataset of images would have a curated set of metadata attributes, that would be used as metadata for hybrid searches. In this lab, however, we will generate metadata for each image using Google Gemini to provide a description of the image and its key characteristics.

### Prepare a list of images on base64 format

In [None]:
image_list = []
for i in range(len(filenames)):
    this_image_path = filenames[i]
    with open(this_image_path, "rb") as image_file:
        image_data = base64.b64encode(image_file.read()).decode("utf-8")
        image_list.append(image_data)

### Define a Pydantic class to parse the model's output

In [None]:
class Product(BaseModel):
    name: str = Field(description="The name of the product shown in the image")
    color: str = Field(description="The color of the product shown in the image")
    type: str = Field(description="The type of product shown in the image")
    marketing_description: str = Field(description="A marketing description of the product shown in the image")

    @model_serializer(when_used='json')
    def sort_model(self) -> Dict[str, Any]:
        return dict(sorted(self.model_dump().items()))

parser = PydanticOutputParser(pydantic_object=Product)

To prevent the model from generating similar descriptions for the products, we will command it to start the response with a certain letter each time. This will force the model to produce different descriptions. Do keep in mind that this is a very basic approach, for lab purposes.

In [None]:
def generate_random_letter():
    letters = ['A', 'B', 'C', 'D', 'M', 'P', 'R', 'S', 'T']
    return str(random.choice(letters))

Prepare the prompt

In [None]:
prompt = ChatPromptTemplate.from_messages([
    ("system", "Return the requested response object in {language}. Make sure the marketing description starts with the letter '{starting_letter}'\n'{format_instructions}'\n"),
    ("human", [
        {
            "type": "image_url",
            "image_url": {"url": "data:image/jpeg;base64,{image_data}"},
        },
    ]),
])

Configure the list of image descriptions

In [None]:
all_images = [{"language": "English",
               "format_instructions": parser.get_format_instructions(),
               "image_data": image,
               "starting_letter": generate_random_letter()}
              for image in image_list]

### Create a chain and run it in parallel to speed up the creation of metadata using Google Gemini

In [None]:
chain = prompt | llm | parser
results = chain.batch(all_images, config={"max_concurrency": 5})

Print the results

In [None]:
for i in range(20):
    print(results[i].model_dump_json())

&nbsp;

## Part 3: Generate Image Embeddings

To generate the image embeddings, we will use the CLIP model from OpenAI.

### Step 1: Load the Embedding Model

In [None]:
import numpy as np
from PIL import Image
from langchain_experimental.open_clip import OpenCLIPEmbeddings

This can take several minutes, as the model needs to be downloaded. Also, keep an eye on the memory usage for your notebook.

In [None]:
clip_embd = OpenCLIPEmbeddings(model_name="ViT-g-14", checkpoint="laion2b_s34b_b88k", weights_only=True)

### Step 2: Generate embeddings for all images

First, we create a test embedding to make sure the model is working properly. The embedding should be an array with 1024 elements.

In [None]:
embedding = clip_embd.embed_image([filenames[0]])
len(embedding[0])

Assuming the previous test was successful, generate embeddings for all 20 images

In [None]:
counter = 0
embeddings = []
for filename in filenames:
  print(f"{counter} --> Generating embedding for file {filename}")
  embedding = clip_embd.embed_image([filename])
  embeddings.append(embedding[0])
  counter += 1
len(embeddings)

### Step 3: Visualize Images and Metadata

Before storing the images in Redis, we should see if the model was able to generate proper descriptions and metadata.

In [None]:
import IPython.display
import matplotlib.pyplot as plt

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

In [None]:
plt.figure(figsize=(20, 5))
for i in range(20):
    image_path = filenames[i]

    image = Image.open(image_path).convert("RGB")

    plt.subplot(4, 5, i+1)
    plt.imshow(image)
    label = f"{image_path}\n{results[i].name}\n{results[i].type}\n{results[i].color}"
    plt.xlabel(label, fontsize=8)
    plt.xticks([])
    plt.yticks([])

plt.tight_layout()

### Step 4: Prepare JSON documents for Redis

We need to format the data we want to store in Redis, by creating JSON documents that contain the embeddings and the metadata for each image.

In [None]:
counter = 0
vector_documents = []
for embedding in embeddings:
    vector_doc = json.loads(results[counter].model_dump_json())
    vector_doc["id"] = f"vecdoc:{counter+1:05}"
    vector_doc["filename"] = filenames[counter]
    vector_doc["vector"] = embedding
    vector_documents.append(vector_doc)
    counter = counter + 1

Print an example document (truncating the vector to show only 5 elements)

In [None]:
print(f"ID: {vector_documents[0]['id']}")
print(f"Name: {vector_documents[0]['name']}")
print(f"Filename: {vector_documents[0]['filename']}")
print(f"Type: {vector_documents[0]['type']}")
print(f"Color: {vector_documents[0]['color']}")
print(f"Description: {vector_documents[0]['marketing_description']}")
print(f"{vector_documents[0]['vector'][:5]}...")
print(len(vector_documents[0]['vector']))

&nbsp;

## Part 4: Storing data in Redis

### Support Functions

In [None]:
import redis
from redis.commands.json.path import Path
from redis.commands.search.query import Query
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.field import NumericField, TagField, TextField, VectorField

We will manually create the search index, in order to understand the code that would be required without the Langchain automation

In [None]:
def create_index(VECTOR_DIMENSION):
    result = "FAILED"
    schema = (
        TextField("$.id", as_name="id"),
        TextField("$.name", as_name="name"),
        TextField("$.filename", as_name="filename"),
        TextField("$.type", as_name="type"),
        TextField("$.color", as_name="color"),
        TextField("$.marketing_description", as_name="marketing_description"),
        VectorField(
            "$.vector",
            "FLAT",
            {
                "TYPE": "FLOAT32",
                "DIM": VECTOR_DIMENSION,
                "DISTANCE_METRIC": "COSINE",
            },
            as_name="vector",
        )
    )
    try:
        definition = IndexDefinition(prefix=["vecdoc:"], index_type=IndexType.JSON)
        result = r.ft("idx:vecdoc").create_index(fields=schema, definition=definition)
    except Exception as ex:
        result = f"FAILED to create index: {ex}"
    return result

This function returns the current status for the search index

In [None]:
def get_index_status():
  info = r.ft("idx:vecdoc").info()
  return info

This function inserts one document in Redis

In [None]:
def write_vector(document):
    result = "FAILED"
    try:
        pipeline = r.pipeline()
        redis_key = document['id']
        pipeline.json().set(redis_key, "$", document)
        res = pipeline.execute()
        result = f"{redis_key} record inserted successfully"
    except Exception as e:
        result = f"FAILED with error: {e}"
    return result

This function performs a vector search

In [None]:
def vector_query(query_vector):
    response = "FAILED TO RUN QUERY"

    query = (
        Query('(*)=>[KNN 3 @vector $query_vector AS vector_score]')
        .sort_by('vector_score')
        .return_fields('vector_score', 'name', 'filename', 'type', 'color', 'marketing_description')
        .dialect(2)
    )
    query_input = query_vector
    query_response = r.ft("idx:vecdoc").search(query, { 'query_vector': np.array(query_input, dtype=np.float32).tobytes() }).docs
    response = []
    for doc in query_response:
        response.append(doc)
    return response

This function performs a hybrid search, by vector and metadata (in this case, color)

In [None]:
def vector_query_by_color(query_vector, color):
    response = "FAILED TO RUN QUERY"

    query = (
        Query('(@color:$color)=>[KNN 3 @vector $query_vector AS vector_score]')
        .sort_by('vector_score')
        .return_fields('vector_score', 'name', 'filename', 'type', 'color', 'marketing_description')
        .dialect(2)
    )
    query_input = query_vector
    query_response = r.ft("idx:vecdoc").search(query, {'color': color,'query_vector': np.array(query_input, dtype=np.float32).tobytes() }).docs
    response = []
    for doc in query_response:
        response.append(doc)
    return response

### Save Vectors to Redis

In [None]:
insert_results = []
try:
    create_index(1024)
except Exception as e:
    print(f"Failed to create index with exception: {e}")
    insert_results.append(e)

for i in range(len(vector_documents)):
    document = vector_documents[i]
    insert_result = write_vector(document)
    insert_results.append(insert_result)
    if i % 5 == 0:
        print(f"--> Inserting document {i} - result: {insert_result}")

Check Index status (it should display the number of documents inserted)

In [None]:
index_status = get_index_status()
print(f"Name: {index_status['index_name']} | Docs: {index_status['num_docs']} | Errors: {index_status['Index Errors'][0].decode()}:{index_status['Index Errors'][1]}")

&nbsp;

<img src="https://github.com/denisabrantesredis/denisd-GenAI-Workshop/blob/main/_assets/images/callout_insight.png?raw=true" alt="Callout - Check Redis Insight"/>

The image vectors, along with their metadata, should not be visible in Redis Insight.

You can also go to the **Workbench** and get a list of indexes using the command:

```
FT._list
```

Finally, you can get more details about the index that was automatically generated by Langchain with this command:
```
FT.info "idx:vecdoc"
```
&nbsp;

## Part 5 - Running a Semantic Search

Redis supports Semantic caching and searching not only for text, but also image and other types of vectors.

First, we will take a look at the images we can use for search; these images were not in the dataset that was stored as vectors in Redis.

In [None]:
test_images = glob.glob("./img_search_20/*")
len(test_images)

In [None]:
plt.figure(figsize=(15, 5))
for i in range(len(test_images)):
    image_path = test_images[i]

    image = Image.open(image_path).convert("RGB")

    plt.subplot(4, 5, i+1)
    plt.imshow(image)
    label = f"{i}\n{image_path}"
    plt.xlabel(label, fontsize=8)
    plt.xticks([])
    plt.yticks([])

plt.tight_layout()

<img src="https://github.com/denisabrantesredis/denisd-GenAI-Workshop/blob/main/_assets/images/callout_threshold.png?raw=true" alt="Callout - Semantic Threshold"/>

In [None]:
redis_cache = RedisSemanticCache(redis_url=REDIS_URL, embeddings=embeddings, distance_threshold=0.2)
set_llm_cache(redis_cache)

Since the Semantic Cache is new, it will be empty. We will ask the original question first, to generate the cache entry:

In [None]:
query = "How does Redis Insight make RDI simpler?"

Prepare the prompt:

In [None]:
messages = get_system_template(text_list, query)

Invoke the model (it will cause a cache miss):

In [None]:
timer_start = time.perf_counter()
llm_response = llm.invoke(messages)
timer_end = time.perf_counter()
total_time = round(timer_end - timer_start, 4)
print(f"Total Time: {total_time}s")

Print the response:

In [None]:
llm_response.content

&nbsp;

<img src="https://github.com/denisabrantesredis/denisd-GenAI-Workshop/blob/main/_assets/images/callout_insight.png?raw=true" alt="Callout - Check Redis Insight"/>

A new Hash document will appear in Redis, with a key prefix of `llmcache`. This is the cached prompt, which includes the question and the answer. The `invoke` function will run a semantic search for these documents, to look for similar questions.

&nbsp;

#### Ask a similar question to trigger a cache hit

In [None]:
query = "What does Redis Insight do to make RDI simpler?"

Prepare the prompt:

In [None]:
messages = get_system_template(text_list, query)

Invoke the model:

In [None]:
timer_start = time.perf_counter()
llm_response = llm.invoke(messages)
timer_end = time.perf_counter()
total_time = round(timer_end - timer_start, 4)
print(f"Total Time: {total_time}s")

Print the response:

In [None]:
llm_response.content

&nbsp;


&nbsp;



# Congrats, this is the end of the lab!!