Skip to content

Florian-BARRE/TaskBrain

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

19 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

TaskBrain -- English Documentation

Using TaskBrain is highly recommended when your application requires executing multiple tasks in parallel across different processes. This module heavily relies on Python's Asyncio library, allowing the management of asynchronous code execution as well as multiprocessing.

Advantages:

  • Error management
  • Execution isolation
  • Detailed logs of task executions
  • All methods share class attributes via self, ensuring seamless intercommunication

Task

To transform a Brain method into a task and be able to control its execution as needed, simply use the Brain.task() decorator.

@Brain.task(...)
async def my_method(self):
   pass

This decorator includes two mandatory parameters: process and run_on_start. These have been made mandatory to improve code clarity—this makes it clear which methods execute in a secondary process and which ones start when the Brain launches.

Now, let's explore all the possible configurations of this decorator and the options it provides.


One-Shot

For all tasks that only need to be executed once, we decorate the method to create a "one_shot_task".

@Brain.task(process=[True / False], run_on_start=[True / False])
async def one_shot_method(self):
   pass

Routine

For tasks that need to run continuously, we can specify a refresh_rate in the decorator. This parameter defines * the execution frequency of the method*. It will then be called indefinitely (even if it crashes) with a pause corresponding to the refresh_rate duration.

@Brain.task(process=[True / False], run_on_start=[True / False], refresh_rate=0.5)
async def routine_method(self):
   pass

Here, this method will be executed indefinitely with a 0.5s pause between each execution.


Timeout and Task

A timeout can be added to both routines and one_shot tasks, meaning the task will be interrupted after a set duration. This is particularly useful when defining precise execution phases.

This feature applies to both routines and one-shot tasks!

# Timed Routine
@Brain.task(process=[True / False], run_on_start=[True / False], refresh_rate=[duration in seconds], timeout=10)
async def timed_routine_method(self):
   pass


# Timed One-Shot
@Brain.task(process=[True / False], run_on_start=[True / False], timeout=10)
async def timed_one_shot_method(self):
   pass

Here, both methods will stop execution after 10 seconds, no matter what.


Execution Output Management

Once executed, methods return an execution code indicating how the task finished. The possible states are:

class ExecutionStates(IntEnum):
   CORRECTLY = 0  # Normal execution: no timeout or crash
   TIMEOUT = 1  # Task stopped because it exceeded the timeout
   ERROR_OCCURRED = 2  # Task stopped due to an error (function crash)

Multiprocessing

A task (routine or one-shot) can be executed in a separate process to better distribute CPU load. This feature is particularly useful for resource-intensive tasks that might otherwise block the main process.

The biggest challenge in multiprocessing is communication between Python objects across processes. In * TaskBrain*, this communication is fully automated. When a process is launched, a copy of the Brain's attributes is created and shared among all processes.

Whenever one of these attributes is modified (whether from main_process → second_process or second_process → main_process), the Brain automatically synchronizes this change back to the **initial instance **.

This shared copy is a proxy dictionary, a special type from Python's multiprocessing.Manager class.


Inter-Process Communication Limitations

The main limitation when using this feature is the type of class attributes that can be shared through the * proxy dictionary*.

Native supported serializable types:

serialized_types = (
   Logger,  # Logger from loggerplusplus library (serialized since V0.1.2)
   int,
   float,
   str,
   list,
   set,
   dict,
   tuple,
   type(None),
)

Complex objects cannot be shared directly between processes. To work around this, you can instantiate the object inside the process.

Adding New Serializable Types

Since version 0.1.2, it is possible to dynamically add new types to the set of serializable types using the method:

@classmethod
def add_serializable_type(cls, new_type: type, test_instance: Any = None) -> bool
Usage Examples

You can add a new type before the instantiation of the brain via the DictProxyAccessor. For example:

from taskbrain import DictProxyAccessor

new_type_instance = MyCustomType()
DictProxyAccessor.add_serializable_type(new_type=MyCustomType, test_instance=new_type_instance)

This will add MyCustomType to the list of serializable types only if the instance MyCustomType can be serialized.

Alternatively, you can add a type directly without verification:

DictProxyAccessor.add_serializable_type(MyCustomType)

Warning: Adding a type without checking its serializability can lead to errors if the type is not compatible with serialization.


Example: Using a Camera in Another Process

Using a camera is resource-intensive, making it ideal for multiprocessing. However, camera objects are not serializable!

So, instead of sharing the object, we retrieve its configuration via attributes and instantiate it inside the process.

@Brain.task(process=True, run_on_start=[True / False])
def camera_in_other_process(self):
   camera = Camera(
      res_w=self.config.CAMERA_RESOLUTION[0],
      res_h=self.config.CAMERA_RESOLUTION[1],
      captures_path=self.config.CAMERA_SAVE_PATH,
      undistorted_coefficients_path=self.config.CAMERA_COEFFICIENTS_PATH,
   )

⚠️ Important: A process is synchronous! Use def, not async def.


Looping Inside a Process Task

Once we instantiate our camera, we use it to capture images and apply processing. However, this introduces * another issue*:

  • The image processing must run continuously.
  • But we cannot create a routine inside another task—especially inside a separate process.

To solve this, we use the define_loop_later option. This allows defining a task as a routine while also executing a one-time setup part.

@Brain.task(process=True, run_on_start=[True / False], refresh_rate=0.1, define_loop_later=True)
def camera_in_other_process(self):
   camera = Camera(
      res_w=self.config.CAMERA_RESOLUTION[0],
      res_h=self.config.CAMERA_RESOLUTION[1],
      captures_path=self.config.CAMERA_SAVE_PATH,
      undistorted_coefficients_path=self.config.CAMERA_COEFFICIENTS_PATH,
   )

   # ---Loop--- #
   camera.capture()
   # ... image processing ... #

Manually Defining the Loop Marker

A custom marker can be used to define where the routine starts:

@Brain.task(process=True, run_on_start=[True / False], refresh_rate=0.1, define_loop_later=True,
            start_loop_marker="#- My Custom Loop Marker -#")
def camera_in_other_process(self):
   camera = Camera(
      res_w=self.config.CAMERA_RESOLUTION[0],
      res_h=self.config.CAMERA_RESOLUTION[1],
      captures_path=self.config.CAMERA_SAVE_PATH,
      undistorted_coefficients_path=self.config.CAMERA_COEFFICIENTS_PATH,
   )

   # - My Custom Loop Marker -#
   camera.capture()
   # ... image processing ... #

Synchronization of Shared Attributes

When using shared attributes between different processes, it is essential to understand how their synchronization mechanism works, especially when dealing with custom types (for example, a user-defined class).

Note
This section only applies when your task is executed in another process (Brain.task(process=True)).


Direct Assignment vs. Internal Modification

  • Direct Assignment
    When you assign a new value to a shared attribute, for example:

    self.my_attribute = new_value

    the Brain automatically detects the change and propagates it to the other processes.

  • Internal Modification of the Object
    However, if you modify the internal state of an attribute by calling a method, like so:

    self.my_attribute.update(new_value)

    the Brain does not capture this implicit modification, because it does not detect that the object itself has changed.
    As a result, synchronization will not happen automatically across different processes.


Explicitly Marking the Attribute to Be Synchronized

To solve this issue, you must manually indicate to the Brain that the attribute should be re-synchronized after an internal modification. To do this, use the method:

self.add_attributes_to_synchronize("my_attribute")

Thus, at the next iteration, the Brain will handle syncing the attribute’s new value with the other processes.


Code Example

Suppose you have a MyObject class that offers an update method to modify its internal state:

class MyObject:
    def __init__(self, initial_value):
        self.value = initial_value
    
    def update(self, new_value):
        # Internal update logic
        self.value = new_value

You then use it within a class managed by a “brain” (or a synchronization system):

class MainBrain(Brain):
    def __init__(self, logger: Logger, my_attribute: MyObject) -> None:
        # my_attribute is an object shared between multiple processes
        # the dynamic init automatically instantiates it as a class attribute
        super().__init__(logger, self)
    
    @Brain.task(process=True, run_on_start=True)
    def do_something(self, new_value):
        # Modify the attribute by calling the object's method
        self.my_attribute.update(new_value)

        # Explicitly indicate to the 'brain' that this attribute must be re-synchronized
        self.add_attributes_to_synchronize("my_attribute")

In the example above:

  1. Assignment

    • If we had done:
      self.my_attribute = MyObject(initial_value=42)
      the change would have been automatically detected and synchronized.
  2. Internal Modification

    • By calling:
      self.my_attribute.update(42)
      the brain does not know it has to update the shared value, unless we add:
      self.add_attributes_to_synchronize("my_attribute")

Changing the synchronization frequency

It is possible to change the synchronization frequency of attributes shared between processes. By default, the frequency is 0.01 seconds. To change this frequency, simply modify the sync_self_and_shared_self_refresh_rate attribute of the Brain base class:

from taskbrain import Brain
Brain.sync_self_and_shared_self_refresh_rate = 0.1

Vigilance Points, Limitations & Key Considerations

Although using TaskBrain is powerful, there are several key points to watch for to get the best performance.

Dynamic Initialization

Automatic Attribute Creation

To simplify the __init__ method and avoid redundant code:

Instead of writing:

def __init__(self, logger: Logger, obj1: type_obj1, obj2: type_obj2, obj3: type_obj3) -> None:
   self.logger = logger
   self.obj1 = obj1
   self.obj2 = obj2
   self.obj3 = obj3

Simply use:

def __init__(self, logger: Logger, obj1: type_obj1, obj2: type_obj2, obj3: type_obj3) -> None:
   super().__init__(logger, self)

Here is the continuation of the full English translation while maintaining the original markdown formatting.


Class Attribute Creation

If we want to create class attributes inside __init__ and make them accessible across multiple processes, we must define them before calling super().__init__(logger, self).

Otherwise, they will only be available in the main process.

def __init__(
        self,
        logger: Logger,
        obj1: type_obj1,
        obj2: type_obj2,
        obj3: type_obj3,
) -> None:
   # Attributes available in all processes
   self.attr_multi_process = 0

   super().__init__(logger, self)

   # Attributes available only in the main process
   self.attr_main_process = 0

Attribute Serialization

When __init__ is called, TaskBrain automatically serializes all class attributes.

However, most objects are not serializable. If an attribute cannot be serialized, a warning message will be logged.

⚠️ This is a warning, not an error.
Any non-serialized attribute will not be available in other processes.

Example warning message:

14:49:30 -> [   brain    ]  WARNING   | [dynamic_init] cannot serialize attribute [ws_cmd].
14:49:30 -> [   brain    ]  WARNING   | [dynamic_init] cannot serialize attribute [ws_pami].
14:49:30 -> [   brain    ]  WARNING   | [dynamic_init] cannot serialize attribute [ws_lidar].
14:49:30 -> [   brain    ]  WARNING   | [dynamic_init] cannot serialize attribute [ws_odometer].
14:49:30 -> [   brain    ]  WARNING   | [dynamic_init] cannot serialize attribute [ws_camera].

Refresh Rate Limitations

The execution of tasks relies on asynchronous execution, meaning pseudo-parallelism.

Key Consideration:
A routine with a very low refresh_rate will monopolize CPU time, which can slow down the entire Brain execution.

Setting refresh_rate to 0 is prohibited!
This parameter must be carefully adjusted to optimize performance.


Inter-Process Communication Limitations

As previously explained, synchronization between the shared Brain and its instance is done via a high-frequency routine.

By default, its refresh_rate is set to 0.01 seconds.

Although this method is optimized for minimal execution time, it is not instantaneous!
This must be considered when deciding to move a process to another task.


Complete Usage Example

Here is a full example of how to use the TaskBrain module inside a main script.

import asyncio
from taskbrain import Brain
from loggerplusplus import Logger


class MainBrain(Brain):
   def __init__(self, logger: Logger, share_attr1: int, share_attr2: int) -> None:
      super().__init__(logger, self)
      self.local_attr1: int = 0
      self.local_attr2: int = 0

   """ 
       MainProcess (mp) Tasks 
   """

   """ One-Shot Tasks """

   @Brain.task(process=False, run_on_start=True)
   async def mp_start(self):
      self.logger.info("[MP] MainBrain started")
      public_attributes = {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
      self.logger.info(f"[MP] Public attributes: {public_attributes}")

   """ Routine Tasks """

   @Brain.task(process=False, run_on_start=True, refresh_rate=1)
   async def mp_states_display(self):
      attributes_public = {k: v for k, v in self.__dict__.items() if
                           not k.startswith('_') and k.__str__() != "logger"}
      self.logger.info(f"[MP] Attributes states: {attributes_public}")

   @Brain.task(process=False, run_on_start=True, refresh_rate=1)
   async def mp_incrementer(self):
      self.share_attr1 += 1
      self.local_attr1 += 1

   @Brain.task(process=False, run_on_start=True, refresh_rate=1, timeout=5)
   async def mp_incrementer_with_timeout(self):
      self.share_attr1 += 10
      self.local_attr1 += 10

   """ 
       SubProcess (sb) Tasks 
   """

   """ One-Shot Tasks """

   @Brain.task(process=True, run_on_start=True)
   def sb_start(self):
      self.logger.info("[SP] MainBrain started in another process")
      shared_attributes = {
         "share_attr1": self.share_attr1,
         "share_attr2": self.share_attr2
      }
      self.logger.info(f"[SP] Public attributes available in this subprocess: {shared_attributes}")

   """ Routine Tasks """

   @Brain.task(process=True, run_on_start=True, refresh_rate=1)
   def sp_states_display(self):
      shared_attributes = {
         "share_attr1": self.share_attr1,
         "share_attr2": self.share_attr2
      }
      self.logger.info(f"[SP] Attributes states: {shared_attributes}")

   @Brain.task(process=True, run_on_start=True, refresh_rate=1)
   def sb_incrementer(self):
      self.share_attr2 += 1

   @Brain.task(process=True, run_on_start=True, refresh_rate=1, timeout=5)
   def sb_incrementer_with_timeout(self):
      self.share_attr2 += 10

   @Brain.task(process=True, run_on_start=True, refresh_rate=1, define_loop_later=True,
               start_loop_marker="# ---Loop--- #")
   def sb_routine_with_setup(self):
      sb_non_serializable_attribute = "I'm not serializable attribute"
      # ---Loop--- #
      self.logger.info(f"[SP] Non-serializable attribute: {sb_non_serializable_attribute}")

   """ Call other tasks """

   @Brain.task(process=False, run_on_start=False)
   async def callable_function_1(self):
      self.logger.info("[MP] Callable function 1")
      return 1

   @Brain.task(process=True, run_on_start=False)
   def callable_function_2(self):
      self.logger.info("[SP] Callable function 2")
      return 2

   @Brain.task(process=False, run_on_start=True)
   async def call_tasks(self):
      await asyncio.sleep(10)  # Wait for timed task to finish
      self.logger.info("[MP] Call tasks")
      f1_result = await self.callable_function_1()
      f2_result = await self.callable_function_2()

      self.logger.info(f"[MP] Callable function 1 result: {f1_result.result}")
      self.logger.info(f"[MP] Callable function 2 result: {f2_result.result}")

Usage in a Main Script

Here’s how to integrate and launch your Brain inside a main script:

import asyncio
from exemple.mainbrain import MainBrain
from loggerplusplus import Logger

if __name__ == "__main__":
   brain_logger = Logger(
      identifier="Brain",
      print_log=True,
      write_to_file=False,
      display_monitoring=False,
      files_monitoring=False,
   )

   brain = MainBrain(
      logger=brain_logger,
      share_attr1=0,
      share_attr2=0
   )


   # Start tasks
   async def run_tasks():
      tasks = [task() for task in brain.get_tasks()]
      return await asyncio.gather(*tasks)


   asyncio.run(run_tasks())

Explanation of the Main Script

  1. Initialize the Logger: A logger is created with a debug level that prints logs to the console (print_log=True).
  2. Initialize the Brain: MainBrain is initialized with two shared attributes (share_attr1 and share_attr2) and the logger.
  3. Execute Tasks:
    • Retrieve all tasks using brain.get_tasks().
    • Execute them asynchronously using asyncio.gather().

This script demonstrates how to initialize and execute various Brain tasks, including async tasks, routines, and multiprocessing.


Author

Project created and maintained by Florian BARRE.
For questions or contributions, feel free to contact me.


TaskBrain -- Documentation Française

L'utilisation de taskbrain est fortement conseillée lorsque votre application nécessite l'exécution de multiples tâches, en parallèle, dans des processus différents. Ce module s'appuie largement sur la bibliothèque Asyncio de Python, ce qui permet de gérer l'exécution de code asynchrone ainsi que le multiprocessing.

Avantages :

  • Gestion des erreurs
  • Isolation des exécutions
  • Journaux détaillés des exécutions des tâches
  • Toutes les méthodes partagent les attributs de la classe via le self, assurant ainsi une intercommunication transparente

Task

Pour transformer une méthode du Brain en tâche et ainsi pouvoir contrôler son exécution comme on veut, il suffit d’utiliser le décorateur Brain.task().

@Brain.task(...)
async def ma_methode(self):
   pass

Ce décorateur comprend 2 paramètres obligatoires à compléter : process et run_on_start. Ils ont été rendus obligatoires afin d’améliorer la clarté du code ; cela permet de bien voir quelle méthode s’exécute dans un processus secondaire et quelles méthodes démarrent au lancement du Brain.

Nous allons à présent voir toutes les configurations possibles de ce décorateur et les possibilités qu’il offre.

One-Shot

Pour toutes les tâches qui n’ont besoin d’être exécutées qu’une seule fois, on va décorer la méthode de sorte à créer ce qu’on appelle une “one_shot_task”.

@Brain.task(process=[True / False], run_on_start=[True / False])
async def methode_one_shot(self):
   pass

Routine

Pour toutes les tâches qui s’exécutent à l’infini, il est possible de préciser à notre tâche, via le décorateur, un refresh_rate. Ce paramètre correspond à la fréquence d’exécution de la méthode. Elle sera alors appelée à l’infini (même si elle plante) avec une pause de la durée du refresh_rate renseigné.

@Brain.task(process=[True / False], run_on_start=[True / False], refresh_rate=0.5)
async def methode_routine(self):
   pass

Ici, cette méthode sera donc exécutée à l’infini avec une pause de 0.5s entre chaque exécution.

Timeout et Task

Il est possible d’ajouter à notre task (routine ou one_shot) un timeout au bout duquel la tâche sera interrompue. Cela est utile notamment lorsqu’on définit des phases précises d'exécution.

Cette fonctionnalité est applicable aux routines et aux one_shot !

# Routine timée
@Brain.task(process=[True / False], run_on_start=[True / False], refresh_rate=[durée en seconde], timeout=10)
async def methode_timed_routine(self):
   pass


# One_shot timée
@Brain.task(process=[True / False], run_on_start=[True / False], timeout=10)
async def methode_timed_one_shot(self):
   pass

Ici, ces deux méthodes s’interrompront quoi qu’il arrive au bout de 10 secondes.

Gestion des outputs

Les méthodes, une fois exécutées, retournent un code d’exécution afin d’indiquer comment la tâche s’est terminée. Voici les états possibles :

class ExecutionStates(IntEnum):
   CORRECTLY = 0  # Exécution normale: pas de timeout ni de crash
   TIMEOUT = 1  # La task s'est interrompue car elle a dépassé le timeout
   ERROR_OCCURRED = 2  # La task s'est interrompue car une erreur est survenue (crash de la fonction)

Multiprocessing

Il est possible d'exécuter une task (routine ou one_shot) dans un autre processus afin de mieux répartir la charge CPU. Cette fonctionnalité est particulièrement utile pour les tâches gourmandes en ressources, qui pourraient autrement bloquer excessivement le temps CPU du processus principal. La principale difficulté du multiprocessing réside dans la communication d'objets Python entre processus. Dans le cadre du Brain, cette communication est entièrement transparente. Lorsqu'un processus est lancé, une copie des attributs du Brain est créée et partagée entre tous les processus. Lorsque l'un de ses attributs est modifié (que ce soit main_process → second_process ou second_process → main_process), le Brain se charge automatiquement de synchroniser cette modification de la copie vers l'instance initiale. Cette copie partagée est un dictionnaire proxy, un type issu de la classe Manager de la librairie multiprocessing.

Limitation de la communication inter-process

La principale restriction quant à l’utilisation de cette fonctionnalité est le type des attributs de la classe pouvant être partagés au travers du dictionnaire proxy. En effet, il faut que l’attribut soit sérialisable ! Les types sérialisables supportés pour le moment sont :

serialized_types = (
   Logger,  # Logger de la bibliothèque loggerplusplus (sérialisé depuis la version 0.1.2)
   int,
   float,
   str,
   list,
   set,
   dict,
   tuple,
   type(None),
)

Ajout de nouveaux types sérialisables

Depuis la version 0.1.2, il est possible d'ajouter dynamiquement de nouveaux types à la liste des types sérialisables en utilisant la méthode :

@classmethod
def add_serializable_type(cls, new_type: type, test_instance: Any = None) -> bool
Exemples d'utilisation

Vous pouvez ajouter un nouveau type avant l'instanciation du brain via le DictProxyAccessor. Par exemple :

from taskbrain import DictProxyAccessor

new_type_instance = MyCustomType()
DictProxyAccessor.add_serializable_type(new_type=MyCustomType, test_instance=new_type_instance)

Cela ajoutera MyCustomType à la liste des types sérialisables uniquement si l'instance MyCustomType peut être sérialisée.

Alternativement, vous pouvez ajouter un type directement sans vérification :

DictProxyAccessor.add_serializable_type(MyCustomType)

Avertissement : Ajouter un type sans vérifier sa sérialisation peut entraîner des erreurs si le type n'est pas compatible avec la sérialisation. Il est donc compliqué de passer en attribut partagé un objet complexe à utiliser dans un autre processus. Pour contourner ce problème, il est possible d’instancier directement dans le processus l’objet en question. Prenons l’exemple de l’utilisation d’une caméra. Son utilisation est gourmande en ressources, donc idéale pour du multiprocessing. Le problème est que l’objet caméra n’est pas sérialisable ! On va donc récupérer les éléments de configuration de celle-ci via un attribut, puis l’instancier directement dans le processus.

@Brain.task(process=True, run_on_start=[True / False])
def camera_in_other_process(self):
   camera = Camera(
      res_w=self.config.CAMERA_RESOLUTION[0],
      res_h=self.config.CAMERA_RESOLUTION[1],
      captures_path=self.config.CAMERA_SAVE_PATH,
      undistorted_coefficients_path=self.config.CAMERA_COEFFICIENTS_PATH,
   )

Attention un process sera synchrone ! Pensez à mettre def et non async def !

On remarque que la configuration est directement accessible via self (qui accède en réalité à la copie partagée du Brain). Une fois instanciée, nous utiliserons notre caméra pour capturer des images et y appliquer un traitement. Cependant, cela pose un nouveau problème : le traitement doit s'exécuter en continu, nécessitant donc la création d'une routine. Or, il n'est pas possible de créer une routine à l'intérieur d'une tâche, surtout si celle-ci est exécutée dans un processus séparé. Pour répondre à ce besoin, une option appelée define_loop_later est disponible. Elle permet de définir une tâche en tant que routine, tout en ayant une partie qui s'exécute une seule fois (comme la création de l'objet caméra).

@Brain.task(process=True, run_on_start=[True / False], refresh_rate=0.1, define_loop_later=True)
def camera_in_other_process(self):
   camera = Camera(
      res_w=self.config.CAMERA_RESOLUTION[0],
      res_h=self.config.CAMERA_RESOLUTION[1],
      captures_path=self.config.CAMERA_SAVE_PATH,
      undistorted_coefficients_path=self.config.CAMERA_COEFFICIENTS_PATH,
   )

   # ---Loop--- #
   camera.capture()
   # ... traitement d'image ... #

Il faut penser à préciser notre refresh_rate car notre task est ici une routine ! (bien qu’elle ait une partie qui ne s’exécute qu’une seule fois) → On peut évidemment profiter de l’exécution hors du process principal pour diminuer fortement le refresh_rate afin d’avoir une routine qui s’exécute à haute fréquence.

Ici, on instancie notre caméra, puis on l’utilise pour prendre des photos et leur appliquer un traitement. Ce qui sépare la partie one_shot de la routine est le commentaire # ---Loop--- #. En réalité, ce code très simple et léger d’utilisation revient à faire ceci :

@Brain.task(process=False, run_on_start=False)
async def one_shot_part(self):
   return Camera(
      res_w=self.config.CAMERA_RESOLUTION[0],
      res_h=self.config.CAMERA_RESOLUTION[1],
      captures_path=self.config.CAMERA_SAVE_PATH,
      undistorted_coefficients_path=self.config.CAMERA_COEFFICIENTS_PATH,
   )


@Brain.task(process=False, run_on_start=False, refresh_rate=0.1)
async def routine_part(self, camera):
   camera.capture()
   # ... traitement d'image ... #


@Brain.task(process=True, run_on_start=[True / False])
def camera_in_other_process(self):
   camera = asyncio.run(self.one_shot_part())
   asyncio.run(self.routine_part())

Il est également possible de définir le marker de la routine soit même :

@Brain.task(process=True, run_on_start=[True / False], refresh_rate=0.1, define_loop_later=True,
            start_loop_marker="#- My Custom Loop Marker -#")
def camera_in_other_process(self):
   camera = Camera(
      res_w=self.config.CAMERA_RESOLUTION[0],
      res_h=self.config.CAMERA_RESOLUTION[1],
      captures_path=self.config.CAMERA_SAVE_PATH,
      undistorted_coefficients_path=self.config.CAMERA_COEFFICIENTS_PATH,
   )

   #- My Custom Loop Marker -#
   camera.capture()

   # ... traitement d'image ... #

Synchronisation des attributs partagés

Lorsqu’on utilise des attributs partagés entre différents processus, il est essentiel de bien comprendre leur mécanisme de synchronisation, en particulier quand on manipule des types personnalisés (par exemple, une classe définie par l’utilisateur).

Note
Cette partie s’applique uniquement lorsque votre task est exécutée dans un autre processus (Brain.task(process=True)).


Attribution directe vs. modification interne

  • Attribution directe
    Lorsque vous affectez une nouvelle valeur à un attribut partagé, par exemple:

    self.mon_attribut = nouvelle_valeur

    Le Brain détecte automatiquement le changement et le répercute dans les autres processus.

  • Modification interne de l’objet
    En revanche, si vous modifiez l’état interne d’un attribut via l’appel à une méthode, comme ceci:

    self.mon_attribut.methode(nouvelle_valeur)

    Le Brain ne capture pas cette modification implicite, car il ne détecte pas que l’objet lui-même a changé.
    Par conséquent, la synchronisation ne se fait pas automatiquement entre les différents processus.


Marquer explicitement l’attribut à synchroniser

Pour résoudre ce problème, il faut manuellement indiquer au Brain que l’attribut doit être resynchronisé après la modification interne. Pour ce faire, utilisez la méthode :

self.add_attributes_to_synchronize("mon_attribut")

Ainsi, lors de l’itération suivante, le Brain se chargera de synchroniser la nouvelle valeur de l’attribut avec les autres processus.


Exemple de code

Supposons que vous ayez une classe MonObjet qui offre une méthode mise_a_jour pour modifier son état interne :

class MonObjet:
    def __init__(self, valeur_initiale):
        self.valeur = valeur_initiale
    
    def mise_a_jour(self, nouvelle_valeur):
        # Logique interne de mise à jour
        self.valeur = nouvelle_valeur

Vous l’utilisez ensuite au sein d’une classe gérée par un «brain» (ou un système de synchronisation) :

class MainBrain(Brain):
    def __init__(self, logger: Logger, mon_attribut: MonObjet) -> None:
        # mon_attribut est un objet partagé entre plusieurs processus
        # le dynamic init l'instancie automatiquement en attribut de la classe
        super().__init__(logger, self)
    
    @Brain.task(process=True, run_on_start=True)
    def faire_quelque_chose(self, nouvelle_valeur):
        # Modification de l'attribut par une méthode de l'objet
        self.mon_attribut.mise_a_jour(nouvelle_valeur)

        # Indiquer explicitement au 'brain' que cet attribut doit être resynchronisé
        self.add_attributes_to_synchronize("mon_attribut")

Dans l’exemple ci-dessus :

  1. Attribution

    • Si on avait fait :
      self.mon_attribut = MonObjet(valeur_initiale=42)
      Le changement aurait été automatiquement détecté et synchronisé.
  2. Modification interne

    • En appelant :
      self.mon_attribut.mise_a_jour(42)
      Le brain ne sait pas qu’il faut mettre à jour la valeur partagée, sauf si on ajoute :
      self.add_attributes_to_synchronize("mon_attribut")

Changer la fréquence de synchronisation

Il est possible de changer la fréquence de synchronisation des attributs partagés entre les processus. Par défaut, la fréquence est de 0.01 seconde. Pour changer cette fréquence, il suffit de changer l'attribut sync_self_and_shared_self_refresh_rate de la classe mère Brain:

from taskbrain import Brain
Brain.sync_self_and_shared_self_refresh_rate = 0.1

Points de vigilances, limitations et précisions

Bien que l’utilisation du Brain soit pratique, certains points sont à surveiller pour en tirer son plein potentiel.

Dynamic init

Création automatique des attributs

Afin d’alléger le code de l’__init__qui consiste essentiellement à faire ça:

def __init__(
        self,
        logger: Logger,
        obj1: type_obj1,
        obj2: type_obj2,
        obj3: type_obj3,
) -> None:
   self.logger = logger
   self.obj1 = obj1
   self.obj2 = obj2
   self.obj3 = obj3
   ...

L’__init__ est rendu dynamique: il le fait automatiquement, il suffit donc d’écrire:

def __init__(
        self,
        logger: Logger,
        obj1: type_obj1,
        obj2: type_obj2,
        obj3: type_obj3,
) -> None:
   super().__init__(logger, self)

Création d’attributs de classe

Si l’on veut créer des attributs de classe dans l’__init__ et que l’on souhaite qu’ils soient partagés entre les process, il faut les définir AVANT super().__init__(logger, self) . Dans le cas contraire ils seront disponibles uniquement dans le main-process.

def __init__(
        self,
        logger: Logger,
        obj1: type_obj1,
        obj2: type_obj2,
        obj3: type_obj3,
) -> None:
   # Attributs disponibles dans tous les process
   self.attr_multi_process = 0

   super().__init__(logger, self)

   # Attributs disponibles uniquement dans le main-process
   self.attr_main_process = 0

Sérialisation des attributs

Lors de l’appel de l’__init__, le Brain se charge également de sérialiser automatiquement tous les attributs de classe. Cependant, la majorité des objets que nous manipulons ne sont pas sérialisables. Un warning sera alors affiché par le logger pour tout attribut non sérialisable. Ce n’est pas une erreur, juste un avertissement. Tout attribut non sérialisé sera évidemment indisponible dans d’autres processus. Exemple de warning :

14:49:30 -> [   brain    ]  WARNING   | [dynamic_init] cannot serialize attribute [ws_cmd].
14:49:30 -> [   brain    ]  WARNING   | [dynamic_init] cannot serialize attribute [ws_pami].
14:49:30 -> [   brain    ]  WARNING   | [dynamic_init] cannot serialize attribute [ws_lidar].
14:49:30 -> [   brain    ]  WARNING   | [dynamic_init] cannot serialize attribute [ws_odometer].
14:49:30 -> [   brain    ]  WARNING   | [dynamic_init] cannot serialize attribute [ws_camera].

Refresh_Rate limitations

L'exécution des tâches repose sur de l'exécution asynchrone, ce qui signifie qu'il s'agit de pseudo-parallélisme. Il est crucial de garder à l'esprit qu'une routine avec un refresh_rate très faible va monopoliser le temps CPU disponible et, dans certains cas, ralentir l'exécution globale du Brain. Il est donc interdit de mettre un refresh_rate à 0 ! Ce paramètre doit être réglé avec attention.

Communication inter-process limitations

Comme expliqué précédemment, la synchronisation entre le Brain partagé et son instance s'effectue via une routine qui s' exécute à très haute fréquence afin de minimiser la latence de communication. Par défaut, son refresh_rate est fixé à 0,01 seconde. Bien que la méthode soit optimisée pour réduire au maximum sa durée d'exécution, ce n'est pas instantané ! Il est donc important de prendre en compte ce facteur lorsqu'on décide de passer un traitement dans un autre processus.

Exemple complet d’utilisation

Voici un exemple complet d'utilisation de votre module Brain avec une explication de son utilisation dans un script principal.

import asyncio
from taskbrain import Brain
from loggerplusplus import Logger


class MainBrain(Brain):
   def __init__(self, logger: Logger, share_attr1: int, share_attr2: int) -> None:
      super().__init__(logger, self)
      self.local_attr1: int = 0
      self.local_attr2: int = 0

   """ 
       MainProcess (mp) Tasks 
   """

   """ One-Shot Tasks """

   @Brain.task(process=False, run_on_start=True)
   async def mp_start(self):
      self.logger.info("[MP] MainBrain started")
      public_attributes = {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
      self.logger.info(f"[MP] Public attributes: {public_attributes}")

   """ Routine Tasks """

   @Brain.task(process=False, run_on_start=True, refresh_rate=1)
   async def mp_states_display(self):
      attributes_public = {k: v for k, v in self.__dict__.items() if
                           not k.startswith('_') and k.__str__() != "logger"}
      self.logger.info(f"[MP] Attributes states: {attributes_public}")

   @Brain.task(process=False, run_on_start=True, refresh_rate=1)
   async def mp_incrementer(self):
      self.share_attr1 += 1
      self.local_attr1 += 1

   @Brain.task(process=False, run_on_start=True, refresh_rate=1, timeout=5)
   async def mp_incrementer_with_timeout(self):
      self.share_attr1 += 10
      self.local_attr1 += 10

   """ 
       SubProcess (sb) Tasks 
   """

   """ One-Shot Tasks """

   @Brain.task(process=True, run_on_start=True)
   def sb_start(self):
      self.logger.info("[SP] MainBrain started in another process")
      shared_attributes = {
         "share_attr1": self.share_attr1,
         "share_attr2": self.share_attr2
      }
      self.logger.info(f"[SP] Public attributes available in this subprocess: {shared_attributes}")

   """ Routine Tasks """

   @Brain.task(process=True, run_on_start=True, refresh_rate=1)
   def sp_states_display(self):
      shared_attributes = {
         "share_attr1": self.share_attr1,
         "share_attr2": self.share_attr2
      }
      self.logger.info(f"[SP] Attributes states: {shared_attributes}")

   @Brain.task(process=True, run_on_start=True, refresh_rate=1)
   def sb_incrementer(self):
      self.share_attr2 += 1

   @Brain.task(process=True, run_on_start=True, refresh_rate=1, timeout=5)
   def sb_incrementer_with_timeout(self):
      self.share_attr2 += 10

   @Brain.task(process=True, run_on_start=True, refresh_rate=1, define_loop_later=True,
               start_loop_marker="# ---Loop--- #")
   def sb_routine_with_setup(self):
      sb_non_serializable_attribute = "I'm not serializable attribute"
      # ---Loop--- #
      self.logger.info(f"[SP] Non-serializable attribute: {sb_non_serializable_attribute}")

   """ Call others tasks """

   @Brain.task(process=False, run_on_start=False)
   async def callable_function_1(self):
      self.logger.info("[MP] Callable function 1")
      return 1

   @Brain.task(process=True, run_on_start=False)
   def callable_function_2(self):
      self.logger.info("[SP] Callable function 2")
      return 2

   @Brain.task(process=False, run_on_start=True)
   async def call_tasks(self):
      await asyncio.sleep(10)  # Wait timed task to finish
      self.logger.info("[MP] Call tasks")
      f1_result = await self.callable_function_1()
      f2_result = await self.callable_function_2()

      self.logger.info(f"[MP] Callable function 1 result: {f1_result.result}")
      self.logger.info(f"[MP] Callable function 2 result: {f2_result.result}")

Utilisation dans un Main

Voici comment vous pouvez intégrer et démarrer votre Brain dans un script principal :

import asyncio
from exemple.mainbrain import MainBrain
from loggerplusplus import Logger

if __name__ == "__main__":
   brain_logger = Logger(
      identifier="Brain",
      print_log=True,
      write_to_file=False,
      display_monitoring=False,
      files_monitoring=False,
   )

   brain = MainBrain(
      logger=brain_logger,
      share_attr1=0,
      share_attr2=0
   )


   # Start tasks
   async def run_tasks():
      tasks = [task() for task in brain.get_tasks()]
      return await asyncio.gather(*tasks)


   asyncio.run(run_tasks())

Explication du Main

  1. Initialisation du Logger : On crée un logger avec un niveau de débogage qui affiche les logs dans la console (print_log=True).
  2. Initialisation du Brain : On initialise le MainBrain avec deux attributs partagés (share_attr1 et share_attr2) et le logger.
  3. Exécution des Tâches : On récupère toutes les tâches du Brain via brain.get_tasks() et on les exécute en les regroupant avec asyncio.gather(). Ce script illustre la manière dont vous pouvez initialiser et exécuter les différentes tâches de votre Brain, y compris les tâches asynchrones, les routines, et le multiprocessing.

Voici une suggestion pour une signature élégante à la fin de votre README:


Auteur

Projet créé et maintenu par Florian BARRE.
Pour toute question ou contribution, n'hésitez pas à me contacter. Mon Site | Mon LinkedIn | Mon GitHub

About

A simple and powerful task scheduler.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages