Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ansys/fluent/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def __init__(
self.events_manager.register_callback(
"DataReadEvent", self.monitors_manager.refresh
)
self.events_manager.start()
self._datamodel_service_tui = DatamodelService_TUI(
self._channel, self._metadata
)
Expand Down
54 changes: 31 additions & 23 deletions src/ansys/fluent/core/solver/events_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,35 @@ class EventsManager:
"""

def __init__(self, session_id: str, service):
self.__session_id: str = session_id
self.__events_service = service
self.__events_to_callbacks_map: dict = {}
self.__id_iter = itertools.count()
self.__lock: threading.Lock = threading.Lock()
self.__events_list: List[str] = [
self._session_id: str = session_id
self._events_service = service
self._events_to_callbacks_map: dict = {}
self._id_iter = itertools.count()
self._lock: threading.Lock = threading.Lock()
self._events_thread = None
self._events_list: List[str] = [
attr for attr in dir(EventsProtoModule) if attr.endswith("Event")
]
self.__events_thread: threading.Thread = threading.Thread(
target=EventsManager.__listen_events, args=(self,)
)
self.__events_thread.start()

def __listen_events(self):
responses = self.__events_service.begin_streaming()
def _listen_events(self):
responses = self._events_service.begin_streaming()
while True:
try:
response = next(responses)
event_name = response.WhichOneof("as")
with self.__lock:
callbacks_map = self.__events_to_callbacks_map.get(event_name, {})
with self._lock:
callbacks_map = self._events_to_callbacks_map.get(event_name, {})
for call_back in callbacks_map.values():
call_back(
session_id=self.__session_id,
session_id=self._session_id,
event_info=getattr(response, event_name),
)
except StopIteration:
break

@property
def events_list(self) -> List[str]:
return self.__events_list
return self._events_list

def register_callback(
self, event_name: str, call_back: Callable, *args, **kwargs
Expand All @@ -84,14 +81,14 @@ def register_callback(
"""
if not event_name in self.events_list:
raise RuntimeError(f"{event_name} is not a valid event.")
with self.__lock:
with self._lock:
event_name = event_name.lower()
id = f"{event_name}-{next(self.__id_iter)}"
callbacks_map = self.__events_to_callbacks_map.get(event_name)
id = f"{event_name}-{next(self._id_iter)}"
callbacks_map = self._events_to_callbacks_map.get(event_name)
if callbacks_map:
callbacks_map.update({id: partial(call_back, *args, **kwargs)})
else:
self.__events_to_callbacks_map[event_name] = {
self._events_to_callbacks_map[event_name] = {
id: partial(call_back, *args, **kwargs)
}
return id
Expand All @@ -104,11 +101,22 @@ def unregister_callback(self, callback_id: str):
callback_id : str
Registered callback Id.
"""
with self.__lock:
for callbacks_map in self.__events_to_callbacks_map.values():
with self._lock:
for callbacks_map in self._events_to_callbacks_map.values():
if callback_id in callbacks_map:
del callbacks_map[callback_id]

def start(self):
"""Start Events manager."""
if self._events_thread is None:
self._events_thread: threading.Thread = threading.Thread(
target=EventsManager._listen_events, args=(self,)
)
self._events_thread.start()

def stop(self):
"""Stop Events manager."""
self.__events_service.end_streaming()
if self._events_thread:
self._events_service.end_streaming()
self._events_thread.join()
self._events_thread = None