Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/docs/guides/server-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,21 @@ To store logs using GCP Logging, set the `DSTACK_SERVER_GCP_LOGGING_PROJECT` env

</div>

## Metrics

If enabled, `dstack` collects and exports Prometheus metrics from running jobs. Metrics for jobs from all projects are available
at the `/metrics` path, and metrics for jobs from a specific project are available at the `/metrics/project/<project-name>` path.

By default, metrics are disabled. To enable, set the `DSTACK_ENABLE_PROMETHEUS_METRICS` variable.

Each sample includes a set of `dstack_*` labels, e.g., `dstack_project_name="main"`, `dstack_run_name="vllm-llama32"`.

Currently, `dstack` collects the following metrics:

* A fixed subset of NVIDIA GPU metrics from [DCGM Exporter :material-arrow-top-right-thin:{ .external }](https://docs.nvidia.com/datacenter/dcgm/latest/gpu-telemetry/dcgm-exporter.html){:target="_blank"} on supported cloud backends — AWS, Azure, GCP, OCI — and SSH fleets.
On supported cloud backends the required packages are already installed.
If you use SSH fleets, install `datacenter-gpu-manager-4-core` and `datacenter-gpu-manager-exporter`.

## Encryption

By default, `dstack` stores data in plaintext. To enforce encryption, you
Expand Down
1 change: 1 addition & 0 deletions docs/docs/reference/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ For more details on the options below, refer to the [server deployment](../guide
- `DSTACK_SERVER_CLOUDWATCH_LOG_REGION`{ #DSTACK_SERVER_CLOUDWATCH_LOG_REGION } – The CloudWatch Logs region. Defaults to `None`.
- `DSTACK_DEFAULT_SERVICE_CLIENT_MAX_BODY_SIZE`{ #DSTACK_DEFAULT_SERVICE_CLIENT_MAX_BODY_SIZE } – Request body size limit for services running with a gateway, in bytes. Defaults to 64 MiB.
- `DSTACK_FORBID_SERVICES_WITHOUT_GATEWAY`{ #DSTACK_FORBID_SERVICES_WITHOUT_GATEWAY } – Forbids registering new services without a gateway if set to any value.
- `DSTACK_ENABLE_PROMETHEUS_METRICS`{ #DSTACK_ENABLE_PROMETHEUS_METRICS } — Enables Prometheus metrics collection and export.

??? info "Internal environment variables"
The following environment variables are intended for development purposes:
Expand Down
39 changes: 38 additions & 1 deletion runner/cmd/shim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/dstackai/dstack/runner/internal/log"
"github.com/dstackai/dstack/runner/internal/shim"
"github.com/dstackai/dstack/runner/internal/shim/api"
"github.com/dstackai/dstack/runner/internal/shim/dcgm"
"github.com/dstackai/dstack/runner/internal/shim/host"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)
Expand Down Expand Up @@ -95,6 +97,21 @@ func main() {
Destination: &args.Runner.LogLevel,
EnvVars: []string{"DSTACK_RUNNER_LOG_LEVEL"},
},
/* DCGM Exporter Parameters */
&cli.IntFlag{
Name: "dcgm-exporter-http-port",
Usage: "DCGM Exporter http port",
Value: 10997,
Destination: &args.DCGMExporter.HTTPPort,
EnvVars: []string{"DSTACK_DCGM_EXPORTER_HTTP_PORT"},
},
&cli.IntFlag{
Name: "dcgm-exporter-interval",
Usage: "DCGM Exporter collect interval, milliseconds",
Value: 5000,
Destination: &args.DCGMExporter.Interval,
EnvVars: []string{"DSTACK_DCGM_EXPORTER_INTERVAL"},
},
/* Docker Parameters */
&cli.BoolFlag{
Name: "privileged",
Expand Down Expand Up @@ -178,8 +195,28 @@ func start(ctx context.Context, args shim.CLIArgs, serviceMode bool) (err error)
return err
}

var dcgmExporter *dcgm.DCGMExporter

if host.GetGpuVendor() == host.GpuVendorNvidia {
dcgmExporterPath, err := dcgm.GetDCGMExporterExecPath(ctx)
if err == nil {
interval := time.Duration(args.DCGMExporter.Interval * int(time.Millisecond))
dcgmExporter = dcgm.NewDCGMExporter(dcgmExporterPath, args.DCGMExporter.HTTPPort, interval)
err = dcgmExporter.Start(ctx)
}
if err == nil {
log.Info(ctx, "using DCGM Exporter")
defer func() {
_ = dcgmExporter.Stop(ctx)
}()
} else {
log.Warning(ctx, "not using DCGM Exporter", "err", err)
dcgmExporter = nil
}
}

address := fmt.Sprintf(":%d", args.Shim.HTTPPort)
shimServer := api.NewShimServer(ctx, address, dockerRunner, Version)
shimServer := api.NewShimServer(ctx, address, dockerRunner, dcgmExporter, Version)

defer func() {
shutdownCtx, cancelShutdown := context.WithTimeout(ctx, 5*time.Second)
Expand Down
20 changes: 20 additions & 0 deletions runner/internal/shim/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/dstackai/dstack/runner/internal/api"
"github.com/dstackai/dstack/runner/internal/log"
"github.com/dstackai/dstack/runner/internal/shim"
"github.com/dstackai/dstack/runner/internal/shim/dcgm"
)

func (s *ShimServer) HealthcheckHandler(w http.ResponseWriter, r *http.Request) (interface{}, error) {
Expand Down Expand Up @@ -121,3 +122,22 @@ func (s *ShimServer) TaskRemoveHandler(w http.ResponseWriter, r *http.Request) (
log.Info(ctx, "removed", "task", taskID)
return nil, nil
}

func (s *ShimServer) TaskMetricsHandler(w http.ResponseWriter, r *http.Request) {
if s.dcgmExporter == nil {
http.Error(w, "DCGM Exporter is not available", http.StatusNotFound)
return
}
taskInfo := s.runner.TaskInfo(r.PathValue("id"))
if taskInfo.ID == "" {
http.Error(w, "Task not found", http.StatusNotFound)
return
}
expfmtBody, err := s.dcgmExporter.Fetch(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
response := dcgm.FilterMetrics(expfmtBody, taskInfo.GpuIDs)
_, _ = w.Write(response)
}
4 changes: 2 additions & 2 deletions runner/internal/shim/api/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestHealthcheck(t *testing.T) {
request := httptest.NewRequest("GET", "/api/healthcheck", nil)
responseRecorder := httptest.NewRecorder()

server := NewShimServer(context.Background(), ":12345", NewDummyRunner(), "0.0.1.dev2")
server := NewShimServer(context.Background(), ":12345", NewDummyRunner(), nil, "0.0.1.dev2")

f := common.JSONResponseHandler(server.HealthcheckHandler)
f(responseRecorder, request)
Expand All @@ -30,7 +30,7 @@ func TestHealthcheck(t *testing.T) {
}

func TestTaskSubmit(t *testing.T) {
server := NewShimServer(context.Background(), ":12340", NewDummyRunner(), "0.0.1.dev2")
server := NewShimServer(context.Background(), ":12340", NewDummyRunner(), nil, "0.0.1.dev2")
requestBody := `{
"id": "dummy-id",
"name": "dummy-name",
Expand Down
8 changes: 7 additions & 1 deletion runner/internal/shim/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/dstackai/dstack/runner/internal/api"
"github.com/dstackai/dstack/runner/internal/shim"
"github.com/dstackai/dstack/runner/internal/shim/dcgm"
)

type TaskRunner interface {
Expand All @@ -27,10 +28,12 @@ type ShimServer struct {

runner TaskRunner

dcgmExporter *dcgm.DCGMExporter

version string
}

func NewShimServer(ctx context.Context, address string, runner TaskRunner, version string) *ShimServer {
func NewShimServer(ctx context.Context, address string, runner TaskRunner, dcgmExporter *dcgm.DCGMExporter, version string) *ShimServer {
r := api.NewRouter()
s := &ShimServer{
HttpServer: &http.Server{
Expand All @@ -41,6 +44,8 @@ func NewShimServer(ctx context.Context, address string, runner TaskRunner, versi

runner: runner,

dcgmExporter: dcgmExporter,

version: version,
}

Expand All @@ -51,6 +56,7 @@ func NewShimServer(ctx context.Context, address string, runner TaskRunner, versi
r.AddHandler("POST", "/api/tasks", s.TaskSubmitHandler)
r.AddHandler("POST", "/api/tasks/{id}/terminate", s.TaskTerminateHandler)
r.AddHandler("POST", "/api/tasks/{id}/remove", s.TaskRemoveHandler)
r.HandleFunc("GET /metrics/tasks/{id}", s.TaskMetricsHandler)

return s
}
212 changes: 212 additions & 0 deletions runner/internal/shim/dcgm/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package dcgm

import (
"context"
"encoding/csv"
"errors"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/alexellis/go-execute/v2"
"github.com/dstackai/dstack/runner/internal/log"
)

// Counter represents a single line in counters.csv, see
// https://github.com/NVIDIA/dcgm-exporter/tree/5f9250c211?tab=readme-ov-file#changing-metrics
// For list of supported types see
// https://github.com/NVIDIA/dcgm-exporter/blob/5f9250c211/internal/pkg/counters/variables.go#L23
// NB: Although it is called "counter" in dcgm-exporter, in fact it can be any Prometheus
// metric type or even a label
type Counter struct {
Name string
Type string
Help string
}

// Full list: https://docs.nvidia.com/datacenter/dcgm/latest/dcgm-api/dcgm-api-field-ids.html
var counters = [...]Counter{
{"DCGM_FI_DEV_GPU_UTIL", "gauge", "GPU utilization (in %)."},
{"DCGM_FI_DEV_MEM_COPY_UTIL", "gauge", "Memory utilization (in %)."},
{"DCGM_FI_DEV_ENC_UTIL", "gauge", "Encoder utilization (in %)."},
{"DCGM_FI_DEV_DEC_UTIL", "gauge", "Decoder utilization (in %)."},
{"DCGM_FI_DEV_FB_FREE", "gauge", "Framebuffer memory free (in MiB)."},
{"DCGM_FI_DEV_FB_USED", "gauge", "Framebuffer memory used (in MiB)."},
{"DCGM_FI_PROF_GR_ENGINE_ACTIVE", "gauge", "The ratio of cycles during which a graphics engine or compute engine remains active."},
{"DCGM_FI_PROF_SM_ACTIVE", "gauge", "The ratio of cycles an SM has at least 1 warp assigned."},
{"DCGM_FI_PROF_SM_OCCUPANCY", "gauge", "The ratio of number of warps resident on an SM."},
{"DCGM_FI_PROF_PIPE_TENSOR_ACTIVE", "gauge", "Ratio of cycles the tensor (HMMA) pipe is active."},
{"DCGM_FI_PROF_PIPE_FP64_ACTIVE", "gauge", "Ratio of cycles the fp64 pipes are active."},
{"DCGM_FI_PROF_PIPE_FP32_ACTIVE", "gauge", "Ratio of cycles the fp32 pipes are active."},
{"DCGM_FI_PROF_PIPE_FP16_ACTIVE", "gauge", "Ratio of cycles the fp16 pipes are active."},
{"DCGM_FI_PROF_PIPE_INT_ACTIVE", "gauge", "Ratio of cycles the integer pipe is active."},
{"DCGM_FI_PROF_DRAM_ACTIVE", "gauge", "Ratio of cycles the device memory interface is active sending or receiving data."},
{"DCGM_FI_PROF_PCIE_TX_BYTES", "counter", "The number of bytes of active PCIe tx (transmit) data including both header and payload."},
{"DCGM_FI_PROF_PCIE_RX_BYTES", "counter", "The number of bytes of active PCIe rx (read) data including both header and payload."},
{"DCGM_FI_DEV_SM_CLOCK", "gauge", "SM clock frequency (in MHz)."},
{"DCGM_FI_DEV_MEM_CLOCK", "gauge", "Memory clock frequency (in MHz)."},
{"DCGM_FI_DEV_MEMORY_TEMP", "gauge", "Memory temperature (in C)."},
{"DCGM_FI_DEV_GPU_TEMP", "gauge", "GPU temperature (in C)."},
{"DCGM_FI_DEV_POWER_USAGE", "gauge", "Power draw (in W)."},
{"DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION", "counter", "Total energy consumption since boot (in mJ)."},
{"DCGM_FI_DEV_PCIE_REPLAY_COUNTER", "counter", "Total number of PCIe retries."},
{"DCGM_FI_DEV_XID_ERRORS", "gauge", "Value of the last XID error encountered."},
{"DCGM_FI_DEV_POWER_VIOLATION", "counter", "Throttling duration due to power constraints (in us)."},
{"DCGM_FI_DEV_THERMAL_VIOLATION", "counter", "Throttling duration due to thermal constraints (in us)."},
{"DCGM_FI_DEV_SYNC_BOOST_VIOLATION", "counter", "Throttling duration due to sync-boost constraints (in us)."},
{"DCGM_FI_DEV_BOARD_LIMIT_VIOLATION", "counter", "Throttling duration due to board limit constraints (in us)."},
{"DCGM_FI_DEV_LOW_UTIL_VIOLATION", "counter", "Throttling duration due to low utilization (in us)."},
{"DCGM_FI_DEV_RELIABILITY_VIOLATION", "counter", "Throttling duration due to reliability constraints (in us)."},
{"DCGM_FI_DEV_ECC_SBE_VOL_TOTAL", "counter", "Total number of single-bit volatile ECC errors."},
{"DCGM_FI_DEV_ECC_DBE_VOL_TOTAL", "counter", "Total number of double-bit volatile ECC errors."},
{"DCGM_FI_DEV_ECC_SBE_AGG_TOTAL", "counter", "Total number of single-bit persistent ECC errors."},
{"DCGM_FI_DEV_ECC_DBE_AGG_TOTAL", "counter", "Total number of double-bit persistent ECC errors."},
{"DCGM_FI_DEV_RETIRED_SBE", "counter", "Total number of retired pages due to single-bit errors."},
{"DCGM_FI_DEV_RETIRED_DBE", "counter", "Total number of retired pages due to double-bit errors."},
{"DCGM_FI_DEV_RETIRED_PENDING", "counter", "Total number of pages pending retirement."},
{"DCGM_FI_DEV_UNCORRECTABLE_REMAPPED_ROWS", "counter", "Number of remapped rows for uncorrectable errors"},
{"DCGM_FI_DEV_CORRECTABLE_REMAPPED_ROWS", "counter", "Number of remapped rows for correctable errors"},
{"DCGM_FI_DEV_ROW_REMAP_FAILURE", "gauge", "Whether remapping of rows has failed"},
{"DCGM_FI_DEV_NVLINK_CRC_FLIT_ERROR_COUNT_TOTAL", "counter", "Total number of NVLink flow-control CRC errors."},
{"DCGM_FI_DEV_NVLINK_CRC_DATA_ERROR_COUNT_TOTAL", "counter", "Total number of NVLink data CRC errors."},
{"DCGM_FI_DEV_NVLINK_REPLAY_ERROR_COUNT_TOTAL", "counter", "Total number of NVLink retries."},
{"DCGM_FI_DEV_NVLINK_RECOVERY_ERROR_COUNT_TOTAL", "counter", "Total number of NVLink recovery errors."},
{"DCGM_FI_DEV_NVLINK_BANDWIDTH_TOTAL", "counter", "Total number of NVLink bandwidth counters for all lanes."},
{"DCGM_FI_DEV_NVLINK_BANDWIDTH_L0", "counter", "The number of bytes of active NVLink rx or tx data including both header and payload."},
{"DCGM_FI_PROF_NVLINK_RX_BYTES", "counter", "The number of bytes of active PCIe rx (read) data including both header and payload. "},
{"DCGM_FI_PROF_NVLINK_TX_BYTES", "counter", "The number of bytes of active NvLink tx (transmit) data including both header and payload. "},
}

const dcgmExporterExecName = "dcgm-exporter"

type DCGMExporter struct {
cmd *exec.Cmd
cancel context.CancelFunc
execPath string
listenAddr string
client *http.Client
url string
interval time.Duration
configPath string
mu sync.Mutex
lastFetchedAt time.Time
lastResponse []byte
}

func (c *DCGMExporter) Start(ctx context.Context) error {
if c.cmd != nil {
return errors.New("already started")
}

configFile, err := os.CreateTemp("", "counters-*.csv")
if err != nil {
return err
}
defer configFile.Close()
c.configPath = configFile.Name()
configWriter := csv.NewWriter(configFile)
for _, counter := range counters {
err := configWriter.Write([]string{counter.Name, counter.Type, counter.Help})
if err != nil {
return err
}
}
configWriter.Flush()

cmdCtx, cmdCancel := context.WithCancel(ctx)
c.cancel = cmdCancel
cmd := exec.CommandContext(
cmdCtx, c.execPath,
"-f", c.configPath,
"-a", c.listenAddr,
"-c", strconv.Itoa(int(c.interval.Milliseconds())),
)
c.cmd = cmd
cmd.Cancel = func() error {
return cmd.Process.Signal(syscall.SIGTERM)
}
cmd.WaitDelay = 5 * time.Second
return cmd.Start()
}

func (c *DCGMExporter) Stop(context.Context) error {
if c.cmd == nil {
return errors.New("not started")
}
c.cancel()
os.Remove(c.configPath)
return c.cmd.Wait()
}

func (c *DCGMExporter) Fetch(ctx context.Context) ([]byte, error) {
c.mu.Lock()
defer c.mu.Unlock()

now := time.Now()

if now.Sub(c.lastFetchedAt) < c.interval {
return c.lastResponse, nil
}

req, err := http.NewRequestWithContext(ctx, "GET", c.url, nil)
if err != nil {
return nil, err
}
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("status is not OK: %d", resp.StatusCode)
}
response, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
c.lastFetchedAt = now
c.lastResponse = response
return response, nil
}

func NewDCGMExporter(execPath string, port int, interval time.Duration) *DCGMExporter {
listenAddr := fmt.Sprintf("localhost:%d", port)
client := &http.Client{
Timeout: 10 * time.Second,
}
return &DCGMExporter{
execPath: execPath,
listenAddr: listenAddr,
client: client,
url: fmt.Sprintf("http://%s/metrics", listenAddr),
interval: interval,
}
}

func GetDCGMExporterExecPath(ctx context.Context) (string, error) {
path, err := exec.LookPath(dcgmExporterExecName)
if err != nil {
return "", err
}
cmd := execute.ExecTask{
Command: path,
Args: []string{"-v"},
StreamStdio: false,
}
res, err := cmd.Execute(ctx)
if err != nil {
return "", err
}
if res.ExitCode != 0 {
return "", fmt.Errorf("%s returned %d, stderr: %s, stdout: %s", path, res.ExitCode, res.Stderr, res.Stdout)
}
log.Debug(ctx, "detected", "path", path, "version", strings.TrimSpace(res.Stdout))
return path, nil
}
Loading