- Live News Analyst: A chatbot that ingests articles from a real-time news API. It can answer
nuanced questions about breaking news stories, with its understanding evolving as new reports,
updates, and corrections are published.


## RAG architecture

#### Import

In [2]:
import pathway as pw
from pathway.stdlib.indexing.nearest_neighbors import BruteForceKnnFactory
import time
from pathway.io.python import ConnectorSubject
#from pathway.xpacks.llm import llms
#from pathway.xpacks.llm.document_store import DocumentStore
# from pathway.xpacks.llm.embedders import OpenAIEmbedder
# from pathway.xpacks.llm.parsers import UnstructuredParser
# from pathway.xpacks.llm.splitters import TokenCountSplitter


In [3]:
from dotenv import load_dotenv
import os

load_dotenv()


True

#### Document Indexing

In [None]:
import requests

def fetch_news():
    url = "https://newsapi.org/v2/top-headlines"
    params = {
        "apiKey": NEWS_API_KEY,
        "language": "en",
        "pageSize": 20
    }

    response = requests.get(url, params=params)

    print("Status Code:", response.status_code)

    data = response.json()
    print("Full Response:", data)

    return data.get("articles", [])


In [5]:
res = fetch_news()

Status Code: 200
Full Response: {'status': 'ok', 'totalResults': 32, 'articles': [{'source': {'id': None, 'name': 'Yahoo Entertainment'}, 'author': 'Yahoo Finance', 'title': "Trump tariffs live updates: EU discusses $108 billion in retaliatory tariffs; Danish PM says Europe 'will not be blackmailed' - Yahoo Finance", 'description': 'President Trump has said the acquisition of Greenland is crucial for the US but European allies, including Denmark and Greenland, remain staunchly opposed to...', 'url': 'https://finance.yahoo.com/news/live/trump-tariffs-live-updates-eu-discusses-108-billion-in-retaliatory-tariffs-danish-pm-says-europe-will-not-be-blackmailed-152657886.html', 'urlToImage': 'https://s.yimg.com/ny/api/res/1.2/j.0soNmn675omzOQsiD3nA--/YXBwaWQ9aGlnaGxhbmRlcjt3PTEyMDA7aD04MDA-/https://s.yimg.com/os/creatr-uploaded-images/2026-01/1f982e20-ec93-11f0-affd-12f1e90fe8c1', 'publishedAt': '2026-01-19T10:18:39Z', 'content': 'EU capitals have entered discussions to implement tariffs of u

In [6]:
print("Number of articles fetched:", len(res))


Number of articles fetched: 18


In [7]:
if res:
    print("Title:", res[0]["title"])
    print("Source:", res[0]["source"]["name"])
else:
    print("No articles found.")

Title: Trump tariffs live updates: EU discusses $108 billion in retaliatory tariffs; Danish PM says Europe 'will not be blackmailed' - Yahoo Finance
Source: Yahoo Entertainment


In [None]:
import requests

def fetch_news():
    url = "https://newsapi.org/v2/top-headlines"
    params = {
        "apiKey": NEWS_API_KEY,
        "language": "en",
        "country": "us", 
        "pageSize": 20
    }
    return requests.get(url, params=params).json()
print(fetch_news())


{'status': 'ok', 'totalResults': 32, 'articles': [{'source': {'id': None, 'name': 'Yahoo Entertainment'}, 'author': 'Yahoo Finance', 'title': "Trump tariffs live updates: EU discusses $108 billion in retaliatory tariffs; Danish PM says Europe 'will not be blackmailed' - Yahoo Finance", 'description': 'President Trump has said the acquisition of Greenland is crucial for the US but European allies, including Denmark and Greenland, remain staunchly opposed to...', 'url': 'https://finance.yahoo.com/news/live/trump-tariffs-live-updates-eu-discusses-108-billion-in-retaliatory-tariffs-danish-pm-says-europe-will-not-be-blackmailed-152657886.html', 'urlToImage': 'https://s.yimg.com/ny/api/res/1.2/j.0soNmn675omzOQsiD3nA--/YXBwaWQ9aGlnaGxhbmRlcjt3PTEyMDA7aD04MDA-/https://s.yimg.com/os/creatr-uploaded-images/2026-01/1f982e20-ec93-11f0-affd-12f1e90fe8c1', 'publishedAt': '2026-01-19T10:18:39Z', 'content': 'EU capitals have entered discussions to implement tariffs of up to $107.71 billion on American

In [9]:
res[0]['content']

'EU capitals have entered discussions to implement tariffs of up to $107.71 billion on American products after President Trump posted his plan to levy new tariffs on Europe on Saturday. Trump said the‚Ä¶ [+15732 chars]'

### Data ingestion

##### Create a Pathway input schema

In [10]:
import pathway as pw

class NewsSchema(pw.Schema):
    article_id: str = pw.column_definition(primary_key=True)
    title: str
    content: str
    source: str
    published_at: str
    url: str


##### Costom connector to fetch data from news api

In [11]:
class NewsApiConnector(ConnectorSubject):
    def __init__(self, api_key):
        super().__init__()
        self.api_key = api_key
        self.url = "https://newsapi.org/v2/top-headlines"

    def run(self):
        params = {'q': 'technology', 'apiKey': self.api_key, "language": "en", "country": "us", "pageSize": 20}
        while True:
            res = requests.get(self.url, params=params).json()
            if res.get("status") != "ok":
                print(f"News api error: {res}")
                time.sleep(60)
                continue

            articles = res.get('articles', [])
            print(f"Fetched {len(articles)} articles")
            for article in articles:
                url = article.get("url")
                if not url:
                    continue

                #Mapping api field to my schima field
                self.next(
                    article_id=article.get('url'), # Using URL as a unique ID
                    title=article.get('title') or "",
                    content=article.get('content') or article.get('description') or  "",
                    source=article.get('source', {}).get('name'),
                    published_at=article.get('publishedAt', ""),
                    url=article.get('url')
                )
            time.sleep(900)

##### Initialise connector

In [None]:
news_connector = NewsApiConnector(api_key=NEWS_API_KEY)

##### Create streaming table using schema

In [13]:
news_table = pw.io.python.read(news_connector, schema=NewsSchema)

### Define transformation

##### 1Ô∏è‚É£ Convert News Rows ‚Üí Documents

- Each row becomes one document.

In [14]:
documents = news_table.select(
    doc_id=pw.this.article_id,
    text=pw.this.title + "\n\n" + pw.this.content,
    published_at=pw.this.published_at,
    source=pw.this.source,
    url=pw.this.url,
)


##### 2Ô∏è‚É£ Chunk the Documents (CRITICAL)

In [15]:
import pathway as pw
import tiktoken

enc = tiktoken.get_encoding("cl100k_base")

@pw.udf
def token_count_split(
    text: str,
    chunk_size: int,
    chunk_overlap: int,
):
    if not text:
        return []

    tokens = enc.encode(text)
    chunks = []

    start = 0
    n = len(tokens)

    while start < n:
        end = start + chunk_size
        chunks.append(enc.decode(tokens[start:end]))
        start = max(end - chunk_overlap, 0)

    return chunks


In [16]:
chunks = documents.select(
    doc_id=pw.this.doc_id,
    chunks=token_count_split(
        pw.this.text,
        400,
        50,
    ),
    published_at=pw.this.published_at,
    source=pw.this.source,
    url=pw.this.url,
)

chunks = chunks.flatten(pw.this.chunks).select(
    doc_id=pw.this.doc_id,
    chunk=pw.this.chunks,
    published_at=pw.this.published_at,
    source=pw.this.source,
    url=pw.this.url,
)


##### 3Ô∏è‚É£ Embed the Chunks

In [19]:
import hashlib

def chunk_id(article_id, chunk_text):
    return hashlib.sha1(
        f"{article_id}:{chunk_text}".encode("utf-8")
    ).hexdigest()



In [20]:
import pathway as pw
from sentence_transformers import SentenceTransformer

model = SentenceTransformer("all-MiniLM-L6-v2")

@pw.udf
def embed_text(text: str):
    if not text:
        return []
    return model.encode(text).tolist()


In [21]:
import hashlib

@pw.udf
def chunk_id(doc_id: str, chunk: str) -> str:
    h = hashlib.sha1(f"{doc_id}:{chunk}".encode()).hexdigest()
    return h


In [22]:
embedded_chunks = chunks.select(
    chunk_id=pw.apply(chunk_id, pw.this.doc_id, pw.this.chunk),
    vector=embed_text(pw.this.chunk),
    text=pw.this.chunk,
    published_at=pw.this.published_at,
    source=pw.this.source,
    url=pw.this.url,
)


In [25]:
print(embedded_chunks.schema)

id          | chunk_id | vector | text | published_at | source | url
ANY_POINTER | STR      | ANY    | ANY  | STR          | STR    | STR


##### 4Ô∏è‚É£ Build the Vector Index

In [26]:
from pathway.stdlib.ml.index import KNNIndex  # Note: ml.index, NOT indexing

In [27]:
index = KNNIndex(
    embedded_chunks.vector,
    embedded_chunks,
    n_dimensions=384 
)

##### 5Ô∏è‚É£ Create Query Input (User Questions)

In [28]:
import pathway as pw

class QuerySchema(pw.Schema):
    query: str

webserver = pw.io.http.PathwayWebserver(host="0.0.0.0", port=8080)
queries, response_writer = pw.io.http.rest_connector(
    webserver=webserver,
    route="/query",  # or your desired route
    schema=QuerySchema
)


    https://beartype.readthedocs.io/en/latest/api_roar/#pep-585-deprecations
  warn(
  warn(


In [33]:
print(queries.typehints())
print(embedded_chunks.typehints())

{'query': <class 'str'>}
{'chunk_id': <class 'str'>, 'vector': typing.Any, 'text': typing.Any, 'published_at': <class 'str'>, 'source': <class 'str'>, 'url': <class 'str'>}


In [35]:
print(embedded_chunks.schema)
print(chunks.schema)
print(documents.schema)
print(queries.schema)


id          | chunk_id | vector | text | published_at | source | url
ANY_POINTER | STR      | ANY    | ANY  | STR          | STR    | STR
id          | doc_id | chunk | published_at | source | url
ANY_POINTER | STR    | ANY   | STR          | STR    | STR
id           | doc_id | text | published_at | source | url
Pointer(STR) | STR    | STR  | STR          | STR    | STR
id          | query
ANY_POINTER | STR  


In [64]:
#Embed query
query_vectors = queries.select(
    query_id=pw.this.id,
    query=pw.this.query,
    vector=embed_text(pw.this.query),
)

##### 6Ô∏è‚É£ Retrieve Relevant News Chunks

In [49]:
nearest = index.get_nearest_items(
    query_vectors.vector,
    k=5
)


In [52]:
print(nearest.schema)
print(embedded_chunks.schema)

id          | chunk_id | vector | text | published_at | source | url
ANY_POINTER | ANY      | ANY    | ANY  | ANY          | ANY    | ANY
id          | chunk_id | vector | text | published_at | source | url
ANY_POINTER | STR      | ANY    | ANY  | STR          | STR    | STR


In [53]:
# retrieved = nearest.join(
#     embedded_chunks,
#     pw.left.chunk_id == pw.right.chunk_id
# ).select(
#     text=pw.right.text,
#     source=pw.right.source,
#     url=pw.right.url,
#     published_at=pw.right.published_at,
# )


retrieved = nearest.join(
    embedded_chunks,
    pw.left.id == pw.right.id
).select(
    text=pw.right.text,
    source=pw.right.source,
    url=pw.right.url,
    published_at=pw.right.published_at,
)


##### 7Ô∏è‚É£ Build context for LLM

In [68]:
# context_table = retrieved.reduce(
#     texts=pw.reducers.collect(pw.this.text)
# )

context_table = retrieved.reduce(
    texts=pw.reducers.tuple(pw.this.text),
    emit_on="append"
)

# context = context_table.select(
#     context=pw.apply(
#         lambda texts: "\n\n".join(
#             t if isinstance(t, str) else t[0] for t in texts
#         ),
#         pw.this.texts,
#     )
# )


context = context_table.select(
    query_id=pw.this.id,
    context=pw.apply(
        lambda texts: "\n\n".join(texts[:3]),
        pw.this.texts
    )
)

In [None]:

prompt = context.join(
    queries,
    pw.left.query_id  == pw.right.id,  # Use id, not query_id
).select(
    prompt=pw.apply(
        lambda c, q: f"""
You are a helpful AI assistant.
Use ONLY the context below to answer the question.

Context:
{c}

Question:
{q}

Answer:
""".strip(),
        pw.left.context,
        pw.right.query
    )
)



##### 8Ô∏è‚É£ Generate Final Answer (LLM)

In [None]:
# from pathway.xpacks.llm.llms import OpenAIChat

# llm = OpenAIChat(model="gpt-4o-mini", api_key=os.environ["OPENAI_API_KEY"], temperature=0)

# answers = prompt.select(
#     answer=llm(pw.this.prompt)
# )


from pathway.xpacks.llm.llms import OpenAIChat
import os

llm = OpenAIChat(
    model="llama3-70b-8192",
    api_key=os.environ["GROQ_API_KEY"],
    base_url="https://api.groq.com/openai/v1",
    temperature=0,
)
answers = prompt.select(
    answer=llm(pw.this.prompt)
)

##### 9Ô∏è‚É£ Return Answer via HTTP

In [None]:
response_writer(
    answers.select(
        answer=pw.this.answer
    )
)

In [None]:
pw.io.http.write(
    answers,
    host="0.0.0.0",
    port=8080,
)

##### üîü Run the Application

In [None]:
# pw.run()
pw.run(
    answers,
    host="0.0.0.0",
    port=8080
)
