diff --git a/.github/workflows/_pre_ce_test.yml b/.github/workflows/_pre_ce_test.yml index b2be6d456a7..ef0c47b1d87 100644 --- a/.github/workflows/_pre_ce_test.yml +++ b/.github/workflows/_pre_ce_test.yml @@ -92,6 +92,9 @@ jobs: echo "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}" echo "FD_METRICS_PORT=${FD_METRICS_PORT}" echo "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}" + echo "FD_ZMQ_RECV_REQUEST_SERVER_PORT=${FD_ZMQ_RECV_REQUEST_SERVER_PORT}" + echo "FD_ZMQ_SEND_RESPONSE_SERVER_PORT=${FD_ZMQ_SEND_RESPONSE_SERVER_PORT}" + echo "FD_ZMQ_CONTROL_CMD_SERVER_PORTS=${FD_ZMQ_CONTROL_CMD_SERVER_PORTS}" echo "DEVICES=${DEVICES}" echo "=========================================================" @@ -141,6 +144,9 @@ jobs: -e "FD_METRICS_PORT=${FD_METRICS_PORT}" \ -e "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}" \ -e "FLASK_PORT=${FLASK_PORT}" \ + -e "FD_ZMQ_RECV_REQUEST_SERVER_PORT=${FD_ZMQ_RECV_REQUEST_SERVER_PORT}" \ + -e "FD_ZMQ_SEND_RESPONSE_SERVER_PORT=${FD_ZMQ_SEND_RESPONSE_SERVER_PORT}" \ + -e "FD_ZMQ_CONTROL_CMD_SERVER_PORTS=${FD_ZMQ_CONTROL_CMD_SERVER_PORTS}" \ -e "fd_wheel_url=${fd_wheel_url}" \ --gpus "\"device=${DEVICES}\"" ${docker_image} /bin/bash -c ' git config --global --add safe.directory /workspace/FastDeploy diff --git a/.github/workflows/_unit_test_coverage.yml b/.github/workflows/_unit_test_coverage.yml index 1499c684710..70a07c0e90b 100644 --- a/.github/workflows/_unit_test_coverage.yml +++ b/.github/workflows/_unit_test_coverage.yml @@ -103,6 +103,8 @@ jobs: FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100)) FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100)) FD_CACHE_QUEUE_PORT=$((42098 + DEVICE_PORT * 100)) + FD_ROUTER_PORT=$((42048 + DEVICE_PORT * 100)) + FD_CONNECTOR_PORT=$((42038 + DEVICE_PORT * 100)) echo "Test ENV Parameter:" echo "=========================================================" echo "FLASK_PORT=${FLASK_PORT}" @@ -110,6 +112,8 @@ jobs: echo "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}" echo "FD_METRICS_PORT=${FD_METRICS_PORT}" echo "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}" + echo "FD_ROUTER_PORT=${FD_ROUTER_PORT}" + echo "FD_CONNECTOR_PORT=${FD_CONNECTOR_PORT}" echo "DEVICES=${DEVICES}" echo "=========================================================" @@ -159,6 +163,8 @@ jobs: -e "FD_METRICS_PORT=${FD_METRICS_PORT}" \ -e "FLASK_PORT=${FLASK_PORT}" \ -e "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}" \ + -e "FD_ROUTER_PORT=${FD_ROUTER_PORT}" \ + -e "FD_CONNECTOR_PORT=${FD_CONNECTOR_PORT}" \ -e TZ="Asia/Shanghai" \ -e "fd_wheel_url=${fd_wheel_url}" \ -e "BASE_REF=${BASE_REF}" \ diff --git a/tests/ci_use/EB_Lite/test_EB_Lite_serving.py b/tests/ci_use/EB_Lite/test_EB_Lite_serving.py index baeeb259864..0c73f0c04a5 100644 --- a/tests/ci_use/EB_Lite/test_EB_Lite_serving.py +++ b/tests/ci_use/EB_Lite/test_EB_Lite_serving.py @@ -17,7 +17,6 @@ import re import shutil import signal -import socket import subprocess import sys import time @@ -26,49 +25,17 @@ import pytest import requests -# Read ports from environment variables; use default values if not set -FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) -FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) -FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) -FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8234)) - -# List of ports to clean before and after tests -PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT] - - -def is_port_open(host: str, port: int, timeout=1.0): - """ - Check if a TCP port is open on the given host. - Returns True if connection succeeds, False otherwise. - """ - try: - with socket.create_connection((host, port), timeout): - return True - except Exception: - return False - - -def kill_process_on_port(port: int): - """ - Kill processes that are listening on the given port. - Uses `lsof` to find process ids and sends SIGKILL. - """ - try: - output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() - for pid in output.splitlines(): - os.kill(int(pid), signal.SIGKILL) - print(f"Killed process on port {port}, pid={pid}") - except subprocess.CalledProcessError: - pass - - -def clean_ports(): - """ - Kill all processes occupying the ports listed in PORTS_TO_CLEAN. - """ - for port in PORTS_TO_CLEAN: - kill_process_on_port(port) - time.sleep(2) +tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +sys.path.insert(0, tests_dir) + +from e2e.utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean_ports, + is_port_open, +) @pytest.fixture(scope="session", autouse=True) diff --git a/tests/ci_use/EB_Lite_with_adapter/test_eblite_serving.py b/tests/ci_use/EB_Lite_with_adapter/test_eblite_serving.py index 6202f4bae0f..72f19aeb852 100644 --- a/tests/ci_use/EB_Lite_with_adapter/test_eblite_serving.py +++ b/tests/ci_use/EB_Lite_with_adapter/test_eblite_serving.py @@ -90,38 +90,71 @@ def is_port_open(host: str, port: int, timeout=1.0): def kill_process_on_port(port: int): """ Kill processes that are listening on the given port. - Uses `lsof` to find process ids and sends SIGKILL. + Uses multiple methods to ensure thorough cleanup. """ + current_pid = os.getpid() + parent_pid = os.getppid() + + # Method 1: Use lsof to find processes try: output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() for pid in output.splitlines(): - os.kill(int(pid), signal.SIGKILL) - print(f"Killed process on port {port}, pid={pid}") + pid = int(pid) + if pid in (current_pid, parent_pid): + print(f"Skip killing current process (pid={pid}) on port {port}") + continue + try: + # First try SIGTERM for graceful shutdown + os.kill(pid, signal.SIGTERM) + time.sleep(1) + # Then SIGKILL if still running + os.kill(pid, signal.SIGKILL) + print(f"Killed process on port {port}, pid={pid}") + except ProcessLookupError: + pass # Process already terminated except subprocess.CalledProcessError: pass + # Method 2: Use netstat and fuser as backup try: - result = subprocess.run( - f"ps -ef -ww| grep {FD_CACHE_QUEUE_PORT} | grep -v grep", shell=True, capture_output=True, text=True - ) - for line in result.stdout.strip().split("\n"): - if not line: - continue - parts = line.split() - pid = int(parts[1]) # ps -ef 的第二列是 PID - print(f"Killing PID: {pid}") - os.kill(pid, signal.SIGKILL) - except Exception as e: - print(f"Failed to kill cache manager process: {e}") + # Find processes using netstat and awk + cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1" + output = subprocess.check_output(cmd, shell=True).decode().strip() + for pid in output.splitlines(): + if pid and pid.isdigit(): + pid = int(pid) + if pid in (current_pid, parent_pid): + continue + try: + os.kill(pid, signal.SIGKILL) + print(f"Killed process (netstat) on port {port}, pid={pid}") + except ProcessLookupError: + pass + except (subprocess.CalledProcessError, FileNotFoundError): + pass + + # Method 3: Use fuser if available + try: + subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5) + except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): + pass def clean_ports(): """ Kill all processes occupying the ports listed in PORTS_TO_CLEAN. """ + print(f"Cleaning ports: {PORTS_TO_CLEAN}") for port in PORTS_TO_CLEAN: kill_process_on_port(port) + + # Double check and retry if ports are still in use time.sleep(2) + for port in PORTS_TO_CLEAN: + if is_port_open("127.0.0.1", port, timeout=0.1): + print(f"Port {port} still in use, retrying cleanup...") + kill_process_on_port(port) + time.sleep(1) @pytest.fixture(scope="session", autouse=True) diff --git a/tests/ci_use/EB_VL_Lite/test_EB_VL_Lite_serving.py b/tests/ci_use/EB_VL_Lite/test_EB_VL_Lite_serving.py index 5da6de8e01a..53788b57743 100644 --- a/tests/ci_use/EB_VL_Lite/test_EB_VL_Lite_serving.py +++ b/tests/ci_use/EB_VL_Lite/test_EB_VL_Lite_serving.py @@ -16,7 +16,6 @@ import os import re import signal -import socket import subprocess import sys import time @@ -25,53 +24,21 @@ import pytest import requests -# Read ports from environment variables; use default values if not set -FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) -FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) -FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) -FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8234)) +tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +sys.path.insert(0, tests_dir) -# List of ports to clean before and after tests -PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT] +from e2e.utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean_ports, + is_port_open, +) os.environ["FD_USE_MACHETE"] = "0" -def is_port_open(host: str, port: int, timeout=1.0): - """ - Check if a TCP port is open on the given host. - Returns True if connection succeeds, False otherwise. - """ - try: - with socket.create_connection((host, port), timeout): - return True - except Exception: - return False - - -def kill_process_on_port(port: int): - """ - Kill processes that are listening on the given port. - Uses `lsof` to find process ids and sends SIGKILL. - """ - try: - output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() - for pid in output.splitlines(): - os.kill(int(pid), signal.SIGKILL) - print(f"Killed process on port {port}, pid={pid}") - except subprocess.CalledProcessError: - pass - - -def clean_ports(): - """ - Kill all processes occupying the ports listed in PORTS_TO_CLEAN. - """ - for port in PORTS_TO_CLEAN: - kill_process_on_port(port) - time.sleep(2) - - @pytest.fixture(scope="session", autouse=True) def setup_and_run_server(): """ diff --git a/tests/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py b/tests/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py index 43dc1e72626..dc6f61c6a9b 100644 --- a/tests/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py +++ b/tests/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py @@ -17,7 +17,6 @@ import os import re import signal -import socket import subprocess import sys import time @@ -27,49 +26,17 @@ import requests from jsonschema import validate -# Read ports from environment variables; use default values if not set -FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) -FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) -FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) -FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333)) - -# List of ports to clean before and after tests -PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT] - - -def is_port_open(host: str, port: int, timeout=1.0): - """ - Check if a TCP port is open on the given host. - Returns True if connection succeeds, False otherwise. - """ - try: - with socket.create_connection((host, port), timeout): - return True - except Exception: - return False - - -def kill_process_on_port(port: int): - """ - Kill processes that are listening on the given port. - Uses `lsof` to find process ids and sends SIGKILL. - """ - try: - output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() - for pid in output.splitlines(): - os.kill(int(pid), signal.SIGKILL) - print(f"Killed process on port {port}, pid={pid}") - except subprocess.CalledProcessError: - pass - - -def clean_ports(): - """ - Kill all processes occupying the ports listed in PORTS_TO_CLEAN. - """ - for port in PORTS_TO_CLEAN: - kill_process_on_port(port) - time.sleep(2) +tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +sys.path.insert(0, tests_dir) + +from e2e.utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean_ports, + is_port_open, +) @pytest.fixture(scope="session", autouse=True) diff --git a/tests/ci_use/Qwen2_5_VL/test_Qwen2_5_VL_serving.py b/tests/ci_use/Qwen2_5_VL/test_Qwen2_5_VL_serving.py index 65602b1eb96..98ed5567833 100644 --- a/tests/ci_use/Qwen2_5_VL/test_Qwen2_5_VL_serving.py +++ b/tests/ci_use/Qwen2_5_VL/test_Qwen2_5_VL_serving.py @@ -16,7 +16,6 @@ import os import re import signal -import socket import subprocess import sys import time @@ -25,47 +24,17 @@ import pytest import requests -# Read ports from environment variables; use default values if not set -FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) -FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) -FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) +tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +sys.path.insert(0, tests_dir) -# List of ports to clean before and after tests -PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT] - - -def is_port_open(host: str, port: int, timeout=1.0): - """ - Check if a TCP port is open on the given host. - Returns True if connection succeeds, False otherwise. - """ - try: - with socket.create_connection((host, port), timeout): - return True - except Exception: - return False - - -def kill_process_on_port(port: int): - """ - Kill processes that are listening on the given port. - Uses `lsof` to find process ids and sends SIGKILL. - """ - try: - output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() - for pid in output.splitlines(): - os.kill(int(pid), signal.SIGKILL) - print(f"Killed process on port {port}, pid={pid}") - except subprocess.CalledProcessError: - pass - - -def clean_ports(): - """ - Kill all processes occupying the ports listed in PORTS_TO_CLEAN. - """ - for port in PORTS_TO_CLEAN: - kill_process_on_port(port) +from e2e.utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean_ports, + is_port_open, +) @pytest.fixture(scope="session", autouse=True) @@ -99,6 +68,8 @@ def setup_and_run_server(): str(FD_ENGINE_QUEUE_PORT), "--metrics-port", str(FD_METRICS_PORT), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT), "--enable-mm", "--max-model-len", "32768", diff --git a/tests/ci_use/Qwen3-MoE/test_Qwen3-MoE_serving.py b/tests/ci_use/Qwen3-MoE/test_Qwen3-MoE_serving.py index ecff16bdc2a..d68bf93caa9 100644 --- a/tests/ci_use/Qwen3-MoE/test_Qwen3-MoE_serving.py +++ b/tests/ci_use/Qwen3-MoE/test_Qwen3-MoE_serving.py @@ -15,7 +15,6 @@ import os import re import signal -import socket import subprocess import sys import time @@ -23,49 +22,17 @@ import pytest import requests -# Read ports from environment variables; use default values if not set -FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) -FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) -FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) -FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333)) - -# List of ports to clean before and after tests -PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT] - - -def is_port_open(host: str, port: int, timeout=1.0): - """ - Check if a TCP port is open on the given host. - Returns True if connection succeeds, False otherwise. - """ - try: - with socket.create_connection((host, port), timeout): - return True - except Exception: - return False - - -def kill_process_on_port(port: int): - """ - Kill processes that are listening on the given port. - Uses `lsof` to find process ids and sends SIGKILL. - """ - try: - output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() - for pid in output.splitlines(): - os.kill(int(pid), signal.SIGKILL) - print(f"Killed process on port {port}, pid={pid}") - except subprocess.CalledProcessError: - pass - - -def clean_ports(): - """ - Kill all processes occupying the ports listed in PORTS_TO_CLEAN. - """ - for port in PORTS_TO_CLEAN: - kill_process_on_port(port) - time.sleep(2) +tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +sys.path.insert(0, tests_dir) + +from e2e.utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean_ports, + is_port_open, +) @pytest.fixture(scope="session", autouse=True) diff --git a/tests/e2e/test_ernie_03b_pd.py b/tests/e2e/test_ernie_03b_pd.py index 558a2159d8f..7d31a574a9e 100644 --- a/tests/e2e/test_ernie_03b_pd.py +++ b/tests/e2e/test_ernie_03b_pd.py @@ -16,20 +16,113 @@ import os import shutil import signal +import socket import subprocess import sys import time import pytest import requests -from utils.serving_utils import ( + +# Read ports from environment variables; use default values if not set +FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) +FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) +FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) +FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333)) + +# List of ports to clean before and after tests +PORTS_TO_CLEAN = [ FD_API_PORT, - FD_CACHE_QUEUE_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, - clean_ports, - is_port_open, -) + FD_CACHE_QUEUE_PORT, + FD_API_PORT + 1, + FD_ENGINE_QUEUE_PORT + 1, + FD_METRICS_PORT + 1, + FD_CACHE_QUEUE_PORT + 1, +] + + +def is_port_open(host: str, port: int, timeout=1.0): + """ + Check if a TCP port is open on the given host. + Returns True if connection succeeds, False otherwise. + """ + try: + with socket.create_connection((host, port), timeout): + return True + except Exception: + return False + + +def kill_process_on_port(port: int): + """ + Kill processes that are listening on the given port. + Uses multiple methods to ensure thorough cleanup. + """ + current_pid = os.getpid() + parent_pid = os.getppid() + + # Method 1: Use lsof to find processes + try: + output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() + for pid in output.splitlines(): + pid = int(pid) + if pid in (current_pid, parent_pid): + print(f"Skip killing current process (pid={pid}) on port {port}") + continue + try: + # First try SIGTERM for graceful shutdown + os.kill(pid, signal.SIGTERM) + time.sleep(1) + # Then SIGKILL if still running + os.kill(pid, signal.SIGKILL) + print(f"Killed process on port {port}, pid={pid}") + except ProcessLookupError: + pass # Process already terminated + except subprocess.CalledProcessError: + pass + + # Method 2: Use netstat and fuser as backup + try: + # Find processes using netstat and awk + cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1" + output = subprocess.check_output(cmd, shell=True).decode().strip() + for pid in output.splitlines(): + if pid and pid.isdigit(): + pid = int(pid) + if pid in (current_pid, parent_pid): + continue + try: + os.kill(pid, signal.SIGKILL) + print(f"Killed process (netstat) on port {port}, pid={pid}") + except ProcessLookupError: + pass + except (subprocess.CalledProcessError, FileNotFoundError): + pass + + # Method 3: Use fuser if available + try: + subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5) + except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): + pass + + +def clean_ports(): + """ + Kill all processes occupying the ports listed in PORTS_TO_CLEAN. + """ + print(f"Cleaning ports: {PORTS_TO_CLEAN}") + for port in PORTS_TO_CLEAN: + kill_process_on_port(port) + + # Double check and retry if ports are still in use + time.sleep(2) + for port in PORTS_TO_CLEAN: + if is_port_open("127.0.0.1", port, timeout=0.1): + print(f"Port {port} still in use, retrying cleanup...") + kill_process_on_port(port) + time.sleep(1) @pytest.fixture(scope="session", autouse=True) diff --git a/tests/e2e/test_ernie_03b_pd_multi_node.py b/tests/e2e/test_ernie_03b_pd_multi_node.py index 0417fdacd63..b1cc1fd2ac4 100644 --- a/tests/e2e/test_ernie_03b_pd_multi_node.py +++ b/tests/e2e/test_ernie_03b_pd_multi_node.py @@ -108,30 +108,71 @@ def get_registered_number(router_url) -> list: def kill_process_on_port(port: int): """ Kill processes that are listening on the given port. - Uses `lsof` to find process ids and sends SIGKILL. + Uses multiple methods to ensure thorough cleanup. """ + current_pid = os.getpid() + parent_pid = os.getppid() + + # Method 1: Use lsof to find processes try: output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() - current_pid = os.getpid() - parent_pid = os.getppid() for pid in output.splitlines(): pid = int(pid) if pid in (current_pid, parent_pid): print(f"Skip killing current process (pid={pid}) on port {port}") continue - os.kill(pid, signal.SIGKILL) - print(f"Killed process on port {port}, pid={pid}") + try: + # First try SIGTERM for graceful shutdown + os.kill(pid, signal.SIGTERM) + time.sleep(1) + # Then SIGKILL if still running + os.kill(pid, signal.SIGKILL) + print(f"Killed process on port {port}, pid={pid}") + except ProcessLookupError: + pass # Process already terminated except subprocess.CalledProcessError: pass + # Method 2: Use netstat and fuser as backup + try: + # Find processes using netstat and awk + cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1" + output = subprocess.check_output(cmd, shell=True).decode().strip() + for pid in output.splitlines(): + if pid and pid.isdigit(): + pid = int(pid) + if pid in (current_pid, parent_pid): + continue + try: + os.kill(pid, signal.SIGKILL) + print(f"Killed process (netstat) on port {port}, pid={pid}") + except ProcessLookupError: + pass + except (subprocess.CalledProcessError, FileNotFoundError): + pass + + # Method 3: Use fuser if available + try: + subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5) + except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): + pass + def clean_ports(): """ Kill all processes occupying the ports listed in PORTS_TO_CLEAN. """ + print(f"Cleaning ports: {PORTS_TO_CLEAN}") for port in PORTS_TO_CLEAN: kill_process_on_port(port) + + # Double check and retry if ports are still in use time.sleep(2) + for port in PORTS_TO_CLEAN: + if is_port_open("127.0.0.1", port, timeout=0.1): + print(f"Port {port} still in use, retrying cleanup...") + kill_process_on_port(port) + time.sleep(1) @pytest.fixture(scope="session", autouse=True) diff --git a/tests/e2e/test_ernie_03b_router.py b/tests/e2e/test_ernie_03b_router.py index 4037f6b775a..4b2cf71dfbb 100644 --- a/tests/e2e/test_ernie_03b_router.py +++ b/tests/e2e/test_ernie_03b_router.py @@ -107,30 +107,71 @@ def get_registered_number(router_url) -> list: def kill_process_on_port(port: int): """ Kill processes that are listening on the given port. - Uses `lsof` to find process ids and sends SIGKILL. + Uses multiple methods to ensure thorough cleanup. """ + current_pid = os.getpid() + parent_pid = os.getppid() + + # Method 1: Use lsof to find processes try: output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() - current_pid = os.getpid() - parent_pid = os.getppid() for pid in output.splitlines(): pid = int(pid) if pid in (current_pid, parent_pid): print(f"Skip killing current process (pid={pid}) on port {port}") continue - os.kill(pid, signal.SIGKILL) - print(f"Killed process on port {port}, pid={pid}") + try: + # First try SIGTERM for graceful shutdown + os.kill(pid, signal.SIGTERM) + time.sleep(1) + # Then SIGKILL if still running + os.kill(pid, signal.SIGKILL) + print(f"Killed process on port {port}, pid={pid}") + except ProcessLookupError: + pass # Process already terminated except subprocess.CalledProcessError: pass + # Method 2: Use netstat and fuser as backup + try: + # Find processes using netstat and awk + cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1" + output = subprocess.check_output(cmd, shell=True).decode().strip() + for pid in output.splitlines(): + if pid and pid.isdigit(): + pid = int(pid) + if pid in (current_pid, parent_pid): + continue + try: + os.kill(pid, signal.SIGKILL) + print(f"Killed process (netstat) on port {port}, pid={pid}") + except ProcessLookupError: + pass + except (subprocess.CalledProcessError, FileNotFoundError): + pass + + # Method 3: Use fuser if available + try: + subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5) + except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): + pass + def clean_ports(): """ Kill all processes occupying the ports listed in PORTS_TO_CLEAN. """ + print(f"Cleaning ports: {PORTS_TO_CLEAN}") for port in PORTS_TO_CLEAN: kill_process_on_port(port) + + # Double check and retry if ports are still in use time.sleep(2) + for port in PORTS_TO_CLEAN: + if is_port_open("127.0.0.1", port, timeout=0.1): + print(f"Port {port} still in use, retrying cleanup...") + kill_process_on_port(port) + time.sleep(1) @pytest.fixture(scope="session", autouse=True) diff --git a/tests/model_loader/test_load_ernie_vl.py b/tests/model_loader/test_load_ernie_vl.py index 68ef55c314e..abbdeb542f5 100644 --- a/tests/model_loader/test_load_ernie_vl.py +++ b/tests/model_loader/test_load_ernie_vl.py @@ -53,24 +53,71 @@ def is_port_open(host: str, port: int, timeout=1.0): def kill_process_on_port(port: int): """ Kill processes that are listening on the given port. - Uses `lsof` to find process ids and sends SIGKILL. + Uses multiple methods to ensure thorough cleanup. """ + current_pid = os.getpid() + parent_pid = os.getppid() + + # Method 1: Use lsof to find processes try: output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() for pid in output.splitlines(): - os.kill(int(pid), signal.SIGKILL) - print(f"Killed process on port {port}, pid={pid}") + pid = int(pid) + if pid in (current_pid, parent_pid): + print(f"Skip killing current process (pid={pid}) on port {port}") + continue + try: + # First try SIGTERM for graceful shutdown + os.kill(pid, signal.SIGTERM) + time.sleep(1) + # Then SIGKILL if still running + os.kill(pid, signal.SIGKILL) + print(f"Killed process on port {port}, pid={pid}") + except ProcessLookupError: + pass # Process already terminated except subprocess.CalledProcessError: pass + # Method 2: Use netstat and fuser as backup + try: + # Find processes using netstat and awk + cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1" + output = subprocess.check_output(cmd, shell=True).decode().strip() + for pid in output.splitlines(): + if pid and pid.isdigit(): + pid = int(pid) + if pid in (current_pid, parent_pid): + continue + try: + os.kill(pid, signal.SIGKILL) + print(f"Killed process (netstat) on port {port}, pid={pid}") + except ProcessLookupError: + pass + except (subprocess.CalledProcessError, FileNotFoundError): + pass + + # Method 3: Use fuser if available + try: + subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5) + except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): + pass + def clean_ports(): """ Kill all processes occupying the ports listed in PORTS_TO_CLEAN. """ + print(f"Cleaning ports: {PORTS_TO_CLEAN}") for port in PORTS_TO_CLEAN: kill_process_on_port(port) + + # Double check and retry if ports are still in use time.sleep(2) + for port in PORTS_TO_CLEAN: + if is_port_open("127.0.0.1", port, timeout=0.1): + print(f"Port {port} still in use, retrying cleanup...") + kill_process_on_port(port) + time.sleep(1) @pytest.fixture(scope="session", autouse=True)