<a href="https://colab.research.google.com/github/pathwaycom/pathway/blob/main/examples/notebooks/showcases/live_vector_indexing_pipeline.ipynb" target="_parent"><img src="https://pathway.com/assets/colab-badge.svg" alt="Run In Colab" class="inline"/></a>

# Installing Pathway with Python 3.10+

In the cell below, we install Pathway into a Python 3.10+ Linux runtime.

> **If you are running in Google Colab, please run the colab notebook (Ctrl+F9)**, disregarding the 'not authored by Google' warning.
>
> **The installation and loading time is less than 1 minute**.


In [1]:
%%capture --no-display
!pip install --prefer-binary pathway

In [2]:
import logging

logging.basicConfig(level=logging.CRITICAL)

# Always Up-to-date Data Indexing pipeline

This showcase shows how to use Pathway to deploy a live data indexing pipeline, which can be queried like a typical vector store. However, under the hood, Pathway updates the index on each data change, always giving up-to-date answers.
<!-- canva link: https://www.canva.com/design/DAF1cxQW5Vg/LcFdDrPApBrgwM5kyirY6w/edit  -->
::article-img
---
src: '/assets/content/showcases/vectorstore/vectorstore_doc.png'
alt: 'Pathway data indexing pipeline'
class: 'mx-auto'
zoomable: true
---
::

Pathway Vectorstore enables building a document index on top of you documents without the
complexity of ETL pipelines, managing different containers for storing, embedding, and serving.
It allows for easy to manage, always up-to-date, LLM pipelines accesible using a RESTful API
and with integrations to popular LLM toolkits such as Langchain and LlamaIndex.


In this article, we will use a simple document processing pipeline that:
1. Monitors several data sources (files, S3 folders, cloud storages) for data changes.
2. Parses, splits and embeds the documents.
3. Builds a vector index for the data.

However, If you prefer not to create the pipeline from the ground up and would like to check out the functionality,
take a look at our [`managed pipelines`](https://cloud.pathway.com/docindex) in action.

We will connect to the index using a `VectorStore` client, which allows retrieval of semantically similar documents.

## Prerequisites

Install the `pathway` package. You can also install the `unstructured` package to use the most powerful `unstructured.io`-based parser.

Then download sample data.

In [3]:
!pip install pathway litellm
# !pip install unstructured[all-docs]
!mkdir -p sample_documents
![ -f sample_documents/repo_readme.md ] || wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'sample_documents/repo_readme.md'

Collecting litellm
  Downloading litellm-1.52.3-py3-none-any.whl.metadata (32 kB)
Collecting openai>=1.54.0 (from litellm)
  Downloading openai-1.54.3-py3-none-any.whl.metadata (24 kB)
Collecting python-dotenv>=0.2.0 (from litellm)
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Collecting tiktoken>=0.7.0 (from litellm)
  Downloading tiktoken-0.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Downloading litellm-1.52.3-py3-none-any.whl (6.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.4/6.4 MB[0m [31m45.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading openai-1.54.3-py3-none-any.whl (389 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m389.6/389.6 kB[0m [31m20.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Downloading tiktoken-0.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [4]:
import logging
import sys
import time

logging.basicConfig(stream=sys.stderr, level=logging.WARN, force=True)

## Building the data pipeline

First, make sure you have an API key with an LLM provider such as OpenAI.

In [5]:
import getpass
import os

os.environ["GOOGLE_API_KEY"] = 'AIzaSyDQPQr_pWALivoVPqIKC6TfHi4AsUBGMm0'

We will now assemble the data vectorization pipeline, using a simple `UTF8` file parser, a  character splitter and an embedder from the [Pathway LLM xpack](/developers/user-guide/llm-xpack/overview).

First, we define the data sources. We use the files-based one for simplicity, but any supported `pathway` [connector](/developers/api-docs/pathway-io/), such as [s3](/developers/api-docs/pathway-io/s3/) or [Google Drive](/developers/api-docs/pathway-io/gdrive#pathway.io.gdrive.read) will also work.

Then, we define the embedder and splitter.

Last, we assemble the data pipeline. We will start it running in a background thread to be able to query it immediately from the demonstration. Please note that in a production deployment, the server will run in another process, possibly on another machine. For the quick-start, we keep the server and client as different threads of the same Python process.

In [6]:
import pathway as pw

# To use advanced features with Pathway Scale, get your free license key from
# https://pathway.com/features and paste it below.
# To use Pathway Community, comment out the line below.
pw.set_license_key("demo-license-key-with-telemetry")


# This creates a connector that tracks files in a given directory.
data_sources = []
data_sources.append(
    pw.io.fs.read(
        "./sample_documents",
        format="binary",
        mode="streaming",
        with_metadata=True,
    )
)

# This creates a connector that tracks files in Google Drive.
# Please follow the instructions at /developers/user-guide/connect/connectors/gdrive-connector/ to get credentials.
# data_sources.append(
#     pw.io.gdrive.read(object_id="17H4YpBOAKQzEJ93xmC2z170l0bP2npMy", service_user_credentials_file="credentials.json", with_metadata=True))

In [8]:
# We now build the VectorStore pipeline

from pathway.xpacks.llm.embedders import GeminiEmbedder
from pathway.xpacks.llm.splitters import TokenCountSplitter
from pathway.xpacks.llm.vector_store import VectorStoreClient, VectorStoreServer

PATHWAY_PORT = 8765

# Choose document transformers
text_splitter = TokenCountSplitter()
embedder = GeminiEmbedder(api_key=os.environ["GOOGLE_API_KEY"])

# The `PathwayVectorServer` is a wrapper over `pathway.xpacks.llm.vector_store` to accept LangChain transformers.
# Fell free to fork it to develop bespoke document processing pipelines.
vector_server = VectorStoreServer(
    *data_sources,
    embedder=embedder,
    splitter=text_splitter,
)
vector_server.run_server(host="127.0.0.1", port=PATHWAY_PORT, threaded=True, with_cache=False)
time.sleep(30)  # Workaround for Colab - messages from threads are not visible unless a cell is running

    https://beartype.readthedocs.io/en/latest/api_roar/#pep-585-deprecations
  warn(


(Press CTRL+C to quit)


We now instantiate and configure the client

In [9]:
client = VectorStoreClient(
    host="127.0.0.1",
    port=PATHWAY_PORT,
)

And we can start asking queries

In [10]:
query = "What is Pathway?"
docs = client(query)
docs

ReadTimeout: HTTPConnectionPool(host='127.0.0.1', port=8765): Read timed out. (read timeout=15)

**Your turn!** Now make a change to the source documents or make a fresh one and retry the query!

## Integrations

### Langchain

You can use a Pathway Vector Store in LangChain pipelines with `PathwayVectorClient`
and configure a `VectorStoreServer` using LangChain components. For more information see [our article](/developers/templates/langchain-integration) or [LangChain documentation](https://python.langchain.com/v0.1/docs/integrations/vectorstores/pathway/).


In [None]:
!pip install langchain
!pip install langchain-openai
!pip install langchain-community

```python
from langchain_community.vectorstores import PathwayVectorClient

# PathwayVectorClient implements regular VectorStore API of LangChain
client = PathwayVectorClient(host="127.0.0.1", port=PATHWAY_PORT)
docs = client.similarity_search("What is Pathway?")
```

In [None]:
# Here we show how to configure a server that uses LangChain document processing components

from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter

# Choose proper LangChain document transformers
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
embeddings_model = OpenAIEmbeddings(openai_api_key=os.environ["OPENAI_API_KEY"])

# Use VectorStoreServer.from_langchain_components to create a vector server using LangChain
# document processors
vector_server = VectorStoreServer.from_langchain_components(
    *data_sources,
    embedder=embeddings_model,
    splitter=text_splitter,
)
vector_server.run_server(host="127.0.0.1", port=PATHWAY_PORT+1, threaded=True, with_cache=False)
time.sleep(30)  # colab workaround

In [None]:
# You can connect to the Pathway+LangChain server using any client - Pathway's, Langchain's or LlamaIndex's!
client = VectorStoreClient(
    host="127.0.0.1",
    port=PATHWAY_PORT+1,
)

client.query("pathway")

### LlamaIndex

Pathway is fully integrated with LlamaIndex! We show below how to instantiate a Llama-Index
retriever that queries the Pathway VectorStoreServer
and how to configure a server using LlamaIndex components.

For more information see `Pathway Retriever`
[cookbook](https://docs.llamaindex.ai/en/stable/examples/retrievers/pathway_retriever.html).

In [None]:
!pip install llama-index llama-index-retrievers-pathway llama-index-embeddings-openai

In [None]:
# You can connect to the PathwayVectorStore using a llama-index compatible retriever
from llama_index.retrievers.pathway import PathwayRetriever

# PathwayRetriever implements the Retriever interface
pr = PathwayRetriever(host="127.0.0.1", port=PATHWAY_PORT)
pr.retrieve(str_or_query_bundle="What is Pathway?")

In [None]:
# Here we show how to configure a server that uses LlamaIndex document processing components

from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import TokenTextSplitter

# Choose proper LlamaIndex document transformers
embed_model = OpenAIEmbedding(embed_batch_size=10)

transformations_example = [
    TokenTextSplitter(
        chunk_size=150,
        chunk_overlap=10,
        separator=" ",
    ),
    embed_model,
]

# Use VectorStoreServer.from_llamaindex_components to create a vector server using LlamaIndex
# document processors
vector_server = VectorStoreServer.from_llamaindex_components(
    *data_sources,
    transformations=transformations_example,
)
vector_server.run_server(host="127.0.0.1", port=PATHWAY_PORT+2, threaded=True, with_cache=False)
time.sleep(30)  # colab workaround

In [None]:
# You can connect to the Pathway+LlamaIndex server using any client - Pathway's, Langchain's or LlamaIndex's!
client = VectorStoreClient(
    host="127.0.0.1",
    port=PATHWAY_PORT+2,
)

client.query("pathway")

## Advanced topics

### Getting information on indexed files

[`PathwayVectorClient.get_vectorstore_statistics()`](/developers/api-docs/pathway-xpacks-llm/vectorstore#pathway.xpacks.llm.vector_store.VectorStoreClient.get_vectorstore_statistics) gives essential statistics on the state of the vector store, like the number of indexed files and the timestamp of the last updated one. You can use it in your chains to tell the user how fresh your knowledge base is.

In [None]:
client.get_vectorstore_statistics()

You can also use [`PathwayVectorClient.get_input_files()`](/developers/api-docs/pathway-xpacks-llm/vectorstore#pathway.xpacks.llm.vector_store.VectorStoreClient.get_input_files) to get the list of indexed files along with the associated metadata.

In [None]:
client.get_input_files()

### Filtering based on file metadata

We support document filtering using [jmespath](https://jmespath.org/) expressions, for instance:

In [None]:
# take into account only sources modified later than unix timestamp
docs = client(query, metadata_filter="modified_at >= `1702672093`")

# take into account only sources modified later than unix timestamp
docs = client(query, metadata_filter="owner == `james`")

# take into account only sources with path containing 'repo_readme'
docs = client(query, metadata_filter="contains(path, 'repo_readme')")

# and of two conditions
docs = client(query, metadata_filter="owner == `james` && modified_at >= `1702672093`")

# or of two conditions
docs = client(query, metadata_filter="owner == `james` || modified_at >= `1702672093`")

### Configuring the parser

The vectorization pipeline supports pluggable parsers. If not provided, defaults to `UTF-8` parser. You can find available parsers [here](https://github.com/pathwaycom/pathway/blob/main/python/pathway/xpacks/llm/parsers.py).
An example parser that can read PDFs, Word documents and other formats is provided with `parsers.ParseUnstructured`:

In [None]:
# !pip install unstructured[all-docs]  # if you will need to parse complex documents

```python
from pathway.xpacks.llm import parsers

vector_server = VectorStoreServer(
    *data_sources,
    parser=parsers.ParseUnstructured(),
    embedder=embeddings_model,
    splitter=text_splitter,
)
```

### Configuring the cache

The Pathway vectorizing pipeline comes with an embeddings cache:
```python
vector_server.run_server(..., with_cache=True)
```

The default cache configuration is the locally hosted disk cache, stored in the `./Cache` directory. However, it can be customized by explicitly specifying the caching backend chosen among several persistent backend [options](/developers/api-docs/persistence-api#pathway.persistence.Backend).

### Running in production

A production deployment will typically run the server in a separate process. We recommend running the Pathway data indexing pipeline in a container-based deployment environment like Docker or Kubernetes. For more info, see [Pathway's deployment guide](/developers/user-guide/deployment/docker-deployment/).

::shoutout-banner
---
href: "https://discord.gg/pathway"
icon: "ic:baseline-discord"
---
#title
Discuss tricks & tips for RAG
#description
Join our Discord community and dive into discussions on tricks and tips for mastering Retrieval Augmented Generation
::