Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ env:
# You should go up in number, if you go down (or repeat a previous value)
# you might end up reusing a previous cache if it haven't been deleted already.
# It applies 7 days retention policy by default.
RESET_EXAMPLES_CACHE: 2
RESET_EXAMPLES_CACHE: 3

jobs:
stylecheck:
Expand Down
4 changes: 3 additions & 1 deletion requirements/requirements_tests.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pytest
pytest-mock
pytest-cov
pytest-cov
psutil>=5.9.1
docker>=5.0.3
4 changes: 3 additions & 1 deletion src/ansys/fluent/core/launcher/fluent_container.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from pathlib import Path
import socket
import subprocess
import tempfile
Expand Down Expand Up @@ -36,6 +37,7 @@ def start_fluent_container(mounted_from: str, mounted_to: str, args: List[str])
timeout = 100
license_server = os.environ["ANSYSLMD_LICENSE_FILE"]
port = _get_free_port()
container_sifile = mounted_to + "/" + Path(sifile).name

try:
subprocess.run(
Expand All @@ -56,7 +58,7 @@ def start_fluent_container(mounted_from: str, mounted_to: str, args: List[str])
"FLUENT_LAUNCHED_FROM_PYFLUENT=1",
"ghcr.io/pyansys/pyfluent",
"-g",
f"-sifile={sifile}",
f"-sifile={container_sifile}",
]
+ args
)
Expand Down
70 changes: 41 additions & 29 deletions src/ansys/fluent/core/session.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Module containing class encapsulating Fluent connection."""

import atexit
from ctypes import c_int, sizeof
import itertools
import os
import threading
from typing import Any, Callable, List, Optional, Tuple
import weakref

import grpc

Expand Down Expand Up @@ -243,13 +243,24 @@ def __init__(
self.scheme_eval = SchemeEval(self._scheme_eval_service)

self._cleanup_on_exit = cleanup_on_exit
Session._monitor_thread.cbs.append(self.exit)

if start_transcript:
self.start_transcript()

self._remote_instance = remote_instance

self._finalizer = weakref.finalize(
self,
Session._exit,
self._channel,
self._cleanup_on_exit,
self.scheme_eval,
self._transcript_service,
self.events_manager,
self._remote_instance,
)
Session._monitor_thread.cbs.append(self._finalizer)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still required to avoid the hang during exit.


@classmethod
def create_from_server_info_file(
cls,
Expand Down Expand Up @@ -293,26 +304,28 @@ def id(self) -> str:
"""Return the session id."""
return self._id

def _print_transcript(self, transcript: str):
@staticmethod
def _print_transcript(transcript: str):
print(transcript)

def _process_transcript(self):
responses = self._transcript_service.begin_streaming()
@staticmethod
def _process_transcript(transcript_service):
responses = transcript_service.begin_streaming()
transcript = ""
while True:
try:
response = next(responses)
transcript += response.transcript
if transcript[-1] == "\n":
self._print_transcript(transcript[0:-1])
Session._print_transcript(transcript[0:-1])
transcript = ""
except StopIteration:
break

def start_transcript(self) -> None:
"""Start streaming of Fluent transcript."""
self._transcript_thread = threading.Thread(
target=Session._process_transcript, args=(self,)
target=Session._process_transcript, args=(self._transcript_service,)
)

self._transcript_thread.start()
Expand All @@ -333,32 +346,34 @@ def check_health(self) -> str:

def exit(self) -> None:
"""Close the Fluent connection and exit Fluent."""
if self._channel:
if self._cleanup_on_exit:
self.scheme_eval.exec(("(exit-server)",))
self._transcript_service.end_streaming()
self.events_manager.stop()
self._channel.close()
self._channel = None
self._finalizer()

if self._remote_instance:
self._remote_instance.delete()
@staticmethod
def _exit(
channel,
cleanup_on_exit,
scheme_eval,
transcript_service,
events_manager,
remote_instance,
) -> None:
if channel:
if cleanup_on_exit:
scheme_eval.exec(("(exit-server)",))
transcript_service.end_streaming()
events_manager.stop()
channel.close()
channel = None

if remote_instance:
remote_instance.delete()

def __enter__(self):
"""Close the Fluent connection and exit Fluent."""
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any):
self.exit()

@classmethod
def register_on_exit(cls, callback: Callable) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used anywhere, thus removed it

cls._on_exit_cbs.append(callback)

@staticmethod
def exit_all() -> None:
for cb in Session._on_exit_cbs:
cb()
self._finalizer()

class Meshing:
def __init__(
Expand Down Expand Up @@ -484,6 +499,3 @@ def root(self):
LOG.warning("The settings API is currently experimental.")
self._settings_root = settings_get_root(flproxy=self._settings_service)
return self._settings_root


atexit.register(Session.exit_all)
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@


@pytest.fixture
def with_running_pytest(monkeypatch: pytest.MonkeyPatch) -> None:
def with_launching_container(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("PYFLUENT_LAUNCH_CONTAINER", "1")
52 changes: 51 additions & 1 deletion tests/test_fluent_session.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import os
import time

import psutil
from util.solver_workflow import ( # noqa: F401
new_solver_session,
new_solver_session_no_transcript,
)

import ansys.fluent.core as pyfluent
from ansys.fluent.core.examples import download_file
from ansys.fluent.core.session import Session
import docker


def _read_case(session):
Expand All @@ -22,7 +29,7 @@ def print_transcript(transcript):
print_transcript.called = False
print_transcript.transcript = None

session._print_transcript = print_transcript
Session._print_transcript = print_transcript

_read_case(session=session)

Expand All @@ -45,3 +52,46 @@ def print_transcript(transcript):
_read_case(session=session)

assert not print_transcript.called


def test_server_exits_when_session_goes_out_of_scope(with_launching_container) -> None:
def f():
session = pyfluent.launch_fluent()
f.server_pid = session.scheme_eval.scheme_eval("(%cx-process-id)")

if os.getenv("PYFLUENT_START_INSTANCE") == "0":
client = docker.from_env()
containers_before = client.containers.list()
f()
time.sleep(10)
containers_after = client.containers.list()
new_containers = set(containers_after) - set(containers_before)
assert not new_containers
Copy link
Contributor Author

@mkundu1 mkundu1 Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if the container has been shut down during CI run.

else:
f()
time.sleep(10)
assert not psutil.pid_exists(f.server_pid)


def test_server_does_not_exit_when_session_goes_out_of_scope(
with_launching_container,
) -> None:
def f():
session = pyfluent.launch_fluent(cleanup_on_exit=False)
f.server_pid = session.scheme_eval.scheme_eval("(%cx-process-id)")

if os.getenv("PYFLUENT_START_INSTANCE") == "0":
client = docker.from_env()
containers_before = client.containers.list()
f()
time.sleep(10)
containers_after = client.containers.list()
new_containers = set(containers_after) - set(containers_before)
assert new_containers
for container in new_containers:
container.stop()
else:
f()
time.sleep(10)
assert psutil.pid_exists(f.server_pid)
psutil.Process(f.server_pid).kill()
2 changes: 2 additions & 0 deletions tests/test_tui_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import pytest
from util.solver_workflow import new_solver_session # noqa: F401


@pytest.mark.skip("randomly failing due to missing transcript capture")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disabling this for now, we have other unittests making TUI calls.

def test_report_system_proc_stats_tui(new_solver_session, capsys) -> None:
new_solver_session.start_transcript()
# Issue: Transcript missing for the first TUI command
Expand Down
6 changes: 3 additions & 3 deletions tests/util/meshing_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def reset_workflow(mesh_session):


@pytest.fixture
def new_mesh_session(with_running_pytest):
def new_mesh_session(with_launching_container):
mesher = create_mesh_session()
yield mesher
mesher.exit()
Expand All @@ -75,7 +75,7 @@ def new_watertight_workflow(new_watertight_workflow_session):


@pytest.fixture
def shared_mesh_session(with_running_pytest):
def shared_mesh_session(with_launching_container):
global _mesher
if not _mesher:
_mesher = create_mesh_session()
Expand Down Expand Up @@ -128,7 +128,7 @@ def new_fault_tolerant_workflow(new_fault_tolerant_workflow_session):


@pytest.fixture
def shared_mesh_session(with_running_pytest):
def shared_mesh_session(with_launching_container):
global _mesher
if not _mesher:
_mesher = create_mesh_session()
Expand Down
4 changes: 2 additions & 2 deletions tests/util/solver_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ def create_solver_session(*args, **kwargs):


@pytest.fixture
def new_solver_session(with_running_pytest):
def new_solver_session(with_launching_container):
solver = create_solver_session()
yield solver
solver.exit()


@pytest.fixture
def new_solver_session_no_transcript(with_running_pytest):
def new_solver_session_no_transcript(with_launching_container):
solver = create_solver_session(start_transcript=False)
yield solver
solver.exit()