From df567aa9a23052d30c8ba5bda6f1764ed1bac16f Mon Sep 17 00:00:00 2001 From: syntron Date: Sun, 23 Nov 2025 11:05:46 +0100 Subject: [PATCH 1/3] merge OMCSessionZMQ into OMCProcess; compatibility class for OMCSessionZMQ --- OMPython/OMCSession.py | 295 +++++++++++++++++++++++++++-------------- 1 file changed, 193 insertions(+), 102 deletions(-) diff --git a/OMPython/OMCSession.py b/OMPython/OMCSession.py index 73d4b0c4..544fff5b 100644 --- a/OMPython/OMCSession.py +++ b/OMPython/OMCSession.py @@ -293,7 +293,7 @@ class OMCPathReal(pathlib.PurePosixPath): errors as well as usage on a Windows system due to slightly different definitions (PureWindowsPath). """ - def __init__(self, *path, session: OMCSessionZMQ) -> None: + def __init__(self, *path, session: OMCProcess) -> None: super().__init__(*path) self._session = session @@ -539,7 +539,117 @@ def get_cmd(self) -> list[str]: class OMCSessionZMQ: """ - This class is handling an OMC session. + This class is handling an OMC session. It is a compatibility class for the new schema using OMCProcess* classes. + """ + + def __init__( + self, + timeout: float = 10.00, + omhome: Optional[str] = None, + omc_process: Optional[OMCProcess] = None, + ) -> None: + """ + Initialisation for OMCSessionZMQ + """ + warnings.warn(message="The class OMCSessionZMQ is depreciated and will be removed in future versions; " + "please use OMCProcess* classes instead!", + category=DeprecationWarning, + stacklevel=2) + + if omc_process is None: + omc_process = OMCProcessLocal(omhome=omhome, timeout=timeout) + elif not isinstance(omc_process, OMCProcess): + raise OMCSessionException("Invalid definition of the OMC process!") + self.omc_process = omc_process + + def __del__(self): + del self.omc_process + + @staticmethod + def escape_str(value: str) -> str: + """ + Escape a string such that it can be used as string within OMC expressions, i.e. escape all double quotes. + """ + return OMCProcess.escape_str(value=value) + + def omcpath(self, *path) -> OMCPath: + """ + Create an OMCPath object based on the given path segments and the current OMC session. + """ + return self.omc_process.omcpath(path) + + def omcpath_tempdir(self, tempdir_base: Optional[OMCPath] = None) -> OMCPath: + """ + Get a temporary directory using OMC. It is our own implementation as non-local usage relies on OMC to run all + filesystem related access. + """ + return self.omc_process.omcpath_tempdir(tempdir_base=tempdir_base) + + def omc_run_data_update(self, omc_run_data: OMCSessionRunData) -> OMCSessionRunData: + """ + Modify data based on the selected OMCProcess implementation. + + Needs to be implemented in the subclasses. + """ + return self.omc_process.omc_run_data_update(omc_run_data=omc_run_data) + + @staticmethod + def run_model_executable(cmd_run_data: OMCSessionRunData) -> int: + """ + Run the command defined in cmd_run_data. This class is defined as static method such that there is no need to + keep instances of over classes around. + """ + return OMCProcess.run_model_executable(cmd_run_data=cmd_run_data) + + def sendExpression(self, command: str, parsed: bool = True) -> Any: + """ + Send an expression to the OMC server and return the result. + + The complete error handling of the OMC result is done within this method using '"getMessagesStringInternal()'. + Caller should only check for OMCSessionException. + """ + return self.omc_process.sendExpression(command=command, parsed=parsed) + + +class PostInitCaller(type): + """ + Metaclass definition to define a new function __post_init__() which is called after all __init__() functions where + executed. The workflow would read as follows: + + On creating a class with the following inheritance Class2 => Class1 => Class0, where each class calls the __init__() + functions of its parent, i.e. super().__init__(), as well as __post_init__() the call schema would be: + + myclass = Class2() + Class2.__init__() + Class1.__init__() + Class0.__init__() + Class2.__post_init__() <= this is done due to the metaclass + Class1.__post_init__() + Class0.__post_init__() + + References: + * https://stackoverflow.com/questions/100003/what-are-metaclasses-in-python + * https://stackoverflow.com/questions/795190/how-to-perform-common-post-initialization-tasks-in-inherited-classes + """ + + def __call__(cls, *args, **kwargs): + obj = type.__call__(cls, *args, **kwargs) + obj.__post_init__() + return obj + + +class OMCProcessMeta(abc.ABCMeta, PostInitCaller): + """ + Helper class to get a combined metaclass of ABCMeta and PostInitCaller. + + References: + * https://stackoverflow.com/questions/11276037/resolving-metaclass-conflicts + """ + + +class OMCProcess(metaclass=OMCProcessMeta): + """ + Base class for an OMC session. This class contains common functionality for all OMC sessions. The main method is sendExpression() which is used to send commands to the OMC process. @@ -561,22 +671,48 @@ class OMCSessionZMQ: def __init__( self, timeout: float = 10.00, - omhome: Optional[str] = None, - omc_process: Optional[OMCProcess] = None, + **kwargs, ) -> None: """ - Initialisation for OMCSessionZMQ + Initialisation for OMCProcess """ + # store variables self._timeout = timeout + # generate a random string for this session + self._random_string = uuid.uuid4().hex + # get a temporary directory + self._temp_dir = pathlib.Path(tempfile.gettempdir()) - if omc_process is None: - omc_process = OMCProcessLocal(omhome=omhome, timeout=timeout) - elif not isinstance(omc_process, OMCProcess): - raise OMCSessionException("Invalid definition of the OMC process!") - self.omc_process = omc_process + # omc process + self._omc_process: Optional[subprocess.Popen] = None + # omc ZMQ port to use + self._omc_port: Optional[str] = None + # omc port and log file + self._omc_filebase = f"openmodelica.{self._random_string}" + # ZMQ socket to communicate with OMC + self._omc_zmq: Optional[zmq.Socket[bytes]] = None + + # setup log file - this file must be closed in the destructor + logfile = self._temp_dir / (self._omc_filebase + ".log") + self._omc_loghandle: Optional[io.TextIOWrapper] = None + try: + self._omc_loghandle = open(file=logfile, mode="w+", encoding="utf-8") + except OSError as ex: + raise OMCSessionException(f"Cannot open log file {logfile}.") from ex - port = self.omc_process.get_port() + # variables to store compiled re expressions use in self.sendExpression() + self._re_log_entries: Optional[re.Pattern[str]] = None + self._re_log_raw: Optional[re.Pattern[str]] = None + + self._re_portfile_path = re.compile(pattern=r'\nDumped server port in file: (.*?)($|\n)', + flags=re.MULTILINE | re.DOTALL) + + def __post_init__(self) -> None: + """ + Create the connection to the OMC server using ZeroMQ. + """ + port = self.get_port() if not isinstance(port, str): raise OMCSessionException(f"Invalid content for port: {port}") @@ -587,22 +723,36 @@ def __init__( omc.setsockopt(zmq.IMMEDIATE, True) # Queue messages only to completed connections omc.connect(port) - self.omc_zmq: Optional[zmq.Socket[bytes]] = omc - - # variables to store compiled re expressions use in self.sendExpression() - self._re_log_entries: Optional[re.Pattern[str]] = None - self._re_log_raw: Optional[re.Pattern[str]] = None + self._omc_zmq = omc def __del__(self): - if isinstance(self.omc_zmq, zmq.Socket): + if isinstance(self._omc_zmq, zmq.Socket): try: self.sendExpression("quit()") except OMCSessionException: pass + finally: + self._omc_zmq = None - del self.omc_zmq + if self._omc_loghandle is not None: + try: + self._omc_loghandle.close() + except (OSError, IOError): + pass + finally: + self._omc_loghandle = None - self.omc_zmq = None + if isinstance(self._omc_process, subprocess.Popen): + try: + self._omc_process.wait(timeout=2.0) + except subprocess.TimeoutExpired: + if self._omc_process: + logger.warning("OMC did not exit after being sent the quit() command; " + "killing the process with pid=%s", self._omc_process.pid) + self._omc_process.kill() + self._omc_process.wait() + finally: + self._omc_process = None @staticmethod def escape_str(value: str) -> str: @@ -618,7 +768,7 @@ def omcpath(self, *path) -> OMCPath: # fallback solution for Python < 3.12; a modified pathlib.Path object is used as OMCPath replacement if sys.version_info < (3, 12): - if isinstance(self.omc_process, OMCProcessLocal): + if isinstance(self, OMCProcessLocal): # noinspection PyArgumentList return OMCPath(*path) raise OMCSessionException("OMCPath is supported for Python < 3.12 only if OMCProcessLocal is used!") @@ -655,14 +805,6 @@ def omcpath_tempdir(self, tempdir_base: Optional[OMCPath] = None) -> OMCPath: return tempdir - def omc_run_data_update(self, omc_run_data: OMCSessionRunData) -> OMCSessionRunData: - """ - Modify data based on the selected OMCProcess implementation. - - Needs to be implemented in the subclasses. - """ - return self.omc_process.omc_run_data_update(omc_run_data=omc_run_data) - @staticmethod def run_model_executable(cmd_run_data: OMCSessionRunData) -> int: """ @@ -715,29 +857,41 @@ def sendExpression(self, command: str, parsed: bool = True) -> Any: The complete error handling of the OMC result is done within this method using '"getMessagesStringInternal()'. Caller should only check for OMCSessionException. """ - if self.omc_zmq is None: - raise OMCSessionException("No OMC running. Create a new instance of OMCProcess!") + + # this is needed if the class is not fully initialized or in the process of deletion + if hasattr(self, '_timeout'): + timeout = self._timeout + else: + timeout = 1.0 + + if self._omc_zmq is None: + raise OMCSessionException("No OMC running. Please create a new instance of OMCProcess!") logger.debug("sendExpression(%r, parsed=%r)", command, parsed) attempts = 0 while True: try: - self.omc_zmq.send_string(str(command), flags=zmq.NOBLOCK) + self._omc_zmq.send_string(str(command), flags=zmq.NOBLOCK) break except zmq.error.Again: pass attempts += 1 if attempts >= 50: - raise OMCSessionException(f"No connection with OMC (timeout={self._timeout}). " - f"Log-file says: \n{self.omc_process.get_log()}") - time.sleep(self._timeout / 50.0) + # in the deletion process, the content is cleared. Thus, any access to a class attribute must be checked + try: + log_content = self.get_log() + except OMCSessionException: + log_content = 'log not available' + raise OMCSessionException(f"No connection with OMC (timeout={timeout}). " + f"Log-file says: \n{log_content}") + time.sleep(timeout / 50.0) if command == "quit()": - self.omc_zmq.close() - self.omc_zmq = None + self._omc_zmq.close() + self._omc_zmq = None return None - result = self.omc_zmq.recv_string() + result = self._omc_zmq.recv_string() if result.startswith('Error occurred building AST'): raise OMCSessionException(f"OMC error: {result}") @@ -755,8 +909,8 @@ def sendExpression(self, command: str, parsed: bool = True) -> Any: return result # always check for error - self.omc_zmq.send_string('getMessagesStringInternal()', flags=zmq.NOBLOCK) - error_raw = self.omc_zmq.recv_string() + self._omc_zmq.send_string('getMessagesStringInternal()', flags=zmq.NOBLOCK) + error_raw = self._omc_zmq.recv_string() # run error handling only if there is something to check msg_long_list = [] has_error = False @@ -839,69 +993,6 @@ def sendExpression(self, command: str, parsed: bool = True) -> Any: except (TypeError, UnboundLocalError) as ex2: raise OMCSessionException("Cannot parse OMC result") from ex2 - -class OMCProcess(metaclass=abc.ABCMeta): - """ - Metaclass to be used by all OMCProcess* implementations. The main task is the evaluation of the port to be used to - connect to the selected OMC process (method get_port()). Besides that, any implementation should define the method - omc_run_data_update() to finalize the definition of an OMC simulation. - """ - - def __init__( - self, - timeout: float = 10.00, - **kwargs, - ) -> None: - super().__init__(**kwargs) - - # store variables - self._timeout = timeout - - # omc process - self._omc_process: Optional[subprocess.Popen] = None - # omc ZMQ port to use - self._omc_port: Optional[str] = None - - # generate a random string for this session - self._random_string = uuid.uuid4().hex - - # omc port and log file - self._omc_filebase = f"openmodelica.{self._random_string}" - - # get a temporary directory - self._temp_dir = pathlib.Path(tempfile.gettempdir()) - - # setup log file - this file must be closed in the destructor - logfile = self._temp_dir / (self._omc_filebase + ".log") - self._omc_loghandle: Optional[io.TextIOWrapper] = None - try: - self._omc_loghandle = open(file=logfile, mode="w+", encoding="utf-8") - except OSError as ex: - raise OMCSessionException(f"Cannot open log file {logfile}.") from ex - - self._re_portfile_path = re.compile(pattern=r'\nDumped server port in file: (.*?)($|\n)', - flags=re.MULTILINE | re.DOTALL) - - def __del__(self): - if self._omc_loghandle is not None: - try: - self._omc_loghandle.close() - except (OSError, IOError): - pass - self._omc_loghandle = None - - if isinstance(self._omc_process, subprocess.Popen): - try: - self._omc_process.wait(timeout=2.0) - except subprocess.TimeoutExpired: - if self._omc_process: - logger.warning("OMC did not exit after being sent the quit() command; " - "killing the process with pid=%s", self._omc_process.pid) - self._omc_process.kill() - self._omc_process.wait() - finally: - self._omc_process = None - def get_port(self) -> Optional[str]: """ Get the port to connect to the OMC process. From e90aa316e911051f0af036dda73c3ae479b1e028 Mon Sep 17 00:00:00 2001 From: syntron Date: Sun, 23 Nov 2025 16:23:29 +0100 Subject: [PATCH 2/3] [OMCSessionZMQ] fix omcpath() --- OMPython/OMCSession.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/OMPython/OMCSession.py b/OMPython/OMCSession.py index 544fff5b..6b7412fc 100644 --- a/OMPython/OMCSession.py +++ b/OMPython/OMCSession.py @@ -576,7 +576,7 @@ def omcpath(self, *path) -> OMCPath: """ Create an OMCPath object based on the given path segments and the current OMC session. """ - return self.omc_process.omcpath(path) + return self.omc_process.omcpath(*path) def omcpath_tempdir(self, tempdir_base: Optional[OMCPath] = None) -> OMCPath: """ From beddea1d90dad6b503b632a3eb521bfd4af62090 Mon Sep 17 00:00:00 2001 From: syntron Date: Sun, 23 Nov 2025 16:59:20 +0100 Subject: [PATCH 3/3] [OMCSessionZMQ] add missing execute() --- OMPython/OMCSession.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/OMPython/OMCSession.py b/OMPython/OMCSession.py index 6b7412fc..ee92ce9f 100644 --- a/OMPython/OMCSession.py +++ b/OMPython/OMCSession.py @@ -601,6 +601,9 @@ def run_model_executable(cmd_run_data: OMCSessionRunData) -> int: """ return OMCProcess.run_model_executable(cmd_run_data=cmd_run_data) + def execute(self, command: str): + return self.omc_process.execute(command=command) + def sendExpression(self, command: str, parsed: bool = True) -> Any: """ Send an expression to the OMC server and return the result.