In [233]:
from abc import ABC, abstractmethod

class Runnable(ABC):

    @abstractmethod
    def invoke(input):
        pass

In [234]:
import random

class DummyLLLM(Runnable):
    def __init__(self):
        print("Initializing DummyLLLM")

    def invoke(self, prompt):
        response_list = [
            'This is response 1',
            'This is response 2',
            'This is response 3',
        ]
        return {'response' : random.choice(response_list)}

    def predict(self, prompt):
        response_list = [
            'This is response 1',
            'This is response 2',
            'This is response 3',
        ]
        print('\nWARNING: This method predict will be deprecated soon the functionality shifted to invoke method.')
        return {'response' : f'Prompt: {prompt} Response: {random.choice(response_list)}'}

In [235]:
class DummyPromptTemplate(Runnable):
    def __init__(self, template, input_variables):
        self.template = template
        self.input_variables = input_variables

    def invoke(self,input_dict):
        return self.template.format(**input_dict)

    def format(self, input_dict):
        print('\nWARNING: This method predict will be deprecated soon the functionality shifted to invoke method.')
        return self.template.format(**input_dict)

In [236]:
class DummyStrOutputParser(Runnable):
    def __init__(self):
        pass

    def invoke(self,input_data):
        return input_data['response']


In [237]:
class RunnableConnector(Runnable):
    def __init__(self, runnable_list):
        self.runnable_list = runnable_list

    def invoke(self, input_data):
        for runnable in self.runnable_list:
            print(f'RunnableConnector Invoke: {runnable.invoke(input_data)}')
            input_data = runnable.invoke(input_data) # Output from first runnable become input for next one
        return input_data

In [238]:
template = DummyPromptTemplate(
    template='Write a {length} liner about {topic}',
    input_variables=['length', 'topic']
)

In [239]:
llm = DummyLLLM()
parser = DummyStrOutputParser()

Initializing DummyLLLM


In [240]:
chain = RunnableConnector([template, llm, parser])

In [241]:
chain.invoke({'length': 'one', 'topic': 'anything'})

RunnableConnector Invoke: Write a one liner about anything
RunnableConnector Invoke: {'response': 'This is response 3'}
RunnableConnector Invoke: This is response 1


'This is response 1'

In [242]:
# Example Problem: Suppose we want 2 chains one in which our LLM will create a joke and give us that joke as output. Then there will be the other chain which take jokes as input and provide explanation of joke as output. Then after creating these chains we will connect these two chains so process go like: create a joke from chain 1 give that joke to chain 2 and get the explanation of the joke.

joke_template = DummyPromptTemplate(
    template="Write a joke about {topic}",
    input_variables=['topic']
)

In [243]:
explanation_template = DummyPromptTemplate(
    template="Write a explanation about {response}",
    input_variables=['response']
) # input variable will must name response because this is the key from joke template response
llm = DummyLLLM()
parser = DummyStrOutputParser()

joke_chain = RunnableConnector([joke_template, llm])
# joke_chain_output = joke_chain.invoke({'topic': 'anything'})
explanation_chain = RunnableConnector([explanation_template, llm, parser])
# explanation_chain.invoke({'joke': 'anything'})

Initializing DummyLLLM


In [244]:
final_chain = RunnableConnector([joke_chain, explanation_chain])
final_chain.invoke({'topic': 'anything'})

RunnableConnector Invoke: Write a joke about anything
RunnableConnector Invoke: {'response': 'This is response 2'}
RunnableConnector Invoke: {'response': 'This is response 3'}
RunnableConnector Invoke: Write a joke about anything
RunnableConnector Invoke: {'response': 'This is response 3'}
RunnableConnector Invoke: Write a explanation about This is response 2
RunnableConnector Invoke: {'response': 'This is response 1'}
RunnableConnector Invoke: This is response 1
RunnableConnector Invoke: This is response 1
RunnableConnector Invoke: Write a explanation about This is response 2
RunnableConnector Invoke: {'response': 'This is response 2'}
RunnableConnector Invoke: This is response 3


'This is response 3'