Apply nest_asyncio to allow nested event loops in Jupyter

In [1]:
import nest_asyncio


nest_asyncio.apply()

Initial Setup and Configuration

In [2]:
import sys
import os

# Check current working directory
current_dir = os.getcwd()
print(f"Current Directory: {current_dir}")

# Change to your project root directory
project_root = '/home/razvansavin/Proiecte/flexiai'
os.chdir(project_root)
print(f"Changed Directory to: {os.getcwd()}")

# Add project root directory to sys.path
sys.path.append(project_root)
print(f"Project root added to sys.path")

Current Directory: /home/razvansavin/Proiecte/flexiai/examples/Code examples
Changed Directory to: /home/razvansavin/Proiecte/flexiai
Project root added to sys.path


Import required modules and set up logging.

In [3]:
import json
import logging
from flexiai.core.flexiai_client import FlexiAI
from flexiai.config.logging_config import setup_logging
from flexiai.core.utils.helpers import HelperFunctions

# Setup logging using the predefined configuration
setup_logging(root_level=logging.DEBUG, file_level=logging.DEBUG, console_level=logging.ERROR)

# Initialize FlexiAI
flexiai = FlexiAI()

Define multiple tasks to be executed in parallel.

In [4]:
tasks = [
    {
        'function_name': 'recipe_master_assistant',
        'parameters': {
            'assistant_id': 'asst_6cXv3YvSnY1VS6HH7i1SZT1P',
            'user_content': 'I want a recipe for pizza diavola.'
        }
    },
    {
        'function_name': 'jokes_creator_assistant',
        'parameters': {
            'assistant_id': 'asst_3j7TCL05TchAcvNVMCoYBfIU',
            'user_content': 'Tell me a joke about computers.'
        }
    }
]

Execute Parallel Tool Calls:
- Execute the tasks in parallel
- Print the results.

In [5]:
try:
    results = flexiai.call_parallel_functions(tasks)
    
    # Print the results
    for result in results:
        print(result)
except Exception as e:
    print(f"An error occurred during parallel function execution: {e}")

None
None


Function to retrieve and print messages for a given thread ID:
- This function retrieves messages from the thread and prints them.

In [6]:
def print_thread_messages(thread_id):
    messages = flexiai.retrieve_messages(thread_id, order='desc', limit=20)
    for msg in messages:
        role = "🤖 Assistant" if msg['role'] == "assistant" else "🧑 You"
        print(f"{role}: {msg['content']}")

Retrieve and print messages for each task:
- Retrieve and print messages for each assistant involved in the tasks.

In [7]:
for task in tasks:
    assistant_id = task['parameters']['assistant_id']
    thread_id = flexiai.create_thread()
    flexiai.add_user_message(thread_id, task['parameters']['user_content'])
    flexiai.create_run(assistant_id, thread_id)
    print(f"\nMessages for thread with assistant {assistant_id}:")
    print_thread_messages(thread_id)


Messages for thread with assistant asst_6cXv3YvSnY1VS6HH7i1SZT1P:
🧑 You: I want a recipe for pizza diavola.
🤖 Assistant: I have crafted a recipe for Pizza Diavola. It features spicy salami, chili peppers, and mozzarella cheese on a thin crust, delivering a fiery and flavorful experience with a kick of heat.

Messages for thread with assistant asst_3j7TCL05TchAcvNVMCoYBfIU:
🧑 You: Tell me a joke about computers.
🤖 Assistant: Why did the computer keep its drinks on the motherboard?

Because it had too many bytes! 😄

I hope that joke brought a smile to your face!
