# How to Make an up to date AI chat bot

ChatGPT and other AI chatbots are amazing, they can write blog posts, emails, fix bugs, document code and have been shown to complete tasks like planning trips all based off of natural language prompts. Despite all of the capabilities they fall short with respect to the most recent information of the world, out-of-domain knowledge or might provide incorrect information without the right prompting.

The solution to helping the language model with those areas that it falls short is sophisticated prompt templating and injecting information into the context. For example, if I were to ask chatGPT, 

>"What happened with Meta in the news today?" 

I would get the response:

> "I'm sorry, but my training data only goes up until September 2021, and I'm unable to provide real-time news updates. To find out the latest news about Meta (formerly known as Facebook), please visit a reliable news website or search for "Meta news" in your preferred search engine."

But I could solve this problem by providing more information as context and the right prompt. For example I could copy and paste an article and change the prompt to something like:

> "Given this article from today: `Article about META in the news today`. Answer the following: What happened with Meta in the news today?"

I now get the response:

> "In the news today, Elon Musk, the CEO of Tesla and owner of Twitter, has accused Meta's messaging app, WhatsApp, of being untrustworthy. The accusation was based on a tweet by a Twitter engineer, Foad Dabiri, who claimed that WhatsApp had been using the microphone in the background without their knowledge. Musk pointed out that WhatsApp cannot be trusted and reminded people that WhatsApp is owned by Meta (previously Facebook) and that its founders left Meta in disagreement over its policies and decisions, particularly regarding personal data processing. This news may have an impact on Meta's public image and its platforms, as trust and privacy are crucial factors for users of social media and messaging apps."

This is basic and over simplified, but the idea is that we can help the language model provide what we ultimately want with a little bit of work asking the right things and giving it the right information as context. 

Since we understand that this works for our one-off use case, how can we make something that will scale to many questions about different subjects? This is where vector databases and embeddings come into play.

## What is an embedding and a vector database?

An embedding is simply a representation of something as a vector, or in other words a list of numbers. It is a way to intelligently turn categorical things into numerical representations so that we can reason about them in different ways, like determine how similar they are group them in multi-dimensional space. A vector database is a database that allows us to store our embeddings as vectors so that we can efficiently retrieve them. Since the vectors can be compared for similarity, our query against the database will be able to return the most similar items.

So if we can consume news as it is created, create embeddings to represent the news and then store it in a vector database. When we ask a question we will be able pull the articles most similar to our query, give them to a large language model as context and then ask the language model to provide us with information. 

To support the ingestion, we will need to make a pipeline that is capable of ingesting data in real-time, processing it, running it against an ML model to create an embedding and then write it to a veector database. We can use Bytewax as the real-time processing framework.

## Building the pipeline

We are going to leverage Bytewax to create a real-time embedding pipeline from a live news source, in this case it is a financial news source via Alpaca. This pipeline could be scaled to consume many different news sources and aggregate them all into a vector database like Qdrant shown below.

In [1]:
# Install requirements
!pip install -r requirements.txt
!pip install unstructured transformers torch sentencepiece pydantic
!pip install qdrant-client==1.1.1







In [2]:
!pip install fake-useragent



To start we will define an input for the pipeline. The input is based of of the `StatelessSource` class and is taking input from Alpaca API which provides a websocket based input. Note that this will not be able to resume and data will be lost if the service goes down. 

In [3]:
import hashlib

In [12]:
import time
from datetime import datetime

from bs4 import BeautifulSoup
from fake_useragent import UserAgent
import requests
from requests.exceptions import RequestException

class WebPage:
    
    def __init__(self, url, headers=None, max_retries=3, wait_time=1):
        
        self.content=''
        self.url=url
        self.max_retries=max_retries
        self.wait_time=wait_time
        if headers:
            self.headers=headers
        else:
            # make a user agent
            ua = UserAgent()

            self.headers = {
                "User-Agent": ua.random,
                "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*"
                ";q=0.8",
                "Accept-Language": "en-US,en;q=0.5",
                "Referer": "https://www.google.com/",
                "DNT": "1",
                "Connection": "keep-alive",
                "Upgrade-Insecure-Requests": "1",
            }
            

    def get_page(self):
        current_wait_time = self.wait_time

        # Send the initial request
        for i in range(self.max_retries + 1):
            try:
                self.response = requests.get(self.url, headers=self.headers)
                self.response.raise_for_status()
                self.content = self.response.content
                break
            except RequestException as e:
                print(f"Request failed (attempt {i + 1}/{self.max_retries}): {e}")
                if i == self.max_retries:
                    print(f"skipping url {self.url}")
                    self.content = ''
                print(f"Retrying in {current_wait_time} seconds...")
                time.sleep(current_wait_time)
                i += 1


In [5]:
import os

from bs4 import BeautifulSoup
from bytewax.dataflow import Dataflow
from bytewax.inputs import DynamicInput, StatelessSource
from bytewax.connectors.stdio import StdOutput


class HTTPSource(StatelessSource):
    def __init__(self, urls, poll_frequency, max_retries=3, wait_time=1):
        self.urls=urls
        self.poll_frequency=poll_frequency
        self.max_retries=max_retries
        self.wait_time=wait_time
        self.poll_time=datetime.now()
        self.counter = 0
        

    def next(self):
        elapsed_time = datetime.now() - self.poll_time
        if self.counter > 0 and elapsed_time.total_seconds() < self.poll_frequency:
            return None
        else:
            start_req = datetime.now()
            webpages = []
            for url in self.urls:
                wp = WebPage(url, max_retries=self.max_retries, wait_time=self.wait_time)
                wp.get_page()
                webpages.append(wp)

            total_req = start_req - datetime.now()
            self.poll_frequency = self.poll_frequency - total_req.total_seconds()
            self.poll_time = datetime.now()
            self.counter += 1
            return webpages
        


class HTTPInput(DynamicInput):
    '''Given a set of urls retrieve the html content from each url'''
    
    def __init__(self, urls, poll_frequency=600, max_retries=3, wait_time=1):
        self.urls=urls
        self.poll_frequency=poll_frequency
        self.max_retries=max_retries 
        self.wait_time=wait_time

    def build(self, worker_index, worker_count):
        urls_per_worker = int(len(self.urls) / worker_count)
        worker_urls = self.urls[
            int(worker_index * urls_per_worker) : int(
                worker_index * urls_per_worker + urls_per_worker
            )
        ]
        return HTTPSource(worker_urls, self.poll_frequency, max_retries=self.max_retries, wait_time=self.wait_time)


flow = Dataflow()
flow.input("input", HTTPInput(["https://news.ycombinator.com/"], poll_frequency=600, max_retries=3, wait_time=1))


## Preprocess data

With our input configured, we will move on to processing our data. We will use pydantic to define a model for our document and then use unstructured to process the text. Finally, to finish preparing the data we will chunk the text into the appropriate lengths.

In [6]:
from unstructured.partition.html import partition_html
from unstructured.cleaners.core import clean, replace_unicode_quotes, clean_non_ascii_chars
# from unstructured.staging.huggingface import chunk_by_attention_window

# from unstructured.staging.huggingface import stage_for_transformers

# from transformers import AutoTokenizer, AutoModel

# tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
# model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")

In [7]:
# news = [json.dumps([{"T":"n","id":32052192,"headline":"What\u0026#39;s Going On With ContraFecta Stock Today","summary":"\n\tContraFect Corporation (NASDAQ: CFRX) shares are trading higher Thursday morning. However, there is no specific news to justify the move.\n","author":"Vandana Singh","created_at":"2023-04-27T18:24:49Z","updated_at":"2023-04-27T18:24:49Z","url":"https://www.benzinga.com/general/biotech/23/04/32052192/whats-going-on-with-contrafecta-stock-today","content":"\u003cul\u003e\r\n\t\u003cli\u003e\u003cstrong\u003eContraFect Corporation\u0026nbsp;\u003c/strong\u003e(NASDAQ:\u003ca class=\"ticker\" href=\"https://www.benzinga.com/stock/CFRX#NASDAQ\"\u003eCFRX\u003c/a\u003e) shares are trading higher Thursday morning. However, there is no specific news to justify the move.\u003c/li\u003e\r\n\t\u003cli\u003eWednesday morning, ContraFect\u0026nbsp;\u003ca class=\"editor-rtfLink\" href=\"https://www.benzinga.com/pressreleases/23/04/g32009856/contrafect-announces-first-patient-dosed-in-the-phase-1b2-study-of-exebacase-in-patients-with-chro\" style=\"color:#4a6ee0; background:transparent; margin-top:0pt; margin-bottom:0pt\" target=\"_blank\"\u003eannounced the dosing\u003c/a\u003e\u0026nbsp;of the first patient in Phase 1b/2 of exebacase in the setting of an arthroscopic debridement, antibiotics, irrigation, and retention procedure in patients with chronic prosthetic joint infections of the knee due to\u0026nbsp;\u003cem\u003eStaphylococcus aureus\u003c/em\u003e\u0026nbsp;or Coagulase-Negative Staphylococci.\u003c/li\u003e\r\n\t\u003cli\u003eThe study was initiated\u0026nbsp;\u003ca class=\"editor-rtfLink\" href=\"https://www.benzinga.com/pressreleases/23/04/g31631700/contrafect-announces-initiation-of-a-phase-1b2-study-of-exebacase-in-patients-with-chronic-prosthe\" style=\"color:#4a6ee0; background:transparent; margin-top:0pt; margin-bottom:0pt\" target=\"_blank\"\u003eearlier this month\u003c/a\u003e.\u003c/li\u003e\r\n\t\u003cli\u003eContraFect stock is gaining on heavy volume, with a session volume of 55 million shares traded, compared to the trailing 100-day volume of 3.08 million shares.\u003c/li\u003e\r\n\t\u003cli\u003eAccording to data from\u0026nbsp;\u003ca class=\"editor-rtfLink\" href=\"https://benzinga.grsm.io/register174\" style=\"color:#4a6ee0; background:transparent; margin-top:0pt; margin-bottom:0pt\" target=\"_blank\"\u003eBenzinga Pro\u003c/a\u003e, CFRX has a 52-week high of $362.4 and a 52-week low of $0.90.\u003c/li\u003e\r\n\t\u003cli\u003e\u003cstrong\u003ePrice Action:\u003c/strong\u003e\u0026nbsp;CFRX shares are up 68.20% at $2.22 on the last check Thursday.\u003c/li\u003e\r\n\u003c/ul\u003e\r\n ","symbols":["CFRX"],"source":"benzinga"}])]

In [8]:
import hashlib
from pydantic import BaseModel
from typing import Any, Optional

class Document(BaseModel):
    id: str
    group_key: Optional[str] = None
    metadata: Optional[dict] = {}
    text: Optional[list]
    embeddings: Optional[list] = []
        
flow.flat_map(lambda x: x)

# recursively get the html from links on the webpage
def recurse_hn(html):
    '''
    Get all the links from the html object and request the webpage 
    and return them in a list of html bs4 objects.
    This should be used in a flat map'''
    webpages = []
    soup = BeautifulSoup(html, 'html.parser')
    items = soup.select("tr[class='athing']")
    for lineItem in items:
        ranking = lineItem.select_one("span[class='rank']").text
        link = lineItem.find("span", {"class": "titleline"}).find("a").get("href")
        title = lineItem.find("span", {"class": "titleline"}).text.strip()
#         metadata = {
#             "source": self.web_path,
#             "title": title,
#             "link": link,
#             "ranking": ranking,
#         }
        wp = WebPage(link)
        wp.get_page()
        webpages.append(wp)
    return webpages

flow.flat_map(lambda webpage: recurse_hn(webpage.content))

flow.inspect(print)

# Clean the code and setup the dataclass
def parse_html(_data):
    document_id = hashlib.md5(_data.encode()).hexdigest()
    document = Document(id = document_id)
    print(_data)
    article_elements = partition_html(text=_data)
    print(article_elements)
    _data['content'] = clean_non_ascii_chars(replace_unicode_quotes(clean(" ".join([str(x) for x in article_elements]))))
    _data['headline'] = clean_non_ascii_chars(replace_unicode_quotes(clean(_data['headline'])))
    _data['summary'] = clean_non_ascii_chars(replace_unicode_quotes(clean(_data['summary'])))

    document.text = [_data['headline'], _data['summary'], _data['content']]
    document.metadata['headline'] = _data['headline']
    document.metadata['summary'] = _data['summary']
    document.metadata['url'] = _data['url']
    document.metadata['symbols'] = _data['symbols']
    document.metadata['author'] = _data['author']
    document.metadata['created_at'] = _data['created_at']
    return (document.id, document)

flow.map(parse_html)

# # chunk the news article and summary
# def chunk(document_id__document):
#     document_id, document = document_id__document
#     chunks = []
#     for text in document.text:
#         chunks += chunk_by_attention_window(text, tokenizer)
    
#     document.text = chunks
#     return(document_id, document)

# flow.map(chunk)
    

In [13]:
from bytewax.testing import run_main

flow.output("out", StdOutput())
run_main(flow)

Request failed (attempt 1/3): 400 Client Error: Bad Request for url: https://twitter.com/elonmusk/status/1657050349608501249
Retrying in 1 seconds...
Request failed (attempt 2/3): 400 Client Error: Bad Request for url: https://twitter.com/elonmusk/status/1657050349608501249
Retrying in 1 seconds...
Request failed (attempt 3/3): 400 Client Error: Bad Request for url: https://twitter.com/elonmusk/status/1657050349608501249
Retrying in 1 seconds...
Request failed (attempt 4/3): 400 Client Error: Bad Request for url: https://twitter.com/elonmusk/status/1657050349608501249
skipping url https://twitter.com/elonmusk/status/1657050349608501249
Retrying in 1 seconds...
Request failed (attempt 1/3): Invalid URL 'item?id=35914929': No scheme supplied. Perhaps you meant https://item?id=35914929?
Retrying in 1 seconds...
Request failed (attempt 2/3): Invalid URL 'item?id=35914929': No scheme supplied. Perhaps you meant https://item?id=35914929?
Retrying in 1 seconds...
Request failed (attempt 3/3):

thread '<unnamed>' panicked at 'Box<dyn Any>', src/operators/mod.rs:29:9



AttributeError: (src/operators/mod.rs:31:14) error calling `map` mapper
Caused by => AttributeError: 'WebPage' object has no attribute 'encode'
Traceback (most recent call last):
  File "/var/folders/lx/0qxsj_495jl6_8vg_0wpj24h0000gn/T/ipykernel_31256/175867626.py", line 44, in parse_html


## Create Embeddings

Embeddings are a vector representation of a sequence of things. It could be words, sound, letters, events. We are using a huggingface  model to transform our chunks of text into embeddings. These will be used later when we want to query for new information.

In [6]:
# create embedding and store in vector db
def embedding(document_id__document):
    document_id, document = document_id__document
    for chunk in document.text:
        inputs = tokenizer(chunk, padding=True, truncation=True, return_tensors="pt", max_length=512)
        result = model(**inputs)
        embeddings = result.last_hidden_state[:, 0, :].cpu().detach().numpy()
        lst = embeddings.flatten().tolist()
        document.embeddings.append(lst)
    return (document_id, document)
    
flow.map(embedding)
    

In [7]:
import datetime

from bytewax.outputs import DynamicOutput, StatelessSink

from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from qdrant_client.models import PointStruct
from qdrant_client.http.api_client import UnexpectedResponse


class _QdrantVectorSink(StatelessSink):
    
    def __init__(self, client, collection_name):
        self._client=client
        self._collection_name=collection_name

    def write(self, id_hash__doc):
        id_hash, doc = id_hash__doc
        _payload = doc.metadata
        _payload.update({"text":doc.text})
        self._client.upsert(
            collection_name=self._collection_name,
            points=[
                PointStruct(
                    id=idx,
                    vector=vector,
                    payload=_payload
                )
                for idx, vector in enumerate(doc.embeddings)
            ]
        )


class QdrantVectorOutput(DynamicOutput):
    """Qdrant.

    Workers are the unit of parallelism.

    Can support at-least-once processing. Messages from the resume
    epoch will be duplicated right after resume.

    """
    def __init__(self, collection_name, vector_size, schema='', host='localhost', port=6333):
        self.collection_name=collection_name
        self.vector_size=vector_size
        self.schema=schema
        self.client=QdrantClient(host, port=6333)
    
        try: 
            self.client.get_collection(collection_name="test_collection")
        except UnexpectedResponse:
            self.client.recreate_collection(
                collection_name="test_collection",
                vectors_config=VectorParams(size=self.vector_size, distance=Distance.COSINE),
                schema=self.schema
            )

    def build(self, worker_index, worker_count):
        
        return _QdrantVectorSink(self.client, self.collection_name)
    

flow.output("output", QdrantVectorOutput("test_collection", 384))



In [11]:
for articles in news:
    for article in json.loads(articles):
        out1 = parse_article(article)
        out2 = chunk(out1)
        out3 = embedding(out2)
        output = QdrantVectorOutput("test_collection", 384)
        output_sink = output.build(1,1)
        output_sink.write(out3)

{'T': 'n', 'id': 32052192, 'headline': 'What&#39;s Going On With ContraFecta Stock Today', 'summary': '\n\tContraFect Corporation (NASDAQ: CFRX) shares are trading higher Thursday morning. However, there is no specific news to justify the move.\n', 'author': 'Vandana Singh', 'created_at': '2023-04-27T18:24:49Z', 'updated_at': '2023-04-27T18:24:49Z', 'url': 'https://www.benzinga.com/general/biotech/23/04/32052192/whats-going-on-with-contrafecta-stock-today', 'content': '<ul>\r\n\t<li><strong>ContraFect Corporation&nbsp;</strong>(NASDAQ:<a class="ticker" href="https://www.benzinga.com/stock/CFRX#NASDAQ">CFRX</a>) shares are trading higher Thursday morning. However, there is no specific news to justify the move.</li>\r\n\t<li>Wednesday morning, ContraFect&nbsp;<a class="editor-rtfLink" href="https://www.benzinga.com/pressreleases/23/04/g32009856/contrafect-announces-first-patient-dosed-in-the-phase-1b2-study-of-exebacase-in-patients-with-chro" style="color:#4a6ee0; background:transparent;

In [9]:
from bytewax.testing import run_main

run_main(flow)

thread '<unnamed>' panicked at 'Box<dyn Any>', src/inputs.rs:465:31
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace



NameError: (src/inputs.rs:466:68) error getting input
Caused by => NameError: name 'url' is not defined
Traceback (most recent call last):
  File "/var/folders/lx/0qxsj_495jl6_8vg_0wpj24h0000gn/T/ipykernel_15143/3766309558.py", line 46, in next


In [None]:
query_string = "what is new on META"
inputs = tokenizer(query_string, padding=True, truncation=True, return_tensors="pt", max_length=512)
result = model(**inputs)
embeddings = result.last_hidden_state[:, 0, :].cpu().detach().numpy()
lst = embeddings.flatten().tolist()

client
query_vector = lst
hits = client.search(
    collection_name="test_collection",
    query_vector=query_vector,
    limit=5  # Return 5 closest points
)