diff --git a/doc/source/api/solver/events.rst b/doc/source/api/solver/events.rst index 4db78de2b10d..e3a93edade24 100644 --- a/doc/source/api/solver/events.rst +++ b/doc/source/api/solver/events.rst @@ -3,7 +3,7 @@ EventsManager ============= -An instance of ``EventsManager`` exists as an active ``events_manager`` property in each +An instance of ``EventsManager`` exists as an active ``events_streaming`` property in each solution mode session object. You can register client callbacks with the EventsManager. The EventsManager calls each callback whenever a server-side event occurs, passing the session ID and event information arguments to the callback. The EventsManager is useful for solution @@ -28,13 +28,13 @@ The following code triggers a callback at the end of every iteration. cb_itr_id = session.events_manager.register_callback('IterationEndedEvent', callback_executed_at_end_of_iteration) -.. currentmodule:: ansys.fluent.core.solver.events_manager +.. currentmodule:: ansys.fluent.core.streaming_services.events_streaming .. autosummary:: :toctree: _autosummary - -.. automethod:: ansys.fluent.core.solver.events_manager.EventsManager.register_callback -.. automethod:: ansys.fluent.core.solver.events_manager.EventsManager.unregister_callback + +.. automethod:: ansys.fluent.core.streaming_services.events_streaming.EventsManager.register_callback +.. automethod:: ansys.fluent.core.streaming_services.events_streaming.EventsManager.unregister_callback \ No newline at end of file diff --git a/doc/source/api/solver/monitors.rst b/doc/source/api/solver/monitors.rst index a57235cfd85c..4bba9aa27b9c 100644 --- a/doc/source/api/solver/monitors.rst +++ b/doc/source/api/solver/monitors.rst @@ -3,17 +3,17 @@ MonitorsManager =============== -An instance of ``MonitorsManager`` exists as an active ``monitors_manager`` +An instance of ``MonitorsManager`` exists as an active ``monitors_streaming`` property in each solution-mode session object. It provides access to server monitors. -.. currentmodule:: ansys.fluent.core.solver.monitors_manager +.. currentmodule:: ansys.fluent.core.streaming_services.monitor_streaming .. autosummary:: :toctree: _autosummary -.. automethod:: ansys.fluent.core.solver.monitors_manager.MonitorsManager.get_monitor_set_names -.. automethod:: ansys.fluent.core.solver.monitors_manager.MonitorsManager.get_monitor_set_data -.. automethod:: ansys.fluent.core.solver.monitors_manager.MonitorsManager.get_monitor_set_plot +.. automethod:: ansys.fluent.core.streaming_services.monitor_streaming.MonitorsManager.get_monitor_set_names +.. automethod:: ansys.fluent.core.streaming_services.monitor_streaming.MonitorsManager.get_monitor_set_data +.. automethod:: ansys.fluent.core.streaming_services.monitor_streaming.MonitorsManager.get_monitor_set_plot \ No newline at end of file diff --git a/src/ansys/fluent/core/fluent_connection.py b/src/ansys/fluent/core/fluent_connection.py index 0885619c2e9e..7c25d739ec3b 100644 --- a/src/ansys/fluent/core/fluent_connection.py +++ b/src/ansys/fluent/core/fluent_connection.py @@ -21,9 +21,9 @@ from ansys.fluent.core.services.monitor import MonitorsService from ansys.fluent.core.services.scheme_eval import SchemeEval, SchemeEvalService from ansys.fluent.core.services.settings import SettingsService -from ansys.fluent.core.solver.events_manager import EventsManager -from ansys.fluent.core.solver.monitors_manager import MonitorsManager -from ansys.fluent.core.transcript import Transcript +from ansys.fluent.core.streaming_services.events_streaming import EventsManager +from ansys.fluent.core.streaming_services.monitor_streaming import MonitorsManager +from ansys.fluent.core.streaming_services.transcript_streaming import Transcript def _get_max_c_int_limit() -> int: @@ -263,21 +263,23 @@ def start_transcript( """ if not _FluentConnection._writing_transcript_to_interpreter: if write_to_interpreter: - self.callback_id1 = self._transcript.add_transcript_callback(print) + self.callback_id1 = self._transcript.register_callback(print) _FluentConnection._writing_transcript_to_interpreter = True if file_path: if Path(file_path).exists(): os.remove(file_path) append_to_file = AppendToFile(file_path) - self.callback_id2 = self._transcript.add_transcript_callback( + self.callback_id2 = self._transcript.register_callback( append_to_file, keep_new_lines=True ) + self._transcript.start() def stop_transcript(self) -> None: """Stop streaming of Fluent transcript.""" for callback_id in (self.callback_id1, self.callback_id2): if callback_id is not None: - self._transcript.remove_transcript_callback(callback_id) + self._transcript.unregister_callback(callback_id) + self._transcript.stop() def add_transcript_callback(self, callback_fn: Callable): """Initiates a fluent transcript streaming depending on the @@ -286,11 +288,11 @@ def add_transcript_callback(self, callback_fn: Callable): For eg.: add_transcript_callback(print) prints the transcript on the interpreter screen. """ - self._transcript.add_transcript_callback(callback_fn) + self._transcript.register_callback(callback_fn) - def remove_transcript_callback(self, callback_id: int): + def remove_transcript_callback(self, callback_id: str): """Stops each transcript streaming based on the callback_id.""" - self._transcript.remove_transcript_callback(callback_id) + self._transcript.unregister_callback(callback_id) def check_health(self) -> str: """Check health of Fluent connection.""" diff --git a/src/ansys/fluent/core/streaming_services/__init__.py b/src/ansys/fluent/core/streaming_services/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/ansys/fluent/core/solver/events_manager.py b/src/ansys/fluent/core/streaming_services/events_streaming.py similarity index 54% rename from src/ansys/fluent/core/solver/events_manager.py rename to src/ansys/fluent/core/streaming_services/events_streaming.py index 8f8f85e52005..ea22eac75a44 100644 --- a/src/ansys/fluent/core/solver/events_manager.py +++ b/src/ansys/fluent/core/streaming_services/events_streaming.py @@ -1,13 +1,12 @@ """Module for events management.""" from functools import partial -import itertools -import threading from typing import Callable, List from ansys.api.fluent.v0 import events_pb2 as EventsProtoModule +from ansys.fluent.core.streaming_services.streaming import StreamingService -class EventsManager: +class EventsManager(StreamingService): """Manages server-side events. This class allows the client to register and unregister callbacks with server events. @@ -26,31 +25,24 @@ class EventsManager: """ def __init__(self, session_id: str, service): + super().__init__( + target=EventsManager._process_streaming, + streaming_service=service, + ) self._session_id: str = session_id - self._events_service = service - self._events_to_callbacks_map: dict = {} - self._id_iter = itertools.count() - self._lock: threading.Lock = threading.Lock() - self._events_thread = None self._events_list: List[str] = [ attr for attr in dir(EventsProtoModule) if attr.endswith("Event") ] - self._streaming = False - @property - def is_streaming(self): - with self._lock: - return self._streaming - - def _listen_events(self, started_evt): - responses = self._events_service.begin_streaming(started_evt) + def _process_streaming(self, started_evt): + responses = self._streaming_service.begin_streaming(started_evt) while True: try: response = next(responses) event_name = response.WhichOneof("as") with self._lock: self._streaming = True - callbacks_map = self._events_to_callbacks_map.get(event_name, {}) + callbacks_map = self._service_callbacks.get(event_name, {}) for call_back in callbacks_map.values(): call_back( session_id=self._session_id, @@ -59,24 +51,9 @@ def _listen_events(self, started_evt): except StopIteration: break - @property - def events_list(self) -> List[str]: - """Get a list of supported events. - - Parameters - ---------- - None - - Returns - ------- - List[str] - List of supported events. - """ - return self._events_list - def register_callback( - self, event_name: str, call_back: Callable, *args, **kwargs - ) -> str: + self, event_name: str = None, call_back: Callable = None, *args, **kwargs + ): """Register the callback. Parameters @@ -97,19 +74,23 @@ def register_callback( RuntimeError If event name is not valid. """ - if not event_name in self.events_list: + if event_name is None or call_back is None: + raise RuntimeError( + "Please provide compulsory arguments : 'event_name' and 'call_back'" + ) + + if event_name not in self.events_list: raise RuntimeError(f"{event_name} is not a valid event.") with self._lock: event_name = event_name.lower() - id = f"{event_name}-{next(self._id_iter)}" - callbacks_map = self._events_to_callbacks_map.get(event_name) + callback_id = f"{event_name}-{next(self._service_callback_id)}" + callbacks_map = self._service_callbacks.get(event_name) if callbacks_map: - callbacks_map.update({id: partial(call_back, *args, **kwargs)}) + callbacks_map.update({callback_id: partial(call_back, *args, **kwargs)}) else: - self._events_to_callbacks_map[event_name] = { - id: partial(call_back, *args, **kwargs) + self._service_callbacks[event_name] = { + callback_id: partial(call_back, *args, **kwargs) } - return id def unregister_callback(self, callback_id: str): """Unregister the callback. @@ -120,32 +101,13 @@ def unregister_callback(self, callback_id: str): ID of the registered callback. """ with self._lock: - for callbacks_map in self._events_to_callbacks_map.values(): + for callbacks_map in self._service_callbacks.values(): if callback_id in callbacks_map: del callbacks_map[callback_id] - def start(self): - """Start EventsManager. - - Parameters - ---------- - None - - Returns - ------- - None - """ - with self._lock: - if self._events_thread is None: - started_evt = threading.Event() - self._events_thread: threading.Thread = threading.Thread( - target=EventsManager._listen_events, args=(self, started_evt) - ) - self._events_thread.start() - started_evt.wait() - - def stop(self): - """Stop EventsManager. + @property + def events_list(self) -> List[str]: + """Get a list of supported events. Parameters ---------- @@ -153,10 +115,7 @@ def stop(self): Returns ------- - None + List[str] + List of supported events. """ - if self._events_thread: - self._events_service.end_streaming() - self._events_thread.join() - self._events_thread = None - self._streaming = False + return self._events_list diff --git a/src/ansys/fluent/core/solver/monitors_manager.py b/src/ansys/fluent/core/streaming_services/monitor_streaming.py similarity index 70% rename from src/ansys/fluent/core/solver/monitors_manager.py rename to src/ansys/fluent/core/streaming_services/monitor_streaming.py index 7c2edc4e450d..75df5671bb8d 100644 --- a/src/ansys/fluent/core/solver/monitors_manager.py +++ b/src/ansys/fluent/core/streaming_services/monitor_streaming.py @@ -1,13 +1,15 @@ """Module for monitors management.""" import threading -from typing import Any, Callable, Dict, List, Tuple, Union +from typing import Dict, List, Tuple, Union import numpy as np import pandas as pd +from ansys.fluent.core.streaming_services.streaming import StreamingService -class MonitorsManager: + +class MonitorsManager(StreamingService): """Manages monitors (Fluent residuals and report definitions monitors). Parameters @@ -19,20 +21,15 @@ class MonitorsManager: """ def __init__(self, session_id: str, service): + super().__init__( + target=MonitorsManager._process_streaming, + streaming_service=service, + ) self._session_id: str = session_id - self._monitors_service = service - self._lock: threading.Lock = threading.Lock() self._lock_refresh: threading.Lock = threading.Lock() self._monitors_info = None - self._monitors_thread = None self._data_frames = {} self._on_monitor_refresh_callback = None - self._streaming: bool = False - - @property - def is_streaming(self): - with self._lock: - return self._streaming def get_monitor_set_names(self) -> List[str]: """Get monitor set names. @@ -122,24 +119,6 @@ def get_monitor_set_data( ) ) - def register_on_monitor_refresh_callback( - self, on_monitor_refresh_callback: Callable[[], Any] - ): - """Register monitor refresh callback. - - The callback is triggered whenever monitor data is updated. - - Parameters - ---------- - on_monitor_refresh_callback : Callable[[], Any] - Callback. - - Returns - ------- - None - """ - self._on_monitor_refresh_callback = on_monitor_refresh_callback - def refresh(self, session_id, event_info) -> None: """Refresh plots on-initialized and data-read events. @@ -158,12 +137,15 @@ def refresh(self, session_id, event_info) -> None: None """ with self._lock_refresh: - self._stop() - self._start() + self.stop() + self.start() + + def _prepare(self): + self._update_dataframe() - def _begin_streaming(self, started_evt): + def _process_streaming(self, started_evt): """Begin monitors streaming.""" - responses = self._monitors_service.begin_streaming(started_evt) + responses = self._streaming_service.begin_streaming(started_evt) while True: try: @@ -197,30 +179,14 @@ def _begin_streaming(self, started_evt): except StopIteration: break - def _start(self) -> str: - """Start MonitorsManager.""" + def _update_dataframe(self): with self._lock: - if not self._monitors_thread: - self._monitors_info = self._monitors_service.get_monitors_info() - self._data_frames = {} - for monitor_set_name, monitor_set_info in self._monitors_info.items(): - self._data_frames[monitor_set_name] = {} - monitors_name = list(monitor_set_info["monitors"]) + ["xvalues"] - df = pd.DataFrame([], columns=monitors_name) - df.set_index("xvalues", inplace=True) - self._data_frames[monitor_set_name]["df"] = df - self._data_frames[monitor_set_name]["monitors"] = monitors_name - started_evt = threading.Event() - self._monitors_thread: threading.Thread = threading.Thread( - target=MonitorsManager._begin_streaming, args=(self, started_evt) - ) - self._monitors_thread.start() - started_evt.wait() - - def _stop(self): - """Stops MonitorsManager.""" - if self._monitors_thread: - self._monitors_service.end_streaming() - self._monitors_thread.join() - self._streaming = False - self._monitors_thread = None + self._monitors_info = self._streaming_service.get_monitors_info() + self._data_frames = {} + for monitor_set_name, monitor_set_info in self._monitors_info.items(): + self._data_frames[monitor_set_name] = {} + monitors_name = list(monitor_set_info["monitors"]) + ["xvalues"] + df = pd.DataFrame([], columns=monitors_name) + df.set_index("xvalues", inplace=True) + self._data_frames[monitor_set_name]["df"] = df + self._data_frames[monitor_set_name]["monitors"] = monitors_name diff --git a/src/ansys/fluent/core/streaming_services/streaming.py b/src/ansys/fluent/core/streaming_services/streaming.py new file mode 100644 index 000000000000..da5b19bf5322 --- /dev/null +++ b/src/ansys/fluent/core/streaming_services/streaming.py @@ -0,0 +1,75 @@ +import itertools +import threading +from typing import Callable, Optional + + +class StreamingService: + """Encapsulates a Fluent streaming service.""" + + def __init__(self, target, streaming_service): + self._lock: threading.RLock = threading.RLock() + self._streaming: bool = False + self._target = target + self._streaming_service = streaming_service + self._stream_thread: Optional[threading.Thread] = None + + self._service_callback_id = itertools.count() + self._service_callbacks: dict = {} + + @property + def is_streaming(self): + with self._lock: + return self._streaming + + def register_callback(self, call_back: Callable, *args, **kwargs) -> str: + """Register the callback. + + Parameters + ---------- + call_back : Callable + Callback to register. + + Returns + ------- + str + Registered callback ID. + """ + with self._lock: + callback_id = f"{next(self._service_callback_id)}" + self._service_callbacks[callback_id] = [call_back, args, kwargs] + return callback_id + + def unregister_callback(self, callback_id: str): + """Unregister the callback. + + Parameters + ---------- + callback_id : str + ID of the registered callback. + """ + with self._lock: + if callback_id in self._service_callbacks: + del self._service_callbacks[callback_id] + + def start(self) -> None: + """Start streaming of Fluent transcript.""" + with self._lock: + if not self.is_streaming: + self._prepare() + started_evt = threading.Event() + self._stream_thread = threading.Thread( + target=self._target, args=(self, started_evt) + ) + self._stream_thread.start() + started_evt.wait() + + def stop(self) -> None: + """Stop streaming of Fluent transcript.""" + if self.is_streaming: + self._streaming_service.end_streaming() + self._stream_thread.join() + self._streaming = False + self._stream_thread = None + + def _prepare(self): + pass # Currently only used by monitor services. diff --git a/src/ansys/fluent/core/streaming_services/transcript_streaming.py b/src/ansys/fluent/core/streaming_services/transcript_streaming.py new file mode 100644 index 000000000000..021697ec57d8 --- /dev/null +++ b/src/ansys/fluent/core/streaming_services/transcript_streaming.py @@ -0,0 +1,36 @@ +from ansys.fluent.core.services.transcript import TranscriptService +from ansys.fluent.core.streaming_services.streaming import StreamingService + + +class Transcript(StreamingService): + """Encapsulates a Fluent Transcript streaming service.""" + + def __init__(self, channel, metadata): + super().__init__( + target=Transcript._process_streaming, + streaming_service=TranscriptService(channel, metadata), + ) + + def _process_streaming(self, started_evt): + """Performs processes on transcript depending on the callback + functions.""" + responses = self._streaming_service.begin_streaming(started_evt) + transcript = "" + while True: + try: + response = next(responses) + with self._lock: + self._streaming = True + transcript += response.transcript + if transcript[-1] == "\n": + for callback_map in self._service_callbacks.values(): + if "keep_new_lines" in callback_map[-1].keys(): + if callback_map[-1]["keep_new_lines"]: + callback_map[0](transcript) + else: + callback_map[0](transcript[0:-1]) + else: + callback_map[0](transcript[0:-1]) + transcript = "" + except StopIteration: + break diff --git a/src/ansys/fluent/core/transcript.py b/src/ansys/fluent/core/transcript.py deleted file mode 100644 index 3aba4ceab3f5..000000000000 --- a/src/ansys/fluent/core/transcript.py +++ /dev/null @@ -1,93 +0,0 @@ -import itertools -import threading -from typing import Callable, Optional - -from ansys.fluent.core.services.transcript import TranscriptService - - -class Transcript: - """Encapsulates a Fluent Transcript streaming service.""" - - def __init__(self, channel, metadata): - self._channel = channel - self._metadata = metadata - self._transcript_service = TranscriptService(self._channel, self._metadata) - self._transcript_thread: Optional[threading.Thread] = None - self._streaming: bool = False - self._transcript_callbacks = {} - self._transcript_callback_id = itertools.count() - self._lock = threading.Lock() - - def add_transcript_callback(self, callback_fn: Callable, keep_new_lines=False): - """Initiates a fluent transcript streaming depending on the - callback_fn. - - For eg.: add_transcript_callback(print) prints the transcript on - the interpreter screen. - """ - with self._lock: - callback_id = next(self._transcript_callback_id) - self._transcript_callbacks[callback_id] = ( - callback_fn, - keep_new_lines, - ) - start_thread = len(self._transcript_callbacks) == 1 - if start_thread: - self.start() - return callback_id - - def remove_transcript_callback(self, callback_id): - """Stops each transcript streaming based on the callback_id.""" - with self._lock: - del self._transcript_callbacks[callback_id] - stop_thread = len(self._transcript_callbacks) == 0 - if stop_thread: - self.stop() - - def _process_transcript(self, started_evt): - """Performs processes on transcript depending on the callback - functions.""" - responses = self._transcript_service.begin_streaming(started_evt) - transcript = "" - while True: - try: - response = next(responses) - with self._lock: - self._streaming = True - transcript += response.transcript - if transcript[-1] == "\n": - for ( - callback_function, - keep_new_lines, - ) in self._transcript_callbacks.values(): - if keep_new_lines: - callback_function(transcript) - else: - callback_function(transcript[0:-1]) - transcript = "" - except StopIteration: - break - - @property - def is_streaming(self): - with self._lock: - return self._streaming - - def start(self) -> None: - """Start streaming of Fluent transcript.""" - with self._lock: - if self._transcript_thread is None: - started_evt = threading.Event() - self._transcript_thread = threading.Thread( - target=Transcript._process_transcript, args=(self, started_evt) - ) - self._transcript_thread.start() - started_evt.wait() - - def stop(self) -> None: - """Stop streaming of Fluent transcript.""" - if self.is_streaming: - self._transcript_service.end_streaming() - self._transcript_thread.join() - self._streaming = False - self._transcript_thread = None