diff --git a/CHANGELOG.md b/CHANGELOG.md index d863506f0947..e1fc40ddd9cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,20 +8,7 @@ IMPROVEMENTS: * csi: Added support for jobs to request a unique volume ID per allocation. [[GH-10136](https://github.com/hashicorp/nomad/issues/10136)] * driver/docker: Added support for optional extra container labels. [[GH-9885](https://github.com/hashicorp/nomad/issues/9885)] * driver/docker: Added support for configuring default logger behavior in the client configuration. [[GH-10156](https://github.com/hashicorp/nomad/issues/10156)] - -BUG FIXES: - * agent: Only allow querying Prometheus formatted metrics if Prometheus is enabled within the config [[GH-10140](https://github.com/hashicorp/nomad/pull/10140)] - * api: Added missing devices block to AllocatedTaskResources [[GH-10064](https://github.com/hashicorp/nomad/pull/10064)] - * cli: Fixed a bug where non-int proxy port would panic CLI [[GH-10072](https://github.com/hashicorp/nomad/issues/10072)] - * cli: Fixed a bug where `nomad operator debug` incorrectly parsed https Consul API URLs. [[GH-10082](https://github.com/hashicorp/nomad/pull/10082)] - * cli: Remove extra linefeeds in monitor.log files written by `nomad operator debug`. [[GH-10252](https://github.com/hashicorp/nomad/issues/10252)] - * client: Fixed log formatting when killing tasks. [[GH-10135](https://github.com/hashicorp/nomad/issues/10135)] - * csi: Fixed a bug where volume with IDs that are a substring prefix of another volume could use the wrong volume for feasibility checking. [[GH-10158](https://github.com/hashicorp/nomad/issues/10158)] - * scheduler: Fixed a bug where Nomad reports negative or incorrect running children counts for periodic jobs. [[GH-10145](https://github.com/hashicorp/nomad/issues/10145)] - * scheduler: Fixed a bug where jobs requesting multiple CSI volumes could be incorrectly scheduled if only one of the volumes passed feasibility checking. [[GH-10143](https://github.com/hashicorp/nomad/issues/10143)] - * ui: Fixed the rendering of interstitial components shown after processing a dynamic application sizing recommendation. [[GH-10094](https://github.com/hashicorp/nomad/pull/10094)] - -## 1.0.5 (Unreleased) + * nomad/structs: Removed deprecated Node.Drain field, added API extensions to restore it [[GH-10202](https://github.com/hashicorp/nomad/issues/10202)] BUG FIXES: * agent: Only allow querying Prometheus formatted metrics if Prometheus is enabled within the config [[GH-10140](https://github.com/hashicorp/nomad/pull/10140)] diff --git a/api/event_stream_test.go b/api/event_stream_test.go index 4bfdc8a33d23..e5f3492da4f2 100644 --- a/api/event_stream_test.go +++ b/api/event_stream_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hashicorp/nomad/api/internal/testutil" + "github.com/mitchellh/mapstructure" "github.com/stretchr/testify/require" ) @@ -148,9 +149,25 @@ func TestEventStream_PayloadValue(t *testing.T) { require.NoError(t, err) } for _, e := range event.Events { + // verify that we get a node n, err := e.Node() require.NoError(t, err) - require.NotEqual(t, "", n.ID) + require.NotEmpty(t, n.ID) + + // perform a raw decoding and look for: + // - "ID" to make sure that raw decoding is working correctly + // - "SecretID" to make sure it's not present + raw := make(map[string]map[string]interface{}, 0) + cfg := &mapstructure.DecoderConfig{ + Result: &raw, + } + dec, err := mapstructure.NewDecoder(cfg) + require.NoError(t, err) + require.NoError(t, dec.Decode(e.Payload)) + require.Contains(t, raw, "Node") + rawNode := raw["Node"] + require.Equal(t, n.ID, rawNode["ID"]) + require.NotContains(t, rawNode, "SecretID") } case <-time.After(5 * time.Second): require.Fail(t, "failed waiting for event stream event") diff --git a/api/nodes_test.go b/api/nodes_test.go index a1d41cda2d11..7ed2b956604c 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -178,6 +178,40 @@ func TestNodes_Info(t *testing.T) { } } +func TestNodes_NoSecretID(t *testing.T) { + t.Parallel() + c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) { + c.DevMode = true + }) + defer s.Stop() + nodes := c.Nodes() + + // Get the node ID + var nodeID string + testutil.WaitForResult(func() (bool, error) { + out, _, err := nodes.List(nil) + if err != nil { + return false, err + } + if n := len(out); n != 1 { + return false, fmt.Errorf("expected 1 node, got: %d", n) + } + nodeID = out[0].ID + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) + + // perform a raw http call and make sure that: + // - "ID" to make sure that raw decoding is working correctly + // - "SecretID" to make sure it's not present + resp := make(map[string]interface{}) + _, err := c.query("/v1/node/"+nodeID, &resp, nil) + require.NoError(t, err) + require.Equal(t, nodeID, resp["ID"]) + require.Empty(t, resp["SecretID"]) +} + func TestNodes_ToggleDrain(t *testing.T) { t.Parallel() require := require.New(t) @@ -206,9 +240,7 @@ func TestNodes_ToggleDrain(t *testing.T) { // Check for drain mode out, _, err := nodes.Info(nodeID, nil) require.Nil(err) - if out.Drain { - t.Fatalf("drain mode should be off") - } + require.False(out.Drain) // Toggle it on spec := &DrainSpec{ @@ -218,11 +250,36 @@ func TestNodes_ToggleDrain(t *testing.T) { require.Nil(err) assertWriteMeta(t, &drainOut.WriteMeta) - // Check again - out, _, err = nodes.Info(nodeID, nil) - require.Nil(err) - if out.SchedulingEligibility != NodeSchedulingIneligible { - t.Fatalf("bad eligibility: %v vs %v", out.SchedulingEligibility, NodeSchedulingIneligible) + // Drain may have completed before we can check, use event stream + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + streamCh, err := c.EventStream().Stream(ctx, map[Topic][]string{ + TopicNode: {nodeID}, + }, 0, nil) + require.NoError(err) + + // we expect to see the node change to Drain:true and then back to Drain:false+ineligible + var sawDraining, sawDrainComplete uint64 + for sawDrainComplete == 0 { + select { + case events := <-streamCh: + require.NoError(events.Err) + for _, e := range events.Events { + node, err := e.Node() + require.NoError(err) + require.Equal(node.DrainStrategy != nil, node.Drain) + require.True(!node.Drain || node.SchedulingEligibility == NodeSchedulingIneligible) // node.Drain => "ineligible" + if node.Drain && node.SchedulingEligibility == NodeSchedulingIneligible { + sawDraining = node.ModifyIndex + } else if sawDraining != 0 && node.ModifyIndex > sawDraining && + !node.Drain && node.SchedulingEligibility == NodeSchedulingIneligible { + sawDrainComplete = node.ModifyIndex + } + } + case <-time.After(5 * time.Second): + require.Fail("failed waiting for event stream event") + } } // Toggle off again @@ -233,15 +290,9 @@ func TestNodes_ToggleDrain(t *testing.T) { // Check again out, _, err = nodes.Info(nodeID, nil) require.Nil(err) - if out.Drain { - t.Fatalf("drain mode should be off") - } - if out.DrainStrategy != nil { - t.Fatalf("drain strategy should be unset") - } - if out.SchedulingEligibility != NodeSchedulingEligible { - t.Fatalf("should be eligible") - } + require.False(out.Drain) + require.Nil(out.DrainStrategy) + require.Equal(NodeSchedulingEligible, out.SchedulingEligibility) } func TestNodes_ToggleEligibility(t *testing.T) { diff --git a/client/agent_endpoint.go b/client/agent_endpoint.go index 856b2787d044..01e12d818cb9 100644 --- a/client/agent_endpoint.go +++ b/client/agent_endpoint.go @@ -8,14 +8,17 @@ import ( "time" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/command/agent/host" "github.com/hashicorp/nomad/command/agent/monitor" "github.com/hashicorp/nomad/command/agent/pprof" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/jsonhandles" "github.com/hashicorp/nomad/nomad/structs" metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" ) @@ -121,7 +124,7 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { frames := make(chan *sframer.StreamFrame, streamFramesBuffer) errCh := make(chan error) var buf bytes.Buffer - frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) + frameCodec := codec.NewEncoder(&buf, jsonhandles.JsonHandle) framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024) framer.Run() diff --git a/client/alloc_endpoint.go b/client/alloc_endpoint.go index 9d7d8c7a5bf6..4427c7b52f3b 100644 --- a/client/alloc_endpoint.go +++ b/client/alloc_endpoint.go @@ -10,10 +10,12 @@ import ( metrics "github.com/armon/go-metrics" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/acl" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/jsonhandles" "github.com/hashicorp/nomad/nomad/structs" nstructs "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" @@ -279,7 +281,7 @@ func newExecStream(decoder *codec.Decoder, encoder *codec.Encoder) drivers.ExecT buf: buf, encoder: encoder, - frameCodec: codec.NewEncoder(buf, structs.JsonHandle), + frameCodec: codec.NewEncoder(buf, jsonhandles.JsonHandle), } } diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index d16b05bd0904..16980c170f11 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -17,13 +17,15 @@ import ( metrics "github.com/armon/go-metrics" "github.com/hashicorp/go-msgpack/codec" + "github.com/hpcloud/tail/watch" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/client/allocdir" sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/jsonhandles" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hpcloud/tail/watch" ) var ( @@ -237,7 +239,7 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { frames := make(chan *sframer.StreamFrame, streamFramesBuffer) errCh := make(chan error) var buf bytes.Buffer - frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) + frameCodec := codec.NewEncoder(&buf, jsonhandles.JsonHandle) // Create the framer framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize) @@ -468,7 +470,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { var streamErr error buf := new(bytes.Buffer) - frameCodec := codec.NewEncoder(buf, structs.JsonHandle) + frameCodec := codec.NewEncoder(buf, jsonhandles.JsonHandle) OUTER: for { select { diff --git a/command/agent/http.go b/command/agent/http.go index 87e553beaae7..b9e5b26d57b1 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -20,10 +20,12 @@ import ( "github.com/hashicorp/go-connlimit" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" + "github.com/rs/cors" + "github.com/hashicorp/nomad/helper/noxssrw" "github.com/hashicorp/nomad/helper/tlsutil" + "github.com/hashicorp/nomad/nomad/jsonhandles" "github.com/hashicorp/nomad/nomad/structs" - "github.com/rs/cors" ) const ( @@ -499,13 +501,13 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque if obj != nil { var buf bytes.Buffer if prettyPrint { - enc := codec.NewEncoder(&buf, structs.JsonHandlePretty) + enc := codec.NewEncoder(&buf, jsonhandles.JsonHandlePretty) err = enc.Encode(obj) if err == nil { buf.Write([]byte("\n")) } } else { - enc := codec.NewEncoder(&buf, structs.JsonHandle) + enc := codec.NewEncoder(&buf, jsonhandles.JsonHandleWithExtensions) err = enc.Encode(obj) } if err != nil { diff --git a/command/agent/http_test.go b/command/agent/http_test.go index bc71fcbc1931..f77a8ef14a48 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -29,6 +29,8 @@ import ( "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/hashicorp/nomad/nomad/jsonhandles" ) // makeHTTPServer returns a test server whose logs will be written to @@ -321,11 +323,11 @@ func testPrettyPrint(pretty string, prettyFmt bool, t *testing.T) { var expected bytes.Buffer var err error if prettyFmt { - enc := codec.NewEncoder(&expected, structs.JsonHandlePretty) + enc := codec.NewEncoder(&expected, jsonhandles.JsonHandlePretty) err = enc.Encode(r) expected.WriteByte('\n') } else { - enc := codec.NewEncoder(&expected, structs.JsonHandle) + enc := codec.NewEncoder(&expected, jsonhandles.JsonHandleWithExtensions) err = enc.Encode(r) } if err != nil { @@ -1295,6 +1297,30 @@ func Test_decodeBody(t *testing.T) { } } +// BenchmarkHTTPServer_JSONEncodingWithExtensions benchmarks the performance of +// encoding JSON objects using extensions +func BenchmarkHTTPServer_JSONEncodingWithExtensions(b *testing.B) { + benchmarkJsonEncoding(b, jsonhandles.JsonHandleWithExtensions) +} + +// BenchmarkHTTPServer_JSONEncodingWithoutExtensions benchmarks the performance of +// encoding JSON objects using extensions +func BenchmarkHTTPServer_JSONEncodingWithoutExtensions(b *testing.B) { + benchmarkJsonEncoding(b, jsonhandles.JsonHandle) +} + +func benchmarkJsonEncoding(b *testing.B, handle *codec.JsonHandle) { + n := mock.Node() + var buf bytes.Buffer + + enc := codec.NewEncoder(&buf, handle) + for i := 0; i < b.N; i++ { + buf.Reset() + err := enc.Encode(n) + require.NoError(b, err) + } +} + func httpTest(t testing.TB, cb func(c *Config), f func(srv *TestAgent)) { s := makeHTTPServer(t, cb) defer s.Shutdown() diff --git a/command/agent/node_endpoint.go b/command/agent/node_endpoint.go index 6498151a9422..c5ff81d8156c 100644 --- a/command/agent/node_endpoint.go +++ b/command/agent/node_endpoint.go @@ -2,9 +2,7 @@ package agent import ( "net/http" - "strconv" "strings" - "time" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" @@ -119,31 +117,9 @@ func (s *HTTPServer) nodeToggleDrain(resp http.ResponseWriter, req *http.Request var drainRequest api.NodeUpdateDrainRequest - // COMPAT: Remove in 0.10. Allow the old style enable query param. - // Get the enable parameter - enableRaw := req.URL.Query().Get("enable") - var enable bool - if enableRaw != "" { - var err error - enable, err = strconv.ParseBool(enableRaw) - if err != nil { - return nil, CodedError(400, "invalid enable value") - } - - // Use the force drain to have it keep the same behavior as old clients. - if enable { - drainRequest.DrainSpec = &api.DrainSpec{ - Deadline: -1 * time.Second, - } - } else { - // If drain is disabled on an old client, mark the node as eligible for backwards compatibility - drainRequest.MarkEligible = true - } - } else { - err := decodeBody(req, &drainRequest) - if err != nil { - return nil, CodedError(400, err.Error()) - } + err := decodeBody(req, &drainRequest) + if err != nil { + return nil, CodedError(400, err.Error()) } args := structs.NodeUpdateDrainRequest{ diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index ecdd74048acd..bfb52e81c065 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -284,11 +284,9 @@ func TestHTTP_NodeDrain(t *testing.T) { out, err := state.NodeByID(nil, node.ID) require.Nil(err) - // the node must either be in drain mode or in elligible + // the node must either be in drain mode or ineligible // once the node is recognize as not having any running allocs - if out.Drain { - require.True(out.Drain) - require.NotNil(out.DrainStrategy) + if out.DrainStrategy != nil { require.Equal(10*time.Second, out.DrainStrategy.Deadline) } else { require.Equal(structs.NodeSchedulingIneligible, out.SchedulingEligibility) @@ -307,7 +305,6 @@ func TestHTTP_NodeDrain(t *testing.T) { out, err = state.NodeByID(nil, node.ID) require.Nil(err) - require.False(out.Drain) require.Nil(out.DrainStrategy) }) } diff --git a/contributing/checklist-jobspec.md b/contributing/checklist-jobspec.md index 146afdb396c0..bfe9b5d15917 100644 --- a/contributing/checklist-jobspec.md +++ b/contributing/checklist-jobspec.md @@ -13,12 +13,14 @@ * Note that analogous struct field names should match with `api/` package * Test the structs/fields via methods mentioned above * Implement and test other logical methods -* [ ] Add conversion between `api/` and `nomad/structs` in `command/agent/job_endpoint.go` +* [ ] Add conversion between `api/` and `nomad/structs/` in `command/agent/job_endpoint.go` * Add test for conversion +* [ ] Determine JSON encoding strategy for responses from RPC (see "JSON Encoding" below) + * [ ] Write `nomad/structs/` to `api/` conversions if necessary and write tests * [ ] Implement diff logic for new structs/fields in `nomad/structs/diff.go` * Note that fields must be listed in alphabetical order in `FieldDiff` slices in `nomad/structs/diff_test.go` * Add test for diff of new structs/fields -* [ ] Add change detection for new structs/feilds in `scheduler/util.go/tasksUpdated` +* [ ] Add change detection for new structs/fields in `scheduler/util.go/tasksUpdated` * Might be covered by `.Equals` but might not be, check. * Should return true if the task must be replaced as a result of the change. @@ -40,3 +42,24 @@ required in the original `jobspec` package. * [ ] Job JSON API entry https://www.nomadproject.io/api/json-jobs.html * [ ] Sample Response output in API https://www.nomadproject.io/api/jobs.html * [ ] Consider if it needs a guide https://www.nomadproject.io/guides/index.html + +## JSON Encoding + +As a general rule, HTTP endpoints (under `command/agent/`) will make RPC calls that return structs belonging to +`nomad/structs/`. These handlers ultimately return an object that is encoded by the Nomad HTTP server. The encoded form +needs to match the Nomad API; specifically, it should have the form of the corresponding struct from `api/`. There are +a few ways that this can be accomplished: +* directly return the struct from the RPC call, if it has the same shape as the corresponding struct in `api/`. + This is convenient when possible, resulting in the least work for the developer. + Examples of this approach include [GET `/v1/evaluation/:id`](https://github.com/hashicorp/nomad/blob/v1.0. + 0/command/agent/eval_endpoint.go#L88). +* convert the struct from the RPC call to the appropriate `api/` struct. + This approach is the most developer effort, but it does have a strong guarantee that the HTTP response matches the + API, due to the explicit conversion (assuming proper implementation, which requires tests). + Examples of this approach include [GET `/v1/volume/csi/:id`](https://github.com/hashicorp/nomad/blob/v1.0.0/command/agent/csi_endpoint.go#L108) +* convert to an intermediate struct with the same shape as the `api/` struct. + This approach strikes a balance between the former two approaches. + This conversion can be performed in-situ in the agent HTTP handler, as long as the conversion doesn't need to + appear in other handlers. + Otherwise, it is possible to register an extension on the JSON encoding used by the HTTP agent; these extensions + can be put in `nomad/jsonhandles/extensions.go`. diff --git a/contributing/checklist-rpc-endpoint.md b/contributing/checklist-rpc-endpoint.md index 37e9ff50d2a2..b21fcfb60b75 100644 --- a/contributing/checklist-rpc-endpoint.md +++ b/contributing/checklist-rpc-endpoint.md @@ -30,7 +30,7 @@ Prefer adding a new message to changing any existing RPC messages. * [ ] `nomad/core_sched.go` sends many RPCs * `ServersMeetMinimumVersion` asserts that the server cluster is - upgraded, so use this to gaurd sending the new RPC, else send the old RPC + upgraded, so use this to guard sending the new RPC, else send the old RPC * Version must match the actual release version! ## Docs diff --git a/helper/pluginutils/hclutils/testing.go b/helper/pluginutils/hclutils/testing.go index 469cec7d5b87..3d587e360a4c 100644 --- a/helper/pluginutils/hclutils/testing.go +++ b/helper/pluginutils/hclutils/testing.go @@ -6,13 +6,14 @@ import ( "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/hcl" "github.com/hashicorp/hcl/hcl/ast" - "github.com/hashicorp/nomad/helper/pluginutils/hclspecutils" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/plugins/drivers" - "github.com/hashicorp/nomad/plugins/shared/hclspec" "github.com/mitchellh/mapstructure" "github.com/stretchr/testify/require" "github.com/zclconf/go-cty/cty" + + "github.com/hashicorp/nomad/helper/pluginutils/hclspecutils" + "github.com/hashicorp/nomad/nomad/jsonhandles" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/shared/hclspec" ) type HCLParser struct { @@ -121,7 +122,7 @@ func JsonConfigToInterface(t *testing.T, config string) interface{} { t.Helper() // Decode from json - dec := codec.NewDecoderBytes([]byte(config), structs.JsonHandle) + dec := codec.NewDecoderBytes([]byte(config), jsonhandles.JsonHandle) var m map[string]interface{} err := dec.Decode(&m) diff --git a/helper/pluginutils/hclutils/util.go b/helper/pluginutils/hclutils/util.go index 6042e7b0fece..c6375c4387d1 100644 --- a/helper/pluginutils/hclutils/util.go +++ b/helper/pluginutils/hclutils/util.go @@ -9,7 +9,9 @@ import ( hcl "github.com/hashicorp/hcl/v2" "github.com/hashicorp/hcl/v2/hcldec" hjson "github.com/hashicorp/hcl/v2/json" - "github.com/hashicorp/nomad/nomad/structs" + + "github.com/hashicorp/nomad/nomad/jsonhandles" + "github.com/zclconf/go-cty/cty" "github.com/zclconf/go-cty/cty/function" "github.com/zclconf/go-cty/cty/function/stdlib" @@ -26,7 +28,7 @@ func ParseHclInterface(val interface{}, spec hcldec.Spec, vars map[string]cty.Va // Encode to json var buf bytes.Buffer - enc := codec.NewEncoder(&buf, structs.JsonHandle) + enc := codec.NewEncoder(&buf, jsonhandles.JsonHandle) err := enc.Encode(val) if err != nil { // Convert to a hcl diagnostics message diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 488478034d0b..59d8242d1442 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -10,12 +10,14 @@ import ( "time" log "github.com/hashicorp/go-hclog" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/command/agent/host" "github.com/hashicorp/nomad/command/agent/monitor" "github.com/hashicorp/nomad/command/agent/pprof" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/jsonhandles" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/go-msgpack/codec" @@ -185,7 +187,7 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { frames := make(chan *sframer.StreamFrame, 32) errCh := make(chan error) var buf bytes.Buffer - frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) + frameCodec := codec.NewEncoder(&buf, jsonhandles.JsonHandle) framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024) framer.Run() diff --git a/nomad/fsm.go b/nomad/fsm.go index 17c427a01a04..e3f6474cf2db 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -429,20 +429,6 @@ func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, ind panic(fmt.Errorf("failed to decode request: %v", err)) } - // COMPAT Remove in version 0.10 - // As part of Nomad 0.8 we have deprecated the drain boolean in favor of a - // drain strategy but we need to handle the upgrade path where the Raft log - // contains drain updates with just the drain boolean being manipulated. - if req.Drain && req.DrainStrategy == nil { - // Mark the drain strategy as a force to imitate the old style drain - // functionality. - req.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: -1 * time.Second, - }, - } - } - if err := n.state.UpdateNodeDrain(reqType, index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil { n.logger.Error("UpdateNodeDrain failed", "error", err) return err diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 4e75e53369fa..ac1c07c22fa6 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -11,6 +11,11 @@ import ( "github.com/google/go-cmp/cmp" memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/raft" + "github.com/kr/pretty" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" @@ -19,10 +24,6 @@ import ( "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" - "github.com/hashicorp/raft" - "github.com/kr/pretty" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) type MockSink struct { @@ -187,7 +188,7 @@ func TestFSM_UpsertNode_Canonicalize(t *testing.T) { fsm := testFSM(t) fsm.blockedEvals.SetEnabled(true) - // Setup a node without eligibility + // Setup a node without eligibility, ensure that upsert/canonicalize put it back node := mock.Node() node.SchedulingEligibility = "" @@ -197,18 +198,43 @@ func TestFSM_UpsertNode_Canonicalize(t *testing.T) { buf, err := structs.Encode(structs.NodeRegisterRequestType, req) require.Nil(err) - resp := fsm.Apply(makeLog(buf)) - require.Nil(resp) + require.Nil(fsm.Apply(makeLog(buf))) // Verify we are registered - ws := memdb.NewWatchSet() - n, err := fsm.State().NodeByID(ws, req.Node.ID) + n, err := fsm.State().NodeByID(nil, req.Node.ID) require.Nil(err) require.NotNil(n) require.EqualValues(1, n.CreateIndex) require.Equal(structs.NodeSchedulingEligible, n.SchedulingEligibility) } +func TestFSM_UpsertNode_Canonicalize_Ineligible(t *testing.T) { + t.Parallel() + require := require.New(t) + + fsm := testFSM(t) + fsm.blockedEvals.SetEnabled(true) + + // Setup a node without eligibility, ensure that upsert/canonicalize put it back + node := mock.DrainNode() + node.SchedulingEligibility = "" + + req := structs.NodeRegisterRequest{ + Node: node, + } + buf, err := structs.Encode(structs.NodeRegisterRequestType, req) + require.Nil(err) + + require.Nil(fsm.Apply(makeLog(buf))) + + // Verify we are registered + n, err := fsm.State().NodeByID(nil, req.Node.ID) + require.Nil(err) + require.NotNil(n) + require.EqualValues(1, n.CreateIndex) + require.Equal(structs.NodeSchedulingIneligible, n.SchedulingEligibility) +} + func TestFSM_DeregisterNode(t *testing.T) { t.Parallel() fsm := testFSM(t) @@ -353,7 +379,6 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) { ws := memdb.NewWatchSet() node, err = fsm.State().NodeByID(ws, req.Node.ID) require.Nil(err) - require.True(node.Drain) require.Equal(node.DrainStrategy, strategy) require.Len(node.Events, 2) } @@ -397,46 +422,10 @@ func TestFSM_UpdateNodeDrain(t *testing.T) { ws := memdb.NewWatchSet() node, err = fsm.State().NodeByID(ws, req.Node.ID) require.Nil(err) - require.True(node.Drain) require.Equal(node.DrainStrategy, strategy) require.Len(node.Events, 2) } -func TestFSM_UpdateNodeDrain_Pre08_Compatibility(t *testing.T) { - t.Parallel() - require := require.New(t) - fsm := testFSM(t) - - // Force a node into the state store without eligiblity - node := mock.Node() - node.SchedulingEligibility = "" - require.Nil(fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1, node)) - - // Do an old style drain - req := structs.NodeUpdateDrainRequest{ - NodeID: node.ID, - Drain: true, - } - buf, err := structs.Encode(structs.NodeUpdateDrainRequestType, req) - require.Nil(err) - - resp := fsm.Apply(makeLog(buf)) - require.Nil(resp) - - // Verify we have upgraded to a force drain - ws := memdb.NewWatchSet() - node, err = fsm.State().NodeByID(ws, req.NodeID) - require.Nil(err) - require.True(node.Drain) - - expected := &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: -1 * time.Second, - }, - } - require.Equal(expected, node.DrainStrategy) -} - func TestFSM_UpdateNodeEligibility(t *testing.T) { t.Parallel() require := require.New(t) @@ -2496,25 +2485,15 @@ func TestFSM_SnapshotRestore_Nodes(t *testing.T) { // Add some state fsm := testFSM(t) state := fsm.State() - node1 := mock.Node() - state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1) - - // Upgrade this node - node2 := mock.Node() - node2.SchedulingEligibility = "" - state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2) + node := mock.Node() + state.UpsertNode(structs.MsgTypeTestSetup, 1000, node) // Verify the contents fsm2 := testSnapshotRestore(t, fsm) state2 := fsm2.State() - out1, _ := state2.NodeByID(nil, node1.ID) - out2, _ := state2.NodeByID(nil, node2.ID) - node2.SchedulingEligibility = structs.NodeSchedulingEligible - if !reflect.DeepEqual(node1, out1) { - t.Fatalf("bad: \n%#v\n%#v", out1, node1) - } - if !reflect.DeepEqual(node2, out2) { - t.Fatalf("bad: \n%#v\n%#v", out2, node2) + out, _ := state2.NodeByID(nil, node.ID) + if !reflect.DeepEqual(node, out) { + t.Fatalf("bad: \n%#v\n%#v", out, node) } } diff --git a/nomad/jsonhandles/encoding.go b/nomad/jsonhandles/encoding.go new file mode 100644 index 000000000000..004923b980b5 --- /dev/null +++ b/nomad/jsonhandles/encoding.go @@ -0,0 +1,38 @@ +package jsonhandles + +import ( + "reflect" + + "github.com/hashicorp/go-msgpack/codec" +) + +// extendFunc is a mapping from one struct to another, to change the shape of the encoded JSON +type extendFunc func(interface{}) interface{} + +// nomadJsonEncodingExtensions is a catch-all go-msgpack extension +// it looks up the types in the list of registered extension functions and applies it +type nomadJsonEncodingExtensions struct{} + +// ConvertExt calls the registered conversions functions +func (n nomadJsonEncodingExtensions) ConvertExt(v interface{}) interface{} { + if fn, ok := extendedTypes[reflect.TypeOf(v)]; ok { + return fn(v) + } else { + // shouldn't get here, but returning v will probably result in an infinite loop + // return nil and erase this field + return nil + } +} + +// UpdateExt is required by go-msgpack, but not used by us +func (n nomadJsonEncodingExtensions) UpdateExt(_ interface{}, _ interface{}) {} + +// NomadJsonEncodingExtensions registers all extension functions against the +// provided JsonHandle. +// It should be called on any JsonHandle which is used by the API HTTP server. +func NomadJsonEncodingExtensions(h *codec.JsonHandle) *codec.JsonHandle { + for tpe := range extendedTypes { + h.SetInterfaceExt(tpe, 1, nomadJsonEncodingExtensions{}) + } + return h +} diff --git a/nomad/jsonhandles/extensions.go b/nomad/jsonhandles/extensions.go new file mode 100644 index 000000000000..ab8ab93d70de --- /dev/null +++ b/nomad/jsonhandles/extensions.go @@ -0,0 +1,33 @@ +package jsonhandles + +import ( + "reflect" + + "github.com/hashicorp/nomad/nomad/structs" +) + +var ( + // extendedTypes is a mapping of extended types to their extension function + // TODO: the duplicates could be simplified by looking up the base type in the case of a pointer type in ConvertExt + extendedTypes = map[reflect.Type]extendFunc{ + reflect.TypeOf(structs.Node{}): nodeExt, + reflect.TypeOf(&structs.Node{}): nodeExt, + } +) + +// nodeExt ensures the node is sanitized and adds the legacy field .Drain back to encoded Node objects +func nodeExt(v interface{}) interface{} { + node := v.(*structs.Node).Sanitize() + // transform to a struct with inlined Node fields plus the Drain field + // - using defined type (not an alias!) EmbeddedNode gives us free conversion to a distinct type + // - distinct type prevents this encoding extension from being called recursively/infinitely on the embedding + // - pointers mean the conversion function doesn't have to make a copy during conversion + type EmbeddedNode structs.Node + return &struct { + *EmbeddedNode + Drain bool + }{ + EmbeddedNode: (*EmbeddedNode)(node), + Drain: node != nil && node.DrainStrategy != nil, + } +} diff --git a/nomad/jsonhandles/handlers.go b/nomad/jsonhandles/handlers.go new file mode 100644 index 000000000000..92a402f8e902 --- /dev/null +++ b/nomad/jsonhandles/handlers.go @@ -0,0 +1,22 @@ +package jsonhandles + +import ( + "github.com/hashicorp/go-msgpack/codec" +) + +var ( + // JsonHandle and JsonHandlePretty are the codec handles to JSON encode + // structs. The pretty handle will add indents for easier human consumption. + // JsonHandleWithExtensions and JsonHandlePretty include extensions for + // encoding structs objects with API-specific fields + JsonHandle = &codec.JsonHandle{ + HTMLCharsAsIs: true, + } + JsonHandleWithExtensions = NomadJsonEncodingExtensions(&codec.JsonHandle{ + HTMLCharsAsIs: true, + }) + JsonHandlePretty = NomadJsonEncodingExtensions(&codec.JsonHandle{ + HTMLCharsAsIs: true, + Indent: 4, + }) +) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index a3fe090542cc..bf80e154e8fb 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -118,6 +118,15 @@ func Node() *structs.Node { return node } +func DrainNode() *structs.Node { + node := Node() + node.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{}, + } + node.Canonicalize() + return node +} + // NvidiaNode returns a node with two instances of an Nvidia GPU func NvidiaNode() *structs.Node { n := Node() diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 36a18a26f640..184ef1f56c8a 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -548,16 +548,6 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, // Update the timestamp of when the node status was updated args.UpdatedAt = now.Unix() - // COMPAT: Remove in 0.9. Attempt to upgrade the request if it is of the old - // format. - if args.Drain && args.DrainStrategy == nil { - args.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: -1 * time.Second, // Force drain - }, - } - } - // Setup drain strategy if args.DrainStrategy != nil { // Mark start time for the drain @@ -811,9 +801,8 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest, // Setup the output if out != nil { - // Clear the secret ID - reply.Node = out.Copy() - reply.Node.SecretID = "" + out = out.Sanitize() + reply.Node = out reply.Index = out.ModifyIndex } else { // Use the last index that affected the nodes table diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index fbc82d8b7a0b..c26945832c7b 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -914,7 +914,7 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) { ws := memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.True(out.Drain) + require.NotNil(out.DrainStrategy) require.Equal(strategy.Deadline, out.DrainStrategy.Deadline) require.Len(out.Events, 2) require.Equal(NodeDrainEventDrainSet, out.Events[1].Message) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f7a3f19fe741..b566736a4955 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -647,10 +647,7 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri } else if node.Status != structs.NodeStatusReady { return false, "node is not ready for placements", nil } else if node.SchedulingEligibility == structs.NodeSchedulingIneligible { - return false, "node is not eligible for draining", nil - } else if node.Drain { - // Deprecate in favor of scheduling eligibility and remove post-0.8 - return false, "node is draining", nil + return false, "node is not eligible", nil } // Get the existing allocations that are non-terminal diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 30036237be8b..7550baf413d4 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -714,8 +714,7 @@ func TestPlanApply_EvalNodePlan_NodeNotReady(t *testing.T) { func TestPlanApply_EvalNodePlan_NodeDrain(t *testing.T) { t.Parallel() state := testStateStore(t) - node := mock.Node() - node.Drain = true + node := mock.DrainNode() state.UpsertNode(structs.MsgTypeTestSetup, 1000, node) snap, _ := state.Snapshot() diff --git a/nomad/state/events.go b/nomad/state/events.go index ab4a086f598a..82bcdbc9c95d 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -80,15 +80,12 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{}, false } - // Node secret ID should not be included - node := before.Copy() - node.SecretID = "" - + before = before.Sanitize() return structs.Event{ Topic: structs.TopicNode, - Key: node.ID, + Key: before.ID, Payload: &structs.NodeStreamEvent{ - Node: node, + Node: before, }, }, true } @@ -179,15 +176,12 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{}, false } - // Node secret ID should not be included - node := after.Copy() - node.SecretID = "" - + after = after.Sanitize() return structs.Event{ Topic: structs.TopicNode, - Key: node.ID, + Key: after.ID, Payload: &structs.NodeStreamEvent{ - Node: node, + Node: after, }, }, true case "deployment": diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 3eba439d9017..1ae6edf62319 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -94,57 +94,6 @@ func TestEventFromChange_ACLTokenSecretID(t *testing.T) { require.Empty(t, tokenEvent2.ACLToken.SecretID) } -// TestEventFromChange_NodeSecretID ensures that a node's secret ID is not -// included in a node event -func TestEventFromChange_NodeSecretID(t *testing.T) { - t.Parallel() - s := TestStateStoreCfg(t, TestStateStorePublisher(t)) - defer s.StopEventBroker() - - node := mock.Node() - require.NotEmpty(t, node.SecretID) - - // Create - changes := Changes{ - Index: 100, - MsgType: structs.NodeRegisterRequestType, - Changes: memdb.Changes{ - { - Table: "nodes", - Before: nil, - After: node, - }, - }, - } - - out := eventsFromChanges(s.db.ReadTxn(), changes) - require.Len(t, out.Events, 1) - - nodeEvent, ok := out.Events[0].Payload.(*structs.NodeStreamEvent) - require.True(t, ok) - require.Empty(t, nodeEvent.Node.SecretID) - - // Delete - changes = Changes{ - Index: 100, - MsgType: structs.NodeDeregisterRequestType, - Changes: memdb.Changes{ - { - Table: "nodes", - Before: node, - After: nil, - }, - }, - } - - out2 := eventsFromChanges(s.db.ReadTxn(), changes) - require.Len(t, out2.Events, 1) - - nodeEvent2, ok := out2.Events[0].Payload.(*structs.NodeStreamEvent) - require.True(t, ok) - require.Empty(t, nodeEvent2.Node.SecretID) -} - func TestEventsFromChanges_DeploymentUpdate(t *testing.T) { t.Parallel() s := TestStateStoreCfg(t, TestStateStorePublisher(t)) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index d85467edd61f..3132673ce974 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -832,7 +832,6 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error { SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))}) } - node.Drain = exist.Drain // Retain the drain mode node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy } else { @@ -951,7 +950,8 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update return nil } -// BatchUpdateNodeDrain is used to update the drain of a node set of nodes +// BatchUpdateNodeDrain is used to update the drain of a node set of nodes. +// This is only called when node drain is completed by the drainer. func (s *StateStore) BatchUpdateNodeDrain(msgType structs.MessageType, index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error { txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() @@ -966,9 +966,10 @@ func (s *StateStore) BatchUpdateNodeDrain(msgType structs.MessageType, index uin // UpdateNodeDrain is used to update the drain of a node func (s *StateStore) UpdateNodeDrain(msgType structs.MessageType, index uint64, nodeID string, drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil { + return err } return txn.Commit() @@ -997,7 +998,6 @@ func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string, } // Update the drain in the copy - copyNode.Drain = drain != nil // COMPAT: Remove in Nomad 0.10 copyNode.DrainStrategy = drain if drain != nil { copyNode.SchedulingEligibility = structs.NodeSchedulingIneligible diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index fecd90ff2b09..f645a2788245 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -963,7 +963,6 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) { for _, id := range []string{n1.ID, n2.ID} { out, err := state.NodeByID(ws, id) require.Nil(err) - require.True(out.Drain) require.NotNil(out.DrainStrategy) require.Equal(out.DrainStrategy, expectedDrain) require.Len(out.Events, 2) @@ -1008,7 +1007,6 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) { ws = memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.True(out.Drain) require.NotNil(out.DrainStrategy) require.Equal(out.DrainStrategy, expectedDrain) require.Len(out.Events, 2) @@ -1152,7 +1150,6 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) { ws = memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.False(out.Drain) require.Nil(out.DrainStrategy) require.Equal(out.SchedulingEligibility, structs.NodeSchedulingEligible) require.Len(out.Events, 3) diff --git a/nomad/stream/ndjson.go b/nomad/stream/ndjson.go index 7e7ad0928104..3e806c26ea7a 100644 --- a/nomad/stream/ndjson.go +++ b/nomad/stream/ndjson.go @@ -1,11 +1,14 @@ package stream import ( + "bytes" "context" - "encoding/json" "fmt" "time" + "github.com/hashicorp/go-msgpack/codec" + + "github.com/hashicorp/nomad/nomad/jsonhandles" "github.com/hashicorp/nomad/nomad/structs" ) @@ -71,7 +74,9 @@ func (n *JsonStream) Send(v interface{}) error { return n.ctx.Err() } - buf, err := json.Marshal(v) + var buf bytes.Buffer + enc := codec.NewEncoder(&buf, jsonhandles.JsonHandleWithExtensions) + err := enc.Encode(v) if err != nil { return fmt.Errorf("error marshaling json for stream: %w", err) } @@ -79,7 +84,7 @@ func (n *JsonStream) Send(v interface{}) error { select { case <-n.ctx.Done(): return fmt.Errorf("error stream is no longer running: %w", err) - case n.outCh <- &structs.EventJson{Data: buf}: + case n.outCh <- &structs.EventJson{Data: buf.Bytes()}: } return nil diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ed885e7ebfd4..a367c869e036 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -518,12 +518,6 @@ type NodeUpdateDrainRequest struct { NodeID string DrainStrategy *DrainStrategy - // COMPAT Remove in version 0.10 - // As part of Nomad 0.8 we have deprecated the drain boolean in favor of a - // drain strategy but we need to handle the upgrade path where the Raft log - // contains drain updates with just the drain boolean being manipulated. - Drain bool - // MarkEligible marks the node as eligible if removing the drain strategy. MarkEligible bool @@ -1827,7 +1821,7 @@ type Node struct { // SecretID is an ID that is only known by the Node and the set of Servers. // It is not accessible via the API and is used to authenticate nodes // conducting privileged activities. - SecretID string + SecretID string `json:"-"` // Datacenter for this node Datacenter string @@ -1885,15 +1879,8 @@ type Node struct { // attributes and capabilities. ComputedClass string - // COMPAT: Remove in Nomad 0.9 - // Drain is controlled by the servers, and not the client. - // If true, no jobs will be scheduled to this node, and existing - // allocations will be drained. Superseded by DrainStrategy in Nomad - // 0.8 but kept for backward compat. - Drain bool - - // DrainStrategy determines the node's draining behavior. Will be nil - // when Drain=false. + // DrainStrategy determines the node's draining behavior. + // Will be non-nil only while draining. DrainStrategy *DrainStrategy // SchedulingEligibility determines whether this node will receive new @@ -1930,10 +1917,23 @@ type Node struct { ModifyIndex uint64 } +// Sanitize returns a copy of the Node omitting confidential fields +// It only returns a copy if the Node contains the confidential fields +func (n *Node) Sanitize() *Node { + if n == nil { + return nil + } + if n.SecretID == "" { + return n + } + clean := n.Copy() + clean.SecretID = "" + return clean +} + // Ready returns true if the node is ready for running allocations func (n *Node) Ready() bool { - // Drain is checked directly to support pre-0.8 Node data - return n.Status == NodeStatusReady && !n.Drain && n.SchedulingEligibility == NodeSchedulingEligible + return n.Status == NodeStatusReady && n.DrainStrategy == nil && n.SchedulingEligibility == NodeSchedulingEligible } func (n *Node) Canonicalize() { @@ -1941,15 +1941,12 @@ func (n *Node) Canonicalize() { return } - // COMPAT Remove in 0.10 - // In v0.8.0 we introduced scheduling eligibility, so we need to set it for - // upgrading nodes - if n.SchedulingEligibility == "" { - if n.Drain { - n.SchedulingEligibility = NodeSchedulingIneligible - } else { - n.SchedulingEligibility = NodeSchedulingEligible - } + // Ensure SchedulingEligibility is correctly set whenever draining so the plan applier and other scheduling logic + // only need to check SchedulingEligibility when determining whether a placement is feasible on a node. + if n.DrainStrategy != nil { + n.SchedulingEligibility = NodeSchedulingIneligible + } else if n.SchedulingEligibility == "" { + n.SchedulingEligibility = NodeSchedulingEligible } // COMPAT remove in 1.0 @@ -2138,7 +2135,7 @@ func (n *Node) Stub(fields *NodeStubFields) *NodeListStub { Name: n.Name, NodeClass: n.NodeClass, Version: n.Attributes["nomad.version"], - Drain: n.Drain, + Drain: n.DrainStrategy != nil, SchedulingEligibility: n.SchedulingEligibility, Status: n.Status, StatusDescription: n.StatusDescription, @@ -10683,18 +10680,6 @@ var MsgpackHandle = func() *codec.MsgpackHandle { return h }() -var ( - // JsonHandle and JsonHandlePretty are the codec handles to JSON encode - // structs. The pretty handle will add indents for easier human consumption. - JsonHandle = &codec.JsonHandle{ - HTMLCharsAsIs: true, - } - JsonHandlePretty = &codec.JsonHandle{ - HTMLCharsAsIs: true, - Indent: 4, - } -) - // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { return codec.NewDecoder(bytes.NewReader(buf), MsgpackHandle).Decode(out) diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index be61264d9ca5..e67def0da250 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -5519,7 +5519,11 @@ func TestNode_Canonicalize(t *testing.T) { require.Equal(NodeSchedulingEligible, node.SchedulingEligibility) node = &Node{ - Drain: true, + DrainStrategy: &DrainStrategy{ + DrainSpec: DrainSpec{ + Deadline: 30000, + }, + }, } node.Canonicalize() require.Equal(NodeSchedulingIneligible, node.SchedulingEligibility) @@ -5636,6 +5640,31 @@ func TestNode_Copy(t *testing.T) { require.Equal(node.Drivers, node2.Drivers) } +func TestNode_Sanitize(t *testing.T) { + require := require.New(t) + + testCases := []*Node{ + nil, + { + ID: uuid.Generate(), + SecretID: "", + }, + { + ID: uuid.Generate(), + SecretID: uuid.Generate(), + }, + } + for _, tc := range testCases { + sanitized := tc.Sanitize() + if tc == nil { + require.Nil(sanitized) + } else { + require.NotNil(sanitized) + require.Empty(sanitized.SecretID) + } + } +} + func TestSpread_Validate(t *testing.T) { type tc struct { spread *Spread diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 92c3f7787854..42db7d16a9fd 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -3087,8 +3087,7 @@ func TestServiceSched_NodeDrain(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Create some nodes @@ -3169,8 +3168,7 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node.Status = structs.NodeStatusDown require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) @@ -3302,7 +3300,7 @@ func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) { } require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs)) - node.Drain = true + node.DrainStrategy = mock.DrainNode().DrainStrategy require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Create a mock evaluation to deal with drain @@ -4155,8 +4153,7 @@ func TestBatchSched_Run_LostAlloc(t *testing.T) { func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) { h := NewHarness(t) - node := mock.Node() - node.Drain = true + node := mock.DrainNode() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Create a job @@ -4210,8 +4207,7 @@ func TestBatchSched_ReRun_SuccessfullyFinishedAlloc(t *testing.T) { // Create two nodes, one that is drained and has a successfully finished // alloc and a fresh undrained one - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node2 := mock.Node() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) @@ -4420,8 +4416,7 @@ func TestBatchSched_NodeDrain_Running_OldJob(t *testing.T) { // Create two nodes, one that is drained and has a successfully finished // alloc and a fresh undrained one - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node2 := mock.Node() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) @@ -4492,8 +4487,7 @@ func TestBatchSched_NodeDrain_Complete(t *testing.T) { // Create two nodes, one that is drained and has a successfully finished // alloc and a fresh undrained one - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node2 := mock.Node() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) @@ -4890,8 +4884,7 @@ func TestServiceSched_NodeDrain_Sticky(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Create an alloc on the draining node diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 0366ebb539cf..b0698b0bf9b4 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -885,10 +885,9 @@ func TestReconciler_DrainNode(t *testing.T) { // Build a map of tainted nodes tainted := make(map[string]*structs.Node, 2) for i := 0; i < 2; i++ { - n := mock.Node() + n := mock.DrainNode() n.ID = allocs[i].NodeID allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true) - n.Drain = true tainted[n.ID] = n } @@ -938,10 +937,9 @@ func TestReconciler_DrainNode_ScaleUp(t *testing.T) { // Build a map of tainted nodes tainted := make(map[string]*structs.Node, 2) for i := 0; i < 2; i++ { - n := mock.Node() + n := mock.DrainNode() n.ID = allocs[i].NodeID allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true) - n.Drain = true tainted[n.ID] = n } @@ -992,10 +990,9 @@ func TestReconciler_DrainNode_ScaleDown(t *testing.T) { // Build a map of tainted nodes tainted := make(map[string]*structs.Node, 3) for i := 0; i < 3; i++ { - n := mock.Node() + n := mock.DrainNode() n.ID = allocs[i].NodeID allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true) - n.Drain = true tainted[n.ID] = n } @@ -2994,10 +2991,9 @@ func TestReconciler_DrainNode_Canary(t *testing.T) { // Build a map of tainted nodes that contains the last canary tainted := make(map[string]*structs.Node, 1) - n := mock.Node() + n := mock.DrainNode() n.ID = allocs[11].NodeID allocs[11].DesiredTransition.Migrate = helper.BoolToPtr(true) - n.Drain = true tainted[n.ID] = n mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) @@ -3785,7 +3781,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) { if i == 0 { n.Status = structs.NodeStatusDown } else { - n.Drain = true + n.DrainStrategy = mock.DrainNode().DrainStrategy allocs[2+i].DesiredTransition.Migrate = helper.BoolToPtr(true) } tainted[n.ID] = n @@ -3870,7 +3866,7 @@ func TestReconciler_FailedDeployment_TaintedNodes(t *testing.T) { if i == 0 { n.Status = structs.NodeStatusDown } else { - n.Drain = true + n.DrainStrategy = mock.DrainNode().DrainStrategy allocs[6+i].DesiredTransition.Migrate = helper.BoolToPtr(true) } tainted[n.ID] = n diff --git a/scheduler/reconcile_util_test.go b/scheduler/reconcile_util_test.go index 6fb1c055542d..59772a349a20 100644 --- a/scheduler/reconcile_util_test.go +++ b/scheduler/reconcile_util_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/require" ) @@ -37,8 +38,8 @@ func TestAllocSet_filterByTainted(t *testing.T) { nodes := map[string]*structs.Node{ "draining": { - ID: "draining", - Drain: true, + ID: "draining", + DrainStrategy: mock.DrainNode().DrainStrategy, }, "lost": { ID: "lost", diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 35ed1ce5189d..c4cce45ffcc9 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -1051,8 +1051,7 @@ func TestSystemSched_NodeDrain_Down(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node.Status = structs.NodeStatusDown require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) @@ -1113,8 +1112,7 @@ func TestSystemSched_NodeDrain(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Generate a fake job allocated on that node. @@ -1708,9 +1706,8 @@ func TestSystemSched_PlanWithDrainedNode(t *testing.T) { h := NewHarness(t) // Register two nodes with two different classes - node := mock.Node() + node := mock.DrainNode() node.NodeClass = "green" - node.Drain = true node.ComputeClass() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) diff --git a/scheduler/util.go b/scheduler/util.go index e144fd42e886..caef16924207 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -252,13 +252,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int // Filter on datacenter and status node := raw.(*structs.Node) - if node.Status != structs.NodeStatusReady { - continue - } - if node.Drain { - continue - } - if node.SchedulingEligibility != structs.NodeSchedulingEligible { + if !node.Ready() { continue } if _, ok := dcMap[node.Datacenter]; !ok { @@ -327,7 +321,7 @@ func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*struct out[alloc.NodeID] = nil continue } - if structs.ShouldDrainNode(node.Status) || node.Drain { + if structs.ShouldDrainNode(node.Status) || node.DrainStrategy != nil { out[alloc.NodeID] = node } } diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 4fa17ce9d447..d0e3c6520063 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -39,8 +39,7 @@ func TestDiffSystemAllocsForNode(t *testing.T) { eligibleNode := mock.Node() eligibleNode.ID = "zip" - drainNode := mock.Node() - drainNode.Drain = true + drainNode := mock.DrainNode() deadNode := mock.Node() deadNode.Status = structs.NodeStatusDown @@ -220,8 +219,7 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) { func TestDiffSystemAllocs(t *testing.T) { job := mock.SystemJob() - drainNode := mock.Node() - drainNode.Drain = true + drainNode := mock.DrainNode() deadNode := mock.Node() deadNode.Status = structs.NodeStatusDown @@ -332,8 +330,7 @@ func TestReadyNodesInDCs(t *testing.T) { node3 := mock.Node() node3.Datacenter = "dc2" node3.Status = structs.NodeStatusDown - node4 := mock.Node() - node4.Drain = true + node4 := mock.DrainNode() require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)) require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2)) @@ -392,8 +389,7 @@ func TestTaintedNodes(t *testing.T) { node3 := mock.Node() node3.Datacenter = "dc2" node3.Status = structs.NodeStatusDown - node4 := mock.Node() - node4.Drain = true + node4 := mock.DrainNode() require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)) require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2)) require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3))