Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/_accuracy_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ jobs:
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)

FLASK_PORT=$((42068 + DEVICE_PORT * 100))
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((42098 + DEVICE_PORT * 100))
FLASK_PORT=$((8068 + DEVICE_PORT * 100))
FD_API_PORT=$((8088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((8058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((8078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((8098 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/_base_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ jobs:
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)

FLASK_PORT=$((42068 + DEVICE_PORT * 100))
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((42098 + DEVICE_PORT * 100))
FLASK_PORT=$((8068 + DEVICE_PORT * 100))
FD_API_PORT=$((8088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((8058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((8078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((8098 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/_logprob_test_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ jobs:
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)

FLASK_PORT=$((42068 + DEVICE_PORT * 100))
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((42098 + DEVICE_PORT * 100))
FLASK_PORT=$((8068 + DEVICE_PORT * 100))
FD_API_PORT=$((8088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((8058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((8078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((8098 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"
Expand Down
16 changes: 8 additions & 8 deletions .github/workflows/_pre_ce_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ jobs:
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)

FLASK_PORT=$((42068 + DEVICE_PORT * 100))
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((42098 + DEVICE_PORT * 100))
FD_ZMQ_RECV_REQUEST_SERVER_PORT=$((42048 + DEVICE_PORT * 100))
FD_ZMQ_SEND_RESPONSE_SERVER_PORT=$((42038 + DEVICE_PORT * 100))
FD_ZMQ_CONTROL_CMD_SERVER_PORTS=$((42028 + DEVICE_PORT * 100))
FLASK_PORT=$((8068 + DEVICE_PORT * 100))
FD_API_PORT=$((8088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((8058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((8078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((8098 + DEVICE_PORT * 100))
FD_ZMQ_RECV_REQUEST_SERVER_PORT=$((8048 + DEVICE_PORT * 100))
FD_ZMQ_SEND_RESPONSE_SERVER_PORT=$((8038 + DEVICE_PORT * 100))
FD_ZMQ_CONTROL_CMD_SERVER_PORTS=$((8028 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/_stable_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ jobs:
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)

FLASK_PORT=$((42068 + DEVICE_PORT * 100))
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((42038 + DEVICE_PORT * 100))
FD_INFERENCE_MSG_QUEUE_ID=$(( 42048 + DEVICE_PORT * 100))
FLASK_PORT=$((8068 + DEVICE_PORT * 100))
FD_API_PORT=$((8088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((8058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((8078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((8038 + DEVICE_PORT * 100))
FD_INFERENCE_MSG_QUEUE_ID=$(( 8048 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"
Expand Down
14 changes: 7 additions & 7 deletions .github/workflows/_unit_test_coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ jobs:
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)

FLASK_PORT=$((42068 + DEVICE_PORT * 100))
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((42098 + DEVICE_PORT * 100))
FD_ROUTER_PORT=$((42048 + DEVICE_PORT * 100))
FD_CONNECTOR_PORT=$((42038 + DEVICE_PORT * 100))
FLASK_PORT=$((8068 + DEVICE_PORT * 100))
FD_API_PORT=$((8088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((8058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((8078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((8098 + DEVICE_PORT * 100))
FD_ROUTER_PORT=$((8048 + DEVICE_PORT * 100))
FD_CONNECTOR_PORT=$((8038 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"
Expand Down
6 changes: 3 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ def __init__(
from fastdeploy.entrypoints.llm import LLM

ports_to_clean = []
if "engine_worker_queue_port" in kwargs:
ports_to_clean.append(kwargs["engine_worker_queue_port"])
port_keys = ["engine_worker_queue_port", "cache_queue_port", "port", "metrics_port"]
ports_to_clean.extend(kwargs[k] for k in port_keys if k in kwargs)
clean_ports(ports_to_clean)
time.sleep(5)
time.sleep(10)
graph_optimization_config = {"use_cudagraph": False}
self.llm = LLM(
model=model_name_or_path,
Expand Down
68 changes: 52 additions & 16 deletions tests/model_loader/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import signal
import socket
import subprocess
import time
import traceback
from multiprocessing import Process, Queue

Expand Down Expand Up @@ -147,37 +148,72 @@ def form_model_get_output_topp1(
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
Uses multiple methods to ensure thorough cleanup.
"""
current_pid = os.getpid()
parent_pid = os.getppid()

# Method 1: Use lsof to find processes
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
pid = int(pid)
if pid in (current_pid, parent_pid):
print(f"Skip killing current process (pid={pid}) on port {port}")
continue
try:
# First try SIGTERM for graceful shutdown
os.kill(pid, signal.SIGTERM)
time.sleep(1)
# Then SIGKILL if still running
os.kill(pid, signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except ProcessLookupError:
pass # Process already terminated
except subprocess.CalledProcessError:
pass

# Method 2: Use netstat and fuser as backup
try:
# Find processes using netstat and awk
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
output = subprocess.check_output(cmd, shell=True).decode().strip()
for pid in output.splitlines():
if pid and pid.isdigit():
pid = int(pid)
if pid in (current_pid, parent_pid):
continue
try:
os.kill(pid, signal.SIGKILL)
print(f"Killed process (netstat) on port {port}, pid={pid}")
except ProcessLookupError:
pass
except (subprocess.CalledProcessError, FileNotFoundError):
pass

# Method 3: Use fuser if available
try:
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass


def clean_ports(ports_to_clean: list[int]):
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
try:
result = subprocess.run(
f"ps -efww | grep {FD_CACHE_QUEUE_PORT} | grep -v grep", shell=True, capture_output=True, text=True
)
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split()
pid = int(parts[1])
print(f"Killing PID: {pid}")
os.kill(pid, signal.SIGKILL)
except Exception as e:
print(f"Failed to kill cache manager process: {e}, {str(traceback.format_exc())}")
print(f"Cleaning ports: {ports_to_clean}")
for port in ports_to_clean:
kill_process_on_port(port)

# Double check and retry if ports are still in use
time.sleep(2)
for port in ports_to_clean:
if is_port_open("127.0.0.1", port, timeout=0.1):
print(f"Port {port} still in use, retrying cleanup...")
kill_process_on_port(port)
time.sleep(1)


def is_port_open(host: str, port: int, timeout=1.0):
"""
Expand Down
Loading