<a href="https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/examples/data_connectors/PathwayReaderDemo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Pathway Reader

> [Pathway](https://pathway.com/) is an open data processing framework. It allows you to easily develop data transformation pipelines and Machine Learning applications that work with live data sources and changing data.

This notebook shows how to use the Pathway to deploy a live data indexing pipeline which can be queried from reader. You can add documents to Pathway from existing connectors or create your own connector with Python ensuring your LLM stays up to date with latest information. 

## Prequisites

In [None]:
!pip install pathway
!pip install llama-index

In [None]:
import logging
import sys

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

If there is no Pathway instance running, we need to start one.
For the demo, lets create an instance that listens local files.

In [None]:
from llama_index.retrievers import PathwayVectorServer

In [None]:
import getpass
import os
import pathway as pw

# omit if embedder of choice is not OpenAI
if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")

## Define inputs Pathway will listen to

In [None]:
data_sources = []
data_sources.append(
    pw.io.fs.read(
        "../data/paul_graham",
        format="binary",
        mode="streaming",
        with_metadata=True,
    )  # This creates a `pathway` connector that tracks
    # all the files in the `data/paul_graham` directory.
)

# We can add more connectors from various sources/formats with pw.io.
# This creates a connector that tracks files in Google drive.
# please follow the instructions at https://pathway.com/developers/tutorials/connectors/gdrive-connector/ to get credentials

# data_sources.append(
#     pw.io.gdrive.read(object_id="17H4YpBOAKQzEJ93xmC2z170l0bP2npMy", service_user_credentials_file="credentials.json", with_metadata=True))

## Create document transformation pipeline

In [None]:
from llama_index.embeddings import OpenAIEmbedding
from llama_index.node_parser import TokenTextSplitter

embed_model = OpenAIEmbedding(embed_batch_size=10)

transformations_example = [
    TokenTextSplitter(
        chunk_size=80,
        chunk_overlap=40,
        separator=" ",
    ),
    embed_model,
]

## Run the Pathway

In [None]:
pr = PathwayVectorServer(
    *data_sources,
    transformations=transformations_example,
)

# Define the Host and port that Pathway will be on
PATHWAY_HOST = "127.0.0.1"
PATHWAY_PORT = 8754

pr.run_server(
    host=PATHWAY_HOST, port=PATHWAY_PORT, with_cache=False, threaded=True
)

<Thread(Thread-5 (run), started 140155873506880)>

[2023-12-21T17:37:11]:INFO:Preparing Pathway computation
[2023-12-21T17:37:11]:ERROR:unhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-5' coro=<_run_app() done, defined at /home/berke/miniconda3/envs/langchain310/lib/python3.10/site-packages/aiohttp/web.py:303> exception=OSError(98, "error while attempting to bind on address ('127.0.0.1', 8754): address already in use")>
Traceback (most recent call last):
  File "/home/berke/miniconda3/envs/langchain310/lib/python3.10/site-packages/aiohttp/web.py", line 544, in run_app
    loop.run_until_complete(main_task)
  File "/home/berke/miniconda3/envs/langchain310/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/home/berke/miniconda3/envs/langchain310/lib/python3.10/site-packages/aiohttp/web.py", line 448, in _run_app
    await site.start()
  File "/home/berke/miniconda3/envs/langchain310/lib/python3.10/site-packages/aiohttp/web_runner.py", line 119, in s

78


[2023-12-21T17:37:13]:INFO:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
[2023-12-21T17:37:13]:INFO:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
[2023-12-21T17:37:13]:INFO:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
[2023-12-21T17:37:13]:INFO:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
[2023-12-21T17:37:17]:INFO:FilesystemReader-0: 1 entries (5 minibatch(es)) have been sent to the engine
[2023-12-21T17:37:23]:INFO:FilesystemReader-0: 0 entries (4 minibatch(es)) have been sent to the engine


## Define `reader` client for Pathway

In [None]:
from llama_index.readers.pathway import PathwayReader

In [None]:
reader = PathwayReader(host=PATHWAY_HOST, port=PATHWAY_PORT)

In [None]:
# let us create some dummy vector that is compatible with OpenAI embeddings
reader.load_data(query_text="some search input")

[2023-12-20T13:08:16]:INFO:PythonReader-1: 1 entries (1119 minibatch(es)) have been sent to the engine


[2023-12-20T13:08:17]:INFO:127.0.0.1 [20/Dec/2023:13:08:16 +0100] "POST / HTTP/1.1" 200 5230 "-" "python-requests/2.31.0"


[Document(id_='8182945624577964255', embedding=None, metadata={'created_at': None, 'modified_at': 1702989823, 'owner': 'berke', 'path': '/home/berke/IoT-Pathway/experimental/berke/llama_reader/sample_documents/repo_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='05e6d1a99ca31c406061f2257fcdc9f45c0d5a31696fa11a3a75876e73fdb336', text='Pathway is an open framework for high-throughput and low-latency real-time data processing. It is used to create Python code which seamlessly combines batch processing, streaming, and real-time API\'s for LLM apps. Pathway\'s distributed runtime (🦀-🐍) provides fresh results of your data pipelines whenever new inputs and requests are received.\n\nIn the first place, Pathway was designed to be a life-saver (or at least a time-saver) for Python developers and ML/AI engineers faced with live data sources, where you need to react quickly to fresh data. Still, Pathway is a powerful tool that can be used for a 

[2023-12-20T13:08:21]:INFO:PythonReader-1: 1 entries (101 minibatch(es)) have been sent to the engine
[2023-12-20T13:08:26]:INFO:PythonReader-1: 0 entries (101 minibatch(es)) have been sent to the engine


## Create a summary index with llama-index

In [None]:
docs = reader.load_data(query_text="some search input", k=2)

[2023-12-20T11:42:27]:INFO:127.0.0.1 [20/Dec/2023:11:42:26 +0100] "POST / HTTP/1.1" 200 9509 "-" "python-requests/2.31.0"


[2023-12-20T11:42:27]:INFO:PythonReader-1: 2 entries (101 minibatch(es)) have been sent to the engine
[2023-12-20T11:42:32]:INFO:PythonReader-1: 0 entries (100 minibatch(es)) have been sent to the engine


In [None]:
from llama_index.indices.list import SummaryIndex

In [None]:
index = SummaryIndex.from_documents(docs)

In [None]:
query_engine = index.as_query_engine()
response = query_engine.query("What is Pathway?")

[2023-12-20T11:43:43]:INFO:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


In [None]:
print(response)

Pathway is an open framework for high-throughput and low-latency real-time data processing. It is a powerful tool that provides a high-level programming interface in Python for defining data transformations, aggregations, and other operations on data streams. Pathway is interoperable with various data sources and sinks such as Kafka, CSV files, SQL/noSQL databases, and REST API's, allowing users to connect and process data from different storage systems. It is commonly used for real-time data processing, ETL pipelines, data analytics, monitoring, anomaly detection, and recommendation. Pathway represents data in the form of Tables and provides a rich set of operations like filtering, joining, grouping, and windowing. It can be easily installed using pip and supports multithreading. Pathway also comes with a monitoring dashboard and supports Prometheus for monitoring purposes.
