In [65]:
from typing import get_args

from bcs import BCSz

from api_dev.types import AI, DIO, Motor

ai = get_args(AI.__value__)
motor = get_args(Motor.__value__)
dio = get_args(DIO.__value__)

In [None]:
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings, SettingsConfigDict


class Connection(BaseSettings):
    addr: str = Field(alias="BCS_SERVER_ADDRESS")
    port: int = Field(alias="BCS_SERVER_PORT")
    model_config = SettingsConfigDict(env_file=".env", extra="ignore")


# Expected Data Structures for AI returns and for motor position returns


class AiData(BaseModel):
    chan: str
    period: float
    time: float
    data: list[float]


class AiResponse(BaseModel):
    success: bool
    error_description: str = Field(alias="error description")
    log: bool = Field(alias="log?")
    not_found: list[str] = []
    chans: list[AiData]
    API_delta_t: float

    model_config = {"populate_by_name": True}


class MotorData(BaseModel):
    motor: str
    position: float
    position_raw: float
    goal: float
    goal_raw: float
    status: int
    time: float


class MotorResponse(BaseModel):
    success: bool
    error_description: str = Field(alias="error description")
    log: bool = Field(alias="log?")
    not_found: list[str] = []
    data: list[MotorData]
    API_delta_t: float

    model_config = {"populate_by_name": True}

    def __repr__(self):
        return f"<MotorResponse success={self.success} data={self.data}>"


class DioData(BaseModel):
    enabled: bool
    data: bool


class DioResponse(BaseModel):
    success: bool
    error_description: str = Field(alias="error description")
    log: bool = Field(alias="log?")
    chans: list[str]
    not_found: list[str] = []
    enabled: list[bool]
    data: list[bool]
    API_delta_t: float

    model_config = {"populate_by_name": True}


config = Connection()
config

Connection(addr='localhost', port=5577)

In [None]:
from typing import Generic, TypeVar

T = TypeVar("T", bound=AI | Motor | DIO)


class RsoxsAccessor(Generic[T]):
    """Class structure responsible for accessing AI's and Motors."""

    def __init__(
        self, server: BCSz.BCSServer, kind: type[T], *, readonly: bool = False
    ) -> None:
        self.server = server
        self.kind = kind
        self.readonly = readonly

    async def __getitem__(self, key: str) -> AiResponse | MotorResponse | DioResponse:
        # Check if key is valid for this accessor type
        if key not in get_args(self.kind.__value__):
            raise KeyError(f"{key} is not a valid {self.kind}")

        if self.kind is AI:
            result = await self.server.get_acquired_array(chans=[key])
            return AiResponse(**result)
        elif self.kind is Motor:
            result = await self.server.get_motor(motors=[key])
            return MotorResponse(**result)
        elif self.kind is DIO:
            result = await self.server.get_di(chans=[key])
            return DioResponse(**result)
        else:
            raise NotImplementedError(
                f"Accessor for type {self.kind} is not implemented."
            )

    async def set(self, key: str, value):
        if self.readonly:
            raise PermissionError("This accessor is read-only.")

        # Check if key is valid for this accessor type
        if key not in get_args(self.kind.__value__):
            raise KeyError(f"{key} is not a valid {self.kind}")

        if self.kind is Motor:
            await self.server.command_motor(
                commands=["Backlash Move"], motors=[key], goals=[value]
            )
        elif self.kind is DIO:
            if not isinstance(value, (int, bool)):
                raise ValueError("DIO value must be an integer or boolean.")
            await self.server.set_do(chan=key, value=bool(value))
        else:
            raise NotImplementedError(
                f"Setting value for type {self.kind} is not implemented."
            )

    async df(self, keys: list[str]) -> pd.DataFrame:

class RsoxsServer(BCSz.BCSServer):
    CONFIG = Connection()

    def __init__(self):
        super().__init__()
        self.ai = RsoxsAccessor[AI](self, AI, readonly=True)
        self.motor = RsoxsAccessor[Motor](self, Motor)
        self.dio = RsoxsAccessor[DIO](self, DIO)
        #  Some default metadata values

    async def connect_with_env(self):
        """Connect to the server using environment variables"""
        import io
        from contextlib import redirect_stdout

        buff = io.StringIO()
        with redirect_stdout(buff):
            await self.connect(**self.CONFIG.model_dump())
            self.__public_key = buff.getvalue().split(" ")[-1]

    @classmethod
    async def create(cls) -> "RsoxsServer":
        """Factory method to create and connect the server"""
        instance = cls()
        await instance.connect_with_env()
        return instance



server = await RsoxsServer.create()

In [96]:
await server.motor["Sample Z"]

MotorResponse(success=True, error_description='no error', log=False, not_found=[], data=[MotorData(motor='Sample Z', position=-6.0, position_raw=12000.0, goal=-6.0, goal_raw=12000.0, status=65576, time=3844100015.95168)], API_delta_t=0.006968975067138672)

In [97]:
await server.dio["Shutter Output"]

DioResponse(success=True, error_description='no error', log=True, chans=['Shutter Output'], not_found=[], enabled=[True], data=[False], API_delta_t=0.00661468505859375)

In [98]:
await server.motor.set("Sample Z", -6)
await server.motor["Sample Z"]

MotorResponse(success=True, error_description='no error', log=False, not_found=[], data=[MotorData(motor='Sample Z', position=-6.0, position_raw=12000.0, goal=-6.0, goal_raw=12000.0, status=65544, time=3844100024.20668)], API_delta_t=0.011004209518432617)