From 3773b1ce19071a14a54e6ce1df53d8a93e889514 Mon Sep 17 00:00:00 2001 From: Dmitry Meyer Date: Tue, 25 Feb 2025 16:43:41 +0000 Subject: [PATCH 1/2] Reexport DCGM metrics from instances * shim: start dcgm-exporter if available, proxy requests * periodically collect and store last metrics * enrich metrics with dstack labels, export Closes: https://github.com/dstackai/dstack/issues/2359 --- docs/docs/guides/server-deployment.md | 16 ++ docs/docs/reference/environment-variables.md | 1 + runner/cmd/shim/main.go | 39 ++- runner/internal/shim/api/handlers.go | 20 ++ runner/internal/shim/api/handlers_test.go | 4 +- runner/internal/shim/api/server.go | 8 +- runner/internal/shim/dcgm/exporter.go | 212 +++++++++++++++ runner/internal/shim/dcgm/metrics.go | 57 ++++ runner/internal/shim/dcgm/metrics_test.go | 43 +++ runner/internal/shim/models.go | 5 + setup.py | 1 + src/dstack/_internal/server/app.py | 12 +- .../_internal/server/background/__init__.py | 10 + .../tasks/process_prometheus_metrics.py | 135 ++++++++++ .../60e444118b6d_add_jobprometheusmetrics.py | 40 +++ src/dstack/_internal/server/models.py | 11 + .../_internal/server/routers/prometheus.py | 36 +++ .../_internal/server/services/prometheus.py | 87 +++++++ .../server/services/runner/client.py | 17 +- src/dstack/_internal/server/settings.py | 1 + src/dstack/_internal/server/testing/common.py | 17 ++ .../tasks/test_process_prometheus_metrics.py | 189 ++++++++++++++ .../server/routers/test_prometheus.py | 244 ++++++++++++++++++ 23 files changed, 1197 insertions(+), 8 deletions(-) create mode 100644 runner/internal/shim/dcgm/exporter.go create mode 100644 runner/internal/shim/dcgm/metrics.go create mode 100644 runner/internal/shim/dcgm/metrics_test.go create mode 100644 src/dstack/_internal/server/background/tasks/process_prometheus_metrics.py create mode 100644 src/dstack/_internal/server/migrations/versions/60e444118b6d_add_jobprometheusmetrics.py create mode 100644 src/dstack/_internal/server/routers/prometheus.py create mode 100644 src/dstack/_internal/server/services/prometheus.py create mode 100644 src/tests/_internal/server/background/tasks/test_process_prometheus_metrics.py create mode 100644 src/tests/_internal/server/routers/test_prometheus.py diff --git a/docs/docs/guides/server-deployment.md b/docs/docs/guides/server-deployment.md index 92942829f..07f69cc4e 100644 --- a/docs/docs/guides/server-deployment.md +++ b/docs/docs/guides/server-deployment.md @@ -202,6 +202,22 @@ To store logs using GCP Logging, set the `DSTACK_SERVER_GCP_LOGGING_PROJECT` env +## Metrics + +If enabled, `dstack` collects and exports Prometheus metrics from running jobs. Metrics for jobs from all projects are available +at the `/metrics` path, and metrics for jobs from a specific project are available at the `/metrics/project/` path. + +By default, metrics are disabled. To enable, set the `DSTACK_ENABLE_PROMETHEUS_METRICS` variable. + +Each sample includes a set of `dstack_*` labels, e.g., `dstack_project_name="main"`, `dstack_run_name="vllm-llama32"`. + +Currently, `dstack` collects the following metrics: + +* A fixed subset of NVIDIA GPU metrics from [DCGM Exporter :material-arrow-top-right-thin:{ .external }](https://docs.nvidia.com/datacenter/dcgm/latest/gpu-telemetry/dcgm-exporter.html){:target="_blank"}. +`dcgm-exporter` and `libdcgm` must be installed on the instance to enable these metrics. +On AWS, Azure, GCP, and OCI backends the required packages are already installed. +If you use SSH fleets, install `datacenter-gpu-manager-4-core` and `datacenter-gpu-manager-exporter`. + ## Encryption By default, `dstack` stores data in plaintext. To enforce encryption, you diff --git a/docs/docs/reference/environment-variables.md b/docs/docs/reference/environment-variables.md index 569dfed04..ab075bf64 100644 --- a/docs/docs/reference/environment-variables.md +++ b/docs/docs/reference/environment-variables.md @@ -105,6 +105,7 @@ For more details on the options below, refer to the [server deployment](../guide - `DSTACK_SERVER_CLOUDWATCH_LOG_REGION`{ #DSTACK_SERVER_CLOUDWATCH_LOG_REGION } – The CloudWatch Logs region. Defaults to `None`. - `DSTACK_DEFAULT_SERVICE_CLIENT_MAX_BODY_SIZE`{ #DSTACK_DEFAULT_SERVICE_CLIENT_MAX_BODY_SIZE } – Request body size limit for services running with a gateway, in bytes. Defaults to 64 MiB. - `DSTACK_FORBID_SERVICES_WITHOUT_GATEWAY`{ #DSTACK_FORBID_SERVICES_WITHOUT_GATEWAY } – Forbids registering new services without a gateway if set to any value. +- `DSTACK_ENABLE_PROMETHEUS_METRICS`{ #DSTACK_ENABLE_PROMETHEUS_METRICS } — Enables Prometheus metrics collection and export. ??? info "Internal environment variables" The following environment variables are intended for development purposes: diff --git a/runner/cmd/shim/main.go b/runner/cmd/shim/main.go index cf37776c3..23cab259c 100644 --- a/runner/cmd/shim/main.go +++ b/runner/cmd/shim/main.go @@ -15,6 +15,8 @@ import ( "github.com/dstackai/dstack/runner/internal/log" "github.com/dstackai/dstack/runner/internal/shim" "github.com/dstackai/dstack/runner/internal/shim/api" + "github.com/dstackai/dstack/runner/internal/shim/dcgm" + "github.com/dstackai/dstack/runner/internal/shim/host" "github.com/sirupsen/logrus" "github.com/urfave/cli/v2" ) @@ -95,6 +97,21 @@ func main() { Destination: &args.Runner.LogLevel, EnvVars: []string{"DSTACK_RUNNER_LOG_LEVEL"}, }, + /* DCGM Exporter Parameters */ + &cli.IntFlag{ + Name: "dcgm-exporter-http-port", + Usage: "DCGM Exporter http port", + Value: 10997, + Destination: &args.DCGMExporter.HTTPPort, + EnvVars: []string{"DSTACK_DCGM_EXPORTER_HTTP_PORT"}, + }, + &cli.IntFlag{ + Name: "dcgm-exporter-interval", + Usage: "DCGM Exporter collect interval, milliseconds", + Value: 5000, + Destination: &args.DCGMExporter.Interval, + EnvVars: []string{"DSTACK_DCGM_EXPORTER_INTERVAL"}, + }, /* Docker Parameters */ &cli.BoolFlag{ Name: "privileged", @@ -178,8 +195,28 @@ func start(ctx context.Context, args shim.CLIArgs, serviceMode bool) (err error) return err } + var dcgmExporter *dcgm.DCGMExporter + + if host.GetGpuVendor() == host.GpuVendorNvidia { + dcgmExporterPath, err := dcgm.GetDCGMExporterExecPath(ctx) + if err == nil { + interval := time.Duration(args.DCGMExporter.Interval * int(time.Millisecond)) + dcgmExporter = dcgm.NewDCGMExporter(dcgmExporterPath, args.DCGMExporter.HTTPPort, interval) + err = dcgmExporter.Start(ctx) + } + if err == nil { + log.Info(ctx, "using DCGM Exporter") + defer func() { + _ = dcgmExporter.Stop(ctx) + }() + } else { + log.Warning(ctx, "not using DCGM Exporter", "err", err) + dcgmExporter = nil + } + } + address := fmt.Sprintf(":%d", args.Shim.HTTPPort) - shimServer := api.NewShimServer(ctx, address, dockerRunner, Version) + shimServer := api.NewShimServer(ctx, address, dockerRunner, dcgmExporter, Version) defer func() { shutdownCtx, cancelShutdown := context.WithTimeout(ctx, 5*time.Second) diff --git a/runner/internal/shim/api/handlers.go b/runner/internal/shim/api/handlers.go index 6648ba483..0a0af9b39 100644 --- a/runner/internal/shim/api/handlers.go +++ b/runner/internal/shim/api/handlers.go @@ -8,6 +8,7 @@ import ( "github.com/dstackai/dstack/runner/internal/api" "github.com/dstackai/dstack/runner/internal/log" "github.com/dstackai/dstack/runner/internal/shim" + "github.com/dstackai/dstack/runner/internal/shim/dcgm" ) func (s *ShimServer) HealthcheckHandler(w http.ResponseWriter, r *http.Request) (interface{}, error) { @@ -121,3 +122,22 @@ func (s *ShimServer) TaskRemoveHandler(w http.ResponseWriter, r *http.Request) ( log.Info(ctx, "removed", "task", taskID) return nil, nil } + +func (s *ShimServer) TaskMetricsHandler(w http.ResponseWriter, r *http.Request) { + if s.dcgmExporter == nil { + http.Error(w, "DCGM Exporter is not available", http.StatusNotFound) + return + } + taskInfo := s.runner.TaskInfo(r.PathValue("id")) + if taskInfo.ID == "" { + http.Error(w, "Task not found", http.StatusNotFound) + return + } + expfmtBody, err := s.dcgmExporter.Fetch(r.Context()) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + response := dcgm.FilterMetrics(expfmtBody, taskInfo.GpuIDs) + _, _ = w.Write(response) +} diff --git a/runner/internal/shim/api/handlers_test.go b/runner/internal/shim/api/handlers_test.go index ddf752c0e..98e129fc1 100644 --- a/runner/internal/shim/api/handlers_test.go +++ b/runner/internal/shim/api/handlers_test.go @@ -13,7 +13,7 @@ func TestHealthcheck(t *testing.T) { request := httptest.NewRequest("GET", "/api/healthcheck", nil) responseRecorder := httptest.NewRecorder() - server := NewShimServer(context.Background(), ":12345", NewDummyRunner(), "0.0.1.dev2") + server := NewShimServer(context.Background(), ":12345", NewDummyRunner(), nil, "0.0.1.dev2") f := common.JSONResponseHandler(server.HealthcheckHandler) f(responseRecorder, request) @@ -30,7 +30,7 @@ func TestHealthcheck(t *testing.T) { } func TestTaskSubmit(t *testing.T) { - server := NewShimServer(context.Background(), ":12340", NewDummyRunner(), "0.0.1.dev2") + server := NewShimServer(context.Background(), ":12340", NewDummyRunner(), nil, "0.0.1.dev2") requestBody := `{ "id": "dummy-id", "name": "dummy-name", diff --git a/runner/internal/shim/api/server.go b/runner/internal/shim/api/server.go index ed0c1a1ab..94a693b4f 100644 --- a/runner/internal/shim/api/server.go +++ b/runner/internal/shim/api/server.go @@ -8,6 +8,7 @@ import ( "github.com/dstackai/dstack/runner/internal/api" "github.com/dstackai/dstack/runner/internal/shim" + "github.com/dstackai/dstack/runner/internal/shim/dcgm" ) type TaskRunner interface { @@ -27,10 +28,12 @@ type ShimServer struct { runner TaskRunner + dcgmExporter *dcgm.DCGMExporter + version string } -func NewShimServer(ctx context.Context, address string, runner TaskRunner, version string) *ShimServer { +func NewShimServer(ctx context.Context, address string, runner TaskRunner, dcgmExporter *dcgm.DCGMExporter, version string) *ShimServer { r := api.NewRouter() s := &ShimServer{ HttpServer: &http.Server{ @@ -41,6 +44,8 @@ func NewShimServer(ctx context.Context, address string, runner TaskRunner, versi runner: runner, + dcgmExporter: dcgmExporter, + version: version, } @@ -51,6 +56,7 @@ func NewShimServer(ctx context.Context, address string, runner TaskRunner, versi r.AddHandler("POST", "/api/tasks", s.TaskSubmitHandler) r.AddHandler("POST", "/api/tasks/{id}/terminate", s.TaskTerminateHandler) r.AddHandler("POST", "/api/tasks/{id}/remove", s.TaskRemoveHandler) + r.HandleFunc("GET /metrics/tasks/{id}", s.TaskMetricsHandler) return s } diff --git a/runner/internal/shim/dcgm/exporter.go b/runner/internal/shim/dcgm/exporter.go new file mode 100644 index 000000000..6d6e8484e --- /dev/null +++ b/runner/internal/shim/dcgm/exporter.go @@ -0,0 +1,212 @@ +package dcgm + +import ( + "context" + "encoding/csv" + "errors" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "strconv" + "strings" + "sync" + "syscall" + "time" + + "github.com/alexellis/go-execute/v2" + "github.com/dstackai/dstack/runner/internal/log" +) + +// Counter represents a single line in counters.csv, see +// https://github.com/NVIDIA/dcgm-exporter/tree/5f9250c211?tab=readme-ov-file#changing-metrics +// For list of supported types see +// https://github.com/NVIDIA/dcgm-exporter/blob/5f9250c211/internal/pkg/counters/variables.go#L23 +// NB: Although it is called "counter" in dcgm-exporter, in fact it can be any Prometheus +// metric type or even a label +type Counter struct { + Name string + Type string + Help string +} + +// Full list: https://docs.nvidia.com/datacenter/dcgm/latest/dcgm-api/dcgm-api-field-ids.html +var counters = [...]Counter{ + {"DCGM_FI_DEV_GPU_UTIL", "gauge", "GPU utilization (in %)."}, + {"DCGM_FI_DEV_MEM_COPY_UTIL", "gauge", "Memory utilization (in %)."}, + {"DCGM_FI_DEV_ENC_UTIL", "gauge", "Encoder utilization (in %)."}, + {"DCGM_FI_DEV_DEC_UTIL", "gauge", "Decoder utilization (in %)."}, + {"DCGM_FI_DEV_FB_FREE", "gauge", "Framebuffer memory free (in MiB)."}, + {"DCGM_FI_DEV_FB_USED", "gauge", "Framebuffer memory used (in MiB)."}, + {"DCGM_FI_PROF_GR_ENGINE_ACTIVE", "gauge", "The ratio of cycles during which a graphics engine or compute engine remains active."}, + {"DCGM_FI_PROF_SM_ACTIVE", "gauge", "The ratio of cycles an SM has at least 1 warp assigned."}, + {"DCGM_FI_PROF_SM_OCCUPANCY", "gauge", "The ratio of number of warps resident on an SM."}, + {"DCGM_FI_PROF_PIPE_TENSOR_ACTIVE", "gauge", "Ratio of cycles the tensor (HMMA) pipe is active."}, + {"DCGM_FI_PROF_PIPE_FP64_ACTIVE", "gauge", "Ratio of cycles the fp64 pipes are active."}, + {"DCGM_FI_PROF_PIPE_FP32_ACTIVE", "gauge", "Ratio of cycles the fp32 pipes are active."}, + {"DCGM_FI_PROF_PIPE_FP16_ACTIVE", "gauge", "Ratio of cycles the fp16 pipes are active."}, + {"DCGM_FI_PROF_PIPE_INT_ACTIVE", "gauge", "Ratio of cycles the integer pipe is active."}, + {"DCGM_FI_PROF_DRAM_ACTIVE", "gauge", "Ratio of cycles the device memory interface is active sending or receiving data."}, + {"DCGM_FI_PROF_PCIE_TX_BYTES", "counter", "The number of bytes of active PCIe tx (transmit) data including both header and payload."}, + {"DCGM_FI_PROF_PCIE_RX_BYTES", "counter", "The number of bytes of active PCIe rx (read) data including both header and payload."}, + {"DCGM_FI_DEV_SM_CLOCK", "gauge", "SM clock frequency (in MHz)."}, + {"DCGM_FI_DEV_MEM_CLOCK", "gauge", "Memory clock frequency (in MHz)."}, + {"DCGM_FI_DEV_MEMORY_TEMP", "gauge", "Memory temperature (in C)."}, + {"DCGM_FI_DEV_GPU_TEMP", "gauge", "GPU temperature (in C)."}, + {"DCGM_FI_DEV_POWER_USAGE", "gauge", "Power draw (in W)."}, + {"DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION", "counter", "Total energy consumption since boot (in mJ)."}, + {"DCGM_FI_DEV_PCIE_REPLAY_COUNTER", "counter", "Total number of PCIe retries."}, + {"DCGM_FI_DEV_XID_ERRORS", "gauge", "Value of the last XID error encountered."}, + {"DCGM_FI_DEV_POWER_VIOLATION", "counter", "Throttling duration due to power constraints (in us)."}, + {"DCGM_FI_DEV_THERMAL_VIOLATION", "counter", "Throttling duration due to thermal constraints (in us)."}, + {"DCGM_FI_DEV_SYNC_BOOST_VIOLATION", "counter", "Throttling duration due to sync-boost constraints (in us)."}, + {"DCGM_FI_DEV_BOARD_LIMIT_VIOLATION", "counter", "Throttling duration due to board limit constraints (in us)."}, + {"DCGM_FI_DEV_LOW_UTIL_VIOLATION", "counter", "Throttling duration due to low utilization (in us)."}, + {"DCGM_FI_DEV_RELIABILITY_VIOLATION", "counter", "Throttling duration due to reliability constraints (in us)."}, + {"DCGM_FI_DEV_ECC_SBE_VOL_TOTAL", "counter", "Total number of single-bit volatile ECC errors."}, + {"DCGM_FI_DEV_ECC_DBE_VOL_TOTAL", "counter", "Total number of double-bit volatile ECC errors."}, + {"DCGM_FI_DEV_ECC_SBE_AGG_TOTAL", "counter", "Total number of single-bit persistent ECC errors."}, + {"DCGM_FI_DEV_ECC_DBE_AGG_TOTAL", "counter", "Total number of double-bit persistent ECC errors."}, + {"DCGM_FI_DEV_RETIRED_SBE", "counter", "Total number of retired pages due to single-bit errors."}, + {"DCGM_FI_DEV_RETIRED_DBE", "counter", "Total number of retired pages due to double-bit errors."}, + {"DCGM_FI_DEV_RETIRED_PENDING", "counter", "Total number of pages pending retirement."}, + {"DCGM_FI_DEV_UNCORRECTABLE_REMAPPED_ROWS", "counter", "Number of remapped rows for uncorrectable errors"}, + {"DCGM_FI_DEV_CORRECTABLE_REMAPPED_ROWS", "counter", "Number of remapped rows for correctable errors"}, + {"DCGM_FI_DEV_ROW_REMAP_FAILURE", "gauge", "Whether remapping of rows has failed"}, + {"DCGM_FI_DEV_NVLINK_CRC_FLIT_ERROR_COUNT_TOTAL", "counter", "Total number of NVLink flow-control CRC errors."}, + {"DCGM_FI_DEV_NVLINK_CRC_DATA_ERROR_COUNT_TOTAL", "counter", "Total number of NVLink data CRC errors."}, + {"DCGM_FI_DEV_NVLINK_REPLAY_ERROR_COUNT_TOTAL", "counter", "Total number of NVLink retries."}, + {"DCGM_FI_DEV_NVLINK_RECOVERY_ERROR_COUNT_TOTAL", "counter", "Total number of NVLink recovery errors."}, + {"DCGM_FI_DEV_NVLINK_BANDWIDTH_TOTAL", "counter", "Total number of NVLink bandwidth counters for all lanes."}, + {"DCGM_FI_DEV_NVLINK_BANDWIDTH_L0", "counter", "The number of bytes of active NVLink rx or tx data including both header and payload."}, + {"DCGM_FI_PROF_NVLINK_RX_BYTES", "counter", "The number of bytes of active PCIe rx (read) data including both header and payload. "}, + {"DCGM_FI_PROF_NVLINK_TX_BYTES", "counter", "The number of bytes of active NvLink tx (transmit) data including both header and payload. "}, +} + +const dcgmExporterExecName = "dcgm-exporter" + +type DCGMExporter struct { + cmd *exec.Cmd + cancel context.CancelFunc + execPath string + listenAddr string + client *http.Client + url string + interval time.Duration + configPath string + mu sync.Mutex + lastFetchedAt time.Time + lastResponse []byte +} + +func (c *DCGMExporter) Start(ctx context.Context) error { + if c.cmd != nil { + return errors.New("already started") + } + + configFile, err := os.CreateTemp("", "counters-*.csv") + if err != nil { + return err + } + defer configFile.Close() + c.configPath = configFile.Name() + configWriter := csv.NewWriter(configFile) + for _, counter := range counters { + err := configWriter.Write([]string{counter.Name, counter.Type, counter.Help}) + if err != nil { + return err + } + } + configWriter.Flush() + + cmdCtx, cmdCancel := context.WithCancel(ctx) + c.cancel = cmdCancel + cmd := exec.CommandContext( + cmdCtx, c.execPath, + "-f", c.configPath, + "-a", c.listenAddr, + "-c", strconv.Itoa(int(c.interval.Milliseconds())), + ) + c.cmd = cmd + cmd.Cancel = func() error { + return cmd.Process.Signal(syscall.SIGTERM) + } + cmd.WaitDelay = 5 * time.Second + return cmd.Start() +} + +func (c *DCGMExporter) Stop(context.Context) error { + if c.cmd == nil { + return errors.New("not started") + } + c.cancel() + os.Remove(c.configPath) + return c.cmd.Wait() +} + +func (c *DCGMExporter) Fetch(ctx context.Context) ([]byte, error) { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + + if now.Sub(c.lastFetchedAt) < c.interval { + return c.lastResponse, nil + } + + req, err := http.NewRequestWithContext(ctx, "GET", c.url, nil) + if err != nil { + return nil, err + } + resp, err := c.client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("status is not OK: %d", resp.StatusCode) + } + response, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + c.lastFetchedAt = now + c.lastResponse = response + return response, nil +} + +func NewDCGMExporter(execPath string, port int, interval time.Duration) *DCGMExporter { + listenAddr := fmt.Sprintf("localhost:%d", port) + client := &http.Client{ + Timeout: 10 * time.Second, + } + return &DCGMExporter{ + execPath: execPath, + listenAddr: listenAddr, + client: client, + url: fmt.Sprintf("http://%s/metrics", listenAddr), + interval: interval, + } +} + +func GetDCGMExporterExecPath(ctx context.Context) (string, error) { + path, err := exec.LookPath(dcgmExporterExecName) + if err != nil { + return "", err + } + cmd := execute.ExecTask{ + Command: path, + Args: []string{"-v"}, + StreamStdio: false, + } + res, err := cmd.Execute(ctx) + if err != nil { + return "", err + } + if res.ExitCode != 0 { + return "", fmt.Errorf("%s returned %d, stderr: %s, stdout: %s", path, res.ExitCode, res.Stderr, res.Stdout) + } + log.Debug(ctx, "detected", "path", path, "version", strings.TrimSpace(res.Stdout)) + return path, nil +} diff --git a/runner/internal/shim/dcgm/metrics.go b/runner/internal/shim/dcgm/metrics.go new file mode 100644 index 000000000..6b0c8983d --- /dev/null +++ b/runner/internal/shim/dcgm/metrics.go @@ -0,0 +1,57 @@ +package dcgm + +import ( + "bufio" + "bytes" + "strings" +) + +// FilterMetrics returns subset of metrics filtered by GPU UUIDs +func FilterMetrics(expfmtBody []byte, uuids []string) []byte { + // DCGM Exporter returns metrics in the following format: + // # HELP DCGM_FIELD_1 Docstring for field 1 + // # TYPE DCGM_FIELD_1 gauge|counter|... + // DCGM_FIELD{gpu="0", UUID="..." [...other labels...]} 0.0 + // DCGM_FIELD{gpu="1", UUID="..." [...other labels...]} 0.5 + // ... + // HELP DCGM_FIELD_2 Docstring for field 2 + // ... + var buffer bytes.Buffer + scanner := bufio.NewScanner(bytes.NewReader(expfmtBody)) + helpComment := "" + typeComment := "" + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if len(line) == 0 { + continue + } + if strings.HasPrefix(line, "# HELP") { + helpComment = line + continue + } + if strings.HasPrefix(line, "# TYPE") { + typeComment = line + continue + } + if strings.HasPrefix(line, "#") { + continue + } + for _, uuid := range uuids { + if strings.Contains(line, uuid) { + if helpComment != "" { + buffer.WriteString(helpComment) + buffer.WriteRune('\n') + helpComment = "" + } + if typeComment != "" { + buffer.WriteString(typeComment) + buffer.WriteRune('\n') + typeComment = "" + } + buffer.WriteString(line) + buffer.WriteRune('\n') + } + } + } + return buffer.Bytes() +} diff --git a/runner/internal/shim/dcgm/metrics_test.go b/runner/internal/shim/dcgm/metrics_test.go new file mode 100644 index 000000000..133f534a5 --- /dev/null +++ b/runner/internal/shim/dcgm/metrics_test.go @@ -0,0 +1,43 @@ +package dcgm + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFilterMetrics(t *testing.T) { + body := []byte(` +# Comment +# HELP DCGM_FI_DEV_SM_CLOCK SM clock frequency (in MHz). +# TYPE DCGM_FI_DEV_SM_CLOCK gauge +DCGM_FI_DEV_SM_CLOCK{gpu="0",UUID="GPU-0781f3bb-da15-f334-d5db-37b3f19542d0",pci_bus_id="00000000:00:1B.0",device="nvidia0",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 1365 +DCGM_FI_DEV_SM_CLOCK{gpu="1",UUID="GPU-41cc2907-3249-5a6b-f0e4-d04063b183a9",pci_bus_id="00000000:00:1C.0",device="nvidia1",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 300 +DCGM_FI_DEV_SM_CLOCK{gpu="2",UUID="GPU-cc8e8c03-ebaa-f217-8e4c-d9cd98e20aed",pci_bus_id="00000000:00:1D.0",device="nvidia2",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 300 +DCGM_FI_DEV_SM_CLOCK{gpu="3",UUID="GPU-fb615fb7-3f5a-5600-0ab1-debad8dc80ee",pci_bus_id="00000000:00:1E.0",device="nvidia3",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 300 + +# HELP DCGM_FI_DEV_MEM_CLOCK Memory clock frequency (in MHz). +DCGM_FI_DEV_MEM_CLOCK{gpu="0",UUID="GPU-0781f3bb-da15-f334-d5db-37b3f19542d0",pci_bus_id="00000000:00:1B.0",device="nvidia0",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 5000 +DCGM_FI_DEV_MEM_CLOCK{gpu="1",UUID="GPU-41cc2907-3249-5a6b-f0e4-d04063b183a9",pci_bus_id="00000000:00:1C.0",device="nvidia1",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 405 +# comment +DCGM_FI_DEV_MEM_CLOCK{gpu="2",UUID="GPU-cc8e8c03-ebaa-f217-8e4c-d9cd98e20aed",pci_bus_id="00000000:00:1D.0",device="nvidia2",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 405 +DCGM_FI_DEV_MEM_CLOCK{gpu="3",UUID="GPU-fb615fb7-3f5a-5600-0ab1-debad8dc80ee",pci_bus_id="00000000:00:1E.0",device="nvidia3",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 405 +DCGM_FI_DEV_MEMORY_TEMP{gpu="0",UUID="GPU-0781f3bb-da15-f334-d5db-37b3f19542d0",pci_bus_id="00000000:00:1B.0",device="nvidia0",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 0 +DCGM_FI_DEV_MEMORY_TEMP{gpu="1",UUID="GPU-41cc2907-3249-5a6b-f0e4-d04063b183a9",pci_bus_id="00000000:00:1C.0",device="nvidia1",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 0 + +DCGM_FI_DEV_MEMORY_TEMP{gpu="2",UUID="GPU-cc8e8c03-ebaa-f217-8e4c-d9cd98e20aed",pci_bus_id="00000000:00:1D.0",device="nvidia2",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 0 +DCGM_FI_DEV_MEMORY_TEMP{gpu="3",UUID="GPU-fb615fb7-3f5a-5600-0ab1-debad8dc80ee",pci_bus_id="00000000:00:1E.0",device="nvidia3",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 0 + `) + filtered := FilterMetrics(body, []string{"GPU-41cc2907-3249-5a6b-f0e4-d04063b183a9", "GPU-fb615fb7-3f5a-5600-0ab1-debad8dc80ee"}) + expected := []byte(`# HELP DCGM_FI_DEV_SM_CLOCK SM clock frequency (in MHz). +# TYPE DCGM_FI_DEV_SM_CLOCK gauge +DCGM_FI_DEV_SM_CLOCK{gpu="1",UUID="GPU-41cc2907-3249-5a6b-f0e4-d04063b183a9",pci_bus_id="00000000:00:1C.0",device="nvidia1",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 300 +DCGM_FI_DEV_SM_CLOCK{gpu="3",UUID="GPU-fb615fb7-3f5a-5600-0ab1-debad8dc80ee",pci_bus_id="00000000:00:1E.0",device="nvidia3",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 300 +# HELP DCGM_FI_DEV_MEM_CLOCK Memory clock frequency (in MHz). +DCGM_FI_DEV_MEM_CLOCK{gpu="1",UUID="GPU-41cc2907-3249-5a6b-f0e4-d04063b183a9",pci_bus_id="00000000:00:1C.0",device="nvidia1",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 405 +DCGM_FI_DEV_MEM_CLOCK{gpu="3",UUID="GPU-fb615fb7-3f5a-5600-0ab1-debad8dc80ee",pci_bus_id="00000000:00:1E.0",device="nvidia3",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 405 +DCGM_FI_DEV_MEMORY_TEMP{gpu="1",UUID="GPU-41cc2907-3249-5a6b-f0e4-d04063b183a9",pci_bus_id="00000000:00:1C.0",device="nvidia1",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 0 +DCGM_FI_DEV_MEMORY_TEMP{gpu="3",UUID="GPU-fb615fb7-3f5a-5600-0ab1-debad8dc80ee",pci_bus_id="00000000:00:1E.0",device="nvidia3",modelName="Tesla T4",Hostname="ip-172-31-16-106",DCGM_FI_DRIVER_VERSION="535.183.06"} 0 +`) + assert.Equal(t, expected, filtered) +} diff --git a/runner/internal/shim/models.go b/runner/internal/shim/models.go index c71bf019e..9ad1f67b8 100644 --- a/runner/internal/shim/models.go +++ b/runner/internal/shim/models.go @@ -28,6 +28,11 @@ type CLIArgs struct { LogLevel int } + DCGMExporter struct { + HTTPPort int + Interval int // milliseconds + } + Docker struct { ConcatinatedPublicSSHKeys string Privileged bool diff --git a/setup.py b/setup.py index cebc1bec1..640518001 100644 --- a/setup.py +++ b/setup.py @@ -76,6 +76,7 @@ def get_long_description(): "asyncpg", "cachetools", "python-json-logger>=3.1.0", + "prometheus-client", "grpcio>=1.50", # indirect ] diff --git a/src/dstack/_internal/server/app.py b/src/dstack/_internal/server/app.py index 5b451823a..89a5f8bb7 100644 --- a/src/dstack/_internal/server/app.py +++ b/src/dstack/_internal/server/app.py @@ -29,6 +29,7 @@ metrics, pools, projects, + prometheus, repos, runs, secrets, @@ -185,6 +186,7 @@ def register_routes(app: FastAPI, ui: bool = True): app.include_router(model_proxy.router, prefix="/proxy/models", tags=["model-proxy"]) app.include_router(pools.root_router) app.include_router(pools.router) + app.include_router(prometheus.router) @app.exception_handler(ForbiddenError) async def forbidden_error_handler(request: Request, exc: ForbiddenError): @@ -252,7 +254,11 @@ async def healthcheck(): @app.exception_handler(404) async def custom_http_exception_handler(request, exc): - if request.url.path.startswith("/api") or _is_proxy_request(request): + if ( + request.url.path.startswith("/api") + or _is_proxy_request(request) + or _is_prometheus_request(request) + ): return JSONResponse( {"detail": exc.detail}, status_code=status.HTTP_404_NOT_FOUND, @@ -283,6 +289,10 @@ def _is_proxy_request(request: Request) -> bool: ) and referrer.path.startswith("/proxy") +def _is_prometheus_request(request: Request) -> bool: + return request.url.path.startswith("/metrics") + + def _print_dstack_logo(): console.print( """[purple]╱╱╭╮╱╱╭╮╱╱╱╱╱╱╭╮ diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index bdb865d8a..4ac4fb25b 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -1,6 +1,7 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger +from dstack._internal.server import settings from dstack._internal.server.background.tasks.process_fleets import process_fleets from dstack._internal.server.background.tasks.process_gateways import ( process_gateways_connections, @@ -16,6 +17,10 @@ from dstack._internal.server.background.tasks.process_placement_groups import ( process_placement_groups, ) +from dstack._internal.server.background.tasks.process_prometheus_metrics import ( + collect_prometheus_metrics, + delete_prometheus_metrics, +) from dstack._internal.server.background.tasks.process_running_jobs import process_running_jobs from dstack._internal.server.background.tasks.process_runs import process_runs from dstack._internal.server.background.tasks.process_submitted_jobs import process_submitted_jobs @@ -43,6 +48,11 @@ def start_background_tasks() -> AsyncIOScheduler: # * 150 active instances with up to 2 minutes processing latency _scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1) _scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1) + if settings.ENABLE_PROMETHEUS_METRICS: + _scheduler.add_job( + collect_prometheus_metrics, IntervalTrigger(seconds=10), max_instances=1 + ) + _scheduler.add_job(delete_prometheus_metrics, IntervalTrigger(minutes=5), max_instances=1) # process_submitted_jobs and process_instances max processing rate is 75 jobs(instances) per minute. _scheduler.add_job( process_submitted_jobs, diff --git a/src/dstack/_internal/server/background/tasks/process_prometheus_metrics.py b/src/dstack/_internal/server/background/tasks/process_prometheus_metrics.py new file mode 100644 index 000000000..9675a6a44 --- /dev/null +++ b/src/dstack/_internal/server/background/tasks/process_prometheus_metrics.py @@ -0,0 +1,135 @@ +import uuid +from datetime import datetime, timedelta +from typing import Optional + +import sqlalchemy.exc +from sqlalchemy import delete, or_, select, update +from sqlalchemy.orm import joinedload + +from dstack._internal.core.consts import DSTACK_SHIM_HTTP_PORT +from dstack._internal.core.models.runs import JobStatus +from dstack._internal.server.db import get_session_ctx +from dstack._internal.server.models import InstanceModel, JobModel, JobPrometheusMetrics +from dstack._internal.server.services.jobs import get_job_provisioning_data, get_job_runtime_data +from dstack._internal.server.services.pools import get_instance_ssh_private_keys +from dstack._internal.server.services.runner import client +from dstack._internal.server.services.runner.ssh import runner_ssh_tunnel +from dstack._internal.server.utils.common import gather_map_async +from dstack._internal.utils.common import batched, get_current_datetime, get_or_error, run_async +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) + + +MAX_JOBS_FETCHED = 100 +BATCH_SIZE = 10 +MIN_COLLECT_INTERVAL_SECONDS = 9 +# 10 minutes should be more than enough to scrape metrics, and, in any case, +# 10 minutes old metrics has little to no value +METRICS_TTL_SECONDS = 600 + + +async def collect_prometheus_metrics(): + now = get_current_datetime() + cutoff = now - timedelta(seconds=MIN_COLLECT_INTERVAL_SECONDS) + async with get_session_ctx() as session: + res = await session.execute( + select(JobModel) + .join(JobPrometheusMetrics, isouter=True) + .where( + JobModel.status.in_([JobStatus.RUNNING]), + or_( + JobPrometheusMetrics.job_id.is_(None), + JobPrometheusMetrics.collected_at < cutoff, + ), + ) + .options(joinedload(JobModel.instance).joinedload(InstanceModel.project)) + .order_by(JobModel.last_processed_at.asc()) + .limit(MAX_JOBS_FETCHED) + ) + job_models = res.unique().scalars().all() + for batch in batched(job_models, BATCH_SIZE): + await _collect_jobs_metrics(batch, now) + + +async def delete_prometheus_metrics(): + now = get_current_datetime() + cutoff = now - timedelta(seconds=METRICS_TTL_SECONDS) + async with get_session_ctx() as session: + await session.execute( + delete(JobPrometheusMetrics).where(JobPrometheusMetrics.collected_at < cutoff) + ) + await session.commit() + + +async def _collect_jobs_metrics(job_models: list[JobModel], collected_at: datetime): + results = await gather_map_async(job_models, _collect_job_metrics, return_exceptions=True) + async with get_session_ctx() as session: + for job_model, result in results: + if result is None: + continue + if isinstance(result, BaseException): + logger.error( + "Failed to collect job %s Prometheus metrics: %r", job_model.job_name, result + ) + continue + res = await session.execute( + update(JobPrometheusMetrics) + .where(JobPrometheusMetrics.job_id == job_model.id) + .values( + collected_at=collected_at, + text=result, + ) + .returning(JobPrometheusMetrics) + ) + metrics = res.scalar() + if metrics is None: + metrics = JobPrometheusMetrics( + job_id=job_model.id, + collected_at=collected_at, + text=result, + ) + try: + async with session.begin_nested(): + session.add(metrics) + except sqlalchemy.exc.IntegrityError: + # Concurrent server replica already committed, ignoring + pass + await session.commit() + + +async def _collect_job_metrics(job_model: JobModel) -> Optional[str]: + ssh_private_keys = get_instance_ssh_private_keys(get_or_error(job_model.instance)) + jpd = get_job_provisioning_data(job_model) + jrd = get_job_runtime_data(job_model) + if jpd is None: + return None + try: + res = await run_async( + _pull_job_metrics, + ssh_private_keys, + jpd, + jrd, + job_model.id, + ) + except Exception: + logger.exception("Failed to collect job %s Prometheus metrics", job_model.job_name) + return None + + if isinstance(res, bool): + logger.warning( + "Failed to connect to job %s to collect Prometheus metrics", job_model.job_name + ) + return None + + if res is None: + # Either not supported by shim or exporter is not available + return None + + return res + + +@runner_ssh_tunnel(ports=[DSTACK_SHIM_HTTP_PORT], retries=1) +def _pull_job_metrics(ports: dict[int, int], task_id: uuid.UUID) -> Optional[str]: + shim_client = client.ShimClient(port=ports[DSTACK_SHIM_HTTP_PORT]) + return shim_client.get_task_metrics(task_id) diff --git a/src/dstack/_internal/server/migrations/versions/60e444118b6d_add_jobprometheusmetrics.py b/src/dstack/_internal/server/migrations/versions/60e444118b6d_add_jobprometheusmetrics.py new file mode 100644 index 000000000..232098099 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/60e444118b6d_add_jobprometheusmetrics.py @@ -0,0 +1,40 @@ +"""Add JobPrometheusMetrics + +Revision ID: 60e444118b6d +Revises: a751ef183f27 +Create Date: 2025-02-21 10:59:26.339353 + +""" + +import sqlalchemy as sa +import sqlalchemy_utils +from alembic import op + +import dstack._internal.server.models + +# revision identifiers, used by Alembic. +revision = "60e444118b6d" +down_revision = "a751ef183f27" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "job_prometheus_metrics", + sa.Column("job_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False), + sa.Column("collected_at", dstack._internal.server.models.NaiveDateTime(), nullable=False), + sa.Column("text", sa.Text(), nullable=False), + sa.ForeignKeyConstraint( + ["job_id"], ["jobs.id"], name=op.f("fk_job_prometheus_metrics_job_id_jobs") + ), + sa.PrimaryKeyConstraint("job_id", name=op.f("pk_job_prometheus_metrics")), + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table("job_prometheus_metrics") + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index 1e1750e31..ace2118e7 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -648,3 +648,14 @@ class JobMetricsPoint(BaseModel): # json-encoded lists of metric values of len(gpus) length gpus_memory_usage_bytes: Mapped[str] = mapped_column(Text) gpus_util_percent: Mapped[str] = mapped_column(Text) + + +class JobPrometheusMetrics(BaseModel): + __tablename__ = "job_prometheus_metrics" + + job_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("jobs.id"), primary_key=True) + job: Mapped["JobModel"] = relationship() + + collected_at: Mapped[datetime] = mapped_column(NaiveDateTime) + # Raw Prometheus text response + text: Mapped[str] = mapped_column(Text) diff --git a/src/dstack/_internal/server/routers/prometheus.py b/src/dstack/_internal/server/routers/prometheus.py new file mode 100644 index 000000000..b84356ff3 --- /dev/null +++ b/src/dstack/_internal/server/routers/prometheus.py @@ -0,0 +1,36 @@ +from typing import Annotated + +from fastapi import APIRouter, Depends +from fastapi.responses import PlainTextResponse +from sqlalchemy.ext.asyncio import AsyncSession + +from dstack._internal.server import settings +from dstack._internal.server.db import get_session +from dstack._internal.server.deps import Project +from dstack._internal.server.models import ProjectModel +from dstack._internal.server.services import prometheus +from dstack._internal.server.utils.routers import error_not_found + +router = APIRouter( + tags=["prometheus"], + default_response_class=PlainTextResponse, +) + + +@router.get("/metrics") +async def get_prometheus_metrics( + session: Annotated[AsyncSession, Depends(get_session)], +) -> str: + if not settings.ENABLE_PROMETHEUS_METRICS: + raise error_not_found() + return await prometheus.get_metrics(session=session) + + +@router.get("/metrics/project/{project_name}") +async def get_project_prometheus_metrics( + session: Annotated[AsyncSession, Depends(get_session)], + project: Annotated[ProjectModel, Depends(Project())], +) -> str: + if not settings.ENABLE_PROMETHEUS_METRICS: + raise error_not_found() + return await prometheus.get_project_metrics(session=session, project=project) diff --git a/src/dstack/_internal/server/services/prometheus.py b/src/dstack/_internal/server/services/prometheus.py new file mode 100644 index 000000000..8bc0f999c --- /dev/null +++ b/src/dstack/_internal/server/services/prometheus.py @@ -0,0 +1,87 @@ +from collections.abc import Generator, Iterable + +from prometheus_client import Metric +from prometheus_client.parser import text_string_to_metric_families +from prometheus_client.samples import Sample +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import joinedload + +from dstack._internal.core.models.runs import JobStatus +from dstack._internal.server.models import JobModel, JobPrometheusMetrics, ProjectModel + + +async def get_metrics(session: AsyncSession) -> str: + res = await session.execute( + select(JobPrometheusMetrics) + .join(JobModel) + .join(ProjectModel) + .where(JobModel.status.in_([JobStatus.RUNNING])) + .order_by(ProjectModel.name, JobModel.job_name) + .options(joinedload(JobPrometheusMetrics.job).joinedload(JobModel.project)) + ) + metrics_models = res.scalars().all() + return _process_metrics(metrics_models) + + +async def get_project_metrics(session: AsyncSession, project: ProjectModel) -> str: + res = await session.execute( + select(JobPrometheusMetrics) + .join(JobModel) + .where( + JobModel.project_id == project.id, + JobModel.status.in_([JobStatus.RUNNING]), + ) + .order_by(JobModel.job_name) + .options(joinedload(JobPrometheusMetrics.job).joinedload(JobModel.project)) + ) + metrics_models = res.scalars().all() + return _process_metrics(metrics_models) + + +def _process_metrics(metrics_models: Iterable[JobPrometheusMetrics]) -> str: + metrics = _parse_and_enrich_metrics(metrics_models) + if not metrics: + return "" + return "\n".join(_render_metrics(metrics)) + "\n" + + +def _parse_and_enrich_metrics(metrics_models: Iterable[JobPrometheusMetrics]) -> list[Metric]: + metrics: dict[str, Metric] = {} + for metrics_model in metrics_models: + for metric in text_string_to_metric_families(metrics_model.text): + samples = metric.samples + metric.samples = [] + name = metric.name + metric = metrics.setdefault(name, metric) + for sample in samples: + labels = sample.labels + labels.update(_get_dstack_labels(metrics_model.job)) + # text_string_to_metric_families "fixes" counter names appending _total, + # we rebuild Sample to revert this + metric.samples.append(Sample(name, labels, *sample[2:])) + return list(metrics.values()) + + +def _get_dstack_labels(job: JobModel) -> dict[str, str]: + return { + "dstack_project_name": job.project.name, + "dstack_run_name": job.run_name, + "dstack_job_name": job.job_name, + "dstack_job_num": str(job.job_num), + "dstack_replica_num": str(job.replica_num), + } + + +def _render_metrics(metrics: Iterable[Metric]) -> Generator[str, None, None]: + for metric in metrics: + yield f"# HELP {metric.name} {metric.documentation}" + yield f"# TYPE {metric.name} {metric.type}" + for sample in metric.samples: + parts: list[str] = [f"{sample.name}{{"] + parts.extend(",".join(f'{name}="{value}"' for name, value in sample.labels.items())) + parts.append(f"}} {sample.value}") + # text_string_to_metric_families converts milliseconds to float seconds + if isinstance(sample.timestamp, float): + parts.append(f" {int(sample.timestamp * 1000)}") + yield "".join(parts) diff --git a/src/dstack/_internal/server/services/runner/client.py b/src/dstack/_internal/server/services/runner/client.py index 320b7222c..bc53dd55d 100644 --- a/src/dstack/_internal/server/services/runner/client.py +++ b/src/dstack/_internal/server/services/runner/client.py @@ -178,9 +178,6 @@ class ShimClient: # API v1 (a.k.a. Legacy API) — `/api/{submit,pull,stop}` _API_V2_MIN_SHIM_VERSION = (0, 18, 34) - # A surrogate task ID for API-v1-over-v2 emulation (`_v2_compat_*` methods) - _LEGACY_TASK_ID = "00000000-0000-0000-0000-000000000000" - _shim_version: Optional["_Version"] _api_version: int _negotiated: bool = False @@ -339,6 +336,20 @@ def pull(self) -> LegacyPullResponse: resp = self._request("GET", "/api/pull", raise_for_status=True) return self._response(LegacyPullResponse, resp) + # Metrics + + def get_task_metrics(self, task_id: "_TaskID") -> Optional[str]: + resp = self._request("GET", f"/metrics/tasks/{task_id}") + if resp.status_code == HTTPStatus.NOT_FOUND: + # Metrics exporter is not installed or old shim version + return None + if resp.status_code == HTTPStatus.BAD_GATEWAY: + # Metrics exporter is not available or returned an error + logger.info("failed to collect metrics for task %s: %s", task_id, resp.text) + return None + self._raise_for_status(resp) + return resp.text + # Private methods used for public methods implementations def _request( diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index b8998aed2..786769209 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -74,3 +74,4 @@ UPDATE_DEFAULT_PROJECT = os.getenv("DSTACK_UPDATE_DEFAULT_PROJECT") is not None DO_NOT_UPDATE_DEFAULT_PROJECT = os.getenv("DSTACK_DO_NOT_UPDATE_DEFAULT_PROJECT") is not None SKIP_GATEWAY_UPDATE = os.getenv("DSTACK_SKIP_GATEWAY_UPDATE", None) is not None +ENABLE_PROMETHEUS_METRICS = os.getenv("DSTACK_ENABLE_PROMETHEUS_METRICS", None) is not None diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index a2bc5db9a..696f3bf22 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -69,6 +69,7 @@ InstanceModel, JobMetricsPoint, JobModel, + JobPrometheusMetrics, PlacementGroupModel, PoolModel, ProjectModel, @@ -858,6 +859,22 @@ async def create_job_metrics_point( return jmp +async def create_job_prometheus_metrics( + session: AsyncSession, + job: JobModel, + collected_at: datetime = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc), + text: str = "# Prometheus metrics\n", +): + metrics = JobPrometheusMetrics( + job_id=job.id, + collected_at=collected_at, + text=text, + ) + session.add(metrics) + await session.commit() + return metrics + + def get_private_key_string() -> str: return """ -----BEGIN RSA PRIVATE KEY----- diff --git a/src/tests/_internal/server/background/tasks/test_process_prometheus_metrics.py b/src/tests/_internal/server/background/tasks/test_process_prometheus_metrics.py new file mode 100644 index 000000000..cfeb323b0 --- /dev/null +++ b/src/tests/_internal/server/background/tasks/test_process_prometheus_metrics.py @@ -0,0 +1,189 @@ +from collections.abc import Generator +from datetime import datetime, timezone +from unittest.mock import Mock, patch + +import pytest +import pytest_asyncio +from freezegun import freeze_time +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from dstack._internal.core.models.instances import InstanceStatus +from dstack._internal.core.models.runs import JobStatus +from dstack._internal.core.models.users import GlobalRole, ProjectRole +from dstack._internal.server.background.tasks.process_prometheus_metrics import ( + collect_prometheus_metrics, + delete_prometheus_metrics, +) +from dstack._internal.server.models import JobModel, JobPrometheusMetrics +from dstack._internal.server.services.projects import add_project_member +from dstack._internal.server.testing.common import ( + create_instance, + create_job, + create_job_prometheus_metrics, + create_pool, + create_project, + create_repo, + create_run, + create_user, + get_job_provisioning_data, +) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) +@pytest.mark.usefixtures("test_db", "image_config_mock") +class TestCollectPrometheusMetrics: + @pytest_asyncio.fixture + async def job(self, session: AsyncSession) -> JobModel: + user = await create_user(session=session, global_role=GlobalRole.USER) + project = await create_project(session=session, owner=user) + await add_project_member( + session=session, project=project, user=user, project_role=ProjectRole.USER + ) + repo = await create_repo( + session=session, + project_id=project.id, + ) + pool = await create_pool(session=session, project=project) + instance = await create_instance( + session=session, + project=project, + pool=pool, + status=InstanceStatus.BUSY, + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + ) + job = await create_job( + session=session, + run=run, + status=JobStatus.RUNNING, + job_provisioning_data=get_job_provisioning_data(), + instance_assigned=True, + instance=instance, + ) + return job + + @pytest.fixture + def ssh_tunnel_mock(self) -> Generator[Mock, None, None]: + with patch("dstack._internal.server.services.runner.ssh.SSHTunnel") as SSHTunnelMock: + yield SSHTunnelMock + + @pytest.fixture + def shim_client_mock(self) -> Generator[Mock, None, None]: + with patch("dstack._internal.server.services.runner.client.ShimClient") as ShimClientMock: + yield ShimClientMock.return_value + + @freeze_time(datetime(2023, 1, 2, 3, 5, 20, tzinfo=timezone.utc)) + async def test_inserts_new_record( + self, session: AsyncSession, job: JobModel, ssh_tunnel_mock: Mock, shim_client_mock: Mock + ): + shim_client_mock.get_task_metrics.return_value = "# prom response" + + await collect_prometheus_metrics() + + ssh_tunnel_mock.assert_called_once() + shim_client_mock.get_task_metrics.assert_called_once() + res = await session.execute( + select(JobPrometheusMetrics).where(JobPrometheusMetrics.job_id == job.id) + ) + metrics = res.scalar_one() + assert metrics.text == "# prom response" + assert metrics.collected_at == datetime(2023, 1, 2, 3, 5, 20) + + @freeze_time(datetime(2023, 1, 2, 3, 5, 20, tzinfo=timezone.utc)) + async def test_updates_record( + self, session: AsyncSession, job: JobModel, ssh_tunnel_mock: Mock, shim_client_mock: Mock + ): + metrics = await create_job_prometheus_metrics( + session=session, + job=job, + collected_at=datetime(2023, 1, 2, 3, 5, 0), + text="# prom old response", + ) + shim_client_mock.get_task_metrics.return_value = "# prom new response" + + await collect_prometheus_metrics() + + ssh_tunnel_mock.assert_called_once() + shim_client_mock.get_task_metrics.assert_called_once() + res = await session.execute( + select(JobPrometheusMetrics) + .where(JobPrometheusMetrics.job_id == job.id) + .execution_options(populate_existing=True) + ) + metrics = res.scalar_one() + assert metrics.text == "# prom new response" + assert metrics.collected_at == datetime(2023, 1, 2, 3, 5, 20) + + @freeze_time(datetime(2023, 1, 2, 3, 5, 20, tzinfo=timezone.utc)) + async def test_skips_recently_updated( + self, session: AsyncSession, job: JobModel, ssh_tunnel_mock: Mock, shim_client_mock: Mock + ): + metrics = await create_job_prometheus_metrics( + session=session, + job=job, + collected_at=datetime(2023, 1, 2, 3, 5, 15), + text="# prom old response", + ) + shim_client_mock.get_task_metrics.return_value = "# prom new response" + + await collect_prometheus_metrics() + + ssh_tunnel_mock.assert_not_called() + shim_client_mock.get_task_metrics.assert_not_called() + res = await session.execute( + select(JobPrometheusMetrics) + .where(JobPrometheusMetrics.job_id == job.id) + .execution_options(populate_existing=True) + ) + metrics = res.scalar_one() + assert metrics.text == "# prom old response" + assert metrics.collected_at == datetime(2023, 1, 2, 3, 5, 15) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) +@pytest.mark.usefixtures("test_db", "image_config_mock") +class TestDeletePrometheusMetrics: + @freeze_time(datetime(2023, 1, 2, 3, 5, 20, tzinfo=timezone.utc)) + async def test_deletes_old_metrics(self, session: AsyncSession): + user = await create_user(session=session, global_role=GlobalRole.USER) + project = await create_project(session=session, owner=user) + await add_project_member( + session=session, project=project, user=user, project_role=ProjectRole.USER + ) + repo = await create_repo(session=session, project_id=project.id) + run_1 = await create_run( + session=session, project=project, repo=repo, user=user, run_name="run-1" + ) + job_1 = await create_job(session=session, run=run_1) + # old metrics + await create_job_prometheus_metrics( + session=session, + job=job_1, + collected_at=datetime(2023, 1, 2, 2, 3, 30), + ) + run_2 = await create_run( + session=session, project=project, repo=repo, user=user, run_name="run-2" + ) + job_2 = await create_job(session=session, run=run_2) + # recent metrics + metrics_2 = await create_job_prometheus_metrics( + session=session, + job=job_2, + collected_at=datetime(2023, 1, 2, 3, 5, 0), + ) + + await delete_prometheus_metrics() + + res = await session.execute( + select(JobPrometheusMetrics).join(JobModel).where(JobModel.project_id == project.id) + ) + all_metrics = res.scalars().all() + assert len(all_metrics) == 1 + assert all_metrics[0] == metrics_2 diff --git a/src/tests/_internal/server/routers/test_prometheus.py b/src/tests/_internal/server/routers/test_prometheus.py new file mode 100644 index 000000000..dd329ecf0 --- /dev/null +++ b/src/tests/_internal/server/routers/test_prometheus.py @@ -0,0 +1,244 @@ +from textwrap import dedent + +import pytest +from httpx import AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession + +from dstack._internal.core.models.runs import JobStatus +from dstack._internal.core.models.users import GlobalRole, ProjectRole +from dstack._internal.server.models import JobModel, ProjectModel, UserModel +from dstack._internal.server.services.projects import add_project_member +from dstack._internal.server.testing.common import ( + create_job, + create_job_prometheus_metrics, + create_project, + create_repo, + create_run, + create_user, +) + + +@pytest.fixture +def enable_metrics(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr("dstack._internal.server.settings.ENABLE_PROMETHEUS_METRICS", True) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) +@pytest.mark.usefixtures("image_config_mock", "test_db", "enable_metrics") +class TestGetPrometheusMetrics: + async def test_returns_metrics(self, session: AsyncSession, client: AsyncClient): + user = await create_user(session=session, global_role=GlobalRole.USER) + project_2 = await _create_project(session, "project-2", user) + job_2_1 = await _create_job(session, "run-1", project_2, user, JobStatus.RUNNING) + await create_job_prometheus_metrics( + session=session, + job=job_2_1, + text=dedent(""" + # HELP FIELD_1 Test field 1 + # TYPE FIELD_1 gauge + FIELD_1{gpu="0"} 100 + FIELD_1{gpu="1"} 200 + """), + ) + project_1 = await _create_project(session, "project-1", user) + job_1_1 = await _create_job(session, "run-1", project_1, user, JobStatus.RUNNING) + await create_job_prometheus_metrics( + session=session, + job=job_1_1, + text=dedent(""" + # Comments should be skipped + + # HELP FIELD_1 Test field 1 + # TYPE FIELD_1 gauge + FIELD_1{gpu="0"} 350 + FIELD_1{gpu="1"} 400 + + # HELP FIELD_2 Test field 2 + # TYPE FIELD_2 counter + FIELD_2{gpu="0"} 337325 1395066363000 + FIELD_2{gpu="1"} 987169 1395066363010 + """), + ) + job_1_2 = await _create_job(session, "run-2", project_1, user, JobStatus.RUNNING) + await create_job_prometheus_metrics( + session=session, + job=job_1_2, + text=dedent(""" + # HELP FIELD_1 Test field 1 + # TYPE FIELD_1 gauge + FIELD_1{gpu="0"} 1200.0 + FIELD_1{gpu="1"} 1600.0 + FIELD_1{gpu="2"} 2400.0 + """), + ) + # Terminated job, should not appear in the response + job_1_3 = await _create_job(session, "run-3", project_1, user, JobStatus.TERMINATED) + await create_job_prometheus_metrics( + session=session, + job=job_1_3, + text=dedent(""" + # HELP FIELD_1 Test field 1 + # TYPE FIELD_1 gauge + FIELD_1{gpu="0"} 10 + FIELD_1{gpu="1"} 20 + """), + ) + + response = await client.get("/metrics") + + assert response.status_code == 200 + assert response.text == dedent("""\ + # HELP FIELD_1 Test field 1 + # TYPE FIELD_1 gauge + FIELD_1{gpu="0",dstack_project_name="project-1",dstack_run_name="run-1",dstack_job_name="run-1-0-0",dstack_job_num="0",dstack_replica_num="0"} 350.0 + FIELD_1{gpu="1",dstack_project_name="project-1",dstack_run_name="run-1",dstack_job_name="run-1-0-0",dstack_job_num="0",dstack_replica_num="0"} 400.0 + FIELD_1{gpu="0",dstack_project_name="project-1",dstack_run_name="run-2",dstack_job_name="run-2-0-0",dstack_job_num="0",dstack_replica_num="0"} 1200.0 + FIELD_1{gpu="1",dstack_project_name="project-1",dstack_run_name="run-2",dstack_job_name="run-2-0-0",dstack_job_num="0",dstack_replica_num="0"} 1600.0 + FIELD_1{gpu="2",dstack_project_name="project-1",dstack_run_name="run-2",dstack_job_name="run-2-0-0",dstack_job_num="0",dstack_replica_num="0"} 2400.0 + FIELD_1{gpu="0",dstack_project_name="project-2",dstack_run_name="run-1",dstack_job_name="run-1-0-0",dstack_job_num="0",dstack_replica_num="0"} 100.0 + FIELD_1{gpu="1",dstack_project_name="project-2",dstack_run_name="run-1",dstack_job_name="run-1-0-0",dstack_job_num="0",dstack_replica_num="0"} 200.0 + # HELP FIELD_2 Test field 2 + # TYPE FIELD_2 counter + FIELD_2{gpu="0",dstack_project_name="project-1",dstack_run_name="run-1",dstack_job_name="run-1-0-0",dstack_job_num="0",dstack_replica_num="0"} 337325.0 1395066363000 + FIELD_2{gpu="1",dstack_project_name="project-1",dstack_run_name="run-1",dstack_job_name="run-1-0-0",dstack_job_num="0",dstack_replica_num="0"} 987169.0 1395066363010 + """) + + async def test_returns_empty_response_if_no_runs(self, client: AsyncClient): + response = await client.get("/metrics") + assert response.status_code == 200 + assert response.text == "" + + async def test_returns_404_if_not_enabled( + self, monkeypatch: pytest.MonkeyPatch, client: AsyncClient + ): + monkeypatch.setattr("dstack._internal.server.settings.ENABLE_PROMETHEUS_METRICS", False) + response = await client.get("/metrics") + assert response.status_code == 404 + + +@pytest.mark.asyncio +@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) +@pytest.mark.usefixtures("image_config_mock", "test_db", "enable_metrics") +class TestGetPrometheusProjectMetrics: + async def test_returns_metrics(self, session: AsyncSession, client: AsyncClient): + user = await create_user(session=session, global_role=GlobalRole.USER) + project = await _create_project(session, "project-1", user) + job_1 = await _create_job(session, "run-1", project, user, JobStatus.RUNNING) + await create_job_prometheus_metrics( + session=session, + job=job_1, + text=dedent(""" + # Comments should be skipped + + # HELP FIELD_1 Test field 1 + # TYPE FIELD_1 gauge + FIELD_1{gpu="0"} 350 + FIELD_1{gpu="1"} 400 + + # HELP FIELD_2 Test field 2 + # TYPE FIELD_2 counter + FIELD_2{gpu="0"} 337325 1395066363000 + FIELD_2{gpu="1"} 987169 1395066363010 + """), + ) + job_2 = await _create_job(session, "run-2", project, user, JobStatus.RUNNING) + await create_job_prometheus_metrics( + session=session, + job=job_2, + text=dedent(""" + # HELP FIELD_1 Test field 1 + # TYPE FIELD_1 gauge + FIELD_1{gpu="0"} 1200.0 + FIELD_1{gpu="1"} 1600.0 + FIELD_1{gpu="2"} 2400.0 + """), + ) + # Terminated job, should not appear in the response + job_3 = await _create_job(session, "run-3", project, user, JobStatus.TERMINATED) + await create_job_prometheus_metrics( + session=session, + job=job_3, + text=dedent(""" + # HELP FIELD_1 Test field 1 + # TYPE FIELD_1 gauge + FIELD_1{gpu="0"} 10 + FIELD_1{gpu="1"} 20 + """), + ) + another_project = await _create_project(session, "project-2", user) + another_project_job = await _create_job( + session, "run-4", another_project, user, JobStatus.RUNNING + ) + await create_job_prometheus_metrics( + session=session, + job=another_project_job, + text=dedent(""" + # HELP FIELD_1 Test field 1 + # TYPE FIELD_1 gauge + FIELD_1{gpu="0"} 100 + FIELD_1{gpu="1"} 200 + """), + ) + + response = await client.get("/metrics/project/project-1") + + assert response.status_code == 200 + assert response.text == dedent("""\ + # HELP FIELD_1 Test field 1 + # TYPE FIELD_1 gauge + FIELD_1{gpu="0",dstack_project_name="project-1",dstack_run_name="run-1",dstack_job_name="run-1-0-0",dstack_job_num="0",dstack_replica_num="0"} 350.0 + FIELD_1{gpu="1",dstack_project_name="project-1",dstack_run_name="run-1",dstack_job_name="run-1-0-0",dstack_job_num="0",dstack_replica_num="0"} 400.0 + FIELD_1{gpu="0",dstack_project_name="project-1",dstack_run_name="run-2",dstack_job_name="run-2-0-0",dstack_job_num="0",dstack_replica_num="0"} 1200.0 + FIELD_1{gpu="1",dstack_project_name="project-1",dstack_run_name="run-2",dstack_job_name="run-2-0-0",dstack_job_num="0",dstack_replica_num="0"} 1600.0 + FIELD_1{gpu="2",dstack_project_name="project-1",dstack_run_name="run-2",dstack_job_name="run-2-0-0",dstack_job_num="0",dstack_replica_num="0"} 2400.0 + # HELP FIELD_2 Test field 2 + # TYPE FIELD_2 counter + FIELD_2{gpu="0",dstack_project_name="project-1",dstack_run_name="run-1",dstack_job_name="run-1-0-0",dstack_job_num="0",dstack_replica_num="0"} 337325.0 1395066363000 + FIELD_2{gpu="1",dstack_project_name="project-1",dstack_run_name="run-1",dstack_job_name="run-1-0-0",dstack_job_num="0",dstack_replica_num="0"} 987169.0 1395066363010 + """) + + async def test_returns_empty_response_if_no_runs( + self, session: AsyncSession, client: AsyncClient + ): + user = await create_user(session=session, global_role=GlobalRole.USER) + await create_project(session=session, owner=user, name="test-project") + response = await client.get("/metrics/project/test-project") + assert response.status_code == 200 + assert response.text == "" + + async def test_returns_404_if_project_doesnt_exist(self, client: AsyncClient): + response = await client.get("/metrics/project/nonexistent") + assert response.status_code == 404 + + async def test_returns_404_if_not_enabled( + self, monkeypatch: pytest.MonkeyPatch, session: AsyncSession, client: AsyncClient + ): + monkeypatch.setattr("dstack._internal.server.settings.ENABLE_PROMETHEUS_METRICS", False) + user = await create_user(session=session, global_role=GlobalRole.USER) + await create_project(session=session, owner=user, name="test-project") + response = await client.get("/metrics/project/test-project") + assert response.status_code == 404 + + +async def _create_project(session: AsyncSession, name: str, user: UserModel) -> ProjectModel: + project = await create_project(session=session, owner=user, name=name) + await add_project_member( + session=session, project=project, user=user, project_role=ProjectRole.USER + ) + return project + + +async def _create_job( + session: AsyncSession, run_name: str, project: ProjectModel, user: UserModel, status: JobStatus +) -> JobModel: + repo = await create_repo(session=session, project_id=project.id, repo_name=f"{run_name}-repo") + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_name=run_name, + ) + job = await create_job(session=session, run=run, status=status) + return job From d6b06aaa0378da044f5a29eb8c0a66901dec889d Mon Sep 17 00:00:00 2001 From: Dmitry Meyer Date: Thu, 27 Feb 2025 09:41:14 +0000 Subject: [PATCH 2/2] Clarify docs --- docs/docs/guides/server-deployment.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/docs/guides/server-deployment.md b/docs/docs/guides/server-deployment.md index 07f69cc4e..eadb503e8 100644 --- a/docs/docs/guides/server-deployment.md +++ b/docs/docs/guides/server-deployment.md @@ -213,9 +213,8 @@ Each sample includes a set of `dstack_*` labels, e.g., `dstack_project_name="mai Currently, `dstack` collects the following metrics: -* A fixed subset of NVIDIA GPU metrics from [DCGM Exporter :material-arrow-top-right-thin:{ .external }](https://docs.nvidia.com/datacenter/dcgm/latest/gpu-telemetry/dcgm-exporter.html){:target="_blank"}. -`dcgm-exporter` and `libdcgm` must be installed on the instance to enable these metrics. -On AWS, Azure, GCP, and OCI backends the required packages are already installed. +* A fixed subset of NVIDIA GPU metrics from [DCGM Exporter :material-arrow-top-right-thin:{ .external }](https://docs.nvidia.com/datacenter/dcgm/latest/gpu-telemetry/dcgm-exporter.html){:target="_blank"} on supported cloud backends — AWS, Azure, GCP, OCI — and SSH fleets. +On supported cloud backends the required packages are already installed. If you use SSH fleets, install `datacenter-gpu-manager-4-core` and `datacenter-gpu-manager-exporter`. ## Encryption