In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from package.bedrock import BedrockChat
from pocketflow import Node, Flow

In [3]:
import yaml

In [37]:
class InputNode(Node):
    def __init__(self, model):
        super().__init__()
        self.model = model

    def prep(self, shared):
        user_input = input("You: ")
        if user_input.lower() == "exit":
            return None
        
        if "messages" not in shared:
            shared['messages'] = []
        
        print("You: ", user_input)
        return user_input

    def exec(self, prep_res):
        return prep_res
    
    def post(self, shared, prep_res, exec_res):
        shared['messages'].append(self.model.UserMessage(text=prep_res))
        if prep_res:
            return "router"

class RouterNode(Node):
    def __init__(self, system_prompt, model):
        super().__init__()
        self.system_prompt = system_prompt
        self.model = model

    def prep(self, shared):
        user_message = shared['messages'][-1]['content'][0]['text']
        return user_message
    
    def exec(self, user_message):
        response = self.model(system_prompt=self.system_prompt, messages=[self.model.UserMessage(text=user_message)])
        yaml_str = response.split("```yaml")[1].split("```")[0].strip()
        action = yaml.safe_load(yaml_str)
        return action['action']

    def post(self, shared, prep_res, exec_res):
        shared['action'] = exec_res
        return exec_res

class AnswerNode(Node):
    def __init__(self, system_prompt, model):
        super().__init__()
        self.system_prompt = system_prompt
        self.model = model

    def prep(self, shared):
        return shared['messages']
    
    def exec(self, messages):
        response = self.model(self.system_prompt, messages)
        print("AI: ", response)
        return response
    
    def post(self, shared, prep_res, exec_res):
        shared["messages"].append(self.model.AIMessage(text=exec_res))
        return "continue"
    

In [38]:
router_prompt = """\
classify a user's intent based on the input messages. 
Intent options are:
1. continue if nothing goes wrong
2. farewell if a user's message indicate that he or she wants to go somewhere

Return your response in codeblock with this following yaml format:
```yaml
action: either continue or farewell
```

IMPORTANT: Make sure to:
1. Use proper indentation (4 spaces) for all multi-line fields
2. Use the | character for multi-line text fields
3. Keep single-line fields without the | character
"""

answer_prompt = "You are a helpful assistant"

In [39]:
model = BedrockChat()
input_node = InputNode(model=model)
router_node = RouterNode(router_prompt, model)
answer_node = AnswerNode(answer_prompt, model)

input_node - "router" >> router_node
router_node - "answer" >> answer_node
router_node - "continue" >> answer_node
answer_node - "continue" >> input_node

flow = Flow(start=input_node)

In [40]:
shared = {}

flow.run(shared)

You:  Hi I gotta go.




'farewell'

In [41]:
shared

{'messages': [{'role': 'user', 'content': [{'text': 'Hi I gotta go.'}]}],
 'action': 'farewell'}

In [42]:
from sentence_transformers import SentenceTransformer

# Load model (will use MPS on Apple Silicon)
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

# Generate embeddings
texts = ["Hello world", "How are you?"]
embeddings = model.encode(texts)


  from .autonotebook import tqdm as notebook_tqdm


In [43]:

# Generate embeddings
texts = ["Hello world", "How are you?"]
embeddings = model.encode(texts)


In [38]:
from typing import Protocol
import warnings, copy

class TaskInterface(Protocol):
    def next(self, task, action): pass

class ConditionalTransition:
    def __init__(self, task:TaskInterface, action:str):
        self.task = task
        self.action = action
    def __rshift__(self, next_task):
        return self.task.next(task=next_task, action=self.action)
class BaseTask:
    def __init__(self):
        self.params = {}
        self.successors = {}
    def set_params(self, params):
        self.params = params
    def prep(self, shared): pass
    def exec(self, prep_res): pass
    def post(self, shared, prep_res, exec_res): pass
    def _exec(self, prep_res): return self.exec(prep_res)
    def _run(self, shared):
        prep_res = self.prep(shared)
        exec_res = self._exec(prep_res)
        return self.post(shared, prep_res, exec_res)
    def run(self, shared):
        if self.successors:
            warnings.warn("Use Flow instead")
        return self._run(shared)
    def next(self, task, action='default'):
        """Register the next task in successors"""
        if action in self.successors:
            msg = f"action: '{action}' replaced becuase it already exits"
            print(msg)
        self.successors[action] = task
        return task
    def __sub__(self, action):
        """Add conditional action"""
        if isinstance(action, str):
            return ConditionalTransition(task=self, action=action)
        raise TypeError(f"action must be str, not {type(action)}")
    def __rshift__(self, other):
        """Using self.next to register the next task"""
        return self.next(other)
    
class Task(BaseTask):
    def __init__(self, max_retries=1, wait=0):
        super().__init__()
        self.max_retries = max_retries
        self.wait = wait

class Pipeline(BaseTask):
    def __init__(self, start:BaseTask=None):
        super().__init__()
        self.start_node = start
    def start(self, start):
        self.start_node = start
        return start
    def get_next_task(self, current_task:BaseTask, next_action:str):
        next_task = current_task.successors.get(next_action or "default")
        if not next_task and current_task.successors:
            warnings.warn(f"Pipeline End, {next_action} not found")
        return next_task
    def _orch(self, shared, params=None):
        current_task = copy.copy(self.start_node)
        params = params or {**self.params}
        next_action = None
        while current_task:
            current_task.set_params(params)
            next_action = current_task._run(shared)
            current_task = copy.copy(self.get_next_task(current_task=current_task, next_action=next_action))
        return next_action
    def _run(self, shared):
        prep_res = self.prep(shared)
        orch_res = self._orch(shared)
        return self.post(shared, prep_res, orch_res)
    def post(self, shared, prep_res, exec_res):
        return exec_res

class Test1(Task):
    def prep(self, shared):
        return "test1"
    def exec(self, prep_res):
        return prep_res
    def post(self, shared, prep_res, exec_res):
        print(exec_res)
        return "to task2"
    
class Test2(Task):
    def prep(self, shared):
        return "test2"
    def exec(self, prep_res):
        return prep_res
    def post(self, shared, prep_res, exec_res):
        print(exec_res)
        return "end"
    
class End(Task):
    def prep(self, shared):
        return "End for all"
    def exec(self, prep_res):
        return prep_res
    def post(self, shared, prep_res, exec_res):
        print(exec_res)
        return exec_res

In [39]:
test1 = Test1()
test2 = Test2()
end = End()

In [40]:
shared = {}

test1.run(shared)
test2.run(shared)
end.run(shared)

test1
test2
End for all


'End for all'

In [41]:
test1 - "to task2" >> test2
test2 - "end" >> end

<__main__.End at 0x11a495710>

In [42]:
pl = Pipeline(start=test1)

In [43]:
pl.run(shared)

test1
test2
End for all


'End for all'