diff --git a/cns/ipampoolmonitor/metrics.go b/cns/ipampoolmonitor/metrics.go index 64fba518ac..7c9b8ce901 100644 --- a/cns/ipampoolmonitor/metrics.go +++ b/cns/ipampoolmonitor/metrics.go @@ -56,7 +56,7 @@ var ( ) ipamRequestedIPConfigCount = prometheus.NewGauge( prometheus.GaugeOpts{ - Name: "ipam_reuested_ips", + Name: "ipam_requested_ips", Help: "Requested IP count.", }, ) diff --git a/cns/restserver/internalapi.go b/cns/restserver/internalapi.go index 41f84e8fce..164d7fbfc9 100644 --- a/cns/restserver/internalapi.go +++ b/cns/restserver/internalapi.go @@ -241,8 +241,8 @@ func (service *HTTPRestService) ReconcileNCState( ipconfigRequest := cns.IPConfigRequest{ DesiredIPAddress: secIpConfig.IPAddress, OrchestratorContext: jsonContext, - PodInterfaceID: podInfo.InterfaceID(), InfraContainerID: podInfo.InfraContainerID(), + PodInterfaceID: podInfo.InterfaceID(), } if _, err := requestIPConfigHelper(service, ipconfigRequest); err != nil { diff --git a/cns/restserver/ipam.go b/cns/restserver/ipam.go index d9d06445ed..1e2aea8982 100644 --- a/cns/restserver/ipam.go +++ b/cns/restserver/ipam.go @@ -12,19 +12,13 @@ import ( "github.com/Azure/azure-container-networking/cns/filter" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/types" + "github.com/pkg/errors" ) // used to request an IPConfig from the CNS state func (service *HTTPRestService) requestIPConfigHandler(w http.ResponseWriter, r *http.Request) { - var ( - err error - ipconfigRequest cns.IPConfigRequest - podIpInfo cns.PodIpInfo - returnCode types.ResponseCode - returnMessage string - ) - - err = service.Listener.Decode(w, r, &ipconfigRequest) + var ipconfigRequest cns.IPConfigRequest + err := service.Listener.Decode(w, r, &ipconfigRequest) operationName := "requestIPConfigHandler" logger.Request(service.Name+operationName, ipconfigRequest, err) if err != nil { @@ -32,56 +26,82 @@ func (service *HTTPRestService) requestIPConfigHandler(w http.ResponseWriter, r } // retrieve ipconfig from nc - _, returnCode, returnMessage = service.validateIPConfigRequest(ipconfigRequest) - if returnCode == types.Success { - if podIpInfo, err = requestIPConfigHelper(service, ipconfigRequest); err != nil { - returnCode = types.FailedToAllocateIPConfig - returnMessage = fmt.Sprintf("AllocateIPConfig failed: %v, IP config request is %s", err, ipconfigRequest) + podInfo, returnCode, returnMessage := service.validateIPConfigRequest(ipconfigRequest) + if returnCode != types.Success { + reserveResp := &cns.IPConfigResponse{ + Response: cns.Response{ + ReturnCode: returnCode, + Message: returnMessage, + }, } + err = service.Listener.Encode(w, &reserveResp) + logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, reserveResp.Response.ReturnCode, err) + return } - resp := cns.Response{ - ReturnCode: returnCode, - Message: returnMessage, + // record a pod requesting an IP + service.podsPendingIPAllocation.Push(podInfo.Key()) + + podIPInfo, err := requestIPConfigHelper(service, ipconfigRequest) + if err != nil { + reserveResp := &cns.IPConfigResponse{ + Response: cns.Response{ + ReturnCode: types.FailedToAllocateIPConfig, + Message: fmt.Sprintf("AllocateIPConfig failed: %v, IP config request is %s", err, ipconfigRequest), + }, + PodIpInfo: podIPInfo, + } + err = service.Listener.Encode(w, &reserveResp) + logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, reserveResp.Response.ReturnCode, err) + return } + // record a pod allocated an IP + defer func() { + // observe allocation wait time + if since := service.podsPendingIPAllocation.Pop(podInfo.Key()); since > 0 { + ipAllocationLatency.Observe(float64(since)) + } + }() reserveResp := &cns.IPConfigResponse{ - Response: resp, + Response: cns.Response{ + ReturnCode: types.Success, + }, + PodIpInfo: podIPInfo, } - reserveResp.PodIpInfo = podIpInfo err = service.Listener.Encode(w, &reserveResp) - logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, resp.ReturnCode, err) + logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, reserveResp.Response.ReturnCode, err) } func (service *HTTPRestService) releaseIPConfigHandler(w http.ResponseWriter, r *http.Request) { - req := cns.IPConfigRequest{} - resp := cns.Response{} - var err error - - defer func() { - err = service.Listener.Encode(w, &resp) - logger.ResponseEx(service.Name, req, resp, resp.ReturnCode, err) - }() - - err = service.Listener.Decode(w, r, &req) + var req cns.IPConfigRequest + err := service.Listener.Decode(w, r, &req) logger.Request(service.Name+"releaseIPConfigHandler", req, err) if err != nil { - resp.ReturnCode = types.UnexpectedError - resp.Message = err.Error() + resp := cns.Response{ + ReturnCode: types.UnexpectedError, + Message: err.Error(), + } logger.Errorf("releaseIPConfigHandler decode failed becase %v, release IP config info %s", resp.Message, req) + err = service.Listener.Encode(w, &resp) + logger.ResponseEx(service.Name, req, resp, resp.ReturnCode, err) return } - var podInfo cns.PodInfo - podInfo, resp.ReturnCode, resp.Message = service.validateIPConfigRequest(req) + podInfo, returnCode, message := service.validateIPConfigRequest(req) if err = service.releaseIPConfig(podInfo); err != nil { - resp.ReturnCode = types.UnexpectedError - resp.Message = err.Error() - logger.Errorf("releaseIPConfigHandler releaseIPConfig failed because %v, release IP config info %s", resp.Message, req) - return + returnCode = types.UnexpectedError + message = err.Error() + logger.Errorf("releaseIPConfigHandler releaseIPConfig failed because %v, release IP config info %s", message, req) } + resp := cns.Response{ + ReturnCode: returnCode, + Message: message, + } + err = service.Listener.Encode(w, &resp) + logger.ResponseEx(service.Name, req, resp, resp.ReturnCode, err) } // MarkIPAsPendingRelease will set the IPs which are in PendingProgramming or Available to PendingRelease state @@ -418,20 +438,15 @@ func (service *HTTPRestService) AllocateAnyAvailableIPConfig(podInfo cns.PodInfo // If IPConfig is already allocated for pod, it returns that else it returns one of the available ipconfigs. func requestIPConfigHelper(service *HTTPRestService, req cns.IPConfigRequest) (cns.PodIpInfo, error) { - var ( - podIpInfo cns.PodIpInfo - isExist bool - ) - // check if ipconfig already allocated for this pod and return if exists or error // if error, ipstate is nil, if exists, ipstate is not nil and error is nil podInfo, err := cns.NewPodInfoFromIPConfigRequest(req) if err != nil { - return podIpInfo, err + return cns.PodIpInfo{}, errors.Wrapf(err, "failed to parse IPConfigRequest %v", req) } - if podIpInfo, isExist, err = service.GetExistingIPConfig(podInfo); err != nil || isExist { - return podIpInfo, err + if podIPInfo, isExist, err := service.GetExistingIPConfig(podInfo); err != nil || isExist { + return podIPInfo, err } // return desired IPConfig diff --git a/cns/restserver/metrics.go b/cns/restserver/metrics.go index e11552e635..67d4b1250b 100644 --- a/cns/restserver/metrics.go +++ b/cns/restserver/metrics.go @@ -12,7 +12,7 @@ var httpRequestLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "http_request_latency_seconds", Help: "Request latency in seconds by endpoint, verb, and response code.", - //nolint:gomnd + //nolint:gomnd // default bucket consts Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), // 1 ms to ~16 seconds }, // TODO(rbtr): @@ -22,9 +22,19 @@ var httpRequestLatency = prometheus.NewHistogramVec( []string{"url", "verb"}, ) +var ipAllocationLatency = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "ip_allocation_latency_seconds", + Help: "IP allocation latency in seconds", + //nolint:gomnd // default bucket consts + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), // 1 ms to ~16 seconds + }, +) + func init() { metrics.Registry.MustRegister( httpRequestLatency, + ipAllocationLatency, ) } diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index 45d8844e55..605a047291 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -17,6 +17,7 @@ import ( "github.com/Azure/azure-container-networking/cns/nmagentclient" "github.com/Azure/azure-container-networking/cns/routes" "github.com/Azure/azure-container-networking/cns/types" + "github.com/Azure/azure-container-networking/cns/types/bounded" acn "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/store" ) @@ -48,6 +49,7 @@ type HTTPRestService struct { routingTable *routes.RoutingTable store store.KeyValueStore state *httpRestServiceState + podsPendingIPAllocation *bounded.TimedSet sync.RWMutex dncPartitionKey string } @@ -137,6 +139,7 @@ func NewHTTPRestService(config *common.ServiceConfig, imdsClientInterface imdscl PodIPConfigState: podIPConfigState, routingTable: routingTable, state: serviceState, + podsPendingIPAllocation: bounded.NewTimedSet(250), // nolint:gomnd // maxpods }, nil } diff --git a/cns/types/bounded/mappedheap.go b/cns/types/bounded/mappedheap.go new file mode 100644 index 0000000000..8d8309bc54 --- /dev/null +++ b/cns/types/bounded/mappedheap.go @@ -0,0 +1,74 @@ +package bounded + +import ( + "container/heap" +) + +// Item describes a type accepted by the mapped heap implementation. +type Item interface { + // Key is used for map index operations. + Key() string + // Less is used for heap sorting operations. + Less(Item) bool + // SetIndex is called by heap implementations to set the Item heap index. + SetIndex(int) + // Index returns the index of this Item. + Index() int +} + +var _ heap.Interface = (*MappedHeap)(nil) + +// MappedHeap is a combination of map and heap structures which allows for +// efficient sorting, uniqueness guarantees, and constant time lookups. +// Implements heap.Interface. +type MappedHeap struct { + m map[string]Item + items []Item +} + +func NewMappedHeap() *MappedHeap { + return &MappedHeap{ + m: make(map[string]Item), + } +} + +func (mh MappedHeap) Contains(key string) (int, bool) { + item, ok := mh.m[key] + if ok { + return item.Index(), true + } + return -1, false +} + +func (mh MappedHeap) Len() int { + return len(mh.items) +} + +func (mh MappedHeap) Less(i, j int) bool { + return mh.items[i].Less(mh.items[j]) +} + +func (mh *MappedHeap) Swap(i, j int) { + mh.items[i], mh.items[j] = mh.items[j], mh.items[i] + mh.items[i].SetIndex(i) + mh.items[j].SetIndex(j) +} + +func (mh *MappedHeap) Push(x interface{}) { + n := len(mh.items) + item := x.(Item) + item.SetIndex(n) + mh.items = append(mh.items, item) + mh.m[item.Key()] = item +} + +func (mh *MappedHeap) Pop() interface{} { + old := mh.items + n := len(old) + item := old[n-1] + old[n-1] = nil + item.SetIndex(-1) + mh.items = old[0 : n-1] + delete(mh.m, item.Key()) + return item +} diff --git a/cns/types/bounded/timedset.go b/cns/types/bounded/timedset.go new file mode 100644 index 0000000000..dd4f6e031e --- /dev/null +++ b/cns/types/bounded/timedset.go @@ -0,0 +1,75 @@ +package bounded + +import ( + "container/heap" + "sync" + "time" +) + +var _ Item = (*TimedItem)(nil) + +// TimedItem implements Item for a string: time.Time tuple. +type TimedItem struct { + Name string + Time time.Time + index int +} + +func (t *TimedItem) Key() string { + return t.Name +} + +func (t *TimedItem) Less(o Item) bool { + other := o.(*TimedItem) + return t.Time.Before(other.Time) +} + +func (t *TimedItem) Index() int { + return t.index +} + +func (t *TimedItem) SetIndex(i int) { + t.index = i +} + +type TimedSet struct { + sync.Mutex + capacity int + items *MappedHeap +} + +func NewTimedSet(c int) *TimedSet { + return &TimedSet{ + capacity: c, + items: NewMappedHeap(), + } +} + +// Push registers the passed key and saves the timestamp it is first registered. +// If the key is already registered, does not overwrite the saved timestamp. +func (ts *TimedSet) Push(key string) { + ts.Lock() + defer ts.Unlock() + if _, ok := ts.items.Contains(key); ok { + return + } + if ts.items.Len() >= ts.capacity { + _ = heap.Pop(ts.items) + } + item := &TimedItem{Name: key} + item.Time = time.Now() + heap.Push(ts.items, item) +} + +// Pop returns the elapsed duration since the passed key was first registered, +// or -1 if it is not found. +func (ts *TimedSet) Pop(key string) time.Duration { + ts.Lock() + defer ts.Unlock() + idx, ok := ts.items.Contains(key) + if !ok { + return -1 + } + item := heap.Remove(ts.items, idx) + return time.Since(item.(*TimedItem).Time) +} diff --git a/cns/types/bounded/timedset_test.go b/cns/types/bounded/timedset_test.go new file mode 100644 index 0000000000..6a010e1515 --- /dev/null +++ b/cns/types/bounded/timedset_test.go @@ -0,0 +1,86 @@ +package bounded + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewTimedSet(t *testing.T) { + tests := []struct { + name string + cap int + in []string + out []string + dropped []string + }{ + { + name: "size 1", + cap: 1, + in: []string{"a", "b", "c"}, + out: []string{"c"}, + dropped: []string{"a", "b"}, + }, + { + name: "overflow", + cap: 2, + in: []string{"a", "b", "c"}, + out: []string{"b", "c"}, + dropped: []string{"a"}, + }, + { + name: "not present", + cap: 2, + in: []string{"a", "b"}, + out: []string{"a", "b"}, + dropped: []string{"c", "d"}, + }, + { + name: "dupe push", + cap: 2, + in: []string{"a", "a", "a", "a"}, + out: []string{"a"}, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() // this test has timer delay, so parallelize + ts := NewTimedSet(tt.cap) + + firstTime := map[string]time.Time{} + for _, item := range tt.in { + ts.Push(item) + time.Sleep(5 * time.Millisecond) + if _, ok := firstTime[item]; !ok { + firstTime[item] = ts.items.m[item].(*TimedItem).Time + } + } + + require.LessOrEqual(t, ts.items.Len(), tt.cap) + + times := []time.Duration{} + for _, item := range tt.out { + _, ok := ts.items.Contains(item) + assert.True(t, ok) + assert.Equal(t, firstTime[item], ts.items.m[item].(*TimedItem).Time) + + times = append(times, ts.Pop(item)) + } + + for _, item := range tt.dropped { + _, ok := ts.items.Contains(item) + assert.False(t, ok) + assert.Negative(t, ts.Pop(item)) + } + + require.NotContains(t, times, time.Duration(-1)) + + for i := 0; i < len(times)-1; i++ { + assert.Less(t, times[i+1], times[i]) + } + }) + } +}