Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions doc/source/api/solver/events.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
EventsManager
=============

An instance of ``EventsManager`` exists as an active ``events_manager`` property in each
An instance of ``EventsManager`` exists as an active ``events_streaming`` property in each
solution mode session object. You can register client callbacks with the EventsManager.
The EventsManager calls each callback whenever a server-side event occurs, passing the session
ID and event information arguments to the callback. The EventsManager is useful for solution
Expand All @@ -28,13 +28,13 @@ The following code triggers a callback at the end of every iteration.

cb_itr_id = session.events_manager.register_callback('IterationEndedEvent', callback_executed_at_end_of_iteration)

.. currentmodule:: ansys.fluent.core.solver.events_manager
.. currentmodule:: ansys.fluent.core.streaming_services.events_streaming

.. autosummary::
:toctree: _autosummary
.. automethod:: ansys.fluent.core.solver.events_manager.EventsManager.register_callback
.. automethod:: ansys.fluent.core.solver.events_manager.EventsManager.unregister_callback

.. automethod:: ansys.fluent.core.streaming_services.events_streaming.EventsManager.register_callback
.. automethod:: ansys.fluent.core.streaming_services.events_streaming.EventsManager.unregister_callback



10 changes: 5 additions & 5 deletions doc/source/api/solver/monitors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
MonitorsManager
===============

An instance of ``MonitorsManager`` exists as an active ``monitors_manager``
An instance of ``MonitorsManager`` exists as an active ``monitors_streaming``
property in each solution-mode session object. It provides access to server monitors.


.. currentmodule:: ansys.fluent.core.solver.monitors_manager
.. currentmodule:: ansys.fluent.core.streaming_services.monitor_streaming

.. autosummary::
:toctree: _autosummary

.. automethod:: ansys.fluent.core.solver.monitors_manager.MonitorsManager.get_monitor_set_names
.. automethod:: ansys.fluent.core.solver.monitors_manager.MonitorsManager.get_monitor_set_data
.. automethod:: ansys.fluent.core.solver.monitors_manager.MonitorsManager.get_monitor_set_plot
.. automethod:: ansys.fluent.core.streaming_services.monitor_streaming.MonitorsManager.get_monitor_set_names
.. automethod:: ansys.fluent.core.streaming_services.monitor_streaming.MonitorsManager.get_monitor_set_data
.. automethod:: ansys.fluent.core.streaming_services.monitor_streaming.MonitorsManager.get_monitor_set_plot


20 changes: 11 additions & 9 deletions src/ansys/fluent/core/fluent_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from ansys.fluent.core.services.monitor import MonitorsService
from ansys.fluent.core.services.scheme_eval import SchemeEval, SchemeEvalService
from ansys.fluent.core.services.settings import SettingsService
from ansys.fluent.core.solver.events_manager import EventsManager
from ansys.fluent.core.solver.monitors_manager import MonitorsManager
from ansys.fluent.core.transcript import Transcript
from ansys.fluent.core.streaming_services.events_streaming import EventsManager
from ansys.fluent.core.streaming_services.monitor_streaming import MonitorsManager
from ansys.fluent.core.streaming_services.transcript_streaming import Transcript


def _get_max_c_int_limit() -> int:
Expand Down Expand Up @@ -263,21 +263,23 @@ def start_transcript(
"""
if not _FluentConnection._writing_transcript_to_interpreter:
if write_to_interpreter:
self.callback_id1 = self._transcript.add_transcript_callback(print)
self.callback_id1 = self._transcript.register_callback(print)
_FluentConnection._writing_transcript_to_interpreter = True
if file_path:
if Path(file_path).exists():
os.remove(file_path)
append_to_file = AppendToFile(file_path)
self.callback_id2 = self._transcript.add_transcript_callback(
self.callback_id2 = self._transcript.register_callback(
append_to_file, keep_new_lines=True
)
self._transcript.start()

def stop_transcript(self) -> None:
"""Stop streaming of Fluent transcript."""
for callback_id in (self.callback_id1, self.callback_id2):
if callback_id is not None:
self._transcript.remove_transcript_callback(callback_id)
self._transcript.unregister_callback(callback_id)
self._transcript.stop()

def add_transcript_callback(self, callback_fn: Callable):
"""Initiates a fluent transcript streaming depending on the
Expand All @@ -286,11 +288,11 @@ def add_transcript_callback(self, callback_fn: Callable):
For eg.: add_transcript_callback(print) prints the transcript on
the interpreter screen.
"""
self._transcript.add_transcript_callback(callback_fn)
self._transcript.register_callback(callback_fn)

def remove_transcript_callback(self, callback_id: int):
def remove_transcript_callback(self, callback_id: str):
"""Stops each transcript streaming based on the callback_id."""
self._transcript.remove_transcript_callback(callback_id)
self._transcript.unregister_callback(callback_id)

def check_health(self) -> str:
"""Check health of Fluent connection."""
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
"""Module for events management."""
from functools import partial
import itertools
import threading
from typing import Callable, List

from ansys.api.fluent.v0 import events_pb2 as EventsProtoModule
from ansys.fluent.core.streaming_services.streaming import StreamingService


class EventsManager:
class EventsManager(StreamingService):
"""Manages server-side events.

This class allows the client to register and unregister callbacks with server events.
Expand All @@ -26,31 +25,24 @@ class EventsManager:
"""

def __init__(self, session_id: str, service):
super().__init__(
target=EventsManager._process_streaming,
streaming_service=service,
)
self._session_id: str = session_id
self._events_service = service
self._events_to_callbacks_map: dict = {}
self._id_iter = itertools.count()
self._lock: threading.Lock = threading.Lock()
self._events_thread = None
self._events_list: List[str] = [
attr for attr in dir(EventsProtoModule) if attr.endswith("Event")
]
self._streaming = False

@property
def is_streaming(self):
with self._lock:
return self._streaming

def _listen_events(self, started_evt):
responses = self._events_service.begin_streaming(started_evt)
def _process_streaming(self, started_evt):
responses = self._streaming_service.begin_streaming(started_evt)
while True:
try:
response = next(responses)
event_name = response.WhichOneof("as")
with self._lock:
self._streaming = True
callbacks_map = self._events_to_callbacks_map.get(event_name, {})
callbacks_map = self._service_callbacks.get(event_name, {})
for call_back in callbacks_map.values():
call_back(
session_id=self._session_id,
Expand All @@ -59,24 +51,9 @@ def _listen_events(self, started_evt):
except StopIteration:
break

@property
def events_list(self) -> List[str]:
"""Get a list of supported events.

Parameters
----------
None

Returns
-------
List[str]
List of supported events.
"""
return self._events_list

def register_callback(
self, event_name: str, call_back: Callable, *args, **kwargs
) -> str:
self, event_name: str = None, call_back: Callable = None, *args, **kwargs
):
"""Register the callback.

Parameters
Expand All @@ -97,19 +74,23 @@ def register_callback(
RuntimeError
If event name is not valid.
"""
if not event_name in self.events_list:
if event_name is None or call_back is None:
raise RuntimeError(
"Please provide compulsory arguments : 'event_name' and 'call_back'"
)

if event_name not in self.events_list:
raise RuntimeError(f"{event_name} is not a valid event.")
with self._lock:
event_name = event_name.lower()
id = f"{event_name}-{next(self._id_iter)}"
callbacks_map = self._events_to_callbacks_map.get(event_name)
callback_id = f"{event_name}-{next(self._service_callback_id)}"
callbacks_map = self._service_callbacks.get(event_name)
if callbacks_map:
callbacks_map.update({id: partial(call_back, *args, **kwargs)})
callbacks_map.update({callback_id: partial(call_back, *args, **kwargs)})
else:
self._events_to_callbacks_map[event_name] = {
id: partial(call_back, *args, **kwargs)
self._service_callbacks[event_name] = {
callback_id: partial(call_back, *args, **kwargs)
}
return id

def unregister_callback(self, callback_id: str):
"""Unregister the callback.
Expand All @@ -120,43 +101,21 @@ def unregister_callback(self, callback_id: str):
ID of the registered callback.
"""
with self._lock:
for callbacks_map in self._events_to_callbacks_map.values():
for callbacks_map in self._service_callbacks.values():
if callback_id in callbacks_map:
del callbacks_map[callback_id]

def start(self):
"""Start EventsManager.

Parameters
----------
None

Returns
-------
None
"""
with self._lock:
if self._events_thread is None:
started_evt = threading.Event()
self._events_thread: threading.Thread = threading.Thread(
target=EventsManager._listen_events, args=(self, started_evt)
)
self._events_thread.start()
started_evt.wait()

def stop(self):
"""Stop EventsManager.
@property
def events_list(self) -> List[str]:
"""Get a list of supported events.

Parameters
----------
None

Returns
-------
None
List[str]
List of supported events.
"""
if self._events_thread:
self._events_service.end_streaming()
self._events_thread.join()
self._events_thread = None
self._streaming = False
return self._events_list
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""Module for monitors management."""

import threading
from typing import Any, Callable, Dict, List, Tuple, Union
from typing import Dict, List, Tuple, Union

import numpy as np
import pandas as pd

from ansys.fluent.core.streaming_services.streaming import StreamingService

class MonitorsManager:

class MonitorsManager(StreamingService):
"""Manages monitors (Fluent residuals and report definitions monitors).

Parameters
Expand All @@ -19,20 +21,15 @@ class MonitorsManager:
"""

def __init__(self, session_id: str, service):
super().__init__(
target=MonitorsManager._process_streaming,
streaming_service=service,
)
self._session_id: str = session_id
self._monitors_service = service
self._lock: threading.Lock = threading.Lock()
self._lock_refresh: threading.Lock = threading.Lock()
self._monitors_info = None
self._monitors_thread = None
self._data_frames = {}
self._on_monitor_refresh_callback = None
self._streaming: bool = False

@property
def is_streaming(self):
with self._lock:
return self._streaming

def get_monitor_set_names(self) -> List[str]:
"""Get monitor set names.
Expand Down Expand Up @@ -122,24 +119,6 @@ def get_monitor_set_data(
)
)

def register_on_monitor_refresh_callback(
self, on_monitor_refresh_callback: Callable[[], Any]
):
"""Register monitor refresh callback.

The callback is triggered whenever monitor data is updated.

Parameters
----------
on_monitor_refresh_callback : Callable[[], Any]
Callback.

Returns
-------
None
"""
self._on_monitor_refresh_callback = on_monitor_refresh_callback

def refresh(self, session_id, event_info) -> None:
"""Refresh plots on-initialized and data-read events.

Expand All @@ -158,12 +137,15 @@ def refresh(self, session_id, event_info) -> None:
None
"""
with self._lock_refresh:
self._stop()
self._start()
self.stop()
self.start()

def _prepare(self):
self._update_dataframe()

def _begin_streaming(self, started_evt):
def _process_streaming(self, started_evt):
"""Begin monitors streaming."""
responses = self._monitors_service.begin_streaming(started_evt)
responses = self._streaming_service.begin_streaming(started_evt)

while True:
try:
Expand Down Expand Up @@ -197,30 +179,14 @@ def _begin_streaming(self, started_evt):
except StopIteration:
break

def _start(self) -> str:
"""Start MonitorsManager."""
def _update_dataframe(self):
with self._lock:
if not self._monitors_thread:
self._monitors_info = self._monitors_service.get_monitors_info()
self._data_frames = {}
for monitor_set_name, monitor_set_info in self._monitors_info.items():
self._data_frames[monitor_set_name] = {}
monitors_name = list(monitor_set_info["monitors"]) + ["xvalues"]
df = pd.DataFrame([], columns=monitors_name)
df.set_index("xvalues", inplace=True)
self._data_frames[monitor_set_name]["df"] = df
self._data_frames[monitor_set_name]["monitors"] = monitors_name
started_evt = threading.Event()
self._monitors_thread: threading.Thread = threading.Thread(
target=MonitorsManager._begin_streaming, args=(self, started_evt)
)
self._monitors_thread.start()
started_evt.wait()

def _stop(self):
"""Stops MonitorsManager."""
if self._monitors_thread:
self._monitors_service.end_streaming()
self._monitors_thread.join()
self._streaming = False
self._monitors_thread = None
self._monitors_info = self._streaming_service.get_monitors_info()
self._data_frames = {}
for monitor_set_name, monitor_set_info in self._monitors_info.items():
self._data_frames[monitor_set_name] = {}
monitors_name = list(monitor_set_info["monitors"]) + ["xvalues"]
df = pd.DataFrame([], columns=monitors_name)
df.set_index("xvalues", inplace=True)
self._data_frames[monitor_set_name]["df"] = df
self._data_frames[monitor_set_name]["monitors"] = monitors_name
Loading