Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Dynamic Metadata #1077

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
584 changes: 305 additions & 279 deletions Documentation/api-v1.md

Large diffs are not rendered by default.

320 changes: 160 additions & 160 deletions Documentation/unit-files-and-scheduling.md

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion agent/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,13 @@ func desiredAgentState(a *Agent, reg registry.Registry) (*AgentState, error) {
return nil, err
}

ms := a.Machine.State()
// fetch full machine state from registry instead of
// using the local version to allow for dynamic metadata
ms, err := reg.MachineState(a.Machine.State().ID)
if err != nil {
log.Errorf("Failed fetching machine state from Registry: %v", err)
return nil, err
}
as := AgentState{
MState: &ms,
Units: make(map[string]*job.Unit),
Expand Down
1 change: 1 addition & 0 deletions agent/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ MachineMetadata=dog=woof`),
reg := registry.NewFakeRegistry()
reg.SetJobs(tt.regJobs)
a := makeAgentWithMetadata(tt.metadata)
reg.SetMachines([]machine.MachineState{a.Machine.State()})
as, err := desiredAgentState(a, reg)
if err != nil {
t.Errorf("case %d: unexpected error: %v", i, err)
Expand Down
74 changes: 70 additions & 4 deletions api/machines.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@
package api

import (
"fmt"
"encoding/json"
"errors"
"net/http"
"path"
"regexp"

"github.com/coreos/fleet/client"
"github.com/coreos/fleet/log"
"github.com/coreos/fleet/machine"
"github.com/coreos/fleet/schema"
)

var (
metadataPathRegex = regexp.MustCompile("^/([^/]+)/metadata/([A-Za-z0-9_.-]+$)")
)

func wireUpMachinesResource(mux *http.ServeMux, prefix string, cAPI client.API) {
res := path.Join(prefix, "machines")
mr := machinesResource{cAPI}
Expand All @@ -35,12 +41,24 @@ type machinesResource struct {
cAPI client.API
}

type machineMetadataOp struct {
Operation string `json:"op"`
Path string
Value string
}

func (mr *machinesResource) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if req.Method != "GET" {
sendError(rw, http.StatusBadRequest, fmt.Errorf("only HTTP GET supported against this resource"))
return
switch req.Method {
case "GET":
mr.list(rw, req)
case "PATCH":
mr.patch(rw, req)
default:
sendError(rw, http.StatusMethodNotAllowed, errors.New("only GET and PATCH supported against this resource"))
}
}

func (mr *machinesResource) list(rw http.ResponseWriter, req *http.Request) {
token, err := findNextPageToken(req.URL)
if err != nil {
sendError(rw, http.StatusBadRequest, err)
Expand All @@ -62,6 +80,54 @@ func (mr *machinesResource) ServeHTTP(rw http.ResponseWriter, req *http.Request)
sendResponse(rw, http.StatusOK, page)
}

func (mr *machinesResource) patch(rw http.ResponseWriter, req *http.Request) {
var ops []machineMetadataOp
dec := json.NewDecoder(req.Body)
if err := dec.Decode(&ops); err != nil {
sendError(rw, http.StatusBadRequest, err)
return
}

for _, op := range ops {
if op.Operation != "add" && op.Operation != "remove" && op.Operation != "replace" {
sendError(rw, http.StatusBadRequest, errors.New("invalid op: expect add, remove, or replace"))
return
}

if metadataPathRegex.FindStringSubmatch(op.Path) == nil {
sendError(rw, http.StatusBadRequest, errors.New("machine metadata path invalid"))
return
}

if op.Operation != "remove" && len(op.Value) == 0 {
sendError(rw, http.StatusBadRequest, errors.New("invalid value: add and replace require a value"))
return
}
}

for _, op := range ops {
// regex already validated above
s := metadataPathRegex.FindStringSubmatch(op.Path)
machID := s[1]
key := s[2]

if op.Operation == "remove" {
err := mr.cAPI.DeleteMachineMetadata(machID, key)
if err != nil {
sendError(rw, http.StatusInternalServerError, err)
return
}
} else {
err := mr.cAPI.SetMachineMetadata(machID, key, op.Value)
if err != nil {
sendError(rw, http.StatusInternalServerError, err)
return
}
}
}
sendResponse(rw, http.StatusNoContent, nil)
}

func getMachinePage(cAPI client.API, tok PageToken) (*schema.MachinePage, error) {
all, err := cAPI.Machines()
if err != nil {
Expand Down
156 changes: 149 additions & 7 deletions api/machines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,30 @@ import (
"net/http/httptest"
"reflect"
"strconv"
"strings"
"testing"

"github.com/coreos/fleet/client"
"github.com/coreos/fleet/machine"
"github.com/coreos/fleet/registry"
)

func TestMachinesList(t *testing.T) {
func fakeMachinesSetup() (*machinesResource, *httptest.ResponseRecorder) {
fr := registry.NewFakeRegistry()
fr.SetMachines([]machine.MachineState{
{ID: "XXX", PublicIP: "", Metadata: nil},
{ID: "XXX", PublicIP: "", Metadata: map[string]string{}},
{ID: "YYY", PublicIP: "1.2.3.4", Metadata: map[string]string{"ping": "pong"}},
})
fAPI := &client.RegistryClient{Registry: fr}
resource := &machinesResource{cAPI: fAPI}
rw := httptest.NewRecorder()
req, err := http.NewRequest("GET", "http://example.com", nil)

return resource, rw
}

func TestMachinesList(t *testing.T) {
resource, rw := fakeMachinesSetup()
req, err := http.NewRequest("GET", "http://example.com/machines", nil)
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}
Expand Down Expand Up @@ -63,11 +70,23 @@ func TestMachinesList(t *testing.T) {
}
}

func TestMachinesListBadMethod(t *testing.T) {
resource, rw := fakeMachinesSetup()
req, err := http.NewRequest("POST", "http://example.com/machines", nil)
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}

resource.ServeHTTP(rw, req)

err = assertErrorResponse(rw, http.StatusMethodNotAllowed)
if err != nil {
t.Error(err.Error())
}
}

func TestMachinesListBadNextPageToken(t *testing.T) {
fr := registry.NewFakeRegistry()
fAPI := &client.RegistryClient{Registry: fr}
resource := &machinesResource{fAPI}
rw := httptest.NewRecorder()
resource, rw := fakeMachinesSetup()
req, err := http.NewRequest("GET", "http://example.com/machines?nextPageToken=EwBMLg==", nil)
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
Expand Down Expand Up @@ -136,3 +155,126 @@ func TestExtractMachinePage(t *testing.T) {
}
}
}

func TestMachinesPatchAddModify(t *testing.T) {
reqBody := `
[{"op": "add", "path": "/XXX/metadata/foo", "value": "bar"},
{"op": "replace", "path": "/YYY/metadata/ping", "value": "splat"}]
`

resource, rw := fakeMachinesSetup()
req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody))
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}

resource.ServeHTTP(rw, req)
if rw.Code != http.StatusNoContent {
t.Errorf("Expected 204, got %d", rw.Code)
}

// fetch machine to make sure data has been added
req, err = http.NewRequest("GET", "http://example.com/machines", nil)
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}
rw.Body.Reset()
resource.ServeHTTP(rw, req)

if rw.Body == nil {
t.Error("Received nil response body")
} else {
body := rw.Body.String()
expected := `{"machines":[{"id":"XXX","metadata":{"foo":"bar"}},{"id":"YYY","metadata":{"ping":"splat"},"primaryIP":"1.2.3.4"}]}`
if body != expected {
t.Errorf("Expected body:\n%s\n\nReceived body:\n%s\n", expected, body)
}
}
}

func TestMachinesPatchDelete(t *testing.T) {
reqBody := `
[{"op": "remove", "path": "/XXX/metadata/foo"},
{"op": "remove", "path": "/YYY/metadata/ping"}]
`

resource, rw := fakeMachinesSetup()
req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody))
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}

resource.ServeHTTP(rw, req)
if rw.Code != http.StatusNoContent {
t.Errorf("Expected 204, got %d", rw.Code)
}

// fetch machine to make sure data has been added
req, err = http.NewRequest("GET", "http://example.com/machines", nil)
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}
rw.Body.Reset()
resource.ServeHTTP(rw, req)

if rw.Body == nil {
t.Error("Received nil response body")
} else {
body := rw.Body.String()
expected := `{"machines":[{"id":"XXX"},{"id":"YYY","primaryIP":"1.2.3.4"}]}`
if body != expected {
t.Errorf("Expected body:\n%s\n\nReceived body:\n%s\n", expected, body)
}
}
}

func TestMachinesPatchBadOp(t *testing.T) {
reqBody := `
[{"op": "noop", "path": "/XXX/metadata/foo", "value": "bar"}]
`

resource, rw := fakeMachinesSetup()
req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody))
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}

resource.ServeHTTP(rw, req)
if rw.Code != http.StatusBadRequest {
t.Errorf("Expected 400, got %d", rw.Code)
}
}

func TestMachinesPatchBadPath(t *testing.T) {
reqBody := `
[{"op": "add", "path": "/XXX/foo", "value": "bar"}]
`

resource, rw := fakeMachinesSetup()
req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody))
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}

resource.ServeHTTP(rw, req)
if rw.Code != http.StatusBadRequest {
t.Errorf("Expected 400, got %d", rw.Code)
}
}

func TestMachinesPatchBadValue(t *testing.T) {
reqBody := `
[{"op": "add", "path": "/XXX/foo"}]
`

resource, rw := fakeMachinesSetup()
req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody))
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}

resource.ServeHTTP(rw, req)
if rw.Code != http.StatusBadRequest {
t.Errorf("Expected 400, got %d", rw.Code)
}
}
2 changes: 2 additions & 0 deletions client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

type API interface {
Machines() ([]machine.MachineState, error)
SetMachineMetadata(machID, key, value string) error
DeleteMachineMetadata(machID, key string) error

Unit(string) (*schema.Unit, error)
Units() ([]*schema.Unit, error)
Expand Down
31 changes: 31 additions & 0 deletions registry/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,37 @@ func (f *FakeRegistry) UnitHeartbeat(name, machID string, ttl time.Duration) err

func (f *FakeRegistry) ClearUnitHeartbeat(string) {}

func (f *FakeRegistry) SetMachineMetadata(machID string, key string, value string) error {
for _, mach := range f.machines {
if mach.ID == machID {
mach.Metadata[key] = value
}
}
return nil
}

func (f *FakeRegistry) DeleteMachineMetadata(machID string, key string) error {
for _, mach := range f.machines {
if mach.ID == machID {
delete(mach.Metadata, key)
}
}
return nil
}

func (f *FakeRegistry) MachineState(machID string) (machine.MachineState, error) {
f.RLock()
defer f.RUnlock()

for _, mach := range f.machines {
if mach.ID == machID {
return mach, nil
}
}

return machine.MachineState{}, errors.New("Machine state not found")
}

func NewFakeClusterRegistry(dVersion *semver.Version, eVersion int) *FakeClusterRegistry {
return &FakeClusterRegistry{
dVersion: dVersion,
Expand Down
3 changes: 3 additions & 0 deletions registry/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ type Registry interface {
ScheduleUnit(name, machID string) error
SetUnitTargetState(name string, state job.JobState) error
SetMachineState(ms machine.MachineState, ttl time.Duration) (uint64, error)
MachineState(machID string) (machine.MachineState, error)
UnscheduleUnit(name, machID string) error
SetMachineMetadata(machID, key, value string) error
DeleteMachineMetadata(machID, key string) error

UnitRegistry
}
Expand Down
Loading