Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/ansys/fluent/core/services/datamodel_se.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,8 +667,6 @@ def _get_static_info(self):
# Populate the static info with respect to a rules only if the
# same info has not been obtained in another context already.
# If the information is available, we can use it without additional remote calls.
# TODO: We need to coordinate the code so that the global infos are commonly
# available in all contexts (without additional remote calls)
request = DataModelProtoModule.GetStaticInfoRequest()
request.rules = self.rules
response = self.service.get_static_info(request)
Expand Down
33 changes: 7 additions & 26 deletions src/ansys/fluent/core/services/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,15 @@

from ansys.api.fluent.v0 import events_pb2 as EventsProtoModule
from ansys.api.fluent.v0 import events_pb2_grpc as EventsGrpcModule
from ansys.fluent.core.services.streaming import StreamingService


class EventsService:
class EventsService(StreamingService):
"""Class wrapping the events gRPC service of Fluent."""

def __init__(self, channel: grpc.Channel, metadata):
self.__stub = EventsGrpcModule.EventsStub(channel)
self.__metadata = metadata
self.__streams = None

def begin_streaming(self, started_evt):
"""Begin events streaming from Fluent.

Yields
------
Event
"""
request = EventsProtoModule.BeginStreamingRequest()
self.__streams = self.__stub.BeginStreaming(request, metadata=self.__metadata)
started_evt.set()
while True:
try:
yield next(self.__streams)
except Exception:
break

def end_streaming(self):
"""End events streaming from Fluent."""

if self.__streams and not self.__streams.cancelled():
self.__streams.cancel()
super().__init__(
stub=EventsGrpcModule.EventsStub(channel),
request=EventsProtoModule.BeginStreamingRequest(),
metadata=metadata,
)
31 changes: 28 additions & 3 deletions src/ansys/fluent/core/services/health_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class Status(Enum):
SERVICE_UNKNOWN = 3

def __init__(self, channel: grpc.Channel, metadata: List[Tuple[str, str]]):
self.__stub = HealthCheckGrpcModule.HealthStub(channel)
self.__metadata = metadata
self._stub = HealthCheckGrpcModule.HealthStub(channel)
self._metadata = metadata
self._channel = channel

@catch_grpc_error
Expand All @@ -40,9 +40,34 @@ def check_health(self) -> str:
"SERVING" or "NOT_SERVING"
"""
request = HealthCheckModule.HealthCheckRequest()
response = self.__stub.Check(request, metadata=self.__metadata)
response = self._stub.Check(request, metadata=self._metadata)
return HealthCheckService.Status(response.status).name

@catch_grpc_error
def wait_for_server(self, timeout) -> None:
"""Keeps a watch on the health of the Fluent connection.

Response changes only when the service's serving status changes.
"""
request = HealthCheckModule.HealthCheckRequest()
responses = self._stub.Watch(request, metadata=self._metadata, timeout=timeout)

while True:
try:
response = next(responses)
if response.status == 1:
responses.cancel()
except StopIteration:
break
except Exception as e:
if e.code() == grpc.StatusCode.CANCELLED:
break
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
raise TimeoutError(
f"The connection to the Fluent server could not be established within the configurable {timeout} second time limit."
)
raise

def status(self) -> str:
"""Check health of Fluent connection."""
if self._channel:
Expand Down
51 changes: 10 additions & 41 deletions src/ansys/fluent/core/services/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@

from ansys.api.fluent.v0 import monitor_pb2 as MonitorModule
from ansys.api.fluent.v0 import monitor_pb2_grpc as MonitorGrpcModule
from ansys.fluent.core.services.streaming import StreamingService


class MonitorsService:
class MonitorsService(StreamingService):
"""Class wrapping the monitor gRPC service of Fluent."""

def __init__(self, channel: grpc.Channel, metadata):
self.__stub = MonitorGrpcModule.MonitorStub(channel)
self.__metadata = metadata
self._streams = None
self._stub = MonitorGrpcModule.MonitorStub(channel)
self._metadata = metadata
super().__init__(
stub=self._stub,
request=MonitorModule.StreamingRequest(),
metadata=self._metadata,
)

def get_monitors_info(self) -> dict:
"""Get monitors information.
Expand All @@ -29,44 +34,8 @@ def get_monitors_info(self) -> dict:
"""
monitors_info = {}
request = MonitorModule.GetMonitorsRequest()
response = self.__stub.GetMonitors(request, metadata=self.__metadata)
response = self._stub.GetMonitors(request, metadata=self._metadata)
for monitor_set in response.monitorset:
monitor_info = MessageToDict(monitor_set)
monitors_info[monitor_set.name] = monitor_info
return monitors_info

def begin_streaming(self, started_evt):
"""Begin monitor streaming from Fluent.

Parameters
----------
None

Yields
-------
Monitor data
Monitor data i.e monitor x and y values.
"""

request = MonitorModule.StreamingRequest()
self._streams = self.__stub.BeginStreaming(request, metadata=self.__metadata)
started_evt.set()
while True:
try:
yield next(self._streams)
except Exception:
break

def end_streaming(self):
"""End monitor streaming from Fluent.

Parameters
----------
None

Returns
-------
None
"""
if self._streams and not self._streams.cancelled():
self._streams.cancel()
38 changes: 38 additions & 0 deletions src/ansys/fluent/core/services/streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Wrapper over the streaming grpc services of Fluent."""

from typing import Generator, List, Tuple


class StreamingService:
"""Class wrapping the streaming gRPC services of Fluent.

Methods
-------
begin_streaming
Begin streaming from Fluent.

end_streaming
End streaming
"""

def __init__(self, stub, request, metadata: List[Tuple[str, str]]):
self._stub = stub
self._metadata = metadata
self.request = request
self._streams = None

def begin_streaming(self, started_evt) -> Generator:
"""Begin streaming from Fluent."""
request = self.request
self._streams = self._stub.BeginStreaming(request, metadata=self._metadata)
started_evt.set()
while True:
try:
yield next(self._streams)
except Exception:
break

def end_streaming(self) -> None:
"""End streaming from Fluent."""
if self._streams and not self._streams.cancelled():
self._streams.cancel()
44 changes: 9 additions & 35 deletions src/ansys/fluent/core/services/transcript.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,20 @@
"""Wrapper over the transcript grpc service of Fluent."""

from typing import Generator, List, Tuple
from typing import List, Tuple

import grpc

from ansys.api.fluent.v0 import transcript_pb2 as TranscriptModule
from ansys.api.fluent.v0 import transcript_pb2_grpc as TranscriptGrpcModule
from ansys.fluent.core.services.streaming import StreamingService


class TranscriptService:
"""Class wrapping the transcript gRPC service of Fluent.

Methods
-------
begin_streaming
Begin transcript streaming from Fluent.
"""
class TranscriptService(StreamingService):
"""Class wrapping the transcript gRPC service of Fluent."""

def __init__(self, channel: grpc.Channel, metadata: List[Tuple[str, str]]):
self.__stub = TranscriptGrpcModule.TranscriptStub(channel)
self.__metadata = metadata
self.__streams = None

def begin_streaming(
self, started_evt
) -> Generator[TranscriptModule.TranscriptResponse, None, None]:
"""Begin transcript streaming from Fluent.

Yields
------
str
A transcript line
"""
request = TranscriptModule.TranscriptRequest()
self.__streams = self.__stub.BeginStreaming(request, metadata=self.__metadata)
started_evt.set()
while True:
try:
yield next(self.__streams)
except Exception:
break

def end_streaming(self) -> None:
if self.__streams and not self.__streams.cancelled():
self.__streams.cancel()
super().__init__(
stub=TranscriptGrpcModule.TranscriptStub(channel),
request=TranscriptModule.TranscriptRequest(),
metadata=metadata,
)
36 changes: 33 additions & 3 deletions tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from concurrent import futures
import os
from pathlib import Path
import time

import grpc
import pytest
Expand Down Expand Up @@ -31,12 +32,43 @@ def Check(self, request, context: grpc.ServicerContext): # noqa N802
status=health_pb2.HealthCheckResponse.ServingStatus.SERVING
)

def Watch(self, request, context: grpc.ServicerContext): # noqa N802
metadata = dict(context.invocation_metadata())
password = metadata.get("password", None)
if password != "12345":
context.set_code(grpc.StatusCode.UNAUTHENTICATED)
yield health_pb2.HealthCheckResponse()

c = 0
while c < 2:
time.sleep(1)
c += 1
yield health_pb2.HealthCheckResponse(
status=health_pb2.HealthCheckResponse.ServingStatus.NOT_SERVING
)

time.sleep(1)
yield health_pb2.HealthCheckResponse(
status=health_pb2.HealthCheckResponse.ServingStatus.SERVING
)


class MockSchemeEvalServicer(scheme_eval_pb2_grpc.SchemeEvalServicer):
def StringEval(self, request, context):
if request.input == "(cx-version)":
return scheme_eval_pb2.StringEvalResponse(output="(23 1 0)")

def SchemeEval(
self,
request,
context: grpc.ServicerContext,
) -> scheme_eval_pb2.SchemeEvalResponse:
metadata = dict(context.invocation_metadata())
password = metadata.get("password", None)
if password != "12345":
context.set_code(grpc.StatusCode.UNAUTHENTICATED)
return scheme_eval_pb2.SchemeEvalResponse()


def test_create_session_by_passing_ip_and_port_and_password() -> None:
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
Expand Down Expand Up @@ -126,7 +158,6 @@ def test_create_session_from_server_info_file_with_wrong_password(
ip = "127.0.0.1"
port = 50051
server.add_insecure_port(f"{ip}:{port}")
health_pb2_grpc.add_HealthServicer_to_server(MockHealthServicer(), server)
scheme_eval_pb2_grpc.add_SchemeEvalServicer_to_server(
MockSchemeEvalServicer(), server
)
Expand All @@ -137,10 +168,9 @@ def test_create_session_from_server_info_file_with_wrong_password(
session = _BaseSession.create_from_server_info_file(
server_info_filepath=str(server_info_file), cleanup_on_exit=False
)
assert session.health_check_service.is_serving
session.scheme_eval.scheme_eval("")
server.stop(None)
session.exit()
assert not session.health_check_service.is_serving


def test_create_session_from_launch_fluent_by_passing_ip_and_port_and_password() -> None:
Expand Down