Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6f0e357
print statements
huntergregory Aug 1, 2022
c8b13a6
cleanup Running pod with empty IP
huntergregory Aug 16, 2022
f023473
add log line
huntergregory Aug 16, 2022
d66c7f2
revert previous 3 commits
huntergregory Aug 17, 2022
56b7994
enqueue updates with empty IPs and add prometheus metric
huntergregory Aug 17, 2022
66d3cc8
fix lints
huntergregory Aug 17, 2022
b0c5bc0
handle pod assigned to wrong endpoint edge case
huntergregory Aug 18, 2022
5e708ce
log and update comment
huntergregory Aug 18, 2022
f384257
UTs and fixed named port + build
huntergregory Aug 18, 2022
094069f
reset entire endpoint regardless of cache
huntergregory Aug 18, 2022
d154623
remove comment in dp.go
huntergregory Aug 18, 2022
31ac386
fix windows build issues
huntergregory Aug 18, 2022
b6287cf
skip refreshing endpoints and address comments
huntergregory Aug 19, 2022
8f4fd49
only sync empty ip if pod running. add tmp log
huntergregory Aug 22, 2022
a1dac4f
Merge branch 'master' into npm-controller-empty-ip
huntergregory Dec 15, 2022
398c99e
undo special pod delete logic
huntergregory Dec 15, 2022
56f64bb
reference GH issue
huntergregory Dec 16, 2022
05ab431
fix Windows UTs
huntergregory Dec 16, 2022
d76ae6a
remove prometheus metrics and a log
huntergregory Dec 16, 2022
aafa1f9
Merge branch 'master' into npm-controller-empty-ip
huntergregory Jan 20, 2023
d7ab36d
Merge branch 'master' into npm-controller-empty-ip
huntergregory Feb 9, 2023
edf7c96
Merge branch 'master' into npm-controller-empty-ip
vakalapa Feb 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions npm/metrics/prometheus-metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ const (
namespaceExecTimeName = "namespace_exec_time"
controllerNamespaceExecTimeHelp = "Execution time in milliseconds for adding/updating/deleting a namespace"

// TODO add health metrics

quantileMedian float64 = 0.5
deltaMedian float64 = 0.05
quantile90th float64 = 0.9
Expand All @@ -67,7 +65,7 @@ const (
// Gauge metrics have the methods Inc(), Dec(), and Set(float64)
// Summary metrics have the method Observe(float64)
// For any Vector metric, you can call With(prometheus.Labels) before the above methods
// e.g. SomeGaugeVec.With(prometheus.Labels{label1: val1, label2: val2, ...).Dec()
// e.g. SomeGaugeVec.With(prometheus.Labels{label1: val1, label2: val2, ...).Dec()
var (
nodeRegistry = prometheus.NewRegistry()
clusterRegistry = prometheus.NewRegistry()
Expand All @@ -94,8 +92,6 @@ var (
controllerPodExecTime *prometheus.SummaryVec
controllerNamespaceExecTime *prometheus.SummaryVec
controllerExecTimeLabels = []string{operationLabel, hadErrorLabel}

// TODO add health metrics
)

type RegistryType string
Expand Down Expand Up @@ -128,14 +124,11 @@ func (op OperationKind) isValid() bool {
// TODO consider refactoring the functionality of the metrics package into a "Metrics" struct with methods (this would require code changes throughout npm).
// Would need to consider how it seems like you can't register a metric twice, even in a separate registry, so you couldn't throw away the Metrics struct and create a new one.
func InitializeAll() {
// TODO introduce isFanOut parameter to determine when to create fan-out controller/daemon metrics
if haveInitialized {
klog.Infof("metrics have already been initialized")
} else {
initializeDaemonMetrics()
initializeControllerMetrics()
// TODO include dataplane health metrics:
// num failures for apply ipsets, updating policies, deleting policies, and running periodic policy tasks, etc.
log.Logf("Finished initializing all Prometheus metrics")
haveInitialized = true
}
Expand Down Expand Up @@ -177,7 +170,6 @@ func initializeDaemonMetrics() {
func initializeControllerMetrics() {
// CLUSTER METRICS
numPolicies = createClusterGauge(numPoliciesName, numPoliciesHelp)
// TODO include health metrics: num failures for validating policies & ipsets

// NODE METRICS
addPolicyExecTime = createNodeSummaryVec(addPolicyExecTimeName, "", addPolicyExecTimeHelp, addPolicyExecTimeLabels)
Expand Down
10 changes: 7 additions & 3 deletions npm/pkg/controlplane/controllers/v2/podController.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type NamedPortOperation string
const (
deleteNamedPort NamedPortOperation = "del"
addNamedPort NamedPortOperation = "add"

addEvent string = "ADD"
updateEvent string = "UPDATE"
)

var kubeAllNamespaces = &ipsets.IPSetMetadata{Name: util.KubeAllNamespacesFlag, Type: ipsets.KeyLabelOfNamespace}
Expand Down Expand Up @@ -91,7 +94,8 @@ func (c *PodController) needSync(eventType string, obj interface{}) (string, boo
return key, needSync
}

if !hasValidPodIP(podObj) {
// should enqueue updates for Pods with an empty IP if they are also Running
if !hasValidPodIP(podObj) && (eventType == addEvent || podObj.Status.Phase != corev1.PodRunning) {
return key, needSync
}

Expand All @@ -112,7 +116,7 @@ func (c *PodController) needSync(eventType string, obj interface{}) (string, boo
}

func (c *PodController) addPod(obj interface{}) {
key, needSync := c.needSync("ADD", obj)
key, needSync := c.needSync(addEvent, obj)
if !needSync {
return
}
Expand All @@ -128,7 +132,7 @@ func (c *PodController) addPod(obj interface{}) {
}

func (c *PodController) updatePod(old, newp interface{}) {
key, needSync := c.needSync("UPDATE", newp)
key, needSync := c.needSync(updateEvent, newp)
if !needSync {
return
}
Expand Down
101 changes: 101 additions & 0 deletions npm/pkg/controlplane/controllers/v2/podController_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,99 @@ func TestLabelUpdatePod(t *testing.T) {
checkNpmPodWithInput("TestLabelUpdatePod", f, newPodObj)
}

func TestEmptyIPUpdate(t *testing.T) {
labels := map[string]string{
"app": "test-pod",
}
oldPodObj := createPod("test-pod", "test-namespace", "0", "1.2.3.4", labels, NonHostNetwork, corev1.PodRunning)

ctrl := gomock.NewController(t)
defer ctrl.Finish()

dp := dpmocks.NewMockGenericDataplane(ctrl)
f := newFixture(t, dp)
f.podLister = append(f.podLister, oldPodObj)
f.kubeobjects = append(f.kubeobjects, oldPodObj)
stopCh := make(chan struct{})
defer close(stopCh)
f.newPodController(stopCh)

newPodObj := oldPodObj.DeepCopy()
// oldPodObj.ResourceVersion value is "0"
newRV, _ := strconv.Atoi(oldPodObj.ResourceVersion)
newPodObj.ResourceVersion = fmt.Sprintf("%d", newRV+1)
// oldPodObj PodIP is "1.2.3.4"
newPodObj.Status.PodIP = ""
// Add pod section
mockIPSets := []*ipsets.IPSetMetadata{
ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace),
ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfPod),
ipsets.NewIPSetMetadata("app:test-pod", ipsets.KeyValueLabelOfPod),
}
podMetadata1 := dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4", "")

dp.EXPECT().AddToLists([]*ipsets.IPSetMetadata{kubeAllNamespaces}, mockIPSets[:1]).Return(nil).Times(1)
dp.EXPECT().AddToSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1)
dp.EXPECT().AddToSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1)
if !util.IsWindowsDP() {
dp.EXPECT().
AddToSets(
[]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)},
dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""),
).
Return(nil).Times(1)
}
dp.EXPECT().ApplyDataPlane().Return(nil).Times(2)
// Delete pod section
dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1)
dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1)
if !util.IsWindowsDP() {
dp.EXPECT().
RemoveFromSets(
[]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)},
dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""),
).
Return(nil).Times(1)
}
// since the new IP is invalid, adding the new Pod object is ignored

updatePod(t, f, oldPodObj, newPodObj)

testCases := []expectedValues{
{0, 1, 0, podPromVals{1, 1, 0}},
}
checkPodTestResult("TestEmptyIPUpdate", f, testCases)
}

func TestEmptyIPAdd(t *testing.T) {
labels := map[string]string{
"app": "test-pod",
}
podObj := createPod("test-pod", "test-namespace", "0", "", labels, NonHostNetwork, corev1.PodRunning)
podKey := getKey(podObj, t)

ctrl := gomock.NewController(t)
defer ctrl.Finish()

dp := dpmocks.NewMockGenericDataplane(ctrl)
f := newFixture(t, dp)
f.podLister = append(f.podLister, podObj)
f.kubeobjects = append(f.kubeobjects, podObj)
stopCh := make(chan struct{})
defer close(stopCh)
f.newPodController(stopCh)

addPod(t, f, podObj)
testCases := []expectedValues{
{0, 0, 0, podPromVals{0, 0, 0}},
}
checkPodTestResult("TestEmptyIPAdd", f, testCases)

if _, exists := f.podController.podMap[podKey]; exists {
t.Error("TestEmptyIPAdd failed @ cached pod obj exists check")
}
}

func TestIPAddressUpdatePod(t *testing.T) {
labels := map[string]string{
"app": "test-pod",
Expand Down Expand Up @@ -797,6 +890,14 @@ func TestHasValidPodIP(t *testing.T) {
if ok := hasValidPodIP(podObj); !ok {
t.Errorf("TestisValidPod failed @ isValidPod")
}

podObj = &corev1.Pod{
Status: corev1.PodStatus{
Phase: "Running",
PodIP: "",
},
}
require.False(t, hasValidPodIP(podObj))
}

func TestIsCompletePod(t *testing.T) {
Expand Down
14 changes: 9 additions & 5 deletions npm/pkg/dataplane/dataplane_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (

var (
errPolicyModeUnsupported = errors.New("only IPSet policy mode is supported")
errMismanagedPodKey = errors.New("the pod key was not managed correctly when refreshing pod endpoints")
errMismanagedPodKey = errors.New("the endpoint corresponds to a different pod")
)

// initializeDataPlane will help gather network and endpoint details
Expand Down Expand Up @@ -112,12 +112,16 @@ func (dp *DataPlane) shouldUpdatePod() bool {

// updatePod has two responsibilities in windows
// 1. Will call into dataplane and updates endpoint references of this pod.
// 2. Will check for existing applicable network policies and applies it on endpoint
// 2. Will check for existing applicable network policies and applies it on endpoint.
/*
FIXME: see https://github.com/Azure/azure-container-networking/issues/1729
TODO: it would be good to replace stalePodKey behavior since it is complex.
*/
func (dp *DataPlane) updatePod(pod *updateNPMPod) error {
klog.Infof("[DataPlane] updatePod called for Pod Key %s", pod.PodKey)
// Check if pod is part of this node
if pod.NodeName != dp.nodeName {
klog.Infof("[DataPlane] ignoring update pod as expected Node: [%s] got: [%s]", dp.nodeName, pod.NodeName)
// Ignore updates if the pod is not part of this node.
klog.Infof("[DataPlane] ignoring update pod as expected Node: [%s] got: [%s]. pod: [%s]", dp.nodeName, pod.NodeName, pod.PodKey)
return nil
}

Expand All @@ -130,7 +134,7 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error {
if !ok {
// ignore this err and pod endpoint will be deleted in ApplyDP
// if the endpoint is not found, it means the pod is not part of this node or pod got deleted.
klog.Warningf("[DataPlane] did not find endpoint with IPaddress %s", pod.PodIP)
klog.Warningf("[DataPlane] did not find endpoint with IPaddress %s for pod %s", pod.PodIP, pod.PodKey)
return nil
}

Expand Down