In [5]:
from pathlib import Path
from pydantic import BaseModel, validator
import yaml

In [2]:
config_path = Path("configs/default_missions/search.yaml")

In [3]:
with open(config_path, "r") as f:
    config = yaml.safe_load(f)

In [20]:
config.items()

dict_items([('name', 'Search_{target}'), ('default_transitions', True), ('states', {'ROTATE_CLOCK': {'initial': True, 'event': 'object_detection_event_gate_clock', 'expire_time': 5, 'action': {'type': 'rotation', 'args': {'speed': '(0', 0: None, '15)': None, 'angle': 90}}}, 'ROTATE_ANTICLOCK': {'event': 'object_detection_event_gate_anticlock', 'expire_time': 5, 'action': {'type': 'rotation', 'args': {'speed': '(0', 0: None, '-15)': None, 'angle': 90}}}, 'DUMMY': {'event': 'no_event'}}), ('transitions', [{'trigger': 'found', 'source': ['ROTATE_CLOCK', 'ROTATE_ANTICLOCK'], 'dest': '{success}'}, {'trigger': '{timeout}', 'source': 'ROTATE_CLOCK', 'dest': 'ROTATE_ANTICLOCK'}, {'trigger': '{timeout}', 'source': 'ROTATE_ANTICLOCK', 'dest': '{failure}'}]), ('events', {'object_detection_event_gate_clock': {'type': 'TopicEvent', 'topic': 'gate_clock', 'trigger': 'found', 'count': 2, 'data': 'gate'}, 'object_detection_event_gate_anticlock': {'type': 'TopicEvent', 'topic': 'gate_anticlock', 'trigg

In [25]:
class StateAction(BaseModel):
    type: str = ""
    goal: dict = {}


class State(BaseModel):
    initial: bool = False
    event: str = ""
    expire_time: float = 0.0
    actions: dict[int, StateAction] = {}


class Transition(BaseModel):
    trigger: str = ""
    source: str | list[str] = ""
    dest: str | list[str] = ""

    @validator('trigger')
    def trigger_must_be_lower(cls, v):
        return v.lower()

    @validator('source', 'dest')
    def source_dest_must_be_upper(cls, v):
        if isinstance(v, str):
            return v.upper()
        elif isinstance(v, list):
            return [s.upper() for s in v]


class Event(BaseModel):
    type: str = ""
    trigger: str = ""


class TopicEvent(Event):
    topic: str = ""
    data: str = ""
    count: int = 0


class ObjectDetectionEvent(Event):
    topic: str = ""
    data: str = ""
    count: int = 0


class Mission(BaseModel):
    states: dict[str, State] = {}
    transitions: list[Transition] = []
    events: dict[str, Event] = {}
    default_transitions: bool = True

    @validator('states')
    def states_must_be_upper(cls, v: dict[str, State]):
        inited_states = {}
        for name, state in v.items():
            inited_states[name.upper()] = State(**state.dict())
        return inited_states

    @validator('events')
    def init_events(cls, v: dict[str, Event]):
        inited_events = {}
        for name, event in v.items():
            if event.type == "TopicEvent":
                inited_events[name.lower()] = TopicEvent(**event.dict())
            elif event.type == "ObjectDetectionEvent":
                inited_events[name.lower()] = ObjectDetectionEvent(**event.dict())
            else:
                raise ValueError(f"Event type {event.type} not supported")
        return inited_events


class Scenario(BaseModel):
    missions: dict[str, Mission] = {}


class Config(BaseModel):
    scenarios: dict[str, Scenario] = {}

In [27]:
Mission().json()

'{"states": {}, "transitions": [], "events": {}, "default_transitions": true}'