Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/_pre_ce_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ jobs:
echo "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}"
echo "FD_METRICS_PORT=${FD_METRICS_PORT}"
echo "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}"
echo "FD_ZMQ_RECV_REQUEST_SERVER_PORT=${FD_ZMQ_RECV_REQUEST_SERVER_PORT}"
echo "FD_ZMQ_SEND_RESPONSE_SERVER_PORT=${FD_ZMQ_SEND_RESPONSE_SERVER_PORT}"
echo "FD_ZMQ_CONTROL_CMD_SERVER_PORTS=${FD_ZMQ_CONTROL_CMD_SERVER_PORTS}"
echo "DEVICES=${DEVICES}"
echo "========================================================="

Expand Down Expand Up @@ -141,6 +144,9 @@ jobs:
-e "FD_METRICS_PORT=${FD_METRICS_PORT}" \
-e "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}" \
-e "FLASK_PORT=${FLASK_PORT}" \
-e "FD_ZMQ_RECV_REQUEST_SERVER_PORT=${FD_ZMQ_RECV_REQUEST_SERVER_PORT}" \
-e "FD_ZMQ_SEND_RESPONSE_SERVER_PORT=${FD_ZMQ_SEND_RESPONSE_SERVER_PORT}" \
-e "FD_ZMQ_CONTROL_CMD_SERVER_PORTS=${FD_ZMQ_CONTROL_CMD_SERVER_PORTS}" \
-e "fd_wheel_url=${fd_wheel_url}" \
--gpus "\"device=${DEVICES}\"" ${docker_image} /bin/bash -c '
git config --global --add safe.directory /workspace/FastDeploy
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/_unit_test_coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,17 @@ jobs:
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((42098 + DEVICE_PORT * 100))
FD_ROUTER_PORT=$((42048 + DEVICE_PORT * 100))
FD_CONNECTOR_PORT=$((42038 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"
echo "FD_API_PORT=${FD_API_PORT}"
echo "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}"
echo "FD_METRICS_PORT=${FD_METRICS_PORT}"
echo "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}"
echo "FD_ROUTER_PORT=${FD_ROUTER_PORT}"
echo "FD_CONNECTOR_PORT=${FD_CONNECTOR_PORT}"
echo "DEVICES=${DEVICES}"
echo "========================================================="

Expand Down Expand Up @@ -159,6 +163,8 @@ jobs:
-e "FD_METRICS_PORT=${FD_METRICS_PORT}" \
-e "FLASK_PORT=${FLASK_PORT}" \
-e "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}" \
-e "FD_ROUTER_PORT=${FD_ROUTER_PORT}" \
-e "FD_CONNECTOR_PORT=${FD_CONNECTOR_PORT}" \
-e TZ="Asia/Shanghai" \
-e "fd_wheel_url=${fd_wheel_url}" \
-e "BASE_REF=${BASE_REF}" \
Expand Down
55 changes: 11 additions & 44 deletions tests/ci_use/EB_Lite/test_EB_Lite_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import re
import shutil
import signal
import socket
import subprocess
import sys
import time
Expand All @@ -26,49 +25,17 @@
import pytest
import requests

# Read ports from environment variables; use default values if not set
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8234))

# List of ports to clean before and after tests
PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT]


def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False


def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
"""
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except subprocess.CalledProcessError:
pass


def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)
time.sleep(2)
tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, tests_dir)

from e2e.utils.serving_utils import (
FD_API_PORT,
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
is_port_open,
)


@pytest.fixture(scope="session", autouse=True)
Expand Down
63 changes: 48 additions & 15 deletions tests/ci_use/EB_Lite_with_adapter/test_eblite_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,38 +90,71 @@ def is_port_open(host: str, port: int, timeout=1.0):
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
Uses multiple methods to ensure thorough cleanup.
"""
current_pid = os.getpid()
parent_pid = os.getppid()

# Method 1: Use lsof to find processes
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
pid = int(pid)
if pid in (current_pid, parent_pid):
print(f"Skip killing current process (pid={pid}) on port {port}")
continue
try:
# First try SIGTERM for graceful shutdown
os.kill(pid, signal.SIGTERM)
time.sleep(1)
# Then SIGKILL if still running
os.kill(pid, signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except ProcessLookupError:
pass # Process already terminated
except subprocess.CalledProcessError:
pass

# Method 2: Use netstat and fuser as backup
try:
result = subprocess.run(
f"ps -ef -ww| grep {FD_CACHE_QUEUE_PORT} | grep -v grep", shell=True, capture_output=True, text=True
)
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split()
pid = int(parts[1]) # ps -ef 的第二列是 PID
print(f"Killing PID: {pid}")
os.kill(pid, signal.SIGKILL)
except Exception as e:
print(f"Failed to kill cache manager process: {e}")
# Find processes using netstat and awk
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
output = subprocess.check_output(cmd, shell=True).decode().strip()
for pid in output.splitlines():
if pid and pid.isdigit():
pid = int(pid)
if pid in (current_pid, parent_pid):
continue
try:
os.kill(pid, signal.SIGKILL)
print(f"Killed process (netstat) on port {port}, pid={pid}")
except ProcessLookupError:
pass
except (subprocess.CalledProcessError, FileNotFoundError):
pass

# Method 3: Use fuser if available
try:
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass


def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
print(f"Cleaning ports: {PORTS_TO_CLEAN}")
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)

# Double check and retry if ports are still in use
time.sleep(2)
for port in PORTS_TO_CLEAN:
if is_port_open("127.0.0.1", port, timeout=0.1):
print(f"Port {port} still in use, retrying cleanup...")
kill_process_on_port(port)
time.sleep(1)


@pytest.fixture(scope="session", autouse=True)
Expand Down
53 changes: 10 additions & 43 deletions tests/ci_use/EB_VL_Lite/test_EB_VL_Lite_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import os
import re
import signal
import socket
import subprocess
import sys
import time
Expand All @@ -25,53 +24,21 @@
import pytest
import requests

# Read ports from environment variables; use default values if not set
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8234))
tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, tests_dir)

# List of ports to clean before and after tests
PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT]
from e2e.utils.serving_utils import (
FD_API_PORT,
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
is_port_open,
)

os.environ["FD_USE_MACHETE"] = "0"


def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False


def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
"""
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except subprocess.CalledProcessError:
pass


def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)
time.sleep(2)


@pytest.fixture(scope="session", autouse=True)
def setup_and_run_server():
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import os
import re
import signal
import socket
import subprocess
import sys
import time
Expand All @@ -27,49 +26,17 @@
import requests
from jsonschema import validate

# Read ports from environment variables; use default values if not set
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333))

# List of ports to clean before and after tests
PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT]


def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False


def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
"""
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except subprocess.CalledProcessError:
pass


def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)
time.sleep(2)
tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, tests_dir)

from e2e.utils.serving_utils import (
FD_API_PORT,
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
is_port_open,
)


@pytest.fixture(scope="session", autouse=True)
Expand Down
Loading
Loading