Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 196 additions & 102 deletions OMPython/OMCSession.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ class OMCPathReal(pathlib.PurePosixPath):
errors as well as usage on a Windows system due to slightly different definitions (PureWindowsPath).
"""

def __init__(self, *path, session: OMCSessionZMQ) -> None:
def __init__(self, *path, session: OMCProcess) -> None:
super().__init__(*path)
self._session = session

Expand Down Expand Up @@ -539,7 +539,120 @@ def get_cmd(self) -> list[str]:

class OMCSessionZMQ:
"""
This class is handling an OMC session.
This class is handling an OMC session. It is a compatibility class for the new schema using OMCProcess* classes.
"""

def __init__(
self,
timeout: float = 10.00,
omhome: Optional[str] = None,
omc_process: Optional[OMCProcess] = None,
) -> None:
"""
Initialisation for OMCSessionZMQ
"""
warnings.warn(message="The class OMCSessionZMQ is depreciated and will be removed in future versions; "
"please use OMCProcess* classes instead!",
category=DeprecationWarning,
stacklevel=2)

if omc_process is None:
omc_process = OMCProcessLocal(omhome=omhome, timeout=timeout)
elif not isinstance(omc_process, OMCProcess):
raise OMCSessionException("Invalid definition of the OMC process!")
self.omc_process = omc_process

def __del__(self):
del self.omc_process

@staticmethod
def escape_str(value: str) -> str:
"""
Escape a string such that it can be used as string within OMC expressions, i.e. escape all double quotes.
"""
return OMCProcess.escape_str(value=value)

def omcpath(self, *path) -> OMCPath:
"""
Create an OMCPath object based on the given path segments and the current OMC session.
"""
return self.omc_process.omcpath(*path)

def omcpath_tempdir(self, tempdir_base: Optional[OMCPath] = None) -> OMCPath:
"""
Get a temporary directory using OMC. It is our own implementation as non-local usage relies on OMC to run all
filesystem related access.
"""
return self.omc_process.omcpath_tempdir(tempdir_base=tempdir_base)

def omc_run_data_update(self, omc_run_data: OMCSessionRunData) -> OMCSessionRunData:
"""
Modify data based on the selected OMCProcess implementation.

Needs to be implemented in the subclasses.
"""
return self.omc_process.omc_run_data_update(omc_run_data=omc_run_data)

@staticmethod
def run_model_executable(cmd_run_data: OMCSessionRunData) -> int:
"""
Run the command defined in cmd_run_data. This class is defined as static method such that there is no need to
keep instances of over classes around.
"""
return OMCProcess.run_model_executable(cmd_run_data=cmd_run_data)

def execute(self, command: str):
return self.omc_process.execute(command=command)

def sendExpression(self, command: str, parsed: bool = True) -> Any:
"""
Send an expression to the OMC server and return the result.

The complete error handling of the OMC result is done within this method using '"getMessagesStringInternal()'.
Caller should only check for OMCSessionException.
"""
return self.omc_process.sendExpression(command=command, parsed=parsed)


class PostInitCaller(type):
"""
Metaclass definition to define a new function __post_init__() which is called after all __init__() functions where
executed. The workflow would read as follows:

On creating a class with the following inheritance Class2 => Class1 => Class0, where each class calls the __init__()
functions of its parent, i.e. super().__init__(), as well as __post_init__() the call schema would be:

myclass = Class2()
Class2.__init__()
Class1.__init__()
Class0.__init__()
Class2.__post_init__() <= this is done due to the metaclass
Class1.__post_init__()
Class0.__post_init__()

References:
* https://stackoverflow.com/questions/100003/what-are-metaclasses-in-python
* https://stackoverflow.com/questions/795190/how-to-perform-common-post-initialization-tasks-in-inherited-classes
"""

def __call__(cls, *args, **kwargs):
obj = type.__call__(cls, *args, **kwargs)
obj.__post_init__()
return obj


class OMCProcessMeta(abc.ABCMeta, PostInitCaller):
"""
Helper class to get a combined metaclass of ABCMeta and PostInitCaller.

References:
* https://stackoverflow.com/questions/11276037/resolving-metaclass-conflicts
"""


class OMCProcess(metaclass=OMCProcessMeta):
"""
Base class for an OMC session. This class contains common functionality for all OMC sessions.

The main method is sendExpression() which is used to send commands to the OMC process.

Expand All @@ -561,22 +674,48 @@ class OMCSessionZMQ:
def __init__(
self,
timeout: float = 10.00,
omhome: Optional[str] = None,
omc_process: Optional[OMCProcess] = None,
**kwargs,
) -> None:
"""
Initialisation for OMCSessionZMQ
Initialisation for OMCProcess
"""

# store variables
self._timeout = timeout
# generate a random string for this session
self._random_string = uuid.uuid4().hex
# get a temporary directory
self._temp_dir = pathlib.Path(tempfile.gettempdir())

if omc_process is None:
omc_process = OMCProcessLocal(omhome=omhome, timeout=timeout)
elif not isinstance(omc_process, OMCProcess):
raise OMCSessionException("Invalid definition of the OMC process!")
self.omc_process = omc_process
# omc process
self._omc_process: Optional[subprocess.Popen] = None
# omc ZMQ port to use
self._omc_port: Optional[str] = None
# omc port and log file
self._omc_filebase = f"openmodelica.{self._random_string}"
# ZMQ socket to communicate with OMC
self._omc_zmq: Optional[zmq.Socket[bytes]] = None

# setup log file - this file must be closed in the destructor
logfile = self._temp_dir / (self._omc_filebase + ".log")
self._omc_loghandle: Optional[io.TextIOWrapper] = None
try:
self._omc_loghandle = open(file=logfile, mode="w+", encoding="utf-8")
except OSError as ex:
raise OMCSessionException(f"Cannot open log file {logfile}.") from ex

port = self.omc_process.get_port()
# variables to store compiled re expressions use in self.sendExpression()
self._re_log_entries: Optional[re.Pattern[str]] = None
self._re_log_raw: Optional[re.Pattern[str]] = None

self._re_portfile_path = re.compile(pattern=r'\nDumped server port in file: (.*?)($|\n)',
flags=re.MULTILINE | re.DOTALL)

def __post_init__(self) -> None:
"""
Create the connection to the OMC server using ZeroMQ.
"""
port = self.get_port()
if not isinstance(port, str):
raise OMCSessionException(f"Invalid content for port: {port}")

Expand All @@ -587,22 +726,36 @@ def __init__(
omc.setsockopt(zmq.IMMEDIATE, True) # Queue messages only to completed connections
omc.connect(port)

self.omc_zmq: Optional[zmq.Socket[bytes]] = omc

# variables to store compiled re expressions use in self.sendExpression()
self._re_log_entries: Optional[re.Pattern[str]] = None
self._re_log_raw: Optional[re.Pattern[str]] = None
self._omc_zmq = omc

def __del__(self):
if isinstance(self.omc_zmq, zmq.Socket):
if isinstance(self._omc_zmq, zmq.Socket):
try:
self.sendExpression("quit()")
except OMCSessionException:
pass
finally:
self._omc_zmq = None

del self.omc_zmq
if self._omc_loghandle is not None:
try:
self._omc_loghandle.close()
except (OSError, IOError):
pass
finally:
self._omc_loghandle = None

self.omc_zmq = None
if isinstance(self._omc_process, subprocess.Popen):
try:
self._omc_process.wait(timeout=2.0)
except subprocess.TimeoutExpired:
if self._omc_process:
logger.warning("OMC did not exit after being sent the quit() command; "
"killing the process with pid=%s", self._omc_process.pid)
self._omc_process.kill()
self._omc_process.wait()
finally:
self._omc_process = None

@staticmethod
def escape_str(value: str) -> str:
Expand All @@ -618,7 +771,7 @@ def omcpath(self, *path) -> OMCPath:

# fallback solution for Python < 3.12; a modified pathlib.Path object is used as OMCPath replacement
if sys.version_info < (3, 12):
if isinstance(self.omc_process, OMCProcessLocal):
if isinstance(self, OMCProcessLocal):
# noinspection PyArgumentList
return OMCPath(*path)
raise OMCSessionException("OMCPath is supported for Python < 3.12 only if OMCProcessLocal is used!")
Expand Down Expand Up @@ -655,14 +808,6 @@ def omcpath_tempdir(self, tempdir_base: Optional[OMCPath] = None) -> OMCPath:

return tempdir

def omc_run_data_update(self, omc_run_data: OMCSessionRunData) -> OMCSessionRunData:
"""
Modify data based on the selected OMCProcess implementation.

Needs to be implemented in the subclasses.
"""
return self.omc_process.omc_run_data_update(omc_run_data=omc_run_data)

@staticmethod
def run_model_executable(cmd_run_data: OMCSessionRunData) -> int:
"""
Expand Down Expand Up @@ -715,29 +860,41 @@ def sendExpression(self, command: str, parsed: bool = True) -> Any:
The complete error handling of the OMC result is done within this method using '"getMessagesStringInternal()'.
Caller should only check for OMCSessionException.
"""
if self.omc_zmq is None:
raise OMCSessionException("No OMC running. Create a new instance of OMCProcess!")

# this is needed if the class is not fully initialized or in the process of deletion
if hasattr(self, '_timeout'):
timeout = self._timeout
else:
timeout = 1.0

if self._omc_zmq is None:
raise OMCSessionException("No OMC running. Please create a new instance of OMCProcess!")

logger.debug("sendExpression(%r, parsed=%r)", command, parsed)

attempts = 0
while True:
try:
self.omc_zmq.send_string(str(command), flags=zmq.NOBLOCK)
self._omc_zmq.send_string(str(command), flags=zmq.NOBLOCK)
break
except zmq.error.Again:
pass
attempts += 1
if attempts >= 50:
raise OMCSessionException(f"No connection with OMC (timeout={self._timeout}). "
f"Log-file says: \n{self.omc_process.get_log()}")
time.sleep(self._timeout / 50.0)
# in the deletion process, the content is cleared. Thus, any access to a class attribute must be checked
try:
log_content = self.get_log()
except OMCSessionException:
log_content = 'log not available'
raise OMCSessionException(f"No connection with OMC (timeout={timeout}). "
f"Log-file says: \n{log_content}")
time.sleep(timeout / 50.0)
if command == "quit()":
self.omc_zmq.close()
self.omc_zmq = None
self._omc_zmq.close()
self._omc_zmq = None
return None

result = self.omc_zmq.recv_string()
result = self._omc_zmq.recv_string()

if result.startswith('Error occurred building AST'):
raise OMCSessionException(f"OMC error: {result}")
Expand All @@ -755,8 +912,8 @@ def sendExpression(self, command: str, parsed: bool = True) -> Any:
return result

# always check for error
self.omc_zmq.send_string('getMessagesStringInternal()', flags=zmq.NOBLOCK)
error_raw = self.omc_zmq.recv_string()
self._omc_zmq.send_string('getMessagesStringInternal()', flags=zmq.NOBLOCK)
error_raw = self._omc_zmq.recv_string()
# run error handling only if there is something to check
msg_long_list = []
has_error = False
Expand Down Expand Up @@ -839,69 +996,6 @@ def sendExpression(self, command: str, parsed: bool = True) -> Any:
except (TypeError, UnboundLocalError) as ex2:
raise OMCSessionException("Cannot parse OMC result") from ex2


class OMCProcess(metaclass=abc.ABCMeta):
"""
Metaclass to be used by all OMCProcess* implementations. The main task is the evaluation of the port to be used to
connect to the selected OMC process (method get_port()). Besides that, any implementation should define the method
omc_run_data_update() to finalize the definition of an OMC simulation.
"""

def __init__(
self,
timeout: float = 10.00,
**kwargs,
) -> None:
super().__init__(**kwargs)

# store variables
self._timeout = timeout

# omc process
self._omc_process: Optional[subprocess.Popen] = None
# omc ZMQ port to use
self._omc_port: Optional[str] = None

# generate a random string for this session
self._random_string = uuid.uuid4().hex

# omc port and log file
self._omc_filebase = f"openmodelica.{self._random_string}"

# get a temporary directory
self._temp_dir = pathlib.Path(tempfile.gettempdir())

# setup log file - this file must be closed in the destructor
logfile = self._temp_dir / (self._omc_filebase + ".log")
self._omc_loghandle: Optional[io.TextIOWrapper] = None
try:
self._omc_loghandle = open(file=logfile, mode="w+", encoding="utf-8")
except OSError as ex:
raise OMCSessionException(f"Cannot open log file {logfile}.") from ex

self._re_portfile_path = re.compile(pattern=r'\nDumped server port in file: (.*?)($|\n)',
flags=re.MULTILINE | re.DOTALL)

def __del__(self):
if self._omc_loghandle is not None:
try:
self._omc_loghandle.close()
except (OSError, IOError):
pass
self._omc_loghandle = None

if isinstance(self._omc_process, subprocess.Popen):
try:
self._omc_process.wait(timeout=2.0)
except subprocess.TimeoutExpired:
if self._omc_process:
logger.warning("OMC did not exit after being sent the quit() command; "
"killing the process with pid=%s", self._omc_process.pid)
self._omc_process.kill()
self._omc_process.wait()
finally:
self._omc_process = None

def get_port(self) -> Optional[str]:
"""
Get the port to connect to the OMC process.
Expand Down