Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 87 additions & 36 deletions src/ansys/mapdl/core/mapdl_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,27 @@ class MapdlGrpc(_MapdlCore):
>>> from ansys.mapdl import core as pymapdl
>>> mapdl = pymapdl.Mapdl()

Connect to an instance of MAPDL running on the LAN on a default port
Connect to an instance of MAPDL running on the LAN on a default port.

>>> mapdl = pymapdl.Mapdl('192.168.1.101')

Connect to an instance of MAPDL running on the LAN on a non-default port
Connect to an instance of MAPDL running on the LAN on a non-default port.

>>> mapdl = pymapdl.Mapdl('192.168.1.101', port=60001)

If you wish to customize the channel, you can also directly connect
directly to gRPC channels. For example, if you wanted to create an insecure
channel with a maximum message length of 8 MB.

>>> import grpc
>>> channel = grpc.insecure_channel(
... '127.0.0.1:50052',
... options=[
... ("grpc.max_receive_message_length", 8*1024**2),
... ],
... )
>>> mapdl = pymapdl.Mapdl(channel=channel)

"""

# Required by `_name` method to be defined before __init__ be
Expand All @@ -232,7 +246,7 @@ class MapdlGrpc(_MapdlCore):

def __init__(
self,
ip="127.0.0.1",
ip=None,
port=None,
timeout=15,
loglevel="WARNING",
Expand All @@ -242,11 +256,20 @@ def __init__(
set_no_abort=True,
remove_temp_files=False,
print_com=False,
channel=None,
**kwargs,
):
"""Initialize connection to the mapdl server"""
self.__distributed = None

if channel is not None:
if ip is not None or port is not None:
raise ValueError(
"If `channel` is specified, neither `port` nor `ip` can be specified."
)
elif ip is None:
ip = "127.0.0.1"

# port and ip are needed to setup the log
self._port = port
self._ip = ip
Expand All @@ -258,8 +281,6 @@ def __init__(
**kwargs,
)

check_valid_ip(ip)

# gRPC request specific locks as these gRPC request are not thread safe
self._vget_lock = False
self._get_lock = False
Expand All @@ -272,7 +293,6 @@ def __init__(
self._jobname = kwargs.pop("jobname", "file")
self._path = kwargs.pop("run_location", None)
self._busy = False # used to check if running a command on the server
self._channel_str = None
self._local = ip in ["127.0.0.1", "127.0.1.1", "localhost"]
if "local" in kwargs: # pragma: no cover # allow this to be overridden
self._local = kwargs["local"]
Expand All @@ -286,50 +306,93 @@ def __init__(
from ansys.mapdl.core.launcher import MAPDL_DEFAULT_PORT

port = MAPDL_DEFAULT_PORT
self._server = None
self._channel = None
self._state = None
self._stub = None
self._timeout = timeout
self._pids = []

# try to connect over a series of attempts rather than one
# single one. This prevents a single failed connection from
# blocking other attempts
n_attempts = 5 # consider adding this as a kwarg
if channel is None:
self._channel = self._create_channel(ip, port)
else:
self._channel = channel

# connect and validate to the channel
self._multi_connect()

# double check we have access to the local path if not
# explicitly specified
if "local" not in kwargs:
self._verify_local()

# only cache process IDs if launched locally
if self._local and "exec_file" in kwargs:
self._cache_pids()

def _create_channel(self, ip, port):
"""Create an insecured grpc channel."""
check_valid_ip(ip)

# open the channel
channel_str = f"{ip}:{port}"
self._log.debug("Opening insecure channel at %s", channel_str)
return grpc.insecure_channel(
channel_str,
options=[
("grpc.max_receive_message_length", MAX_MESSAGE_LENGTH),
],
)

def _multi_connect(self, n_attempts=5, timeout=15, set_no_abort=True):
"""Try to connect over a series of attempts to the channel.

Parameters
----------
n_attempts : int, optional
Number of connection attempts.
timeout : float, optional
Total timeout.
set_no_abort : bool, optional
Sets MAPDL to not abort at the first error within /BATCH mode.
Default ``True``.

"""
# This prevents a single failed connection from blocking other attempts
connected = False
attempt_timeout = timeout / n_attempts

max_time = time.time() + timeout
i = 0

while time.time() < max_time and i <= n_attempts:
self._log.debug("Connection attempt %d", i + 1)
connected = self._connect(
port, timeout=attempt_timeout, set_no_abort=set_no_abort
timeout=attempt_timeout, set_no_abort=set_no_abort
)
i += 1
if connected:
self._log.debug("Connected")
break
else:
self._log.debug(
f"Reached either maximum amount of connection attempts ({n_attempts}) or timeout ({timeout} s)."
"Reached either maximum amount of connection attempts (%d) or timeout (%f s).",
n_attempts,
timeout,
)

if not connected:
raise IOError(
"Unable to connect to MAPDL gRPC instance at %s" % self._channel_str
f"Unable to connect to MAPDL gRPC instance at {self._target_str}"
)

# double check we have access to the local path if not
# explicitly specified
if "local" not in kwargs:
self._verify_local()
@property
def _channel_str(self):
"""Return the target string.

# only cache process IDs if launched locally
if self._local and "exec_file" in kwargs:
self._cache_pids()
Generally of the form of "ip:port", like "127.0.0.1:50052".

"""
if self._channel is not None:
return self._channel._channel.target().decode()
return ""

def _verify_local(self):
"""Check if Python is local to the MAPDL instance."""
Expand Down Expand Up @@ -387,26 +450,14 @@ def __repr__(self):
info = super().__repr__()
return info

def _connect(self, port, timeout=5, set_no_abort=True, enable_health_check=False):
def _connect(self, timeout=5, set_no_abort=True, enable_health_check=False):
"""Establish a gRPC channel to a remote or local MAPDL instance.

Parameters
----------
timeout : float
Time in seconds to wait until the connection has been established
"""
self._server = {"ip": self._ip, "port": port}

# open the channel
self._channel_str = "%s:%d" % (self._ip, port)
self._log.debug("Opening insecure channel at %s", self._channel_str)
self._channel = grpc.insecure_channel(
self._channel_str,
options=[
("grpc.max_receive_message_length", MAX_MESSAGE_LENGTH),
],
)

self._state = grpc.channel_ready_future(self._channel)
self._stub = mapdl_grpc.MapdlServiceStub(self._channel)

Expand Down
17 changes: 17 additions & 0 deletions tests/test_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,23 @@ def setup_for_cmatrix(mapdl, cleared):
mapdl.run("/solu")


def test_connect_via_channel(mapdl):
"""Validate MapdlGrpc can be created directly from a channel."""

import grpc

from ansys.mapdl.core.mapdl_grpc import MAX_MESSAGE_LENGTH, MapdlGrpc

channel = grpc.insecure_channel(
mapdl._channel_str,
options=[
("grpc.max_receive_message_length", MAX_MESSAGE_LENGTH),
],
)
mapdl = MapdlGrpc(channel=channel)
assert mapdl.is_alive


def test_clear_nostart(mapdl):
resp = mapdl._send_command("FINISH")
resp = mapdl._send_command("/CLEAR, NOSTART")
Expand Down