Skip to content

Conversation

@Bihan
Copy link
Collaborator

@Bihan Bihan commented Nov 6, 2025

Intro
We want to make it possible to create a gateway which extends the gateway functionality with additional features (all sgl-router features such as cache aware routing, etc) while keeping all the standard gateway features (such as authentication, rate limits).

For the user, using such gateway should be very simple, e.g. setting router to sglang. - for the gateway configurations. The rest for the user should look the same - the same service endpoint, authentication and rate limits working, etc.
While the first change should only bring minimum features - allow to route replicas traffic through the router (dstack’s gateway/ngnix -> sglang-router -> replica workers), in the future this may be extended with router-specific scaling metrics, such as ttft, itl, dissagregated PD. See TTFT/ITL Based Auto Scaling Issue).

As the first experimental version, the most critical is to come up with the minimum changes that are tested thoroughly that would allow embedding the router without breaking any existing functionality.

Key Changes

  1. Add src/dstack/_internal/core/models/routers.py
    Define router types and configuration models. The RouterType enum identifies available routers. Each router has its own config model (SGLangRouterConfig, VLLMRouterConfig) with router-specific options. AnyRouterConfig allows to select the correct config class based on the type field.

  2. Add router: AnyRouterConfig in GatewayConfiguration and in GatewayComputeConfiguration
    Ensure router config flows from user input → server → backend compute layer.

  3. Update gateway/pyproject.toml to include router packages as optional dependencies

  4. Update get_dstack_gateway_commands() in src/dstack/_internal/core/backends/base/compute.py to accept router config

  5. Update _update_gateway() in src/dstack/_internal/server/services/gateways/__init__.py to extract router_config

  6. Add abstract Router base class in src/dstack/_internal/proxy/gateway/model_routers/base.py
    Handles lifecycle methods of router.

  7. Extend abstract Router base class and implement SGLangRouter in src/dstack/_internal/proxy/gateway/model_routers/sglang.py

  8. Add router register src/dstack/_internal/proxy/gateway/model_routers/__init__.py
    Implement the registry pattern (similar to dstack's backend configurators) for auto-discovery and lookup of available routers.

  9. Update src/dstack/_internal/proxy/gateway/services/nginx.py

  10. Update upstream block of src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 to forward request when router is defined.

  11. Add new nginx config to src/dstack/_internal/proxy/gateway/resources/nginx/router_workers.jinja2 to make service replicas's available to TCP port. Later we could avoid this extra proxying layer by switching from Unix sockets to TCP ports when opening SSH tunnels on the gateway.

How Router Upgrade Works

Steps

  1. Let’s say Gateway currently has dstack-gateway 0.19.34 and sglang-router 0.2.1 and we are releasing new dstack-gateway 0.19.35 with new SGLang feature in latest SGLang version (sglang-router 0.2.2)
  2. We bump up sglang-router version to 0.2.2 in gateway/pyproject.toml
  3. dstack server restart -> init_gateways() called -> _update_gateway() -> update.sh executes
  4. Flips to inactive venv, let’s say now version = “green”
  5. Installs new version in green venv

Gateway Service Restart and Gateway Instance Reboot
Router has been tested to successfully reconnect to replicas after both a gateway service restart and a full gateway instance reboot.

How to test

Step 1
Apply Below Gateway Config

#gateway.dstack.yml
type: gateway
name: bihan-gateway


# Gateways are bound to a specific backend and region
backend: aws
region: eu-west-1

# This domain will be used to access the endpoint
domain: example.com
router:
  type: sglang
  policy: cache_aware

Step 2
Update DNS

Step 3
We want to test with multiple services therefore, apply below service configs.

Config1

#sglang-service1.yml
type: service
name: sglang-service1

python: 3.12
nvcc: true

env:
  - HF_TOKEN
  - MODEL_ID=meta-llama/Llama-3.2-3B-Instruct
  - HF_HUB_DISABLE_XET=1
  - HF_HUB_ENABLE_HF_TRANSFER=0

commands:
  - pip install --upgrade pip
  - pip install uv
  - uv pip install sglang --prerelease=allow
  - python -m sglang.launch_server --model-path $MODEL_ID --host 0.0.0.0 --port 8000 --enable-metrics

port: 8000
model: sglang-service1.meta-llama/Llama-3.2-3B-Instruct

resources:
  gpu: 24GB

replicas: 0..2
scaling:
   metric: rps
   target: 1

Config2

#sglang-service2.yml
type: service
name: sglang-service2

python: 3.12
nvcc: true

env:
  - HF_TOKEN
  - MODEL_ID=meta-llama/Llama-3.2-3B-Instruct
  - HF_HUB_DISABLE_XET=1
  - HF_HUB_ENABLE_HF_TRANSFER=0

commands:
  - pip install --upgrade pip
  - pip install uv
  - uv pip install sglang --prerelease=allow
  - python -m sglang.launch_server --model-path $MODEL_ID --host 0.0.0.0 --port 8000 --enable-metrics

port: 8000
model: sglang-service2.meta-llama/Llama-3.2-3B-Instruct

resources:
  gpu: 24GB

replicas: 0..2
scaling:
   metric: rps
   target: 1

Step 3
To automate request and test autoscaling, you can use below script: autoscale_test_sglang.py

import asyncio
import aiohttp
import time
import json

# ==== Configuration ====
URL = "https://sglang-service1.example.com/v1/chat/completions" # <-- replace with your endpoint
TOKEN = "esdfds3263-c36d-41db-ba9b-0d31df4efb15e"   # <-- replace with your token
RPS = 2            # requests per second
DURATION = 1800        # duration in seconds
# =======================

HEADERS = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {TOKEN}"
}

PAYLOAD = {
    "model": "sglang-service1.meta-llama/Llama-3.2-3B-Instruct", #<--replace with your model
    "messages": [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "What is Deep Learning?"}
    ]
}


async def send_request(session, idx):
    """Send a single request and print full response"""
    try:
        async with session.post(URL, headers=HEADERS, json=PAYLOAD) as resp:
            text = await resp.text()
            print(f"\n[{idx}] Status: {resp.status}")
            print(f"Response:\n{text}\n")
    except Exception as e:
        print(f"[{idx}] Error: {e}")


async def run_load_test():
    total_requests = RPS * DURATION
    interval = 1.0 / RPS

    async with aiohttp.ClientSession() as session:
        start_time = time.perf_counter()
        tasks = []

        for i in range(total_requests):
            tasks.append(asyncio.create_task(send_request(session, i + 1)))
            await asyncio.sleep(interval)

        await asyncio.gather(*tasks)

        elapsed = time.perf_counter() - start_time
        print(f"\n✅ Sent {total_requests} requests in {elapsed:.2f}s "
              f"(~{total_requests/elapsed:.2f} RPS)")


if __name__ == "__main__":
    asyncio.run(run_load_test())

Step 6
After updating token and service endpoint, run above script python autoscale_test_sglang.py from your local machine.

Once the automated requests start hitting the service endpoint; dstack submits the job. When the service get's deployed and /health check from sglang-router responds with 200 as shown below, you will start to see response from the model.

As the automated requests continue, first dstack scales up to 2 jobs. If we stop the requests, dstack scales down to 0 jobs.

Note:

  1. This PR uses "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.1-py3-none-any.whl" for testing. Later once the PR is ready for merge, I will update it in src/dstack/_internal/core/backends/base/compute.py

  2. For testing gateway/pyproject.toml has my fork as dependency. I will update it once the PR is ready for merge.

Copy link
Collaborator

@jvstme jvstme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bihan, thanks for the PR, I tested it and the main use cases seem to work as expected. I’ve added some comments for things that may need attention and tagged them to make priorities easier to navigate

Bihan Rana added 2 commits November 13, 2025 17:46
Test sglang router per service implementation

Test sglang router per service implementation

Test sglang router per service implementation

Test sglang router per service implementation

Test sglang router per service implementation

Test sglang router per service implementation
@Bihan Bihan force-pushed the add_sglang_router_support branch from eabbff3 to 178d5a5 Compare November 17, 2025 11:29
@peterschmidt85 peterschmidt85 mentioned this pull request Nov 17, 2025
17 tasks
logger.debug("Found the latest gateway build: %s", build)
return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl"
# return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl"
return "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.1-py3-none-any.whl"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm certain it's not supposed to be hard-coded, we need to get the dynamic URL back.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right. In production, it will not be hardcoded and the hardcoded URL will be replaced by return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl

class SglangRouter(Router):
"""SGLang router implementation with 1:1 service-to-router."""

TYPE = "sglang"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better if we took the sglang literal value from the RouterType enum? That would be DRY.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved


venv_python = DSTACK_DIR_ON_GATEWAY / version / "bin" / "python3"

prometheus_port = self.context.port + 10000

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the 10000 magic number coming from?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prometheus port is offset by 10000 from router port to keep it in a separate range. Now included in comments


subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

time.sleep(2)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think clients would benefit from being able to customize this constant e.g. via an env?

["lsof", "-ti", f":{self.context.port}"], capture_output=True, timeout=5
)
if result.returncode == 0:
pids = result.stdout.decode().strip().split("\n")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't a platform-independent os.linesep be a better choice here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here lsof always emits "\n" line endings, so splitting on "\n" matches the actual command output.

url = worker.get("url")
if url and isinstance(url, str):
current_worker_urls.add(url)
target_worker_urls = set(replica_urls)
Copy link

@nikita-toffee-ai nikita-toffee-ai Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure there will be no path artifacts in set elements, like trailing slashes, otherwise, the set operations below could yield unexpected results.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

def _get_router_workers(self) -> List[dict]:
try:
result = subprocess.run(
["curl", "-s", f"http://{self.context.host}:{self.context.port}/workers"],
Copy link

@nikita-toffee-ai nikita-toffee-ai Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before, we should use httpx for HTTP queries.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved


# Create router context with allocated port
ctx = RouterContext(
host="127.0.0.1",
Copy link

@nikita-toffee-ai nikita-toffee-ai Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've hard-coded 127.0.0.1 in quite a few places already, I think we should take it from the source model where it was once defined instead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

return False

def _allocate_router_port(self) -> int:
"""Allocate next available router port in fixed range (20000-24999).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove the exact range from the comment, because range ends are vars by nature as we see right down below.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

)

def _allocate_worker_ports(self, num_ports: int) -> list[int]:
"""Allocate worker ports globally in fixed range (10001-11999).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove the exact range from the comment, because range ends are vars by nature as we see right down below.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

Comment on lines 468 to 471
f"sh dstack/_update.sh '{gateway_package}' {build}",
"rm dstack/_update.sh",
# Install gateway package with router extras to the active venv (blue or green)
# update.sh writes the active version to dstack/version
f"version=$(cat /home/ubuntu/dstack/version) && /home/ubuntu/dstack/$version/bin/pip install --upgrade '{gateway_package}'",
]
stdout = await connection.tunnel.aexec("/bin/sh -c '" + " && ".join(commands) + "'")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(major)

Gateway update doesn't work now:

[19:44:03] WARNING  dstack._internal.server.services.gateways:423 Failed to update gateway 34.244.47.57:

This error message is not informative, but a quick debug shows that the problem is that we pass a malformed command to tunnel.aexec, where single quotes added on line 468 unintentionally close single quotes from line 471.

/bin/sh -c 'cp dstack/update.sh dstack/_update.sh && sh dstack/_update.sh 'dstack-gateway[sglang] @ https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.1-py3-none-any.whl' latest && rm dstack/_update.sh'

And I assume the same issue is present in core/backends/compute.py, where single quotes from get_dstack_gateway_commands can interfere with single quotes from _get_gateway_commands of the kubernetes backend, which most likely breaks k8s gateway provisioning.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The k8s part doesn't seem to be resolved (or pushed) yet


@abstractmethod
def remove_replicas(self, replicas: List[Replica]) -> None:
def remove_replicas(self, replica_urls: List[str]) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) This method looks redundant, because it is possible to remove replicas using update_replicas.

For example, this can be replaced with run_async(router.update_replicas, [])


TYPE = "sglang"
TYPE = RouterType.SGLANG
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Unused, can remove

pid,
self.context.port,
)
subprocess.run(["kill", pid], timeout=5)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Use os.kill

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

return []
with httpx.Client(timeout=5.0) as client:
response = client.get(f"http://{self.context.host}:{self.context.port}/workers")
if response.status_code == 200:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Log an error if status is not 200

Comment on lines -37 to -40
return jinja2.Template(template).render(
**self.dict(),
proxy_port=PROXY_PORT_ON_GATEWAY,
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Does this change do anything?

Comment on lines 82 to 97
result = subprocess.run(
["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5
["lsof", "-ti", f":{self.context.port}"], capture_output=True, timeout=5
)
if result.returncode == 0:
logger.info("Stopping sglang-router process...")
subprocess.run(["pkill", "-f", "sglang::router"], timeout=5)
pids = result.stdout.decode().strip().split("\n")
for pid in pids:
if pid:
logger.info(
"Stopping sglang-router-new process (PID: %s) on port %s",
pid,
self.context.port,
)
subprocess.run(["kill", pid], timeout=5)
else:
logger.debug("No sglang-router process found to stop")
result = subprocess.run(
["pgrep", "-f", f"sglang.*--port.*{self.context.port}"],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Neither lsof, nor pgrep is guaranteed to be installed. Use psutil instead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) I'd suggest to move the model_routers directory to proxy/gateway/services/model_routers.

The subdirectories in proxy/gateway are supposed to represent architectural tiers:

  • proxy/gateway/repo - the data tier
  • proxy/gateway/services - the logic tier
  • proxy/gateway/routers and proxy/gateway/schemas - the presentation tier

The model routers implementation is part of the logic tier.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

Comment on lines 17 to 18
port: int = 3000
log_dir: Path = Path("./router_logs")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) These defaults here are no longer relevant after switching to per-service routers and dynamic port allocation.

I can suggest to remove the defaults to avoid misleading the devs. And then explicit RouterContext will also become required in Router.__init__

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

Comment on lines 7 to 8
{% if router is not none %}
{% if router.type == "sglang" and router_port is not none %}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Can replace these 3 conditions with 1 — router_port is not none?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes router_port is only set when a router exists and only sglang router exists currently.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

Comment on lines 204 to 213
if response_data.get("status") == "accepted":
logger.info(
"Added worker %s to sglang router on port %s",
worker_url,
self.context.port,
)
return True
else:
logger.error("Failed to add worker %s: %s", worker_url, response_data)
return False
Copy link
Collaborator

@jvstme jvstme Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason I'm always getting this error now:

dstack._internal.proxy.gateway.model_routers.sglang - ERROR - Failed to add worker http://127.0.0.1:10001: status 202, {"status":"accepted","worker_id":"http://127.0.0.1:10001","message":"Worker addition queued for background processing"}

Could it be because the gateway wheel from S3 depends on sglang-router 0.2.3?

$ dstack/blue/bin/pip freeze | grep sglang
dstack @ https://github.com/Bihan/dstack/archive/refs/heads/add_sglang_router_support.tar.gz
sglang-router==0.2.3

UPD: I tried downgrading to 0.2.2 and it didn't help, so maybe this is unrelated

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've identified an issue affecting SGLang Router versions 0.2.2 and 0.2.3. The worker registration retry mechanism stops at 10 attempts regardless of the --worker-startup-timeout-secs setting. Version 0.2.2 was working until recently. I'm investigating the root cause and will provide an update once I have more details.

Test gateway package update

Test gateway package update

Test gateway package update

Test gateway package update

Test gateway package update

Resolve rate limits and location issue

Resolve rate limits and location issue

Resolve rate limits and location issue

Resolve all major comments

Resolve all major comments

Resolve kubernetes gateway issue with sglang intregration
@Bihan Bihan force-pushed the add_sglang_router_support branch from 71cacf2 to a475858 Compare November 20, 2025 05:07
return False
else:
logger.error(
"Failed to remove worker %s: status %d, %s",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) I'm seeing this error in systemctl -u dstack.gateway:

dstack._internal.proxy.gateway.model_routers.sglang - ERROR - Failed to remove worker http://127.0.0.1:10003: status 202, {"status":"accepted","worker_id":"http://127.0.0.1:10003","message":"Worker removal queued for background processing"}

To reproduce:

  • Start a service with one replica.
  • Scale it to two replicas.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved. This was happening with all Add/Remove call. This happened because I was expecting status 200, while Add/Remove worker apis responded with 202.

@Bihan Bihan merged commit 3b58cae into dstackai:master Nov 21, 2025
28 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants