Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 17 additions & 67 deletions npm/metrics/ipsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,23 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

var (
ipsetInventoryMap map[string]int
maxMembers = 0
)
var ipsetInventoryMap map[string]int

// AddPod increments the number of Pod IPs.
func AddPod() {
podsWatched.Inc()
}

// RemovePod decrements the number of Pod IPs.
func RemovePod() {
podsWatched.Dec()
}

// PodCount gets the current number of Pod IPs.
// This function is slow.
func PodCount() (int, error) {
return getValue(podsWatched)
}

// IncNumIPSets increments the number of IPSets.
func IncNumIPSets() {
Expand Down Expand Up @@ -47,13 +60,6 @@ func RecordIPSetExecTime(timer *Timer) {
// AddEntryToIPSet increments the number of entries for IPSet setName.
// It doesn't ever update the number of IPSets.
func AddEntryToIPSet(setName string) {
if util.IsWindowsDP() && ipsetInventoryMap[setName] == maxMembers {
// ipsetInventoryMap[setName] returns 0 if the set doesn't exist in the map, which is good in case maxMembers is 0
// increase maxMembers if this set previously had the max
maxMembers++
maxIPSetMembers.Set(float64(maxMembers))
}

numIPSetEntries.Inc()
ipsetInventoryMap[setName]++ // adds the setName with value 1 if it doesn't exist
updateIPSetInventory(setName)
Expand All @@ -63,27 +69,6 @@ func AddEntryToIPSet(setName string) {
func RemoveEntryFromIPSet(setName string) {
_, exists := ipsetInventoryMap[setName]
if exists {
if util.IsWindowsDP() && ipsetInventoryMap[setName] == maxMembers && maxMembers > 0 {
// decrease maxMembers if this set previously had the max AND no other set has the max
wasUniqueMax := true
for s, val := range ipsetInventoryMap {
if setName == s {
continue
}

if val == maxMembers {
wasUniqueMax = false
break
}
}

if wasUniqueMax {
// decrease the maxMembers since no other set has the max
maxMembers--
maxIPSetMembers.Set(float64(maxMembers))
}
}

numIPSetEntries.Dec()
ipsetInventoryMap[setName]--
if ipsetInventoryMap[setName] == 0 {
Expand All @@ -97,32 +82,6 @@ func RemoveEntryFromIPSet(setName string) {
// RemoveAllEntriesFromIPSet sets the number of entries for ipset setName to 0.
// It doesn't ever update the number of IPSets.
func RemoveAllEntriesFromIPSet(setName string) {
if util.IsWindowsDP() && ipsetInventoryMap[setName] == maxMembers && maxMembers > 0 {
// ipsetInventoryMap[setName] returns 0 if the set doesn't exist in the map
// determine the new maxMembers if this set previously had the max
oldMaxMembers := maxMembers
maxMembers = 0
for s, val := range ipsetInventoryMap {
if setName == s {
continue
}

if val > maxMembers {
maxMembers = val

if val == oldMaxMembers {
// the max didn't change
break
}
}
}

if oldMaxMembers != maxMembers {
// update max if it changed
maxIPSetMembers.Set(float64(maxMembers))
}
}

numIPSetEntries.Add(-getEntryCountForIPSet(setName))
delete(ipsetInventoryMap, setName)
removeFromIPSetInventory(setName)
Expand All @@ -137,11 +96,6 @@ func DeleteIPSet(setName string) {
// ResetIPSetEntries sets the number of entries to 0 for all IPSets.
// It doesn't ever update the number of IPSets.
func ResetIPSetEntries() {
if util.IsWindowsDP() {
maxMembers = 0
maxIPSetMembers.Set(float64(maxMembers))
}

numIPSetEntries.Set(0)
for setName := range ipsetInventoryMap {
removeFromIPSetInventory(setName)
Expand Down Expand Up @@ -170,10 +124,6 @@ func GetNumEntriesForIPSet(setName string) (int, error) {
return getVecValue(ipsetInventory, labels)
}

func MaxIPSetMembers() (int, error) {
return getValue(maxIPSetMembers)
}

// GetIPSetExecCount returns the number of observations for execution time of adding IPSets.
// This function is slow.
func GetIPSetExecCount() (int, error) {
Expand Down
11 changes: 11 additions & 0 deletions npm/metrics/ipsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ type testSet struct {
entryCount int
}

func TestAddPod(t *testing.T) {
podsWatched.Set(0)
AddPod()
AddPod()
AddPod()
RemovePod()
val, err := PodCount()
require.NoError(t, err, "unexpected error when getting pod count")
require.Equal(t, 2, val, "incorrect pod count")
}

func TestRecordIPSetExecTime(t *testing.T) {
testStopAndRecord(t, setExecMetric)
}
Expand Down
95 changes: 0 additions & 95 deletions npm/metrics/ipsets_windows_test.go

This file was deleted.

25 changes: 13 additions & 12 deletions npm/metrics/prometheus-metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ var (
getNetworkFailures prometheus.Counter
aclFailures *prometheus.CounterVec
setPolicyFailures *prometheus.CounterVec
maxIPSetMembers prometheus.Gauge
podsWatched prometheus.Gauge
)

type RegistryType string
Expand Down Expand Up @@ -152,6 +152,16 @@ func InitializeAll() {
initializeDaemonMetrics()
initializeControllerMetrics()

podsWatched = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "pods_watched",
Subsystem: "",
Help: "Number of Pods NPM tracks across the cluster including Linux and Windows nodes",
},
)
register(podsWatched, "pods_watched", ClusterMetrics)

if util.IsWindowsDP() {
InitializeWindowsMetrics()

Expand All @@ -166,8 +176,6 @@ func InitializeAll() {
register(getNetworkFailures, "get_network_failure_total", NodeMetrics)
register(aclFailures, "acl_failure_total", NodeMetrics)
register(setPolicyFailures, "setpolicy_failure_total", NodeMetrics)
// all new metrics should go on the node metrics URL
register(maxIPSetMembers, "ipset_members_max", NodeMetrics)
}

log.Logf("Finished initializing all Prometheus metrics")
Expand Down Expand Up @@ -290,15 +298,6 @@ func InitializeWindowsMetrics() {
},
[]string{operationLabel, isNestedLabel},
)

maxIPSetMembers = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "ipset_members_max",
Subsystem: windowsPrefix,
Help: "Maximum number of members in a single IPSet",
},
)
}

// GetHandler returns the HTTP handler for the metrics endpoint
Expand Down Expand Up @@ -344,6 +343,8 @@ func register(collector prometheus.Collector, name string, registryType Registry
err := getRegistry(registryType).Register(collector)
if err != nil {
log.Errorf("Error creating metric %s", name)
} else {
klog.Infof("registered metric %s to registry %s", name, registryType)
}
}

Expand Down
8 changes: 8 additions & 0 deletions npm/pkg/controlplane/controllers/v2/namespaceController.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,14 @@ func (nsc *NamespaceController) syncNamespace(nsKey string) error {
metrics.RecordControllerNamespaceExecTime(timer, operationKind, err != nil && dperr != nil)

if dperr != nil {
klog.Errorf("failed to apply dataplane changes while syncing namespace. err: %s", dperr.Error())
metrics.SendErrorLogAndMetric(util.NSID, "[syncNamespace] failed to apply dataplane changes while syncing namespace. err: %s", dperr.Error())

// Seems like setting err below does nothing.
// The return value of syncNamespace is fixed before this deferred func is called
// so modifications to err here do nothing.
// As a result, the controller will not requeue if there is an error applying the dataplane.
// However, a subsequent controller event should Apply Dataplane soon after.
if err == nil {
err = fmt.Errorf("failed to apply dataplane changes while syncing namespace. err: %w", dperr)
} else {
Expand Down
10 changes: 10 additions & 0 deletions npm/pkg/controlplane/controllers/v2/podController.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,14 @@ func (c *PodController) syncPod(key string) error {
metrics.RecordControllerPodExecTime(timer, operationKind, err != nil && dperr != nil)

if dperr != nil {
klog.Errorf("failed to apply dataplane changes while syncing pod. err: %s", dperr.Error())
metrics.SendErrorLogAndMetric(util.PodID, "[syncPod] failed to apply dataplane changes while syncing pod. err: %s", dperr.Error())

// Seems like setting err below does nothing.
// The return value of syncPod is fixed before this deferred func is called,
// so modifications to err here do nothing.
// As a result, the controller will not requeue if there is an error applying the dataplane.
// However, a subsequent controller event should Apply Dataplane soon after.
if err == nil {
err = fmt.Errorf("failed to apply dataplane changes while syncing pod. err: %w", dperr)
} else {
Expand Down Expand Up @@ -369,6 +377,7 @@ func (c *PodController) syncAddedPod(podObj *corev1.Pod) error {
// Create npmPod and add it to the podMap
npmPodObj := common.NewNpmPod(podObj)
c.podMap[podKey] = npmPodObj
metrics.AddPod()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goal: call metrics.AddPod() once per Pod/IP pair (similar for metrics.RemovePod())

  • If a Pod has an update to its IP, follow Pod DELETE then Pod CREATE.
  • For Pod DELETE: c.cleanupDeletePod() is only called when podKey exists in c.podMap
  • For Pod CREATE: c.syncAddedPod() is only called when podKey does not exist in c.podMap. Otherwise, another codepath is taken.
  • Keys are only added/removed from c.podMap in c.syncAddedPod() and c.cleanupDeletePod()


// Get lists of podLabelKey and podLabelKey + podLavelValue ,and then start adding them to ipsets.
for labelKey, labelVal := range podObj.Labels {
Expand Down Expand Up @@ -571,6 +580,7 @@ func (c *PodController) cleanUpDeletedPod(cachedNpmPodKey string) error {
return fmt.Errorf("[cleanUpDeletedPod] Error: failed to delete pod from named port ipset with err: %w", err)
}

metrics.RemovePod()
delete(c.podMap, cachedNpmPodKey)
return nil
}
Expand Down
Loading