Multi threading or parallel running agents with no doubt would be more effecient is some cases rather than sequence. But while using ADK Parallel Agents you may face the lack of ability change which agents are needed to run in parallel.
To do so I would like you provide modified version of parallel agent which invokes specific agents based on current state.

In [None]:
import logging
from typing import Optional
from typing import AsyncGenerator
from typing_extensions import override
from google.adk.agents import BaseAgent
from google.adk.events import Event
from google.adk.agents.invocation_context import InvocationContext
from ..utils import _create_branch_ctx_for_sub_agent, _merge_agent_run

In [None]:
def _create_branch_ctx_for_sub_agent( #function name speak for itself. You can take it from here or from ADK library.
    agent: BaseAgent,
    sub_agent: BaseAgent,
    invocation_context: InvocationContext,
) -> InvocationContext:
  """Create isolated branch for every sub-agent."""
  invocation_context = invocation_context.model_copy()
  branch_suffix = f'{agent.name}.{sub_agent.name}'
  invocation_context.branch = (
      f'{invocation_context.branch}.{branch_suffix}'
      if invocation_context.branch
      else branch_suffix
  )
  return invocation_context


In [None]:
async def _merge_agent_run( # necessary for parallel agents
    agent_runs: list[AsyncGenerator[Event, None]],
) -> AsyncGenerator[Event, None]:
  """Merges the agent run event generator.

  This implementation guarantees for each agent, it won't move on until the
  generated event is processed by upstream runner.

  Args:
      agent_runs: A list of async generators that yield events from each agent.

  Yields:
      Event: The next event from the merged generator.
  """
  tasks = [
      asyncio.create_task(events_for_one_agent.__anext__())
      for events_for_one_agent in agent_runs
  ]
  pending_tasks = set(tasks)

  while pending_tasks:
    done, pending_tasks = await asyncio.wait(
        pending_tasks, return_when=asyncio.FIRST_COMPLETED
    )
    for task in done:
      try:
        yield task.result()

        # Find the generator that produced this event and move it on.
        for i, original_task in enumerate(tasks):
          if task == original_task:
            new_task = asyncio.create_task(agent_runs[i].__anext__())
            tasks[i] = new_task
            pending_tasks.add(new_task)
            break  # stop iterating once found

      except StopAsyncIteration:
        continue

In [None]:
logger = logging.getLogger(__name__)

class CustomParallelAgent(BaseAgent):
    def get_agent_by_name(self, name: str) -> Optional[BaseAgent]: #simple function to search specific agent in the list based on its name.
        """Find sub-agent by name."""
        return next((agent for agent in self.sub_agents if agent.name == name), None)

    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        state = ctx.session.state
        config = state.get("your_state_key", {})
        agents_to_run = []
        agent_mapping = { #map agents and keys in the state. You can define you custom logic based on your needs
            "past": "AgentNameOfPast", #here is the example of keys which contain True/False.
            "future": "AgentNameOfFuture", 
            "present": "AgentNameOfPresent" #type your agent's names as values of the keys they should be invokated from.
        }

        agents_to_run = [
            agent for config_key, agent_name in agent_mapping.items()
            if config.get(config_key) and (agent := self.get_agent_by_name(agent_name))
        ]
        
        if not agents_to_run:
            logging.warning(f"--- [AGENT: {self.name}] No agents to run ---")
            yield Event(author=self.name)
        
        logger.info(f"Running agents: {[agent.name for agent in agents_to_run]}")
        agent_runs = [
            sub_agent.run_async(
                _create_branch_ctx_for_sub_agent(self, sub_agent, ctx)
            )
            for sub_agent in agents_to_run
        ]
        async for event in _merge_agent_run(agent_runs):
            yield event