Skip to content
This repository was archived by the owner on Feb 8, 2021. It is now read-only.

Commit 1ad4549

Browse files
committed
Proxy infrastructure for NodePorts
A service with a NodePort set will listen on that port, on every node. This is both handy for some load balancers (AWS ELB) and for people that want to expose a service without using a load balancer.
1 parent 295d056 commit 1ad4549

File tree

4 files changed

+231
-15
lines changed

4 files changed

+231
-15
lines changed

pkg/proxy/proxier.go

Lines changed: 214 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type serviceInfo struct {
4141
socket proxySocket
4242
timeout time.Duration
4343
publicIPs []string // TODO: make this net.IP
44+
nodePort int
4445
sessionAffinityType api.ServiceAffinity
4546
stickyMaxAgeMinutes int
4647
}
@@ -61,12 +62,24 @@ type Proxier struct {
6162
loadBalancer LoadBalancer
6263
mu sync.Mutex // protects serviceMap
6364
serviceMap map[ServicePortName]*serviceInfo
65+
portMapMutex sync.Mutex
66+
portMap map[portMapKey]ServicePortName
6467
numProxyLoops int32 // use atomic ops to access this; mostly for testing
6568
listenIP net.IP
6669
iptables iptables.Interface
6770
hostIP net.IP
6871
}
6972

73+
// A key for the portMap
74+
type portMapKey struct {
75+
port int
76+
protocol api.Protocol
77+
}
78+
79+
func (k *portMapKey) String() string {
80+
return fmt.Sprintf("%s/%d", k.protocol, k.port)
81+
}
82+
7083
var (
7184
// ErrProxyOnLocalhost is returned by NewProxier if the user requests a proxier on
7285
// the loopback address. May be checked for by callers of NewProxier to know whether
@@ -113,6 +126,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
113126
return &Proxier{
114127
loadBalancer: loadBalancer,
115128
serviceMap: make(map[ServicePortName]*serviceInfo),
129+
portMap: make(map[portMapKey]ServicePortName),
116130
listenIP: listenIP,
117131
iptables: iptables,
118132
hostIP: hostIP,
@@ -274,6 +288,8 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
274288
info.portalIP = serviceIP
275289
info.portalPort = servicePort.Port
276290
info.publicIPs = service.Spec.PublicIPs
291+
// TODO(justinsb): switch to servicePort.NodePort when that lands
292+
info.nodePort = 0
277293
info.sessionAffinityType = service.Spec.SessionAffinity
278294
glog.V(4).Infof("info: %+v", info)
279295

@@ -302,7 +318,8 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
302318
}
303319

304320
func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
305-
if info.protocol != port.Protocol || info.portalPort != port.Port {
321+
// TODO(justinsb): switch to port.NodePort when that lands
322+
if info.protocol != port.Protocol || info.portalPort != port.Port || info.nodePort != 0 /*port.NodePort*/ {
306323
return false
307324
}
308325
if !info.portalIP.Equal(net.ParseIP(service.Spec.PortalIP)) {
@@ -340,13 +357,19 @@ func (proxier *Proxier) openPortal(service ServicePortName, info *serviceInfo) e
340357
return err
341358
}
342359
}
360+
if info.nodePort != 0 {
361+
err = proxier.openNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)
362+
if err != nil {
363+
return err
364+
}
365+
}
343366
return nil
344367
}
345368

346369
func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) error {
347370
// Handle traffic from containers.
348371
args := proxier.iptablesContainerPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name)
349-
existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesContainerPortalChain, args...)
372+
existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...)
350373
if err != nil {
351374
glog.Errorf("Failed to install iptables %s rule for service %q", iptablesContainerPortalChain, name)
352375
return err
@@ -357,7 +380,7 @@ func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol
357380

358381
// Handle traffic from the host.
359382
args = proxier.iptablesHostPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name)
360-
existed, err = proxier.iptables.EnsureRule(iptables.TableNAT, iptablesHostPortalChain, args...)
383+
existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...)
361384
if err != nil {
362385
glog.Errorf("Failed to install iptables %s rule for service %q", iptablesHostPortalChain, name)
363386
return err
@@ -368,12 +391,89 @@ func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol
368391
return nil
369392
}
370393

394+
// Marks a port as being owned by a particular service, or returns error if already claimed.
395+
// Idempotent: reclaiming with the same owner is not an error
396+
func (proxier *Proxier) claimPort(port int, protocol api.Protocol, owner ServicePortName) error {
397+
proxier.portMapMutex.Lock()
398+
defer proxier.portMapMutex.Unlock()
399+
400+
// TODO: We could pre-populate some reserved ports into portMap and/or blacklist some well-known ports
401+
402+
key := portMapKey{port: port, protocol: protocol}
403+
existing, found := proxier.portMap[key]
404+
if !found {
405+
proxier.portMap[key] = owner
406+
return nil
407+
}
408+
if existing == owner {
409+
// We are idempotent
410+
return nil
411+
}
412+
return fmt.Errorf("Port conflict detected on port %v. %v vs %v", key, owner, existing)
413+
}
414+
415+
// Release a claim on a port. Returns an error if the owner does not match the claim.
416+
// Tolerates release on an unclaimed port, to simplify .
417+
func (proxier *Proxier) releasePort(port int, protocol api.Protocol, owner ServicePortName) error {
418+
proxier.portMapMutex.Lock()
419+
defer proxier.portMapMutex.Unlock()
420+
421+
key := portMapKey{port: port, protocol: protocol}
422+
existing, found := proxier.portMap[key]
423+
if !found {
424+
// We tolerate this, it happens if we are cleaning up a failed allocation
425+
glog.Infof("Ignoring release on unowned port: %v", key)
426+
return nil
427+
}
428+
if existing != owner {
429+
return fmt.Errorf("Port conflict detected on port %v (unowned unlock). %v vs %v", key, owner, existing)
430+
}
431+
delete(proxier.portMap, key)
432+
return nil
433+
}
434+
435+
func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) error {
436+
// TODO: Do we want to allow containers to access public services? Probably yes.
437+
// TODO: We could refactor this to be the same code as portal, but with IP == nil
438+
439+
err := proxier.claimPort(nodePort, protocol, name)
440+
if err != nil {
441+
return err
442+
}
443+
444+
// Handle traffic from containers.
445+
args := proxier.iptablesContainerNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
446+
existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerNodePortChain, args...)
447+
if err != nil {
448+
glog.Errorf("Failed to install iptables %s rule for service %q", iptablesContainerNodePortChain, name)
449+
return err
450+
}
451+
if !existed {
452+
glog.Infof("Opened iptables from-containers public port for service %q on %s port %d", name, protocol, nodePort)
453+
}
454+
455+
// Handle traffic from the host.
456+
args = proxier.iptablesHostNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
457+
existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostNodePortChain, args...)
458+
if err != nil {
459+
glog.Errorf("Failed to install iptables %s rule for service %q", iptablesHostNodePortChain, name)
460+
return err
461+
}
462+
if !existed {
463+
glog.Infof("Opened iptables from-host public port for service %q on %s port %d", name, protocol, nodePort)
464+
}
465+
return nil
466+
}
467+
371468
func (proxier *Proxier) closePortal(service ServicePortName, info *serviceInfo) error {
372469
// Collect errors and report them all at the end.
373470
el := proxier.closeOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)
374471
for _, publicIP := range info.publicIPs {
375472
el = append(el, proxier.closeOnePortal(net.ParseIP(publicIP), info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)...)
376473
}
474+
if info.nodePort != 0 {
475+
el = append(el, proxier.closeNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)...)
476+
}
377477
if len(el) == 0 {
378478
glog.V(3).Infof("Closed iptables portals for service %q", service)
379479
} else {
@@ -402,28 +502,94 @@ func (proxier *Proxier) closeOnePortal(portalIP net.IP, portalPort int, protocol
402502
return el
403503
}
404504

505+
func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) []error {
506+
el := []error{}
507+
508+
// Handle traffic from containers.
509+
args := proxier.iptablesContainerNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
510+
if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesContainerNodePortChain, args...); err != nil {
511+
glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesContainerNodePortChain, name)
512+
el = append(el, err)
513+
}
514+
515+
// Handle traffic from the host.
516+
args = proxier.iptablesHostNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
517+
if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesHostNodePortChain, args...); err != nil {
518+
glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesHostNodePortChain, name)
519+
el = append(el, err)
520+
}
521+
522+
if err := proxier.releasePort(nodePort, protocol, name); err != nil {
523+
el = append(el, err)
524+
}
525+
526+
return el
527+
}
528+
405529
// See comments in the *PortalArgs() functions for some details about why we
406-
// use two chains.
530+
// use two chains for portals.
407531
var iptablesContainerPortalChain iptables.Chain = "KUBE-PORTALS-CONTAINER"
408532
var iptablesHostPortalChain iptables.Chain = "KUBE-PORTALS-HOST"
409533

534+
// Chains for NodePort services
535+
var iptablesContainerNodePortChain iptables.Chain = "KUBE-NODEPORT-CONTAINER"
536+
var iptablesHostNodePortChain iptables.Chain = "KUBE-NODEPORT-HOST"
537+
410538
// Ensure that the iptables infrastructure we use is set up. This can safely be called periodically.
411539
func iptablesInit(ipt iptables.Interface) error {
412540
// TODO: There is almost certainly room for optimization here. E.g. If
413541
// we knew the portal_net CIDR we could fast-track outbound packets not
414542
// destined for a service. There's probably more, help wanted.
543+
544+
// Danger - order of these rules matters here:
545+
//
546+
// We match portal rules first, then NodePort rules. For NodePort rules, we filter primarily on --dst-type LOCAL,
547+
// because we want to listen on all local addresses, but don't match internet traffic with the same dst port number.
548+
//
549+
// There is one complication (per thockin):
550+
// -m addrtype --dst-type LOCAL is what we want except that it is broken (by intent without foresight to our usecase)
551+
// on at least GCE. Specifically, GCE machines have a daemon which learns what external IPs are forwarded to that
552+
// machine, and configure a local route for that IP, making a match for --dst-type LOCAL when we don't want it to.
553+
// Removing the route gives correct behavior until the daemon recreates it.
554+
// Killing the daemon is an option, but means that any non-kubernetes use of the machine with external IP will be broken.
555+
//
556+
// This applies to IPs on GCE that are actually from a load-balancer; they will be categorized as LOCAL.
557+
// _If_ the chains were in the wrong order, and the LB traffic had dst-port == a NodePort on some other service,
558+
// the NodePort would take priority (incorrectly).
559+
// This is unlikely (and would only affect outgoing traffic from the cluster to the load balancer, which seems
560+
// doubly-unlikely), but we need to be careful to keep the rules in the right order.
561+
args := []string{ /* portal_net matching could go here */ }
562+
args = append(args, "-m", "comment", "--comment", "handle Portals; NOTE: this must be before the NodePort rules")
415563
if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesContainerPortalChain); err != nil {
416564
return err
417565
}
418-
if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainPrerouting, "-j", string(iptablesContainerPortalChain)); err != nil {
566+
if _, err := ipt.EnsureRule(iptables.Prepend, iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerPortalChain))...); err != nil {
419567
return err
420568
}
421569
if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesHostPortalChain); err != nil {
422570
return err
423571
}
424-
if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainOutput, "-j", string(iptablesHostPortalChain)); err != nil {
572+
if _, err := ipt.EnsureRule(iptables.Prepend, iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostPortalChain))...); err != nil {
425573
return err
426574
}
575+
576+
// This set of rules matches broadly (addrtype & destination port), and therefore must come after the portal rules
577+
args = []string{"-m", "addrtype", "--dst-type", "LOCAL"}
578+
args = append(args, "-m", "comment", "--comment", "handle service NodePorts; NOTE: this must be the last rule in the chain")
579+
if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesContainerNodePortChain); err != nil {
580+
return err
581+
}
582+
if _, err := ipt.EnsureRule(iptables.Append, iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerNodePortChain))...); err != nil {
583+
return err
584+
}
585+
if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesHostNodePortChain); err != nil {
586+
return err
587+
}
588+
if _, err := ipt.EnsureRule(iptables.Append, iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostNodePortChain))...); err != nil {
589+
return err
590+
}
591+
592+
// TODO: Verify order of rules.
427593
return nil
428594
}
429595

@@ -436,6 +602,12 @@ func iptablesFlush(ipt iptables.Interface) error {
436602
if err := ipt.FlushChain(iptables.TableNAT, iptablesHostPortalChain); err != nil {
437603
el = append(el, err)
438604
}
605+
if err := ipt.FlushChain(iptables.TableNAT, iptablesContainerNodePortChain); err != nil {
606+
el = append(el, err)
607+
}
608+
if err := ipt.FlushChain(iptables.TableNAT, iptablesHostNodePortChain); err != nil {
609+
el = append(el, err)
610+
}
439611
if len(el) != 0 {
440612
glog.Errorf("Some errors flushing old iptables portals: %v", el)
441613
}
@@ -464,9 +636,13 @@ func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol
464636
"--comment", service.String(),
465637
"-p", strings.ToLower(string(protocol)),
466638
"-m", strings.ToLower(string(protocol)),
467-
"-d", fmt.Sprintf("%s/32", destIP.String()),
468639
"--dport", fmt.Sprintf("%d", destPort),
469640
}
641+
642+
if destIP != nil {
643+
args = append(args, "-d", fmt.Sprintf("%s/32", destIP.String()))
644+
}
645+
470646
return args
471647
}
472648

@@ -550,6 +726,37 @@ func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, prot
550726
return args
551727
}
552728

729+
// Build a slice of iptables args for a from-container public-port rule.
730+
// See iptablesContainerPortalArgs
731+
// TODO: Should we just reuse iptablesContainerPortalArgs?
732+
func (proxier *Proxier) iptablesContainerNodePortArgs(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service ServicePortName) []string {
733+
args := iptablesCommonPortalArgs(nil, nodePort, protocol, service)
734+
735+
if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) {
736+
// TODO: Can we REDIRECT with IPv6?
737+
args = append(args, "-j", "REDIRECT", "--to-ports", fmt.Sprintf("%d", proxyPort))
738+
} else {
739+
// TODO: Can we DNAT with IPv6?
740+
args = append(args, "-j", "DNAT", "--to-destination", net.JoinHostPort(proxyIP.String(), strconv.Itoa(proxyPort)))
741+
}
742+
743+
return args
744+
}
745+
746+
// Build a slice of iptables args for a from-host public-port rule.
747+
// See iptablesHostPortalArgs
748+
// TODO: Should we just reuse iptablesHostPortalArgs?
749+
func (proxier *Proxier) iptablesHostNodePortArgs(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service ServicePortName) []string {
750+
args := iptablesCommonPortalArgs(nil, nodePort, protocol, service)
751+
752+
if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) {
753+
proxyIP = proxier.hostIP
754+
}
755+
// TODO: Can we DNAT with IPv6?
756+
args = append(args, "-j", "DNAT", "--to-destination", net.JoinHostPort(proxyIP.String(), strconv.Itoa(proxyPort)))
757+
return args
758+
}
759+
553760
func isTooManyFDsError(err error) bool {
554761
return strings.Contains(err.Error(), "too many open files")
555762
}

pkg/proxy/proxier_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (fake *fakeIptables) FlushChain(table iptables.Table, chain iptables.Chain)
9292
return nil
9393
}
9494

95-
func (fake *fakeIptables) EnsureRule(table iptables.Table, chain iptables.Chain, args ...string) (bool, error) {
95+
func (fake *fakeIptables) EnsureRule(position iptables.RulePosition, table iptables.Table, chain iptables.Chain, args ...string) (bool, error) {
9696
return false, nil
9797
}
9898

@@ -810,3 +810,5 @@ func TestProxyUpdatePortal(t *testing.T) {
810810
}
811811

812812
// TODO: Test UDP timeouts.
813+
814+
// TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in

pkg/util/iptables/iptables.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ import (
2828
"github.com/golang/glog"
2929
)
3030

31+
type RulePosition string
32+
33+
const (
34+
Prepend RulePosition = "-I"
35+
Append RulePosition = "-A"
36+
)
37+
3138
// An injectable interface for running iptables commands. Implementations must be goroutine-safe.
3239
type Interface interface {
3340
// EnsureChain checks if the specified chain exists and, if not, creates it. If the chain existed, return true.
@@ -37,7 +44,7 @@ type Interface interface {
3744
// DeleteChain deletes the specified chain. If the chain did not exist, return error.
3845
DeleteChain(table Table, chain Chain) error
3946
// EnsureRule checks if the specified rule is present and, if not, creates it. If the rule existed, return true.
40-
EnsureRule(table Table, chain Chain, args ...string) (bool, error)
47+
EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error)
4148
// DeleteRule checks if the specified rule is present and, if so, deletes it.
4249
DeleteRule(table Table, chain Chain, args ...string) error
4350
// IsIpv6 returns true if this is managing ipv6 tables
@@ -126,7 +133,7 @@ func (runner *runner) DeleteChain(table Table, chain Chain) error {
126133
}
127134

128135
// EnsureRule is part of Interface.
129-
func (runner *runner) EnsureRule(table Table, chain Chain, args ...string) (bool, error) {
136+
func (runner *runner) EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error) {
130137
fullArgs := makeFullArgs(table, chain, args...)
131138

132139
runner.mu.Lock()
@@ -139,7 +146,7 @@ func (runner *runner) EnsureRule(table Table, chain Chain, args ...string) (bool
139146
if exists {
140147
return true, nil
141148
}
142-
out, err := runner.run(opAppendRule, fullArgs)
149+
out, err := runner.run(operation(position), fullArgs)
143150
if err != nil {
144151
return false, fmt.Errorf("error appending rule: %v: %s", err, out)
145152
}

0 commit comments

Comments
 (0)