# Streaming

We want to add the feature that we show tokens arriving from the LLM as soon as they arrive. To do this we make use of some things that are new to me;
- queueing
- python generator objects (lazy lists)
- calling an object
- a LangChain handler, and 
- threading for concurrency of processes. 

## Queueing 


There is a class native to python for this.

In [1]:
from queue import Queue

queue = Queue()

# Put items in
for i in range(5):
    queue.put(item=i+9)

print(f"The type of a queue is {type(queue)}.")
for i in range(queue.qsize()):
    print(f"The {i}th element of the queue is {queue.get()}, "
    f"and {queue.qsize()} elements remain.")

The type of a queue is <class 'queue.Queue'>.
The 0th element of the queue is 9, and 4 elements remain.
The 1th element of the queue is 10, and 3 elements remain.
The 2th element of the queue is 11, and 2 elements remain.
The 3th element of the queue is 12, and 1 elements remain.
The 4th element of the queue is 13, and 0 elements remain.


## Generator Objects


Python generator objects are lasy lists. This means that they do not store their contents in memory, but figure out their contents when needed. They are iterables. They can be created in two equivalent ways
1. generator comprehensions
2. generator functions

In [2]:
# A generator comprehension.
g = (i for i in range(5)) 
print(type(g))
print(next(g)) # next(g) calculates, removes, and returns the first element.
print(next(g))
for i in g: # so does iteration.
    print(f"and then {i}")
for i in g: # Now g is empty
    print(f"and then {i}")

<class 'generator'>
0
1
and then 2
and then 3
and then 4


In [3]:
# A generator function is built using yield.
# When the yiled line is run, the function stops running until the next time
# an element of the generator is invoked.

from time import sleep
def g():
    for i in range(5):
        yield i
        yield i+10
        sleep(1)

# An element gets added to g() every 2 seconds.
# The for loop is not exited when 
# the second element is looked for and doesn't exist yet.
# Rather, the loop waits for that second item to be created. 
for i in g():
    print(i, end=", ")

0, 10, 1, 11, 2, 12, 3, 13, 4, 14, 

Infinite sequences are possible with generators. Be careful. 

In [4]:
def infinite_sequence():
    n = 0
    while True:
        n += 1 
        yield n

for i in infinite_sequence():
    print(i,end=", ")
    if i == 3:
        break

1, 2, 3, 

Side note: The yield statement can be used to assign values to a variable; when a generator function's send method is used, the argument of `send()` is passed to the function AND the next item is returned. 

In [5]:
def g():
    for i in range(10):
        x = yield i
        print(x) 

g = g()
# print(next(g))
# next(g) # You can't send when there has been no yield statement yet.
print(g.send(None))
print(g.send("hello"))
g.send("world"); # Semicolon so that Jupyter does not print the last line.

0
hello
1
world


## Calling an Object

While objects are not functions because they have more structure, they have an aspect that acts like a function if they have a `__call__` method.  One can use this method for whatever is needed. There are three ways to call an object.

1. If the name of the instance is `foo_instance` then `foo_instance.__call__` is the call method.
2. If the name of the instance is `foo_instance` then `foo_instance()` runs the `__call__` method.
3. Within the definitions of the methods, the name of the instance is not accessable; it is referred to by `self`. Thus `self()` is the way of calling the instance from inside the methods themselves. 

In [6]:
class Foo:

    def __init__(self,x):
        self.x = x
        print("Initializing")
        self.state = "initiated"

    def __call__(self,x):
        self.x = x
        print("calling")
        self.state = "called"

    def rip(self,x):
        print("riping", self.x)
        self.x = x 
        print("calling via self", self.x)
        self(self.x)

foo_instance = Foo(1) # Instantiate including initialize. No calling.
print(foo_instance.state, foo_instance.x) 
foo_instance(2) # This IS calling the __call__ method.
print(foo_instance.state, foo_instance.x)
foo_instance.__call__(3) # This IS calling the __call__ method.
print(foo_instance.state, foo_instance.x)
foo_instance.rip(4) # This method has one line that also runs the __call__ method.
print(foo_instance.state, foo_instance.x)



Initializing
initiated 1
calling
called 2
calling
called 3
riping 3
calling via self 4
calling
called 4


## Handler

We did use handlers previously.

## Threading 

Threading is a form of concurrency; it has multiple jobs run on the same resources (like CPUs). It is thus NOT parallelism. 

In [7]:
from threading import Thread

# Define a task to run in concurrency.
def task(s):
    print(s) # To mark that this line has been read.
    # The next line will take a second.
    print(sum( [x**2 for x in range(10_000_000) ])) 

Thread(
    target=task, # name of the function to run concurrent with the next line.
    # kwargs={"s":"Starting\n"} # One way to pass arguments to the target.
    args=["Starting task\n"] # Another way to pass args.
    ).start() # Start running it now. 
print("but first we get here, then..")

Starting task
but first we get here, then..

333333283333335000000


# Code

In [8]:
from dotenv import load_dotenv

load_dotenv() # For the OpenAI API key

# we will create a child class
from langchain.callbacks.base import BaseCallbackHandler 

class StreamingHandler(BaseCallbackHandler):
    """
    There are some special method names we can access
    that observe events broadcast by chains using this callback.
    We use them to modify our queue in response to events.
    1. when the LLM generates a new token, add it to a queue
    2. When the LLM is done generating tokens, add None to the queue. 
    3. When the LLM throws an error, add None to the queue.
    """

    # Queue object as a property so each chat has its own handler. 
    def __init__(self,queue):
        self.queue = queue 
    # When a new token becomes available add it to the queue. 
    def on_llm_new_token(self,
                         token, # required for this special method.
                         **kwargs # required for this special method.
                         ):
        self.queue.put(token)
    # When the LLM ends its response put None in the queue as a signal we use.
    def on_llm_end(self, 
                   response,  # required for this special method.
                   **kwargs # required for this special method.
                   ):
        self.queue.put(None)
    # When the LLM throws an error put None in the queue as a signal we use.
    def on_llm_error(self, 
                   error,  # required for this special method.
                   **kwargs # required for this special method.
                   ):
        self.queue.put(None)


In [9]:
# Define a custom chain based on the LLMChain.
# The purpose is to have a stream method. 
# The streaming is facilitated by threading.
from langchain.chains import LLMChain
from threading import Thread

class StreamingChain(LLMChain):
    # chain.stream(<any string>) will be our generator object
    # from which to put new tokens arriving from the LLM,
    # and then print those tokens immediately after arrival.
    # A generator function:
    def stream(self,input):
        # Each streamingChain should have its own queue and handler
        # to keep the chats separate. 
        queue = Queue()
        handler = StreamingHandler(queue)
        # Define a task to run in concurrency.
        def task():
            self( # Without threading this blocks our while loop.
                input, # Human input.
                callbacks=[handler] # Here instead of in chat=OpenAI(<>).
                ) 
        Thread(target=task).start() # Run it now. 
        # Constantly look for items in the queue.
        # When it comes into existance, add it to the generator self.stream.
        # The python `yield` appends to a generator object. 
        while True:
            # if there is something in the queue, then yield it. 
            # That means add it to the generator
            token = queue.get()
            # If the token is None (because LLM is done or because LLM error)
            # Then we are done. 
            if token is None:
                break
            yield token

In [10]:
from langchain.chat_models import ChatOpenAI 
from langchain.prompts import ChatPromptTemplate

# Choose a chat object. It will be the same for all conversatons. 
chat = ChatOpenAI(
    streaming=True, # Force OpenAI to stream new tokens to LangChain.
    )

# Choose a prompt template to send a prompt to the LLM with user input.
prompt = ChatPromptTemplate.from_messages(
    [
        ("system","You are an angry pirate."),
        ("human","{content}")
    ]
)

In [12]:
# Instantiate the streaming chain.
chain = StreamingChain(llm=chat,prompt=prompt)

# A for loop that prints elements of a generator object.
# When a new token is available it is added to the queue.
# A `while True` loop is constantly looking for new elements of the queue.
# Immediately when found that token is added to 
# the generator object chain.stream(input).
# The following loop does not end 
# when the last element in the generator is reached. 
# Rather, whenever a new item is added to the generator 
# another iteration is run.
for output_chunk in chain.stream(input={"content": "tell me a 500 word joke"}):
    print(output_chunk,end="")

As an angry pirate, jokes may not be my strong suit, but I'll give it a try. Here's a 500-word joke for you:

Once upon a time, in the vast and treacherous ocean, there sailed a ship full of rowdy pirates. Among them was the angriest of them all, Captain Gruffbeard. With his fiery red beard and fierce scowl, he struck fear into the hearts of anyone who crossed his path.

One day, as the crew was sailing through a dense fog, Captain Gruffbeard's first mate, Smitty, stumbled upon an old, weathered treasure map. The crew's excitement grew as they realized it led to the legendary treasure of the Lost Isles.

Captain Gruffbeard, being the clever pirate that he was, immediately set the course for the Lost Isles. However, their journey wasn't going to be easy. The map was filled with riddles and puzzles, and only those who could solve them would be worthy of the treasure.

Days turned into weeks, and the crew sailed through treacherous storms and battled terrifying sea creatures. Tempers bega

105 

another way to implement the streaming chain class. We made a streaming LLMChain. Say we wanted to make a streaming RetrievalQA chain. To do this, we can make a 'mix in class' called `StreamableChain` with the streaming features we want to give to offspring. Then we can make streaming chains from whaever kind of chain we want by making the streaming chain be a child of StreamaleChain and the kind of chain we are modicying form.

In [None]:

from langchain.chains import LLMChain
from langchain.chains import RetrievalQA 
from langchain.chains import ConversationalRetrievalChain

# class StreamingChain(LLMChain) becomes
class StreamableChain(): # No child.
    pass #omitting content foir streaming from above for purposes of sketching.


# e.g. 1 
class StreamingLLMChain(StreamableChain, LLMChain):
    pass

streaming_llm_chain = StreamingLLMChain(llm=chat, prompt=prompt)


# e.g. 2
class StreamingRQAChain(StreamableChain, RetrievalQA):
    pass

streaming_RQA_chain = StreamingRQAChain(llm=chat, prompt=prompt)

# e.g.3
class StreamingConversationalRetrievalChain(
    StreamableChain, 
    ConversationalRetrievalChain
    ):
    pass

streaming_RQA_chain = StreamingRQAChain(llm=chat, prompt=prompt)

There is a bug. 

We can place the callbacks list in
1. the LLM, so then the callbacks apply only to that LLM. 
2. the instantiation of the chain,  so then the callbacks apply only to that chain. 
3. the call of the chain, so that the callbacks apply to everything in the calling of the chain including its LLM(s) and chains(s). 

This is problematic because we have two calls to the same LLM, one to summarize the question, the other to ask the question; when the user asks a question the end stream and thus None token is encountered when the summary is generated. What the user sees is a response to the user question that is a summary of the user question. 

If we just make two LLMs for the two tasks then, since we are using option 3, the problem persists. 

The way out of this mess is via the streaming flag; with two LLMs we put the streaming flag to True on the LLM that answers the summarized question, and to False on the LLM that summarizes the question. We then put a `if streaming=True` condition on the addition of tokens to the queue. 

# Partial

Python has a way to partially evaluate a function. 

In [None]:
from functools import partial
def f(x,y,z):
    return x+2*y+3*z

g= partial(f,1) # Defaults to parial evaluation on the first argument

g(2,0)

In [None]:
# Force evaluation on second and third.
print(partial(f,**{"y":3,"z":1})(1))
print(partial(f,y=3,z=1)(1))

# LangFuse 

For collecting data about text generation processes.

You can use their hosting... or

You can self host very easily. 



In [72]:
class T:
    def __call__(self, *args, **kwargs):
        print("hello")
        return self().__call__(*args,**kwargs)


In [73]:
t = T

In [74]:
t()

<__main__.T at 0x12e551150>