In [None]:
import openai
import os
from effectful.handlers.llm import Template
from effectful.handlers.llm.providers import OpenAIAPIProvider, tool_call
from effectful.handlers.llm.structure import DecodeError, decode
from effectful.handlers.llm.synthesis import ProgramSynthesis
from effectful.ops.semantics import fwd, handler
from effectful.ops.syntax import defop

provider = OpenAIAPIProvider(openai.OpenAI(api_key=os.environ['OPENAI_API_KEY']))

In [3]:
@Template.define
def limerick(theme: str) -> str:
    """Write a limerick on the theme of {theme}"""
    raise NotImplementedError

In [11]:
with handler(provider):
    print(limerick("floating point computations"))

In a world where numbers convene,  
Floats dance on a binary sheen.  
They wobble with flair,  
Precision to spare,  
But sometimes they skip what's unseen.  



Now we're going to encode some examples from the pocketflow cookbook into the effectful-llm.

### Pocketflow Agent
Taken from [here](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-agent)

Idea: a state-machine, with the actions, decide (between searching, answer), search, answer

```mermaid
stateDiagram-v2
    [*] --> Decide
    Decide --> Search : search
    Decide --> Answer : answer
    Search --> Decide
    Answer --> [*]
```

in pocketflow this is encoded using this class node, which states subclass and implement three methods for (`prep`, `exec`, `post`).

`prep` - takes information from a shared context
`exec` - takes this information and feeds it to an LLM call 
`post` - stores the LLM call into the shared context, and returns the next action

For example:
```python
class DecideAction(Node):
    def prep(self, shared):
        context = shared.get('context') or ''
        question = shared['question']
        return question, context
    
    def exec(self, inputs):
        question, context = inputs
        prompt = """ you are a research assistant that can search the web, ... """
        res = call_llm(prompt)
        return decode(res)
    
    def post(self, shared, res):
        shared['context'] = exec_res['answer']
        return result['action']
```


In [None]:
from effectful.ops.syntax import ObjectInterpretation, implements
from urllib.parse import urlencode
import requests

@defop
def search_web(query: str) -> str:
    "Search the web for a query string and return the results."
    raise NotHandled

class DuckDuckGoSearchProvider(ObjectInterpretation):
    """Implements web search using DuckDuckGo."""
    @implements(search_web)
    def search_web(self, query: str) -> str:
        url = f"https://api.duckduckgo.com/?{urlencode({'q': query, 'format': 'json', 'pretty': '1'})}"
        headers = {"accept": 'application/json'}
        response = requests.get(url, headers=headers)
        if response.status_code in {202, 200}:
            results = response.json()['RelatedTopics']
            return '\n\n'.join([
                f"URL: {r.get('FirstURL')}\nDescription: {r.get('Text')}"
                for r in results if 'FirstURL' in r and 'Text' in r
            ])
        else: return f'Request failed with status code {response.status_code}'


@Template.define(tools=[search_web])
def answer_question(question: str) -> str:
    """Acting as a Research Assistant that can search the web, construct an answer to the user's question, {question}."""
    raise NotHandled

# feels weird to  pass in tools at the place of definition --- what if I want the agent to support more tools? it might feel nicer to give tools seperately
@Template.define(tools=[search_web])
def refine_answer(question: str, answer: str) -> str:
    """Acting as a Research Assistant that can search the web, given the user's original question, {question}, refine this previous answer {answer}."""
    raise NotHandled


# would like to refine the prompt based on type signature, currently I need to give this instruction
@Template.define
def is_question_answered(question: str, answer: str) -> bool:
    """Acting as a Research Assistant, Decide if the user's question {question} is appropriately answered by {answer}.
    <instruction>
    respond only true or false. Do not give any explanations.
    </instruction>"""
    raise NotHandled

def research_agent(question: str, max_attempts: int = 3) -> str:
    """A research agent which answers a question"""
    answer = answer_question(question)
    
    for _ in range(max_attempts):
        if is_question_answered(question, answer):
            break
        answer = refine_answer(question, answer)
            
    return answer

with handler(provider), handler(DuckDuckGoSearchProvider()):
    meaning_of_life = research_agent('what is the meaning of life?')
    print(f'The meaning of life is: {meaning_of_life}')

The meaning of life is: The question about the "meaning of life" is a profound and philosophical one asked throughout history. It doesn't have a single definitive answer but varies across different cultures, philosophies, and individual beliefs. Here are some perspectives:

1. **Religious Perspectives**: Many religions offer answers, suggesting the meaning of life is to fulfill the will of a divine creator or to achieve a spiritual goal, such as salvation in Christianity, enlightenment in Buddhism, or union with God in Islam.

2. **Philosophical Views**:
    - **Existentialism**: Suggests life has no inherent meaning, and it is up to each individual to create their own purpose and meaning.
    - **Nihilism**: Argues life is without objective meaning, purpose, or intrinsic value.
    - **Absurdism**: Proposes that humans naturally seek meaning, but life is intrinsically without it, creating an inherent conflict.

3. **Scientific Perspectives**: From a scientific view, the meaning of lif

Given the repetition of "Acting as a research agent" and the fact that these functions are conceptually tied together and dependent, it might make sense to wrap them up in an object: 

In [None]:
class ResearchAgent:
    """research Agent that answers user's questions"""
    tools: List[Callable]
    max_attempts: int
    def __init__(self, max_attempts: int, tools=[search_web]):
        self.tools = tools
        self.max_attempts = max_attempts

    @Template.define(tools=tools)
    def answer_question(self, question: str) -> str:
        """Construct an answer to the user's question {question}"""
        ...
    
    @Template.define(tools=tools)
    def refine_answer(self, question: str, answer: str) -> str:
        """Given the user's original question, {question}, refine this previous answer: {answer}"""
        ...

    @Template.define
    def is_question_answered(self, question: str, answer: str) -> bool:
        """Decide if the user's question {question} is appropriately answered by {answer}."""
        ...
    
    def query(self, question: str) -> str:
        answer = self.answer_question(question)
        for _ in range(self.max_attempts):
            if self.is_question_answered(question, answer):
                break
            answer = refine_answer(question, answer)
        return answer

### PocketFlow Async Basic example
Taken from [here](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-async-basic)

Same model as above, but using async nodes
```python
class FetchRecipies:
    async def prep(self, shared):
        return shared['ingredient']
    async def exec(self, ingredient):
        return await fetch_recipies(ingredient)
    async def post(self, shared, res):
        shared['recipes'] = res
        return "suggest"
class SuggestRecipe:
    async def prep(self, shared):
        return shared['recipes']
    async def exec(self, recipes):
        suggestion = await call_llm("Choose the best recipe from {','.join(recipes)}")
        return suggestion
    async def post(self, shared, res):
        shared['suggestion'] = suggestion
        return "approve"
class GetApproval:
    async def prep(self, shared): return shared['suggestion']
    async def exec(self, suggestion): return await input('Accept this recipe?')
    async def post(self, shared, answer):
        if answer == "y":
            return "accept"
        else:
            return "retry"
```
This could probably be encoded using the same structure as above. I assume this is trivial, so left out, but should be a similar design to the previous, just with async, it doesn't really make use of the async part

In [None]:
import asyncio

async def suggest_best_recipe(recipes: list[str]) -> str:
    """Return the best recipe from this list <instruction>return only the recipe, do not give any explanation.</instruction>"""

async def find_recipes(initial_ingredient: str) -> list[str]:
    found_recipe = None
    while not found_recipe:
        recipes = await fetch_recipes_using(ingredient)
        found_recipe = await suggest_best_recipe(recipes)
        print(found_recipe)
        if input('Accept this recipe?') == 'y':
            break
    return found_recipe


### Pocketflow BatchFlow example
Taken from [here](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-batch-flow).

Introduces the notion of parameters, which allows batching flows over several choices of parameter

```python
class LoadImage:
    def prep(self, shared): return os.path.join("images", self.params['input'])
    def exec(self, image_path): return Image.load(image_path)
    def post(self, shared, res): shared["image"] = res; return "apply_filter"

class ApplyFilter:
    def prep(self, shared): return shared['image'], self.params['filter']
    def exec(self, inputs):
        image, filter_type = inputs
        match filter_type: ...
    def post(self, shared, res):
        shared['filtered_image'] = res
        return "save"
class SaveImage:
    def prep(self, shared): 
        os.makedirs('output')
        input_name, filter_name, output_path = ...
        return shared['filtered_image'], output_path
    def exec(self, inputs): image, output_path = inputs; image.save(output_path, "jpeg"); return output_path
    def post(self, shared, res): return "default"
```

Batching allows taking a single flow and runnning it over a series of parameters
```python
class ImageBatchFlow(BatchFlow):
    def prep(self, shared):
        images = ['cat.jpeg', 'dog.jpeg', 'bird.jpeg']
        filters = ['grayscale', 'blur', 'sepia']
        params = []
        for img in images: for filter in filters: params.append({'input':img, 'filter': f})
        return params
```

No real LLM stuff here, can be represented with a function I suppose

### Pocketflow Batch
Taken from [here](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-batch)
Demonstrates batching - idea is subclassing batchnode means pre and post need to handle batches, the exec is written as if straight line unbatched

```python

class TranslateTextNode(BatchNode):
    def prep(self, shared):
        text, languages = shared["text"], shared['languages']
        return [(text,lang) for lang in langauges]
    def exec(self, data): text, language = data; return call_llm(f"translate the following into {language}: {text}")
    def post(self, shared, ress):
        for res in ress: os.write(res['language'], res['translation'])
```

In [60]:
import dataclasses

@Template.define
def translate(text: str, language: str) -> str:
    """Translate the provided text into {language}: {text}"""

def instructions(*instructions: list[str]):
    instructions = '\n'.join(instructions)
    class InstructionsInterpretation(ObjectInterpretation):
        @implements(Template.__call__)        
        def _call(self, template, *args, **kwargs) -> None:
            prompt_ext = (f"{template.__prompt_template__}\n<instructions>\n{instructions}\n</instructions>")
            return fwd(dataclasses.replace(template, __prompt_template__=prompt_ext), *args, **kwargs)
    return handler(InstructionsInterpretation())
            


with handler(provider), instructions("Do not give any explanation", "Add a copyright note"):
    print(translate('hello, how are you? how is your day going?', 'french'))


Bonjour, comment ça va ? Comment se passe ta journée ?

© 2023 OpenAI


### Travel Advisor Chat with Guardrails

Taken from [here](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-chat-guardrail)

```python 
class GuardrailNode:
    def prep(self, shared):
        return shared['user_input']
    def exec(self, user_input):
        return call_llm('evaluate if the following query is travel related ...') 
    def post(self, shared, is_valid):
        if not is_vaild: return "retry"
        shared["messages"].append({'role': "user", "content": shared['user_input']})
        return "process"
```

maybe this could be encoded using handlers? (playing a bit fast and loose with the exact definitions here)

In [74]:
@Template.define
def travel_query(user_query: str) -> str:
    """Produce a concise (<100) word answer to the travel query {user_query}"""
    raise NotHandled

@Template.define
def is_safe_query(user_query: str) -> bool:
    """Determine whether the user's query {user_query} is purely related to travel advice."""
    raise NotHandled

def answer_travel_query(user_query: str):
    with instructions("Respond only true or false"):
        is_safe = is_safe_query(user_query)
    if not is_safe:
        return f'Not asking question {user_query} as it is not related to travel advice.'
    else:
        return travel_query(user_query)

with handler(provider), instructions("Do not give any explanation"):
    print(answer_travel_query("What are great places to check out in NYC?"))
    print(answer_travel_query('Should I buy apple stocks?'))

Check out Central Park, Times Square, Statue of Liberty, Brooklyn Bridge, Empire State Building, 9/11 Memorial, Metropolitan Museum of Art, Broadway, Rockefeller Center, and The High Line.
Not asking question Should I buy apple stocks? as it is not related to travel advice.


In [None]:
@Template.define
def is_safe_query(user_query: str) -> bool:
    """Tests whether the user query {user_query} is related to travel advice"""
    raise NotHandled

@Template.define
def travel_query(user_query: Anotated[str, is_safe_query]) -> str:
    """Produces a concise (<100) word answer to the query {user_query}"""
    raise NotHandled

@Template.define
def travel_query(user_query: str):

### PocketFlow Chat with Memory
Taken from [here](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-chat-memory)

```python 
class GetUserQuestionNode:
    def prep(self, shared): shared['messages'] = shared.get('messages', [])
    def exec(self, _): return input()
    def post(self, shared, res): shared['messages'].append({'role': 'user', 'content': exec_res}); return "retrieve"

class AnswerNode:
    def prep(self, shared):
        recent_messages = shared.get('messages', [])[-6:]
        context = []
        if (relevant_convos := shared.get('retrieved_conversation')):
            context.append({'role': 'system', 'content': 'the following is a relevant past conversation that might help'})
            context.extend(relevant_convos)
        context.extend(recent_messages)
        context.append({'role': 'system', 'content': 'now continue the conversation'})
        return context
    def exec(self, messages): return call_llm(messages)
    def post(self, shared, res):
        shared['messages'].append({'role': 'assistant', 'content': res})
        if len(shared['messages']) > 6: return 'embed'
        return 'question'

class EmbedNode:
    def prep(self, shared):
        oldest_pair, shared['messages'] = shared['messages'][:2], shared['messages'][2:]
        return oldest_pair
    def exec(conversation):
        embedding = get_embedding('\n'.join(msg['content'] for msg in conversation))
        return {'conversation': conversation, 'embedding': embedding}
    def post(self, shared, res):
        add_vector(shared['vector_index'], (res['embedding']))
        return 'question'

class RetrieveNode:
    def prep(self, shared): return next(msg for msg in shared['messages'] if msg['role'] == 'user', None)
    def exec(self, input): 
        query = inputs['query']; vector_index = inputs['vector_index']; vector_items = inputs['vector_items']
        [index], [dist] = search_vectors(inputs['vector_index'], get_embedding(inputs['query']), k = 1)
        return {'conversation': inputs['vector_items'][index], 'distance': dist}
    def post(self, shared, res):
        shared['retrieved_conversation'] = res['conversation']
        return 'answer'
```

In [92]:
import numpy as np
from typing import Any

def get_embedding(text):
    response = provider._client.embeddings.create(model="text-embedding-ada-002", input=text)
    return np.array(response.data[0].embedding, dtype=np.float32)

def find_closest(index: list[tuple[str, Any]], phrase):
    if not index: return None
    def dist(a, b): return float(((a - b) ** 2).sum())
    phrase_embedding = get_embedding(phrase)
    return min(((msg, dist(embedding, phrase_embedding)) for (msg, embedding) in index), key=lambda elt: elt[1])

@Template.define
def respond_to_user(user_message: str, relevant_context: str, prev_messages: str) -> str:
    """Given the user wrote {user_message}, continue the conversation. The last few messages in the conversation were {prev_messages}, and older context was {relevant_context}"""
    ...

class ChatAgent:
    history: list[str] = []
    index = []
    
    def _compress(self):
        oldest_pair, self.history = self.history[:2], self.history[2:]
        oldest_pair = '\n'.join(message['content'] for message in oldest_pair)
        self.index.append([oldest_pair, get_embedding(oldest_pair)])
    
    def _find_relevant(self, query: str):
        return find_closest(self.index, query)


    def chat(self, input: str):
        relevant = self._find_relevant(input)
        relevant_context = relevant[0] if relevant else 'No relevant context.'
        prev_messages = '\n'.join([f"{message['author']}: {message['content']}" for message in self.history])
        response = respond_to_user(input, relevant_context, prev_messages)
        self.history.append({'author': 'user', 'content': input})
        self.history.append({'author': 'agent', 'content': response})
        if len(self.history) > 6: self._compress()
        print(f'user: {input}')
        print(f'agent: {response}')

agent = ChatAgent()
with handler(provider), instructions('Do not give any explanation, simply return the response.'):
    agent.chat('hello!, how are you doing?')
    agent.chat('lovely! I\'m having a lovely day')
    agent.chat('What is the captial of france?')
    agent.chat('I didn\'t know that! That\'s amazing!')


user: hello!, how are you doing?
agent: Hello! I'm just a program, but I'm here to help you. How's your day going?
user: lovely! I'm having a lovely day
agent: That’s wonderful to hear! What’s been the highlight of your day so far?
user: What is the captial of france?
agent: agent: The capital of France is Paris. Is there anything else you'd like to know?
user: I didn't know that! That's amazing!
agent: agent: I'm glad you found it amazing! Is there anything else you're curious about?


### Pocketflow LLM Streaming
Taken from [here](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-llm-streaming)

```python
class StreamNode:
    def prep(self, shared):
        interrupt_evt = threading.Event()
        def wait_for_interrupt(): interrupt_evt.set()
        listener_thread = threading.Thread(target = wait_for_interrupt)
        listener_thread.start()
        chunks = stream_llm(shared['prompt'])
        return chunks, interrupt_evt, listener_thread
    def exec(self, inputs):
        chunks, interrupt_evt, listener_thread = inputs

        for chunk in chunks:
            if interrupt_evt.is_set():
                break
            print(chunk.choices[0].delta.content)
        
        return interrupt_evt, listenker_thread
    def post(self, shared, res):
        interrupt_evt, listener_thread = res 
        interrupt_evt.set(); listener_thread.join()
        return 'default'
```

Probably good to support this, though Pocket-flow doesn't really have the nicest interface here...

### Pocketflow Majority vote
Taken from [here](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-majority-vote)

illustrates how to use the batching mechanism to implement majority voting

```python
class MajorityVote(BatchNode):
    def prep(self, shared):
        question = shared['question']
        attempts = shared.get('num_tries', 3)
        return [question for _ in range(attempts)]

    def exec(self, question):
        return call_llm("Please answer the user's question below\n{question},answer")

    def post(self, shared, ress):
        answers = [res['answer'] for res in ress]
        best_answer, freq = Counter(answers).most_common(1)[0]
        return 'end'
```

In [93]:
import collections

def majority_vote(oracle, query, voters=3):
    with instructions('respond only yes or no'):
        counter = collections.Counter([oracle(query) for i in range(voters)])
    best_answer, freq = counter.most_common(1)[0]
    return best_answer, freq

with handler(provider):
    print(majority_vote(travel_query, 'Is paris the captial of france?'))


('Yes.', 3)


### Pocketflow Multi-Agent Game
Taken from [here](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-multi-agent)

Async communication between two nodes

```python
class AsyncHinter(AsyncNode):
    async def prep(self, shared):
        guess = await shared['hinter_queue'].get()
        if guess == 'GAME_OVER': return None
        return shared['target_word'], shared['forbidden_words'], shared.get('past_guesses', [])
    
    async def exec(self, inputs):
        target, forbidden, past_guesses = inputs
        prompt = 'generate hint for {target}, forbidden words: {forbidden}, past guesses were: {past_guesses}'
        hint = call_llm(prompt)
        return hint
    
    async def post(self, shared, res):
        if res is None:
            return 'end'
        await shared['guesser_queue'].put(exec_res)
        return 'continue'
    
class AsyncGuesser(AsyncNode):
    async def prep(self, shared):
        hint = await shared['guesser_queue'].get()
        return hint, shared.get('past_guesses', [])
    
    async def exec(self, inputs):
        hint, past_guesses = inputs
        prompt = 'given hint {hint}, past guesses {past_guesses}, make a new guess'
        guess = call_llm(prompt)
        return guess
    
    async def post(self, shared, res):
        if res == shared['target_word']:
            await shared['hinter_queue'].put('GAME_OVER')
            return 'end'
        shared['past_guesses'].append(exec_res)
        await shared['hinter_queue'].put(exec_res)
        return 'continue'
        
```

In [None]:
@Template.define
async def guesser(hint, wrong_guesses):
    """Given the hint {hint}, guess the hidden word. Prior wrong guesses were {wrong_guesses}"""
    ...

@Template.define
async def hinter(guessed_word, hint, hidden_word, wrong_guesses):
    """Given the wrong guess {guessed_word} for hint {hint}, and prior wrong guesses ({wrong_guesses}), construct a new hint."""
    ...


class Guesser:
    prior_guesses = []
    last_guess = ''
    last_hint = ''
    
    async def make_guess(self, hint: str) -> str:
        guess = guesser(hint, self.prior_guesses)
        self.last_guess = guess
        self.last_hint = hint
        return guess
    
    async def report_wrong(self):
        self.prior_guesses.append(self.last_guess)

async def evaluate_guesser(guesser: Guesser, hidden_word):
    hint = hinter(guesser.last_guess, guesser.hint, hidden_word, guesser.prior_guesses)
    guess = await guesser.make_guess(hint)
    if guess.lower() == hidden_word.lower():
        return True
    guesser.report_wrong()
    return False


async def run_guessing_game(guesser, hidden_word):
    while not await evaluate_guesser(guesser, hidden_word):
        continue
    

with handler(provider):
    guessing_games = [run_guessing_game(Guesser()) for i in range(10)]
    finished_game = await asyncio.wait(guessing_games, return_when=asyncio.FIRST_COMPLETED)
    


### Pocketflow parallel batch flow

Taken from [here](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-parallel-batch-flow)

same code as batched, just changes execution strategy such that each flow is run in parallel
```python
class ImageParallelBatchFlow(AsyncParallelBatchFlow):
    
    async def prep_async(self, shared):
        images = shared.get("images",[])
        filters = ['grayscale', 'blur', 'sepia']
        params = []
        for image_path in images:
            for filter_type in filters:
                params.append({ 'image_path': image_path, 'filter': filter_type })
        return params
```

```python
class TranslateTextNodeParallel(AsyncParallelBatchNode):
    async def prep_async(self, shared):
        text = shared.get("text", "no text provided")
        languages = shared.get("langauges", [])
        return [(text,lang) for lang in languages]
    async def exec_async(self, inputs):
        text, language = inputs
        result = await call_llm("please translate the following markdown file into {language}")
        return {"language": language, "translation": result}
    async def post_async(self, shared, res):
        for file in res:
            with open(file['language'], 'w') as f: await f.write(file['translation'])
        return 'default'
```

### Pocketflow RAG example

```python
class ChunkDocumentsNode(BatchNode):
    def prep(self,shared): return shared['texts']
    def exec(self, text): return fixed_size_chunk(text)
    def post(self,shared,res): shared['texts'] = [doc for doc_ls in res for doc in doc_ls]; return 'default'
class EmbedDocumentsNode(BatchNode):
    def prep(self,shared): return shared['texts']
    def exec(self,text): return get_embedding(text)
    def post(self,shared, res): shared['embeddings'] = np.array(res); return 'default'
class CreateIndexNode(Node):
    def prep(self, shared): return shared['query']
    def exec(self,query): return get_embedding(query)
    def post(Self,shared, res): shared['query_embedding'] = res
class RetrieveDocumentsNode(Node):
    def prep(self,shared): return shared['query_embedding'], shared['index'], shared['texts']
    def exec(self,inputs): q_embed, index, texts = inputs; dists, inds = index.search(q_embed, k=1); return {'text': texts[inds[0][0]], 'ind': inds[0][0]}
    def post(self,shared,res): shared['retrieved'] = res; return 'default
```

### Pocketflow Supervisor Flow
Taken from [here](https://github.com/The-Pocket/PocketFlow/blob/main/cookbook/pocketflow-supervisor/flow.py)

```python
class SupervisorNode(Node):
    def prep(self,shared): return shared['answer']
    def exec(self,answer):
        is_nonsense = any(marker in answer for marker in ['coffee break', 'who knows?', 'made up', '42'])
        if is_nonsense: return {'valid': False, 'reason': 'Answer appears to be nonsense'}
        else: return {'valid': True, 'reason': 'Answer appears to be legitimate'}
    def post(self, shared, res):
        if not res['valid']: return 'retry'
```