Skip to content

Commit

Permalink
Add tests for kube-proxy healthcheck, fix bug
Browse files Browse the repository at this point in the history
Adding test cases for HC updates found a bug with an update that
simultaneously removes one port and adds another.  Map iteration is
randomized, so sometimes no HC would be created.
  • Loading branch information
thockin committed Apr 5, 2017
1 parent 2db4aff commit 7664b97
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 78 deletions.
58 changes: 35 additions & 23 deletions pkg/proxy/iptables/proxier.go
Expand Up @@ -614,16 +614,45 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap
}
}

// Update service health check
allSvcPorts := make(map[proxy.ServicePortName]bool)
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return
}

// Update service health check. We include entries from the current map,
// with zero-length value, to trigger the healthchecker to stop reporting
// health for that service.
//
// This whole mechanism may be over-designed. It builds a list of endpoints
// per service, filters for local endpoints, builds a string that is the
// same as the name, and then passes each (name, list) pair over a channel.
//
// I am pretty sure that there's no way there can be more than one entry in
// the final list, and passing an empty list as a delete signal is weird.
// It could probably be simplified to a synchronous function call of a set
// of NamespacedNames. I am not making that simplification at this time.
//
// ServicePortName includes the port name, which doesn't matter for
// healthchecks. It's possible that a single update both added and removed
// ports on the same IP, so we need to make sure that removals are counted,
// with additions overriding them. Track all endpoints so we can find local
// ones.
epsBySvcName := map[types.NamespacedName][]*endpointsInfo{}
for svcPort := range curMap {
allSvcPorts[svcPort] = true
epsBySvcName[svcPort.NamespacedName] = nil
}
for svcPort := range newMap {
allSvcPorts[svcPort] = true
epsBySvcName[svcPort.NamespacedName] = append(epsBySvcName[svcPort.NamespacedName], newMap[svcPort]...)
}
for svcPort := range allSvcPorts {
updateHealthCheckEntries(svcPort.NamespacedName, newMap[svcPort], healthChecker)
for nsn, eps := range epsBySvcName {
// Use a set instead of a slice to provide deduplication
epSet := sets.NewString()
for _, ep := range eps {
if ep.isLocal {
// kube-proxy health check only needs local endpoints
epSet.Insert(fmt.Sprintf("%s/%s", nsn.Namespace, nsn.Name))
}
}
healthChecker.UpdateEndpoints(nsn, epSet)
}

return newMap, staleSet
Expand Down Expand Up @@ -674,23 +703,6 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
}
}

// updateHealthCheckEntries - send the new set of local endpoints to the health checker
func updateHealthCheckEntries(name types.NamespacedName, endpoints []*endpointsInfo, healthChecker healthChecker) {
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return
}

// Use a set instead of a slice to provide deduplication
epSet := sets.NewString()
for _, portInfo := range endpoints {
if portInfo.isLocal {
// kube-proxy health check only needs local endpoints
epSet.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name))
}
}
healthChecker.UpdateEndpoints(name, epSet)
}

// portProtoHash takes the ServicePortName and protocol for a service
// returns the associated 16 character hash. This is computed by hashing (sha256)
// then encoding to base32 and truncating to 16 chars. We do this because IPTables
Expand Down

0 comments on commit 7664b97

Please sign in to comment.