Reference:

https://www.linkedin.com/posts/reuvencohen_dspy-has-dramatically-changed-how-i-develop-activity-7175636276170174464-0Dkl?utm_source=share&utm_medium=member_desktop

In [None]:
from pydantic import Field
from lionagi.core.prompt.prompt_template import PromptTemplate
from lionagi.core.branch import Branch
from lionagi.libs import func_call

In [None]:
# import dspy
import logging
import time
import random
from typing import List, Any

# #initial DSPy
# turbo = dspy.OpenAI(model='gpt-3.5-turbo')
# colbertv2_wiki17_abstracts = dspy.ColBERTv2(url='http://20.102.90.50:2017/wiki17_abstracts')

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)


class rUv(PromptTemplate):
    """
    Recursive Unified Validators (rUv): Generate expert model outputs.
    This is the primary function that generates outputs from multiple expert models.
    """

    context: str | dict = Field(desc="The current context")
    prompt: str | dict = Field(desc="A prompt to guide the language model")
    max_tokens: int = Field(
        desc="Maximum number of tokens to generate", default="1500"
    )
    temperature: float = Field(
        desc="Temperature for sampling (higher values make output more random)",
        default="0.1",
    )
    top_k: float = Field(
        desc="Top K words to sample from (higher values consider more words)",
        default="100",
    )
    top_p: float = Field(
        desc="Top P probability threshold (higher values make output more diverse)",
        default="0.9",
    )
    frequency_penalty: float = Field(
        desc="Frequency penalty (higher values penalize frequent words)",
        default="0.0",
    )
    presence_penalty: float = Field(
        desc="Presence penalty (higher values penalize repeated words)",
        default="0.0",
    )
    output: Any | None = Field(None, desc="The generated expert model output")
    signature: str = (
        "context, prompt, max_tokens, temperature, top_k, top_p, frequency_penalty, presence_penalty -> output"
    )

    # teleprompter = Field(desc="Additional context or instructions for the language model", default="matter of fact")


class GatingModel(PromptTemplate):
    """Assess expert model relevance."""

    context: str | dict = Field(desc="The current context")
    expert_outputs: str = Field(
        desc="Serialized string of generated expert model outputs"
    )
    selected_expert: int | None = Field(
        None, desc="The index of the selected expert model"
    )
    # teleprompter = Field(desc="Additional context or instructions for the language model", default="Select the index of the most relevant expert for the given context.")
    signature: str = "context, experts_outputs -> selected_expert"


class IntrinsicRewardModel(PromptTemplate):
    """Evaluate the agent's performance intrinsically."""

    context: str | dict = Field(desc="The current context")
    expert_outputs: str = Field(
        desc="Serialized string of generated expert model outputs"
    )
    selected_expert_index: int = Field(
        desc="The index of the selected expert model"
    )
    intrinsic_reward: str | None = Field(
        None, desc="The intrinsic reward for the agent's performance"
    )
    signature: str = (
        "context, expert_outputs, selected_expert_index -> intrinsic_reward"
    )

In [None]:
class MixtureOfExperts:
    def __init__(
        self,
        num_experts: int = 4,
        min_iterations: int = 4,
        learning_rate: float = 0.1,
        discount_factor: float = 0.99,
        exploration_rate: float = 0.2,
    ):
        """
        Initialize the MixtureOfExperts class with default values if the previous code cell isn't run.

        Args:
            num_experts (int, optional): Number of expert models to use. Defaults to 4.
            min_iterations (int, optional): Minimum number of iterations to run. Defaults to 4.
            learning_rate (float, optional): Learning rate for updating expert values. Defaults to 0.1.
            discount_factor (float, optional): Discount factor for future rewards. Defaults to 0.99.
            exploration_rate (float, optional): Exploration rate for selecting experts. Defaults to 0.2.
        """
        self.num_experts: int = num_experts
        self.expert_outputs: List[str] = []
        self.min_iterations: int = min_iterations
        self.learning_rate: float = learning_rate
        self.discount_factor: float = discount_factor
        self.exploration_rate: float = exploration_rate
        self.expert_values: List[float] = [0.0] * num_experts
        self.expert_architectures: List[dict] = [
            self.initialize_expert_architecture() for _ in range(num_experts)
        ]
        self.gating_architecture: dict = self.initialize_gating_architecture()
        self.selected_experts_history: List[int] = []

    def initialize_expert_architecture(self) -> dict:
        """Initialize the architecture of an expert model."""
        return {
            "num_layers": random.randint(1, 5),
            "hidden_size": random.randint(32, 256),
        }

    def initialize_gating_architecture(self) -> dict:
        """Initialize the architecture of the gating model."""
        return {
            "num_layers": random.randint(1, 5),
            "hidden_size": random.randint(32, 256),
        }

    async def generate_expert_outputs(
        self, context: str, prompt: str, max_tokens: int, guidance: str
    ) -> str:
        """Generate expert outputs based on the given context and prompt."""
        logging.info("Starting to generate expert outputs...")
        print("Generating expert outputs...")

        # try:
        # generate_expert = dspy.Predict(rUv)
        # select_expert = dspy.Predict(GatingModel)
        # evaluate_intrinsic_reward = dspy.Predict(IntrinsicRewardModel)
        # except Exception as e:
        #     logging.error("Error initializing DSPy Predict functions: %s", e)
        #     return "Failed to initialize expert models."

        for i in range(self.num_experts):
            print(f"Generating output for Expert {i+1}/{self.num_experts}...")
            logging.info(f"Generating output for Expert {i+1}...")

            try:
                expert_prompt = f"Expert {i+1}: {prompt}"

                # Determine the desired output length based on the previous values
                if self.expert_values[i] < 0.2:
                    output_length = "short"
                elif self.expert_values[i] < 0.5:
                    output_length = "medium"
                else:
                    output_length = "long"

                expert_output = ""
                while True:
                    _template = rUv(
                        context=context,
                        prompt=expert_prompt,
                        max_tokens=str(max_tokens),
                        temperature=str(random.uniform(0.7, 1.2)),
                        top_k=str(random.randint(30, 70)),
                        top_p=str(random.uniform(0.8, 1.0)),
                        frequency_penalty=str(random.uniform(0.0, 0.5)),
                        presence_penalty=str(random.uniform(0.0, 0.5)),
                        task=f"Focus on your area of expertise. Provide a {output_length} response using a {random.choice(['formal', 'casual', 'technical'])} tone.",
                    )

                    branch = Branch()
                    _out = await branch.chat(prompt_template=_template)

                    expert_output += _out.out

                    if self.check_output_completeness(expert_output):
                        break

                    expert_prompt = (
                        f"Expert {i+1} (continued): {prompt}\n{expert_output}"
                    )

                self.expert_outputs.append(expert_output)

                print(f"LLM Parameters for Expert {i+1}:")
                print(f"Max Tokens per chunk: {max_tokens}")
                print(f"Temperature: {random.uniform(0.7, 1.2)}")
                print(f"Top K: {random.randint(30, 70)}")
                print(f"Top P: {random.uniform(0.8, 1.0)}")
                print(f"Frequency Penalty: {random.uniform(0.0, 0.5)}")
                print(f"Presence Penalty: {random.uniform(0.0, 0.5)}")
                print("------------------------")

            except Exception as e:
                logging.error(
                    "Error generating output for Expert %d: %s", i + 1, e
                )
                continue

            logging.info(f"Output for Expert {i+1} generated.")
            time.sleep(1)

        try:
            serialized_expert_outputs = ",".join(self.expert_outputs)

            if random.random() < self.exploration_rate:
                selected_expert_index = random.randint(0, self.num_experts - 1)
            else:
                _template = GatingModel(
                    context=context,
                    expert_outputs=serialized_expert_outputs,
                    task="Select the index of the most relevant expert for the given context.",
                )

                branch.clear_messages()
                _out = await branch.chat(prompt_template=_template)

                # selected_expert_index = select_expert(context=context, expert_outputs=serialized_expert_outputs, teleprompter="Select the index of the most relevant expert for the given context.").selected_expert
                # selected_expert_index = int(_out.selected_expert) if selected_expert_index.isdigit() else 0

                # Penalize selection of recently chosen experts
                selected_expert_index = _out.selected_expert
                if selected_expert_index in self.selected_experts_history:
                    selected_expert_index = random.randint(
                        0, self.num_experts - 1
                    )
        except Exception as e:
            logging.error("Error selecting the most relevant expert: %s", e)
            return "Failed to select the most relevant expert."

        if selected_expert_index < 0 or selected_expert_index >= len(
            self.expert_outputs
        ):
            selected_expert_index = 0

        self.selected_experts_history.append(selected_expert_index)

        try:
            _template = IntrinsicRewardModel(
                context=context,
                expert_outputs=serialized_expert_outputs,
                selected_expert_index=str(selected_expert_index),
            )
            branch.clear_messages()

            _out = await branch.chat(prompt_template=_template)
            intrinsic_reward_str = _out.intrinsic_reward

            # intrinsic_reward_str = evaluate_intrinsic_reward(context=context, expert_outputs=serialized_expert_outputs, selected_expert_index=str(selected_expert_index)).intrinsic_reward

            # Extract numeric reward value from the string
            if "highly effective" in intrinsic_reward_str.lower():
                intrinsic_reward = 1.0
            elif "effective" in intrinsic_reward_str.lower():
                intrinsic_reward = 0.7
            elif "moderate" in intrinsic_reward_str.lower():
                intrinsic_reward = 0.5
            elif "poor" in intrinsic_reward_str.lower():
                intrinsic_reward = 0.2
            else:
                intrinsic_reward = 0.0
        except Exception as e:
            logging.error("Error evaluating intrinsic reward: %s", e)
            intrinsic_reward = 0.0

        print("Expert output generation complete!")
        logging.info(
            "All expert outputs have been generated and the most relevant expert has been selected."
        )

        return (
            self.expert_outputs[selected_expert_index],
            selected_expert_index,
            intrinsic_reward,
        )

    def check_output_completeness(self, output: str) -> bool:
        """Check if the output ends with a proper conclusion."""
        if (
            output.endswith(".")
            or output.endswith("!")
            or output.endswith("?")
        ):
            return True
        return False

    def update_expert_values(self, selected_expert_index: int, reward: float):
        """Update the value estimate of the selected expert based on the received reward."""
        self.expert_values[selected_expert_index] += self.learning_rate * (
            reward
            + self.discount_factor * max(self.expert_values)
            - self.expert_values[selected_expert_index]
        )

    def update_expert_architecture(self, expert_index: int):
        """Update the architecture of the specified expert model."""
        self.expert_architectures[expert_index] = (
            self.initialize_expert_architecture()
        )

    def update_gating_architecture(self):
        """Update the architecture of the gating model."""
        self.gating_architecture = self.initialize_gating_architecture()

    def check_termination_condition(
        self, iteration: int, total_reward: float
    ) -> bool:
        """Check if the termination condition is met based on the iteration and total reward."""
        if iteration >= self.min_iterations and total_reward >= 10.0:
            return True
        return False

    def update_exploration_rate(self, iteration: int):
        """Update the exploration rate based on the current iteration."""
        self.exploration_rate = max(
            0.1, 1.0 - (iteration / self.min_iterations)
        )


# Example usage with adjustments for self-improvement and intrinsic motivation
context = "Acme Corporation is exploring investment opportunities in emerging technologies. The board seeks insights into which technologies could potentially transform their industry over the next decade."
prompt = "Evaluate the potential impact and investment viability of artificial intelligence (AI), blockchain, quantum computing, and biotechnology."

# # Get values from widgets
# num_experts = num_experts_widget.value
# min_iterations = min_iterations_widget.value
# learning_rate = learning_rate_widget.value
# discount_factor = discount_factor_widget.value
# exploration_rate = exploration_rate_widget.value
# context = context_widget.value
# prompt = prompt_widget.value
# max_tokens = max_tokens_widget.value
# guidance = guidance_widget.value


num_experts = 5
min_iterations = 2
learning_rate = 0.2
discount_factor = 0.75
exploration_rate = 0.5
context = """
Acme Corporation, a leading multinational conglomerate, is actively exploring strategic investment opportunities in emerging technologies to maintain its competitive edge and drive future growth. The board of directors has convened a special committee to conduct a comprehensive analysis of the technological landscape and identify the most promising areas for investment. The committee seeks in-depth insights and recommendations on which cutting-edge technologies have the potential to revolutionize Acme's core industries and create new market opportunities over the next decade.
"""
prompt = """
Conduct a thorough evaluation of the potential impact and investment viability of four key emerging technologies: artificial intelligence (AI), blockchain, quantum computing, and biotechnology. For each technology, provide a detailed assessment of its current state of development, major players in the field, and projected market growth. Analyze the specific applications and use cases within Acme's core industries, highlighting the potential benefits, challenges, and disruptions each technology could bring. Consider factors such as scalability, regulatory landscape, talent availability, and competitive dynamics when assessing the investment viability of each technology. Provide clear recommendations on which technologies Acme should prioritize for investment, along with a proposed allocation of resources and a high-level roadmap for integration into the company's existing operations.
"""
max_tokens = 100
guidance = """
Provide a comprehensive and well-structured analysis, focusing on delivering clear, concise, and actionable insights. Use industry-specific terminology and cite relevant data and examples to support your recommendations. Maintain an objective and analytical tone throughout the report.
"""


# Instantiate MixtureOfExperts with widget values
moe = MixtureOfExperts(
    num_experts=num_experts,
    min_iterations=min_iterations,
    learning_rate=learning_rate,
    discount_factor=discount_factor,
    exploration_rate=exploration_rate,
)
for iteration in range(moe.min_iterations):
    final_output, selected_expert_index, intrinsic_reward = (
        await moe.generate_expert_outputs(
            context, prompt, max_tokens, guidance
        )
    )

    print(
        f"Iteration {iteration+1} - Selected Expert: {selected_expert_index}, Intrinsic Reward: {intrinsic_reward}"
    )
    print("Expert Values:", moe.expert_values)
    print("Final Expert Output:")
    print(final_output)
    print("------------------------")

    # Get reward from an external expert
    expert_reward = float(
        input(f"Enter expert reward for iteration {iteration+1}: ")
    )

    # Combine intrinsic and expert rewards
    total_reward = intrinsic_reward + expert_reward

    print(f"Expert Reward: {expert_reward}, Total Reward: {total_reward}")

    # Update the value estimate of the selected expert
    moe.update_expert_values(selected_expert_index, total_reward)

    # Update expert and gating architectures for self-improvement
    if random.random() < 0.2:
        moe.update_expert_architecture(selected_expert_index)
    if random.random() < 0.1:
        moe.update_gating_architecture()

    # Update exploration rate based on the current iteration
    moe.update_exploration_rate(iteration)

    # Check termination condition
    if moe.check_termination_condition(iteration, total_reward):
        print("Termination condition met. Stopping the process.")
        break

In [None]:
a = await moe.generate_expert_outputs(context, prompt, max_tokens, guidance)