# Device State Management Implementation

This notebook implements and tests key classes for device state management and hardware interaction:
- DeviceState
- DeviceStateManager
- HardwareAbstraction
- AsioManager

These classes work together to provide a structured approach to managing device states and handling hardware interactions asynchronously.

## Define the DeviceState Class

First, we'll define the DeviceState class to represent the state of a device, including attributes like state name and status.

In [None]:
import enum
from dataclasses import dataclass
from typing import Dict, Any, Optional, List, Callable


class StateStatus(enum.Enum):
    """Enumeration representing the status of a device state."""
    INACTIVE = 0
    ACTIVE = 1
    TRANSITIONING = 2
    ERROR = 3


@dataclass
class DeviceState:
    """
    Represents the state of a device.

    Attributes:
        name (str): The name of the state.
        status (StateStatus): The status of the state.
        metadata (Dict[str, Any]): Additional metadata associated with the state.
    """
    name: str
    status: StateStatus = StateStatus.INACTIVE
    metadata: Dict[str, Any] = None

    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}

    def activate(self) -> None:
        """Activate the state."""
        self.status = StateStatus.ACTIVE

    def deactivate(self) -> None:
        """Deactivate the state."""
        self.status = StateStatus.INACTIVE

    def set_transitioning(self) -> None:
        """Set the state to transitioning."""
        self.status = StateStatus.TRANSITIONING

    def set_error(self) -> None:
        """Set the state to error."""
        self.status = StateStatus.ERROR

    def is_active(self) -> bool:
        """Check if the state is active."""
        return self.status == StateStatus.ACTIVE

    def __str__(self) -> str:
        return f"DeviceState(name='{self.name}', status={self.status.name})"

## Implement the DeviceStateManager Class

Next, we'll implement the DeviceStateManager class to manage transitions between different DeviceState instances.

In [None]:
class StateTransitionError(Exception):
    """Exception raised for errors in state transitions."""
    pass


class DeviceStateManager:
    """
    Manages transitions between different DeviceState instances.

    Attributes:
        states (Dict[str, DeviceState]): Dictionary mapping state names to state objects.
        current_state (DeviceState): The current active state.
        transition_handlers (Dict[tuple, Callable]): Handlers for state transitions.
    """

    def __init__(self, initial_state: Optional[DeviceState] = None):
        self.states: Dict[str, DeviceState] = {}
        self.current_state: Optional[DeviceState] = None
        self.transition_handlers: Dict[tuple, Callable] = {}

        if initial_state:
            self.add_state(initial_state)
            self.set_state(initial_state.name)

    def add_state(self, state: DeviceState) -> None:
        """
        Add a state to the manager.

        Args:
            state (DeviceState): The state to add.
        """
        self.states[state.name] = state

    def register_transition_handler(self, from_state: str, to_state: str,
                                    handler: Callable[[DeviceState, DeviceState], None]) -> None:
        """
        Register a handler for a specific state transition.

        Args:
            from_state (str): The starting state name.
            to_state (str): The target state name.
            handler (Callable): Function that handles the transition.
        """
        self.transition_handlers[(from_state, to_state)] = handler

    def can_transition(self, to_state: str) -> bool:
        """
        Check if transition to the specified state is possible.

        Args:
            to_state (str): The target state name.

        Returns:
            bool: Whether the transition is possible.
        """
        if not self.current_state:
            return to_state in self.states

        transition_key = (self.current_state.name, to_state)

        # If there's a specific handler, the transition is allowed
        if transition_key in self.transition_handlers:
            return True

        # By default, allow transitions if the target state exists
        return to_state in self.states

    def set_state(self, state_name: str) -> None:
        """
        Transition to the specified state.

        Args:
            state_name (str): The name of the state to transition to.

        Raises:
            StateTransitionError: If the transition is not possible or the state doesn't exist.
        """
        if state_name not in self.states:
            raise StateTransitionError(f"State '{state_name}' does not exist")

        target_state = self.states[state_name]

        # If no current state (initialization), just set it
        if not self.current_state:
            self.current_state = target_state
            self.current_state.activate()
            return

        # Check if transition is allowed
        if not self.can_transition(state_name):
            raise StateTransitionError(
                f"Cannot transition from '{self.current_state.name}' to '{state_name}'")

        # Get the previous state before changing
        previous_state = self.current_state

        # Handle the transition if a handler exists
        transition_key = (previous_state.name, state_name)
        if transition_key in self.transition_handlers:
            try:
                previous_state.set_transitioning()
                self.transition_handlers[transition_key](previous_state, target_state)
            except Exception as e:
                previous_state.set_error()
                raise StateTransitionError(f"Transition handler failed: {str(e)}")

        # Update states
        previous_state.deactivate()
        self.current_state = target_state
        self.current_state.activate()

    def get_current_state(self) -> Optional[DeviceState]:
        """
        Get the current state.

        Returns:
            DeviceState: The current state object.
        """
        return self.current_state

    def get_available_states(self) -> List[str]:
        """
        Get a list of all available state names.

        Returns:
            List[str]: List of state names.
        """
        return list(self.states.keys())

## Create the HardwareAbstraction Class

Now, we'll create the HardwareAbstraction class to provide an interface for interacting with hardware components.

In [None]:
import time
from abc import ABC, abstractmethod


class HardwareComponent(ABC):
    """Abstract base class for hardware components."""

    @abstractmethod
    def initialize(self) -> bool:
        """Initialize the hardware component."""
        pass

    @abstractmethod
    def shutdown(self) -> bool:
        """Shutdown the hardware component."""
        pass

    @abstractmethod
    def is_connected(self) -> bool:
        """Check if the hardware component is connected."""
        pass


class DummyHardware(HardwareComponent):
    """A dummy hardware component for testing."""

    def __init__(self, name: str):
        self.name = name
        self._connected = False

    def initialize(self) -> bool:
        """Initialize the dummy hardware."""
        print(f"Initializing {self.name}...")
        time.sleep(0.5)  # Simulate initialization delay
        self._connected = True
        return True

    def shutdown(self) -> bool:
        """Shutdown the dummy hardware."""
        print(f"Shutting down {self.name}...")
        time.sleep(0.5)  # Simulate shutdown delay
        self._connected = False
        return True

    def is_connected(self) -> bool:
        """Check if the dummy hardware is connected."""
        return self._connected

    def perform_action(self, action: str) -> bool:
        """Perform a specific action on the hardware."""
        if not self._connected:
            return False
        print(f"{self.name} performing action: {action}")
        return True


class HardwareAbstraction:
    """
    Provides a unified interface for interacting with hardware components.

    Attributes:
        components (Dict[str, HardwareComponent]): Dictionary of hardware components.
    """

    def __init__(self):
        self.components: Dict[str, HardwareComponent] = {}

    def register_component(self, name: str, component: HardwareComponent) -> None:
        """
        Register a hardware component.

        Args:
            name (str): The name of the component.
            component (HardwareComponent): The component object.
        """
        self.components[name] = component

    def initialize_all(self) -> Dict[str, bool]:
        """
        Initialize all registered hardware components.

        Returns:
            Dict[str, bool]: Map of component names to initialization success.
        """
        results = {}
        for name, component in self.components.items():
            results[name] = component.initialize()
        return results

    def shutdown_all(self) -> Dict[str, bool]:
        """
        Shutdown all registered hardware components.

        Returns:
            Dict[str, bool]: Map of component names to shutdown success.
        """
        results = {}
        for name, component in self.components.items():
            results[name] = component.shutdown()
        return results

    def get_component(self, name: str) -> Optional[HardwareComponent]:
        """
        Get a specific hardware component.

        Args:
            name (str): The name of the component.

        Returns:
            Optional[HardwareComponent]: The component if found, None otherwise.
        """
        return self.components.get(name)

    def check_status(self) -> Dict[str, bool]:
        """
        Check the connection status of all components.

        Returns:
            Dict[str, bool]: Map of component names to connection status.
        """
        return {name: component.is_connected() for name, component in self.components.items()}

## Develop the AsioManager Class

Now, we'll develop the AsioManager class to handle asynchronous operations and manage I/O tasks.

In [None]:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial


class AsioManager:
    """
    Manages asynchronous I/O operations.

    This class provides a wrapper around asyncio to handle async operations
    and manage background tasks.

    Attributes:
        loop (asyncio.AbstractEventLoop): The asyncio event loop.
        executor (ThreadPoolExecutor): Thread pool for running blocking operations.
        tasks (List[asyncio.Task]): List of running tasks.
    """

    def __init__(self, max_workers: int = 4):
        """
        Initialize the AsioManager.

        Args:
            max_workers (int): Maximum number of worker threads.
        """
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.tasks = []

    def run_in_thread(self, func, *args, **kwargs):
        """
        Run a function in a background thread.

        Args:
            func: The function to run.
            *args: Arguments to pass to the function.
            **kwargs: Keyword arguments to pass to the function.

        Returns:
            concurrent.futures.Future: Future object representing the execution.
        """
        return self.executor.submit(func, *args, **kwargs)

    async def run_async(self, func, *args, **kwargs):
        """
        Run a blocking function asynchronously.

        Args:
            func: The function to run.
            *args: Arguments to pass to the function.
            **kwargs: Keyword arguments to pass to the function.

        Returns:
            Any: The result of the function.
        """
        return await self.loop.run_in_executor(self.executor, partial(func, *args, **kwargs))

    def schedule_task(self, coro):
        """
        Schedule a coroutine to run as a task.

        Args:
            coro: The coroutine to schedule.

        Returns:
            asyncio.Task: The scheduled task.
        """
        task = self.loop.create_task(coro)
        self.tasks.append(task)
        task.add_done_callback(lambda t: self.tasks.remove(t))
        return task

    def run_until_complete(self, coro):
        """
        Run the event loop until the coroutine completes.

        Args:
            coro: The coroutine to run.

        Returns:
            Any: The result of the coroutine.
        """
        return self.loop.run_until_complete(coro)

    def run_forever(self):
        """Run the event loop forever."""
        self.loop.run_forever()

    def stop(self):
        """Stop the event loop."""
        for task in self.tasks:
            task.cancel()
        self.loop.stop()

    def close(self):
        """Close the event loop and executor."""
        self.stop()
        self.executor.shutdown()
        self.loop.close()

    async def periodic_task(self, interval_seconds, callback, *args, **kwargs):
        """
        Run a callback periodically.

        Args:
            interval_seconds (float): Interval between calls in seconds.
            callback: The function to call.
            *args: Arguments to pass to the callback.
            **kwargs: Keyword arguments to pass to the callback.
        """
        while True:
            try:
                await self.run_async(callback, *args, **kwargs)
            except Exception as e:
                print(f"Error in periodic task: {e}")
            await asyncio.sleep(interval_seconds)

## Integrate and Test the Classes

Now, let's integrate all the classes and test their functionality with example scenarios.

In [None]:
# Test the DeviceState and DeviceStateManager classes

# Create some device states
idle_state = DeviceState(name="idle")
running_state = DeviceState(name="running")
error_state = DeviceState(name="error")

# Create a state manager with an initial state
state_manager = DeviceStateManager(initial_state=idle_state)
state_manager.add_state(running_state)
state_manager.add_state(error_state)

# Define a transition handler
def idle_to_running_handler(from_state, to_state):
    print(f"Transitioning from {from_state.name} to {to_state.name}...")
    # Simulate some transition logic
    time.sleep(1)
    print("Transition complete!")

# Register the transition handler
state_manager.register_transition_handler("idle", "running", idle_to_running_handler)

# Test state transitions
print("Current state:", state_manager.get_current_state())
print("Available states:", state_manager.get_available_states())

print("\nTransitioning to running state...")
state_manager.set_state("running")
print("Current state:", state_manager.get_current_state())

print("\nTransitioning to error state...")
state_manager.set_state("error")
print("Current state:", state_manager.get_current_state())

# Try to transition back to idle
print("\nTransitioning back to idle state...")
state_manager.set_state("idle")
print("Current state:", state_manager.get_current_state())

In [None]:
# Test the HardwareAbstraction class

# Create a hardware abstraction layer
hal = HardwareAbstraction()

# Create some dummy hardware components
sensor = DummyHardware("Temperature Sensor")
motor = DummyHardware("Motor Controller")
display = DummyHardware("LCD Display")

# Register the components
hal.register_component("sensor", sensor)
hal.register_component("motor", motor)
hal.register_component("display", display)

# Initialize all components
print("Initializing hardware components...")
init_results = hal.initialize_all()
print("Initialization results:", init_results)

# Check the status of all components
status = hal.check_status()
print("Hardware status:", status)

# Perform actions on specific components
sensor_component = hal.get_component("sensor")
if sensor_component:
    sensor_component.perform_action("read_temperature")

motor_component = hal.get_component("motor")
if motor_component:
    motor_component.perform_action("rotate_clockwise")

# Shutdown all components
print("\nShutting down hardware components...")
shutdown_results = hal.shutdown_all()
print("Shutdown results:", shutdown_results)

# Check status again
status = hal.check_status()
print("Hardware status after shutdown:", status)

In [None]:
# Test the AsioManager class

# Create an AsioManager
asio_manager = AsioManager()

# Define some test functions
def blocking_function(name, duration):
    """A blocking function that simulates I/O or heavy computation."""
    print(f"[{name}] Starting...")
    time.sleep(duration)
    print(f"[{name}] Completed after {duration} seconds")
    return f"{name} result"

async def async_task(name, duration):
    """An async task that uses the AsioManager to run a blocking function."""
    print(f"[{name}] Task starting...")
    result = await asio_manager.run_async(blocking_function, name, duration)
    print(f"[{name}] Task completed with result: {result}")
    return result

# Define a periodic monitoring function
def monitor_system():
    """A function that periodically monitors the system."""
    print(f"[Monitor] Checking system at {time.strftime('%H:%M:%S')}")

# Run multiple async tasks concurrently
async def run_multiple_tasks():
    # Schedule several tasks
    task1 = asyncio.create_task(async_task("Task 1", 2))
    task2 = asyncio.create_task(async_task("Task 2", 1))
    task3 = asyncio.create_task(async_task("Task 3", 3))

    # Run a periodic task for a limited time
    monitor_task = asio_manager.schedule_task(asio_manager.periodic_task(1, monitor_system))

    # Wait for the main tasks to complete
    results = await asyncio.gather(task1, task2, task3)

    # Cancel the monitor task after the main tasks are done
    monitor_task.cancel()

    print("All tasks completed with results:", results)

# Run the test
try:
    print("Running AsioManager test...")
    asio_manager.run_until_complete(run_multiple_tasks())
finally:
    # Clean up
    asio_manager.close()
    print("AsioManager test completed and resources cleaned up")

In [None]:
# Integrated test demonstrating all classes working together

# Create a comprehensive example of a device with states, hardware, and async operations

class SmartDevice:
    """
    A smart device that integrates all the classes we've built.
    """

    def __init__(self, name: str):
        self.name = name

        # Create states
        self.off_state = DeviceState(name="off")
        self.standby_state = DeviceState(name="standby")
        self.running_state = DeviceState(name="running")
        self.error_state = DeviceState(name="error")

        # Create state manager
        self.state_manager = DeviceStateManager(initial_state=self.off_state)
        self.state_manager.add_state(self.standby_state)
        self.state_manager.add_state(self.running_state)
        self.state_manager.add_state(self.error_state)

        # Register transition handlers
        self.state_manager.register_transition_handler("off", "standby", self._handle_off_to_standby)
        self.state_manager.register_transition_handler("standby", "running", self._handle_standby_to_running)
        self.state_manager.register_transition_handler("running", "standby", self._handle_running_to_standby)
        self.state_manager.register_transition_handler("standby", "off", self._handle_standby_to_off)

        # Create hardware abstraction
        self.hardware = HardwareAbstraction()

        # Add hardware components
        self.hardware.register_component("main_processor", DummyHardware("Main Processor"))
        self.hardware.register_component("power_unit", DummyHardware("Power Unit"))
        self.hardware.register_component("sensor_array", DummyHardware("Sensor Array"))

        # Create async manager
        self.asio_manager = AsioManager()

        # Internal flags
        self._running = False
        self._monitoring_task = None

    def _handle_off_to_standby(self, from_state, to_state):
        """Handle transition from off to standby."""
        print(f"{self.name}: Powering up...")
        # Initialize hardware
        results = self.hardware.initialize_all()
        if not all(results.values()):
            raise StateTransitionError("Failed to initialize all hardware components")
        print(f"{self.name}: Power up complete")

    def _handle_standby_to_running(self, from_state, to_state):
        """Handle transition from standby to running."""
        print(f"{self.name}: Starting operations...")
        # Start the main operation
        self._running = True

    def _handle_running_to_standby(self, from_state, to_state):
        """Handle transition from running to standby."""
        print(f"{self.name}: Stopping operations...")
        # Stop the main operation
        self._running = False

    def _handle_standby_to_off(self, from_state, to_state):
        """Handle transition from standby to off."""
        print(f"{self.name}: Shutting down...")
        # Shutdown hardware
        results = self.hardware.shutdown_all()
        if not all(results.values()):
            raise StateTransitionError("Failed to shut down all hardware components")
        print(f"{self.name}: Shutdown complete")

    def monitor_device(self):
        """Periodic monitoring function."""
        current_state = self.state_manager.get_current_state()
        print(f"{self.name} monitor: Current state is {current_state.name}")

        # Check hardware status
        status = self.hardware.check_status()
        all_connected = all(status.values())

        if not all_connected and current_state.name != "error":
            print(f"{self.name}: Hardware issue detected!")
            try:
                self.state_manager.set_state("error")
            except StateTransitionError as e:
                print(f"Transition error: {e}")

    def simulate_operation(self):
        """Simulate a device operation when in running state."""
        if not self._running:
            return

        current_state = self.state_manager.get_current_state()
        if current_state and current_state.name == "running":
            print(f"{self.name}: Performing operations at {time.strftime('%H:%M:%S')}")

            # Use some hardware components
            sensor_array = self.hardware.get_component("sensor_array")
            if sensor_array:
                sensor_array.perform_action("collect_data")

    async def start(self):
        """Start the device."""
        print(f"Starting {self.name}...")

        try:
            # Transition to standby
            if self.state_manager.get_current_state().name == "off":
                self.state_manager.set_state("standby")

            # Set up monitoring task
            self._monitoring_task = self.asio_manager.schedule_task(
                self.asio_manager.periodic_task(2, self.monitor_device)
            )

            # Start operation monitoring if transitioning to running
            self.state_manager.set_state("running")

            # Simulate operations for a while
            for _ in range(5):
                await self.asio_manager.run_async(self.simulate_operation)
                await asyncio.sleep(1)

            # Transition back to standby
            self.state_manager.set_state("standby")

        except Exception as e:
            print(f"{self.name} encountered an error: {e}")
            try:
                self.state_manager.set_state("error")
            except:
                pass

    async def shutdown(self):
        """Shutdown the device."""
        print(f"Shutting down {self.name}...")

        # Cancel monitoring task
        if self._monitoring_task:
            self._monitoring_task.cancel()
            self._monitoring_task = None

        # Transition to off state
        current_state = self.state_manager.get_current_state()
        if current_state.name != "off":
            if current_state.name == "running":
                self.state_manager.set_state("standby")

            if current_state.name == "standby" or current_state.name == "error":
                self.state_manager.set_state("off")

        # Close async manager
        self.asio_manager.close()
        print(f"{self.name} shutdown complete")


# Run the integrated test
async def run_integrated_test():
    # Create a smart device
    device = SmartDevice("TestDevice1")

    try:
        # Start the device
        await device.start()

        # Wait a bit
        await asyncio.sleep(1)

        # Simulate a hardware failure
        print("\nSimulating hardware failure...")
        power_unit = device.hardware.get_component("power_unit")
        if power_unit:
            power_unit.shutdown()  # This will cause the monitor to detect a problem

        # Give time for monitor to detect the issue
        await asyncio.sleep(3)

    finally:
        # Shutdown the device
        await device.shutdown()

# Run the test
asio_manager = AsioManager()
try:
    asio_manager.run_until_complete(run_integrated_test())
finally:
    asio_manager.close()

## Conclusion

This notebook has implemented and tested the following key classes:

1. **DeviceState** - A class representing the state of a device with attributes like state name and status.

2. **DeviceStateManager** - A class managing transitions between different DeviceState instances with support for custom transition handlers.

3. **HardwareAbstraction** - A class providing a unified interface for interacting with hardware components, with a consistent API for initialization, shutdown, and status checking.

4. **AsioManager** - A class for handling asynchronous operations and managing I/O tasks, with support for running blocking operations asynchronously and scheduling periodic tasks.

These classes can be used together to build robust device control systems with proper state management, hardware abstraction, and asynchronous I/O handling. The SmartDevice class in the final test demonstrates how these components can work together in a real-world scenario.