From f49cc0f9cdc8ccd16e3ede43cb97c36c0c3adb71 Mon Sep 17 00:00:00 2001 From: Evan Baker Date: Mon, 20 Sep 2021 18:20:33 -0500 Subject: [PATCH 1/5] add histo metric for IP allocation latency per unique pod Signed-off-by: Evan Baker --- cns/restserver/ipam.go | 105 ++++++++++++++++++++--------------- cns/restserver/metrics.go | 12 +++- cns/restserver/restserver.go | 24 ++++++++ 3 files changed, 95 insertions(+), 46 deletions(-) diff --git a/cns/restserver/ipam.go b/cns/restserver/ipam.go index d9d06445ed..9c2f2b678f 100644 --- a/cns/restserver/ipam.go +++ b/cns/restserver/ipam.go @@ -16,15 +16,8 @@ import ( // used to request an IPConfig from the CNS state func (service *HTTPRestService) requestIPConfigHandler(w http.ResponseWriter, r *http.Request) { - var ( - err error - ipconfigRequest cns.IPConfigRequest - podIpInfo cns.PodIpInfo - returnCode types.ResponseCode - returnMessage string - ) - - err = service.Listener.Decode(w, r, &ipconfigRequest) + var ipconfigRequest cns.IPConfigRequest + err := service.Listener.Decode(w, r, &ipconfigRequest) operationName := "requestIPConfigHandler" logger.Request(service.Name+operationName, ipconfigRequest, err) if err != nil { @@ -32,56 +25,83 @@ func (service *HTTPRestService) requestIPConfigHandler(w http.ResponseWriter, r } // retrieve ipconfig from nc - _, returnCode, returnMessage = service.validateIPConfigRequest(ipconfigRequest) - if returnCode == types.Success { - if podIpInfo, err = requestIPConfigHelper(service, ipconfigRequest); err != nil { - returnCode = types.FailedToAllocateIPConfig - returnMessage = fmt.Sprintf("AllocateIPConfig failed: %v, IP config request is %s", err, ipconfigRequest) + podInfo, returnCode, returnMessage := service.validateIPConfigRequest(ipconfigRequest) + if returnCode != types.Success { + reserveResp := &cns.IPConfigResponse{ + Response: cns.Response{ + ReturnCode: returnCode, + Message: returnMessage, + }, } + err = service.Listener.Encode(w, &reserveResp) + logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, reserveResp.Response.ReturnCode, err) + return } - resp := cns.Response{ - ReturnCode: returnCode, - Message: returnMessage, + // record a pod requesting an IP + service.podsPendingIPAllocation.Push(podInfo.Key()) + + podIPInfo, err := requestIPConfigHelper(service, ipconfigRequest) + if err != nil { + reserveResp := &cns.IPConfigResponse{ + Response: cns.Response{ + ReturnCode: types.FailedToAllocateIPConfig, + Message: fmt.Sprintf("AllocateIPConfig failed: %v, IP config request is %s", err, ipconfigRequest), + }, + PodIpInfo: podIPInfo, + } + err = service.Listener.Encode(w, &reserveResp) + logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, reserveResp.Response.ReturnCode, err) + return } + // record a pod allocated an IP + defer func() { + // observe allocation wait time + ipAllocationLatency.Observe(float64(service.podsPendingIPAllocation.Pop(podInfo.Key()))) + }() reserveResp := &cns.IPConfigResponse{ - Response: resp, + Response: cns.Response{ + ReturnCode: types.Success, + }, + PodIpInfo: podIPInfo, } - reserveResp.PodIpInfo = podIpInfo err = service.Listener.Encode(w, &reserveResp) - logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, resp.ReturnCode, err) + logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, reserveResp.Response.ReturnCode, err) } func (service *HTTPRestService) releaseIPConfigHandler(w http.ResponseWriter, r *http.Request) { - req := cns.IPConfigRequest{} - resp := cns.Response{} - var err error - - defer func() { - err = service.Listener.Encode(w, &resp) - logger.ResponseEx(service.Name, req, resp, resp.ReturnCode, err) - }() - - err = service.Listener.Decode(w, r, &req) + var req cns.IPConfigRequest + err := service.Listener.Decode(w, r, &req) logger.Request(service.Name+"releaseIPConfigHandler", req, err) if err != nil { - resp.ReturnCode = types.UnexpectedError - resp.Message = err.Error() + resp := cns.Response{ + ReturnCode: types.UnexpectedError, + Message: err.Error(), + } logger.Errorf("releaseIPConfigHandler decode failed becase %v, release IP config info %s", resp.Message, req) + err = service.Listener.Encode(w, &resp) + logger.ResponseEx(service.Name, req, resp, resp.ReturnCode, err) return } - var podInfo cns.PodInfo - podInfo, resp.ReturnCode, resp.Message = service.validateIPConfigRequest(req) + podInfo, returnCode, message := service.validateIPConfigRequest(req) + + // remove this pod from the waiting for IP allocation set, if it is present + _ = service.podsPendingIPAllocation.Pop(podInfo.Key()) if err = service.releaseIPConfig(podInfo); err != nil { - resp.ReturnCode = types.UnexpectedError - resp.Message = err.Error() - logger.Errorf("releaseIPConfigHandler releaseIPConfig failed because %v, release IP config info %s", resp.Message, req) - return + returnCode = types.UnexpectedError + message = err.Error() + logger.Errorf("releaseIPConfigHandler releaseIPConfig failed because %v, release IP config info %s", message, req) } + resp := cns.Response{ + ReturnCode: returnCode, + Message: message, + } + err = service.Listener.Encode(w, &resp) + logger.ResponseEx(service.Name, req, resp, resp.ReturnCode, err) } // MarkIPAsPendingRelease will set the IPs which are in PendingProgramming or Available to PendingRelease state @@ -418,19 +438,14 @@ func (service *HTTPRestService) AllocateAnyAvailableIPConfig(podInfo cns.PodInfo // If IPConfig is already allocated for pod, it returns that else it returns one of the available ipconfigs. func requestIPConfigHelper(service *HTTPRestService, req cns.IPConfigRequest) (cns.PodIpInfo, error) { - var ( - podIpInfo cns.PodIpInfo - isExist bool - ) - // check if ipconfig already allocated for this pod and return if exists or error // if error, ipstate is nil, if exists, ipstate is not nil and error is nil podInfo, err := cns.NewPodInfoFromIPConfigRequest(req) if err != nil { - return podIpInfo, err + return cns.PodIpInfo{}, err } - if podIpInfo, isExist, err = service.GetExistingIPConfig(podInfo); err != nil || isExist { + if podIpInfo, isExist, err := service.GetExistingIPConfig(podInfo); err != nil || isExist { return podIpInfo, err } diff --git a/cns/restserver/metrics.go b/cns/restserver/metrics.go index e11552e635..67d4b1250b 100644 --- a/cns/restserver/metrics.go +++ b/cns/restserver/metrics.go @@ -12,7 +12,7 @@ var httpRequestLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "http_request_latency_seconds", Help: "Request latency in seconds by endpoint, verb, and response code.", - //nolint:gomnd + //nolint:gomnd // default bucket consts Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), // 1 ms to ~16 seconds }, // TODO(rbtr): @@ -22,9 +22,19 @@ var httpRequestLatency = prometheus.NewHistogramVec( []string{"url", "verb"}, ) +var ipAllocationLatency = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "ip_allocation_latency_seconds", + Help: "IP allocation latency in seconds", + //nolint:gomnd // default bucket consts + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), // 1 ms to ~16 seconds + }, +) + func init() { metrics.Registry.MustRegister( httpRequestLatency, + ipAllocationLatency, ) } diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index 45d8844e55..b396fbeb44 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -34,6 +34,28 @@ var ( ncVersionURLs sync.Map ) +type lockedDurationSet struct { + sync.Mutex + waits map[string]time.Time +} + +func (m *lockedDurationSet) Push(key string) { + m.Lock() + defer m.Unlock() + _, ok := m.waits[key] + if !ok { + m.waits[key] = time.Now() + } +} + +func (m *lockedDurationSet) Pop(key string) time.Duration { + m.Lock() + start := m.waits[key] + delete(m.waits, key) + m.Unlock() + return time.Since(start) +} + // HTTPRestService represents http listener for CNS - Container Networking Service. type HTTPRestService struct { *cns.Service @@ -48,6 +70,7 @@ type HTTPRestService struct { routingTable *routes.RoutingTable store store.KeyValueStore state *httpRestServiceState + podsPendingIPAllocation *lockedDurationSet sync.RWMutex dncPartitionKey string } @@ -137,6 +160,7 @@ func NewHTTPRestService(config *common.ServiceConfig, imdsClientInterface imdscl PodIPConfigState: podIPConfigState, routingTable: routingTable, state: serviceState, + podsPendingIPAllocation: &lockedDurationSet{waits: map[string]time.Time{}}, }, nil } From f3c7fe6301b91cc3294af4a81ca7f76f319011f3 Mon Sep 17 00:00:00 2001 From: Evan Baker Date: Mon, 20 Sep 2021 19:08:23 -0500 Subject: [PATCH 2/5] fix typo in ipam metrics Signed-off-by: Evan Baker --- cns/ipampoolmonitor/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cns/ipampoolmonitor/metrics.go b/cns/ipampoolmonitor/metrics.go index 64fba518ac..7c9b8ce901 100644 --- a/cns/ipampoolmonitor/metrics.go +++ b/cns/ipampoolmonitor/metrics.go @@ -56,7 +56,7 @@ var ( ) ipamRequestedIPConfigCount = prometheus.NewGauge( prometheus.GaugeOpts{ - Name: "ipam_reuested_ips", + Name: "ipam_requested_ips", Help: "Requested IP count.", }, ) From 7227e386c9f35dff3972983225a1bc5bbbe45405 Mon Sep 17 00:00:00 2001 From: Evan Baker Date: Tue, 21 Sep 2021 12:53:06 -0500 Subject: [PATCH 3/5] delint Signed-off-by: Evan Baker --- cns/restserver/internalapi.go | 2 +- cns/restserver/ipam.go | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/cns/restserver/internalapi.go b/cns/restserver/internalapi.go index 41f84e8fce..164d7fbfc9 100644 --- a/cns/restserver/internalapi.go +++ b/cns/restserver/internalapi.go @@ -241,8 +241,8 @@ func (service *HTTPRestService) ReconcileNCState( ipconfigRequest := cns.IPConfigRequest{ DesiredIPAddress: secIpConfig.IPAddress, OrchestratorContext: jsonContext, - PodInterfaceID: podInfo.InterfaceID(), InfraContainerID: podInfo.InfraContainerID(), + PodInterfaceID: podInfo.InterfaceID(), } if _, err := requestIPConfigHelper(service, ipconfigRequest); err != nil { diff --git a/cns/restserver/ipam.go b/cns/restserver/ipam.go index 9c2f2b678f..86a94fe26e 100644 --- a/cns/restserver/ipam.go +++ b/cns/restserver/ipam.go @@ -12,6 +12,7 @@ import ( "github.com/Azure/azure-container-networking/cns/filter" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/types" + "github.com/pkg/errors" ) // used to request an IPConfig from the CNS state @@ -442,11 +443,11 @@ func requestIPConfigHelper(service *HTTPRestService, req cns.IPConfigRequest) (c // if error, ipstate is nil, if exists, ipstate is not nil and error is nil podInfo, err := cns.NewPodInfoFromIPConfigRequest(req) if err != nil { - return cns.PodIpInfo{}, err + return cns.PodIpInfo{}, errors.Wrapf(err, "failed to parse IPConfigRequest %v", req) } - if podIpInfo, isExist, err := service.GetExistingIPConfig(podInfo); err != nil || isExist { - return podIpInfo, err + if podIPInfo, isExist, err := service.GetExistingIPConfig(podInfo); err != nil || isExist { + return podIPInfo, err } // return desired IPConfig From 1405320d85cfdbe5cf80575b6ca12e330da76624 Mon Sep 17 00:00:00 2001 From: Evan Baker Date: Tue, 21 Sep 2021 20:03:32 -0500 Subject: [PATCH 4/5] noop on non-existent key Signed-off-by: Evan Baker --- cns/restserver/ipam.go | 4 +++- cns/restserver/restserver.go | 9 ++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/cns/restserver/ipam.go b/cns/restserver/ipam.go index 86a94fe26e..ea11caf55d 100644 --- a/cns/restserver/ipam.go +++ b/cns/restserver/ipam.go @@ -59,7 +59,9 @@ func (service *HTTPRestService) requestIPConfigHandler(w http.ResponseWriter, r // record a pod allocated an IP defer func() { // observe allocation wait time - ipAllocationLatency.Observe(float64(service.podsPendingIPAllocation.Pop(podInfo.Key()))) + if since := service.podsPendingIPAllocation.Pop(podInfo.Key()); since > 0 { + ipAllocationLatency.Observe(float64(since)) + } }() reserveResp := &cns.IPConfigResponse{ Response: cns.Response{ diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index b396fbeb44..51eaf85386 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -39,6 +39,8 @@ type lockedDurationSet struct { waits map[string]time.Time } +// Push registers the passed key and saves the timestamp it is first registered. +// If the key is already registered, does not overwrite the saved timestamp. func (m *lockedDurationSet) Push(key string) { m.Lock() defer m.Unlock() @@ -48,11 +50,16 @@ func (m *lockedDurationSet) Push(key string) { } } +// Pop returns the elapsed duration since the passed key was first registered, +// or -1 if it is not found. func (m *lockedDurationSet) Pop(key string) time.Duration { m.Lock() - start := m.waits[key] + start, ok := m.waits[key] delete(m.waits, key) m.Unlock() + if !ok { + return -1 + } return time.Since(start) } From 1c564de316787cbf223eb5089bf5c02ee2b7fa44 Mon Sep 17 00:00:00 2001 From: Evan Baker Date: Tue, 28 Sep 2021 14:01:44 -0500 Subject: [PATCH 5/5] create bounded, mapped heap to record pod-first-seen times during ip allocation Signed-off-by: Evan Baker --- cns/restserver/ipam.go | 3 -- cns/restserver/restserver.go | 34 ++---------- cns/types/bounded/mappedheap.go | 74 +++++++++++++++++++++++++ cns/types/bounded/timedset.go | 75 ++++++++++++++++++++++++++ cns/types/bounded/timedset_test.go | 86 ++++++++++++++++++++++++++++++ 5 files changed, 238 insertions(+), 34 deletions(-) create mode 100644 cns/types/bounded/mappedheap.go create mode 100644 cns/types/bounded/timedset.go create mode 100644 cns/types/bounded/timedset_test.go diff --git a/cns/restserver/ipam.go b/cns/restserver/ipam.go index ea11caf55d..1e2aea8982 100644 --- a/cns/restserver/ipam.go +++ b/cns/restserver/ipam.go @@ -91,9 +91,6 @@ func (service *HTTPRestService) releaseIPConfigHandler(w http.ResponseWriter, r podInfo, returnCode, message := service.validateIPConfigRequest(req) - // remove this pod from the waiting for IP allocation set, if it is present - _ = service.podsPendingIPAllocation.Pop(podInfo.Key()) - if err = service.releaseIPConfig(podInfo); err != nil { returnCode = types.UnexpectedError message = err.Error() diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index 51eaf85386..605a047291 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -17,6 +17,7 @@ import ( "github.com/Azure/azure-container-networking/cns/nmagentclient" "github.com/Azure/azure-container-networking/cns/routes" "github.com/Azure/azure-container-networking/cns/types" + "github.com/Azure/azure-container-networking/cns/types/bounded" acn "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/store" ) @@ -34,35 +35,6 @@ var ( ncVersionURLs sync.Map ) -type lockedDurationSet struct { - sync.Mutex - waits map[string]time.Time -} - -// Push registers the passed key and saves the timestamp it is first registered. -// If the key is already registered, does not overwrite the saved timestamp. -func (m *lockedDurationSet) Push(key string) { - m.Lock() - defer m.Unlock() - _, ok := m.waits[key] - if !ok { - m.waits[key] = time.Now() - } -} - -// Pop returns the elapsed duration since the passed key was first registered, -// or -1 if it is not found. -func (m *lockedDurationSet) Pop(key string) time.Duration { - m.Lock() - start, ok := m.waits[key] - delete(m.waits, key) - m.Unlock() - if !ok { - return -1 - } - return time.Since(start) -} - // HTTPRestService represents http listener for CNS - Container Networking Service. type HTTPRestService struct { *cns.Service @@ -77,7 +49,7 @@ type HTTPRestService struct { routingTable *routes.RoutingTable store store.KeyValueStore state *httpRestServiceState - podsPendingIPAllocation *lockedDurationSet + podsPendingIPAllocation *bounded.TimedSet sync.RWMutex dncPartitionKey string } @@ -167,7 +139,7 @@ func NewHTTPRestService(config *common.ServiceConfig, imdsClientInterface imdscl PodIPConfigState: podIPConfigState, routingTable: routingTable, state: serviceState, - podsPendingIPAllocation: &lockedDurationSet{waits: map[string]time.Time{}}, + podsPendingIPAllocation: bounded.NewTimedSet(250), // nolint:gomnd // maxpods }, nil } diff --git a/cns/types/bounded/mappedheap.go b/cns/types/bounded/mappedheap.go new file mode 100644 index 0000000000..8d8309bc54 --- /dev/null +++ b/cns/types/bounded/mappedheap.go @@ -0,0 +1,74 @@ +package bounded + +import ( + "container/heap" +) + +// Item describes a type accepted by the mapped heap implementation. +type Item interface { + // Key is used for map index operations. + Key() string + // Less is used for heap sorting operations. + Less(Item) bool + // SetIndex is called by heap implementations to set the Item heap index. + SetIndex(int) + // Index returns the index of this Item. + Index() int +} + +var _ heap.Interface = (*MappedHeap)(nil) + +// MappedHeap is a combination of map and heap structures which allows for +// efficient sorting, uniqueness guarantees, and constant time lookups. +// Implements heap.Interface. +type MappedHeap struct { + m map[string]Item + items []Item +} + +func NewMappedHeap() *MappedHeap { + return &MappedHeap{ + m: make(map[string]Item), + } +} + +func (mh MappedHeap) Contains(key string) (int, bool) { + item, ok := mh.m[key] + if ok { + return item.Index(), true + } + return -1, false +} + +func (mh MappedHeap) Len() int { + return len(mh.items) +} + +func (mh MappedHeap) Less(i, j int) bool { + return mh.items[i].Less(mh.items[j]) +} + +func (mh *MappedHeap) Swap(i, j int) { + mh.items[i], mh.items[j] = mh.items[j], mh.items[i] + mh.items[i].SetIndex(i) + mh.items[j].SetIndex(j) +} + +func (mh *MappedHeap) Push(x interface{}) { + n := len(mh.items) + item := x.(Item) + item.SetIndex(n) + mh.items = append(mh.items, item) + mh.m[item.Key()] = item +} + +func (mh *MappedHeap) Pop() interface{} { + old := mh.items + n := len(old) + item := old[n-1] + old[n-1] = nil + item.SetIndex(-1) + mh.items = old[0 : n-1] + delete(mh.m, item.Key()) + return item +} diff --git a/cns/types/bounded/timedset.go b/cns/types/bounded/timedset.go new file mode 100644 index 0000000000..dd4f6e031e --- /dev/null +++ b/cns/types/bounded/timedset.go @@ -0,0 +1,75 @@ +package bounded + +import ( + "container/heap" + "sync" + "time" +) + +var _ Item = (*TimedItem)(nil) + +// TimedItem implements Item for a string: time.Time tuple. +type TimedItem struct { + Name string + Time time.Time + index int +} + +func (t *TimedItem) Key() string { + return t.Name +} + +func (t *TimedItem) Less(o Item) bool { + other := o.(*TimedItem) + return t.Time.Before(other.Time) +} + +func (t *TimedItem) Index() int { + return t.index +} + +func (t *TimedItem) SetIndex(i int) { + t.index = i +} + +type TimedSet struct { + sync.Mutex + capacity int + items *MappedHeap +} + +func NewTimedSet(c int) *TimedSet { + return &TimedSet{ + capacity: c, + items: NewMappedHeap(), + } +} + +// Push registers the passed key and saves the timestamp it is first registered. +// If the key is already registered, does not overwrite the saved timestamp. +func (ts *TimedSet) Push(key string) { + ts.Lock() + defer ts.Unlock() + if _, ok := ts.items.Contains(key); ok { + return + } + if ts.items.Len() >= ts.capacity { + _ = heap.Pop(ts.items) + } + item := &TimedItem{Name: key} + item.Time = time.Now() + heap.Push(ts.items, item) +} + +// Pop returns the elapsed duration since the passed key was first registered, +// or -1 if it is not found. +func (ts *TimedSet) Pop(key string) time.Duration { + ts.Lock() + defer ts.Unlock() + idx, ok := ts.items.Contains(key) + if !ok { + return -1 + } + item := heap.Remove(ts.items, idx) + return time.Since(item.(*TimedItem).Time) +} diff --git a/cns/types/bounded/timedset_test.go b/cns/types/bounded/timedset_test.go new file mode 100644 index 0000000000..6a010e1515 --- /dev/null +++ b/cns/types/bounded/timedset_test.go @@ -0,0 +1,86 @@ +package bounded + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewTimedSet(t *testing.T) { + tests := []struct { + name string + cap int + in []string + out []string + dropped []string + }{ + { + name: "size 1", + cap: 1, + in: []string{"a", "b", "c"}, + out: []string{"c"}, + dropped: []string{"a", "b"}, + }, + { + name: "overflow", + cap: 2, + in: []string{"a", "b", "c"}, + out: []string{"b", "c"}, + dropped: []string{"a"}, + }, + { + name: "not present", + cap: 2, + in: []string{"a", "b"}, + out: []string{"a", "b"}, + dropped: []string{"c", "d"}, + }, + { + name: "dupe push", + cap: 2, + in: []string{"a", "a", "a", "a"}, + out: []string{"a"}, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() // this test has timer delay, so parallelize + ts := NewTimedSet(tt.cap) + + firstTime := map[string]time.Time{} + for _, item := range tt.in { + ts.Push(item) + time.Sleep(5 * time.Millisecond) + if _, ok := firstTime[item]; !ok { + firstTime[item] = ts.items.m[item].(*TimedItem).Time + } + } + + require.LessOrEqual(t, ts.items.Len(), tt.cap) + + times := []time.Duration{} + for _, item := range tt.out { + _, ok := ts.items.Contains(item) + assert.True(t, ok) + assert.Equal(t, firstTime[item], ts.items.m[item].(*TimedItem).Time) + + times = append(times, ts.Pop(item)) + } + + for _, item := range tt.dropped { + _, ok := ts.items.Contains(item) + assert.False(t, ok) + assert.Negative(t, ts.Pop(item)) + } + + require.NotContains(t, times, time.Duration(-1)) + + for i := 0; i < len(times)-1; i++ { + assert.Less(t, times[i+1], times[i]) + } + }) + } +}