In [8]:
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [9]:
import autogen
from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent
import chromadb
from prompts.plan_proposal import proposal_prompt
from test_dalle import DALLEAgent
from functions.draft_proposal import store_draft_proposal_schema, store_draft_proposal

config_list = autogen.config_list_from_json(
    "OAI_CONFIG_LIST",
    filter_dict={
        "model": ["gpt-4"],
    },
)

In [2]:
termination_msg = lambda x: isinstance(x, dict) and "TERMINATE" == str(x.get("content", ""))[-9:].upper()

llm_config = {
    "cache_seed": 42,  # change the cache_seed for different trials
    "temperature": 0,
    "config_list": config_list,
    "timeout": 120,
}

boss = autogen.UserProxyAgent(
    name="Boss",
    is_termination_msg=termination_msg,
    human_input_mode="NEVER",
    system_message="The boss who ask questions and give tasks.",
    code_execution_config=False,  # we don't want to execute code in this case.
    default_auto_reply="Reply `TERMINATE` if the task is done.",
)

boss_aid = RetrieveUserProxyAgent(
    name="Boss_Assistant",
    is_termination_msg=termination_msg,
    system_message="Assistant who has extra content retrieval power for solving difficult problems.",
    human_input_mode="NEVER",
    max_consecutive_auto_reply=3,
    retrieve_config={
        "task": "code",
        "docs_path": "https://raw.githubusercontent.com/microsoft/FLAML/main/website/docs/Examples/Integrate%20-%20Spark.md",
        "chunk_token_size": 1000,
        "model": config_list_gpt4[0]["model"],
        "client": chromadb.PersistentClient(path="./chromadb"),
        "collection_name": "groupchat",
        "get_or_create": True,
    },
    code_execution_config=False,  # we don't want to execute code in this case.
)

coder = autogen.AssistantAgent(
    name="Senior_Python_Engineer",
    is_termination_msg=termination_msg,
    system_message="You are a senior python engineer. Reply `TERMINATE` in the end when everything is done.",
    llm_config=llm_config,
)

pm = autogen.AssistantAgent(
    name="Product_Manager",
    is_termination_msg=termination_msg,
    system_message="You are a product manager. Reply `TERMINATE` in the end when everything is done.",
    llm_config=llm_config,
)

reviewer = autogen.AssistantAgent(
    name="Code_Reviewer",
    is_termination_msg=termination_msg,
    system_message="You are a code reviewer. Reply `TERMINATE` in the end when everything is done.",
    llm_config=llm_config,
)

def _reset_agents():
    boss.reset()
    boss_aid.reset()
    coder.reset()
    pm.reset()
    reviewer.reset()


def rag_chat(problem):
    _reset_agents()
    groupchat = autogen.GroupChat(
        agents=[boss_aid, coder, pm, reviewer], messages=[], max_round=12, speaker_selection_method="round_robin"
    )
    manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)

    # Start chatting with boss_aid as this is the user proxy agent.
    boss_aid.initiate_chat(
        manager,
        problem=problem,
        n_results=3,
    )

In [4]:
PROBLEM = "How to use spark for parallel training in FLAML? Give me sample code."

rag_chat(PROBLEM)

Trying to create collection.


  from .autonotebook import tqdm as notebook_tqdm
Number of requested results 3 is greater than number of elements in index 2, updating n_results = 2


doc_ids:  [['doc_0', 'doc_1']]
[32mAdding doc_id doc_0 to context.[0m
[32mAdding doc_id doc_1 to context.[0m
[33mBoss_Assistant[0m (to chat_manager):

You're a retrieve augmented coding assistant. You answer user's questions based on your own knowledge and the
context provided by the user.
If you can't answer the question with or without the current context, you should reply exactly `UPDATE CONTEXT`.
For code generation, you must obey the following rules:
Rule 1. You MUST NOT install any packages because all the packages needed are already installed.
Rule 2. You must follow the formats below to write your code:
```language
# your code
```

User's question is: How to use spark for parallel training in FLAML? Give me sample code.

Context is: # Integrate - Spark

FLAML has integrated Spark for distributed training. There are two main aspects of integration with Spark:
- Use Spark ML estimators for AutoML.
- Use Spark to run training in parallel spark jobs.

## Spark ML Estimators



huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


[33mSenior_Python_Engineer[0m (to chat_manager):

Based on the context, here is a sample code on how to use Spark for parallel training in FLAML:

```python
# Import necessary libraries
import pandas as pd
from flaml.automl.spark.utils import to_pandas_on_spark
from pyspark.ml.feature import VectorAssembler
import flaml

# Creating a dictionary
data = {"Square_Feet": [800, 1200, 1800, 1500, 850],
      "Age_Years": [20, 15, 10, 7, 25],
      "Price": [100000, 200000, 300000, 240000, 120000]}

# Creating a pandas DataFrame
dataframe = pd.DataFrame(data)
label = "Price"

# Convert to pandas-on-spark dataframe
psdf = to_pandas_on_spark(dataframe)

# Use VectorAssembler to merge all feature columns into a single vector column
columns = psdf.columns
feature_cols = [col for col in columns if col != label]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
psdf = featurizer.transform(psdf.to_spark(index_col="index"))["index", "features"]

# Initialize AutoML
automl =

In [17]:
def call_rag_chat(problem):
    _reset_agents()

    # In this case, we will have multiple user proxy agents and we don't initiate the chat
    # with RAG user proxy agent.
    # In order to use RAG user proxy agent, we need to wrap RAG agents in a function and call
    # it from other agents.
    def retrieve_content(message, n_results=3):
        print(message)
        boss_aid.n_results = n_results  # Set the number of results to be retrieved.
        # Check if we need to update the context.
        update_context_case1, update_context_case2 = boss_aid._check_update_context(message)
        print((update_context_case1, update_context_case2, boss_aid.update_context))
        if (update_context_case1 or update_context_case2) and boss_aid.update_context:
            boss_aid.problem = message if not hasattr(boss_aid, "problem") else boss_aid.problem
            _, ret_msg = boss_aid._generate_retrieve_user_reply(message)
        else:
            ret_msg = boss_aid.generate_init_message(message, n_results=n_results)
        print(ret_msg)
        return ret_msg if ret_msg else message

    boss_aid.human_input_mode = "NEVER"  # Disable human input for boss_aid since it only retrieves content.

    llm_config = {
        "functions": [
            {
                "name": "retrieve_content",
                "description": "retrieve content for code generation and question answering.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "message": {
                            "type": "string",
                            "description": "Refined message which keeps the original meaning and can be used to retrieve content for code generation and question answering.",
                        }
                    },
                    "required": ["message"],
                },
            },
        ],
        "config_list": config_list,
        "timeout": 60,
        "cache_seed": 43,
    }

    for agent in [coder, pm, reviewer]:
        # update llm_config for assistant agents.
        agent.llm_config.update(llm_config)

    for agent in [boss, coder, pm, reviewer]:
        # register functions for all agents.
        agent.register_function(
            function_map={
                "retrieve_content": retrieve_content,
            }
        )

    groupchat = autogen.GroupChat(
        agents=[boss, coder, pm, reviewer],
        messages=[],
        max_round=12,
        speaker_selection_method="auto",
        allow_repeat_speaker=False,
    )

    manager_llm_config = llm_config.copy()
    manager_llm_config.pop("functions")
    manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=manager_llm_config)

    # Start chatting with the boss as this is the user proxy agent.
    boss.initiate_chat(
        manager,
        message=problem,
    )

PROBLEM = "How to use spark for parallel training in FLAML? Give me sample code with retrieve content."
call_rag_chat(PROBLEM)

[33mBoss[0m (to chat_manager):

How to use spark for parallel training in FLAML? Give me sample code with retrieve content.



--------------------------------------------------------------------------------
[33mSenior_Python_Engineer[0m (to chat_manager):

To use Apache Spark for parallel training in FLAML, you need to use the `flaml.tune.run` function. Here is a sample code snippet:

```python
from flaml import tune

# Define your training function
def train_func(config):
    # Your training code here
    pass

# Define your search space
search_space = {
    "lr": tune.loguniform(1e-4, 1e-1),
    "momentum": tune.uniform(0.1, 0.9),
}

# Use SparkTrials for parallel training
from ray.tune import SparkTrials

# Set the number of CPUs for each trial
spark_trials = SparkTrials(max_concurrent=4)

# Run the tuning
analysis = tune.run(
    train_func,
    config=search_space,
    num_samples=10,
    scheduler=tune.schedulers.FIFOScheduler(),
    trial_executor=spark_trials,
)

# Retrieve the best config and best trial
best_config = analysis.get_best_config(metric="accuracy", mode="max")
best_trial = analysis.get

In [19]:
coder._reply_func_list

[{'trigger': [autogen.agentchat.agent.Agent, None],
  'reply_func': <function autogen.agentchat.conversable_agent.ConversableAgent.a_check_termination_and_human_reply(self, messages: Optional[List[Dict]] = None, sender: Optional[autogen.agentchat.agent.Agent] = None, config: Optional[Any] = None) -> Tuple[bool, Optional[str]]>,
  'config': None,
  'init_config': None,
  'reset_config': None},
 {'trigger': [autogen.agentchat.agent.Agent, None],
  'reply_func': <function autogen.agentchat.conversable_agent.ConversableAgent.check_termination_and_human_reply(self, messages: Optional[List[Dict]] = None, sender: Optional[autogen.agentchat.agent.Agent] = None, config: Optional[Any] = None) -> Tuple[bool, Optional[str]]>,
  'config': None,
  'init_config': None,
  'reset_config': None},
 {'trigger': [autogen.agentchat.agent.Agent, None],
  'reply_func': <function autogen.agentchat.conversable_agent.ConversableAgent.a_generate_function_call_reply(self, messages: Optional[List[Dict]] = None, sen

In [20]:
coder.chat_messages

defaultdict(list,
            {<autogen.agentchat.groupchat.GroupChatManager at 0x2ba3eefe0>: [{'content': 'How to use spark for parallel training in FLAML? Give me sample code with retrieve content.',
               'name': 'Boss',
               'role': 'user'},
              {'content': 'To use Apache Spark for parallel training in FLAML, you need to use the `flaml.tune.run` function. Here is a sample code snippet:\n\n```python\nfrom flaml import tune\n\n# Define your training function\ndef train_func(config):\n    # Your training code here\n    pass\n\n# Define your search space\nsearch_space = {\n    "lr": tune.loguniform(1e-4, 1e-1),\n    "momentum": tune.uniform(0.1, 0.9),\n}\n\n# Use SparkTrials for parallel training\nfrom ray.tune import SparkTrials\n\n# Set the number of CPUs for each trial\nspark_trials = SparkTrials(max_concurrent=4)\n\n# Run the tuning\nanalysis = tune.run(\n    train_func,\n    config=search_space,\n    num_samples=10,\n    scheduler=tune.schedulers.FIFOS