# Building a Live RAG Pipeline over Google Drive Files

<a href="https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/docs/examples/ingestion/ingestion_gdrive.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In this guide we show you how to build a "live" RAG pipeline over Google Drive files.

This pipeline will index Google Drive files and dump them to a Redis vector store. Afterwards, every time you rerun the ingestion pipeline, the pipeline will propagate **incremental updates**, so that only changed documents are updated in the vector store. This means that we don't re-index all the documents!

We use the following [data source](https://drive.google.com/drive/folders/1RFhr3-KmOZCR5rtp4dlOMNl3LKe1kOA5?usp=sharing) - you will need to copy these files and upload them to your own Google Drive directory!

**NOTE**: You will also need to setup a service account and credentials.json. See our LlamaHub page for the Google Drive loader for more details: https://llamahub.ai/l/readers/llama-index-readers-google?from=readers



## Setup

We install required packages and launch the Redis Docker image.

In [None]:
%pip install llama_index
%pip install langchain

In [None]:
%pip install llama-index-storage-docstore-redis
%pip install llama-index-vector-stores-redis
%pip install llama-index-embeddings-huggingface
%pip install llama-index-readers-google

In [4]:
# if creating a new container
!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
# # if starting an existing container
# !docker start -a redis-stack

docker: Error response from daemon: Conflict. The container name "/redis-stack" is already in use by container "0f8431747c7706d69300b99a86c59aa98e9a7d7aeae2be59787afa9e42c90814". You have to remove (or rename) that container to be able to reuse that name.
See 'docker run --help'.


In [5]:
import os

os.environ["OPENAI_API_KEY"] = 'sk-proj-cf2vGKeLXFbUw702UEB0T3BlbkFJRJAY4TNZCAuErTbkbSsl' 

## Define Ingestion Pipeline

Here we define the ingestion pipeline. Given a set of documents, we will run sentence splitting/embedding transformations, and then load them into a Redis docstore/vector store.

The vector store is for indexing the data + storing the embeddings, the docstore is for tracking duplicates.

In [6]:
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.ingestion import (
    DocstoreStrategy,
    IngestionPipeline,
    IngestionCache,
)
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.vector_stores.redis import RedisVectorStore

from redisvl.schema import IndexSchema

In [7]:
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")



In [8]:
# e: Define redis schema
custom_schema = IndexSchema.from_dict(
    {
        "index": {"name": "gdrive", "prefix": "doc"},
        # customize fields that are indexed
        "fields": [
            # required fields for llamaindex
            {"type": "tag", "name": "id"},
            {"type": "tag", "name": "doc_id"},
            {"type": "text", "name": "text"},
            # custom vector field for bge-small-en-v1.5 embeddings
            {
                "type": "vector",
                "name": "vector",
                "attrs": {
                    "dims": 384,
                    "algorithm": "hnsw",
                    "distance_metric": "cosine",
                },
            },
        ],
    }
)

# e: define vector store given schema
vector_store = RedisVectorStore(
    schema=custom_schema,
    redis_url="redis://localhost:6379",
)

10:13:48 redisvl.index.index INFO   Index already exists, not overwriting.


In [9]:
# Optional: clear vector store if exists
if vector_store.index_exists():
    vector_store.delete_index()
vector_store.create_index()

In [10]:
# Set up the ingestion cache layer
cache = IngestionCache(
    cache=RedisCache.from_host_and_port("localhost", 6379),
    collection="redis_cache",
)

In [11]:
docstore = RedisDocumentStore.from_host_and_port(
        "localhost", 6379, namespace="document_store"
)

In [12]:
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(),
        embed_model,
    ],
    docstore=docstore,
    vector_store=vector_store,
    cache=cache,
    docstore_strategy=DocstoreStrategy.UPSERTS,
)

### Define our Vector Store Index

We define our index to wrap the underlying vector store.

In [13]:
from llama_index.core import VectorStoreIndex

index = VectorStoreIndex.from_vector_store(
    pipeline.vector_store, embed_model=embed_model
)

## Load Initial Data

Here we load data from our [Google Drive Loader](https://llamahub.ai/l/readers/llama-index-readers-google?from=readers) on LlamaHub.

The loaded docs are the header sections of our [Use Cases from our documentation](https://docs.llamaindex.ai/en/latest/use_cases/q_and_a/root.html).

In [14]:
from llama_index.core import SimpleDirectoryReader
loader = SimpleDirectoryReader("data")

def load_data():
    docs = loader.load_data()
    print(len(docs), "docs")
    for doc in docs:
        doc.id_ = doc.metadata["file_name"]
    return docs
docs = load_data()

38 docs


In [17]:
nodes = pipeline.run(documents=docs)
print(f"Ingested {len(nodes)} Nodes")

Ingested 0 Nodes


Since this is our first time starting up the vector store, we see that we've transformed/ingested all the documents into it (by chunking, and then by embedding).

### Ask Questions over Initial Data

In [199]:
## Attempt to use Pydantic Output response formats
# from typing import List
# from pydantic import BaseModel

# class Program(BaseModel):
#     """Data model for python program."""

#     name: str
#     python_code : str
#     explanation : str

# query_engine = index.as_query_engine(llm=llm, response_model="refine", output_cls=Program)

In [19]:
from llama_index.core.output_parsers import LangchainOutputParser
from llama_index.llms.openai import OpenAI
from langchain.output_parsers import StructuredOutputParser, ResponseSchema

# define output schema
response_schemas = [
    ResponseSchema(
        name="Python code",
        description="Write python code that correspond to the query",
    ),
    ResponseSchema(
        name="Explanation",
        description="Describe what you've done",
    ),
]

# define output parser
lc_output_parser = StructuredOutputParser.from_response_schemas(
    response_schemas
)
output_parser = LangchainOutputParser(lc_output_parser)
print("Output parseroutput_parser", output_parser)

Output parseroutput_parser <llama_index.core.output_parsers.langchain.LangchainOutputParser object at 0x180587c50>


In [21]:
llm = OpenAI(temperature=0.1, model="gpt-4o", output_parser=output_parser)
# obtain a structured response
query_engine = index.as_query_engine(llm=llm)
# response = query_engine.query(
#     "What are a few things the author did growing up?",
# )
# print(str(response))

In [24]:
response = query_engine.query("What is this documentation about?")
print(response)

{'Python code': '', 'Explanation': 'The documentation is about the Ursina Engine, which includes tutorials, installation guides, and references for various features such as entity basics, coordinate systems, collisions, and text handling.'}


In [25]:
response = query_engine.query("Write a tic tac toe game")
print(response)

{'Python code': "from ursina import *\n\nif __name__ == '__main__':\n    app = Ursina()\n\ncamera.orthographic = True\ncamera.fov = 4\ncamera.position = (1, 1)\nText.default_resolution *= 2\n\nplayer = Entity(name='o', color=color.azure)\ncursor = Tooltip(player.name, color=player.color, origin=(0,0), scale=4, enabled=True)\ncursor.background.color = color.clear\nbg = Entity(parent=scene, model='quad', texture='shore', scale=(16,8), z=10, color=color.light_gray)\nmouse.visible = False\n\n# create a matrix to store the buttons in. makes it easier to check for victory\nboard = [[None for x in range(3)] for y in range(3)]\n\nfor y in range(3):\n    for x in range(3):\n        b = Button(parent=scene, position=(x,y))\n        board[x][y] = b\n\n        def on_click(b=b):\n            b.text = player.name\n            b.color = player.color\n            b.collision = False\n            check_for_victory()\n\n            if player.name == 'o':\n                player.name = 'x'\n            

In [26]:
import ast

# Assuming response_obj is your Response object
response_str = response.response

# Parse the string representation of the dictionary
response_dict = ast.literal_eval(response_str)

# Extract 'Python code' and 'Explanation'
python_code = response_dict.get('Python code')
explanation = response_dict.get('Explanation')

print("Python Code:\n", python_code)
print("\nExplanation:\n", explanation)

Python Code:
 from ursina import *

if __name__ == '__main__':
    app = Ursina()

camera.orthographic = True
camera.fov = 4
camera.position = (1, 1)
Text.default_resolution *= 2

player = Entity(name='o', color=color.azure)
cursor = Tooltip(player.name, color=player.color, origin=(0,0), scale=4, enabled=True)
cursor.background.color = color.clear
bg = Entity(parent=scene, model='quad', texture='shore', scale=(16,8), z=10, color=color.light_gray)
mouse.visible = False

# create a matrix to store the buttons in. makes it easier to check for victory
board = [[None for x in range(3)] for y in range(3)]

for y in range(3):
    for x in range(3):
        b = Button(parent=scene, position=(x,y))
        board[x][y] = b

        def on_click(b=b):
            b.text = player.name
            b.color = player.color
            b.collision = False
            check_for_victory()

            if player.name == 'o':
                player.name = 'x'
                player.color = color.orange
   

In [None]:
## product
# chat with your documentation
# iteratively edit code?


## todos
# turn the above local file upload / enter path into chatbot & pipe it into UI to show dad
# execute the output code
# documentation parsing into structured pydantic functions

In [None]:
response = query_engine.query("What are the examples provided here?")
print(response)

## Modify and Reload the Data

Let's try modifying our ingested data!

We modify the "Q&A" doc to include an extra "structured analytics" block of text. See our [updated document](https://docs.google.com/document/d/1QQMKNAgyplv2IUOKNClEBymOFaASwmsZFoLmO_IeSTw/edit?usp=sharing) as a reference.

Now let's rerun the ingestion pipeline.

In [None]:
# docs = load_data(folder_id="1RFhr3-KmOZCR5rtp4dlOMNl3LKe1kOA5")
# nodes = pipeline.run(documents=docs)
# print(f"Ingested {len(nodes)} Nodes")

Notice how only one node is ingested. This is beacuse only one document changed, while the other documents stayed the same. This means that we only need to re-transform and re-embed one document!

### Ask Questions over New Data

In [None]:
query_engine = index.as_query_engine()

In [None]:
response = query_engine.query("What are the sub-types of question answering?")

In [None]:
print(str(response))

The sub-types of question answering mentioned in the context are semantic search, summarization, and structured analytics.
