Skip to content

Commit

Permalink
Merge 440a6d0 into 740131e
Browse files Browse the repository at this point in the history
  • Loading branch information
piontec committed Apr 16, 2019
2 parents 740131e + 440a6d0 commit 54214d1
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 136 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Expand Up @@ -25,7 +25,7 @@ required = [

[[constraint]]
name = "github.com/DevFactory/go-tools"
version = "0.1.1"
version = "0.2.0"

[[constraint]]
name = "github.com/stretchr/testify"
Expand Down
1 change: 1 addition & 0 deletions doc/ops_guide.md
Expand Up @@ -130,6 +130,7 @@ Advanced options:
* "SNCTRLR_IPTABLES_TIMEOUT_SEC" - timeout in seconds for the `iptables` system command that is frequently called by smartnat. Default should be OK, might need to be increased on heavily loaded smartnat instances.
* "SNCTRLR_AUTOREFRESH_PERIOD_SEC" - smartnat automatically reloads information about available network interface and assigned IP addresses. This setting controls how frequently (in seconds) should this information be loaded from the operating system. The default is 60 s.
* "SNCTRLR_MAX_FINALIZE_WAIT_MINUTES" - sets the absolute finalizer timeout in minutes. The finalizer timeout is a safety mechanism that needs to be triggered if a smartnat instance has configured a Mapping and then the instance had failed and never came back. In that case, when the Mapping is removed, kubernetes doesn't really delete the Mapping until all smartnat nodes that had been configured for it confirm that the configuration was removed. A failed instance will never do that, so we use this timeout to forcibly remove the Mapping after SNCTRLR_AUTOREFRESH_PERIOD_SEC, even if it is indicated as being still configured on some smartnat nodes.
* "SNCTRLR_MAX_PORTS_PER_MAPPING" - limits the number of Ports a single Mapping can configure. The number of exposed ports has influence on traffic forwarding rate and thus might be needed to tune by the administrator. The default is 50.

### 3. Cluster configuration

Expand Down
10 changes: 9 additions & 1 deletion pkg/config/config.go
Expand Up @@ -33,6 +33,7 @@ type Config struct {
MaxFinalizeWaitMinutes int
GwAddressOffset int32
MonPort int
MaxPortsPerMapping int
}

const (
Expand Down Expand Up @@ -79,6 +80,8 @@ const (
snctrlrGwAddressOffset = "SNCTRLR_GW_ADDRESS_OFFSET"
// snctrlrMonitorPort configures TCP port for exposing HTTP healthcheck and performance metrics
snctrlrMonitorPort = "SNCTRLR_MONITOR_PORT"
// snctrlrMaxPortsPerMapping configures a limit of ports used by a single mapping
snctrlrMaxPortsPerMapping = "SNCTRLR_MAX_PORTS_PER_MAPPING"
)

// NewConfig returns new configuration settings for smart-nat-controller
Expand All @@ -91,7 +94,7 @@ func NewConfig() (*Config, error) {
// NewConfigFromArgs creates new config struct from arguments, after running validation
func NewConfigFromArgs(interfaceNamePattern string, setupMasquerade, setupSNAT bool, defaultGWInterface string,
ipTablesTimeoutSec int, enableDebug bool, interfaceAutorefreshPeriodSec, maxFinalizeWaitMinutes int,
gwAddressOffset int32, monitorPort int) (*Config, error) {
gwAddressOffset int32, monitorPort int, maxPortsPerMapping int) (*Config, error) {
cfg := &Config{
DefaultGWInterface: defaultGWInterface,
EnableDebug: enableDebug,
Expand All @@ -103,6 +106,7 @@ func NewConfigFromArgs(interfaceNamePattern string, setupMasquerade, setupSNAT b
MaxFinalizeWaitMinutes: maxFinalizeWaitMinutes,
GwAddressOffset: gwAddressOffset,
MonPort: monitorPort,
MaxPortsPerMapping: maxPortsPerMapping,
}
err := cfg.validate()
return cfg, err
Expand All @@ -121,6 +125,7 @@ func (cfg *Config) load() error {
offset, _ := eos.GetIntFromEnvVarWithDefault(snctrlrGwAddressOffset, 1)
cfg.GwAddressOffset = int32(offset)
cfg.MonPort, _ = eos.GetIntFromEnvVarWithDefault(snctrlrMonitorPort, 8080)
cfg.MaxPortsPerMapping, _ = eos.GetIntFromEnvVarWithDefault(snctrlrMaxPortsPerMapping, 50)
return cfg.validate()
}

Expand All @@ -143,6 +148,9 @@ func (cfg *Config) validate() error {
if cfg.MaxFinalizeWaitMinutes <= 0 {
return fmt.Errorf("Configuration env var %s can't be <= 0", snctrlrMaxFinalizeWaitMinutes)
}
if cfg.MaxPortsPerMapping <= 0 {
return fmt.Errorf("Configuration env var %s can't be <= 0", snctrlrMaxPortsPerMapping)
}
if cfg.MonPort <= 0 || cfg.MonPort > 65535 {
return fmt.Errorf("Configuration env var %s must be a valid TCP port num (1-65535)", snctrlrMonitorPort)
}
Expand Down
151 changes: 83 additions & 68 deletions pkg/controller/mapping/dnat_providers.go
Expand Up @@ -16,7 +16,6 @@ package mapping
import (
"fmt"
"net"
"sort"
"strings"

"github.com/DevFactory/go-tools/pkg/extensions/collections"
Expand All @@ -27,6 +26,7 @@ import (

const (
chainNamePrefix = "MAP"
ipSetNamePrefix = "DNAT"
markMasqComment = "mark for masquerade in " + masqPostroutingChain
)

Expand All @@ -50,13 +50,16 @@ type DNATProvider interface {
// supported configuration.
type ThroughServiceDNAT struct {
iptables nettools.IPTablesHelper
ipset nettools.IPSetHelper
namer Namer
}

// NewThroughServiceDNATProvider returns new instance of the ThroughServiceDNAT
func NewThroughServiceDNATProvider(iptables nettools.IPTablesHelper, namer Namer) DNATProvider {
func NewThroughServiceDNATProvider(iptables nettools.IPTablesHelper, ipset nettools.IPSetHelper,
namer Namer) DNATProvider {
return &ThroughServiceDNAT{
iptables: iptables,
ipset: ipset,
namer: namer,
}
}
Expand All @@ -79,15 +82,10 @@ func (p *ThroughServiceDNAT) SetupDNAT(externalIP net.IP, mapping *v1alpha1.Mapp
return err
}

// sort all external port numbers by protocol
tcpPorts, udpPorts := p.getPerProtocolPorts(mapping)
// jumps to chain
if err := p.synchronizeJumpPerProtocol(mapping, tcpPorts, externalIP, "tcp", preroutingChain, chainName); err != nil {
return err
}
if err := p.synchronizeJumpPerProtocol(mapping, udpPorts, externalIP, "udp", preroutingChain, chainName); err != nil {
return err
}
// create ipset that contains all "allowed_source_ip:proto,port" pairs
// and add iptables rules that checks against that set and jumps to
// per-service chain with per-port translation rules
p.synchronizeJumpAndIPSet(mapping, externalIP, preroutingChain, chainName)

// setup Masquerade marks if enabled
if setupMasquerade {
Expand Down Expand Up @@ -124,17 +122,25 @@ func (p *ThroughServiceDNAT) DeleteDNAT(externalIP net.IP, mapping *v1alpha1.Map
logrusWithMapping(mapping).Errorf("Error when flushing chain %s: %v", chainName, err)
return err
}
logrusWithMapping(mapping).Debugf("Removing jump(s) from chain %s to %s", preroutingChain, chainName)
if err := p.deleteJumpPerProtocol("tcp", preroutingChain, mapping); err != nil {
logrusWithMapping(mapping).Errorf("Error when deleting jump to chain %s from %s for "+
"protocol TCP: %v", chainName, preroutingChain, err)

// remove ipset
setName := p.getIPSetName(mapping)
if err := p.ipset.DeleteSet(setName); err != nil {
logrusWithMapping(mapping).Errorf("Error removing ipset %s: %v", setName, err)
return err
}
if err := p.deleteJumpPerProtocol("udp", preroutingChain, mapping); err != nil {
logrusWithMapping(mapping).Errorf("Error when deleting jump to chain %s from %s for "+
"protocol UDP: %v", chainName, preroutingChain, err)

// remove jump rule
comment := p.getJumpComment(mapping)
logrusWithMapping(mapping).Debugf("Deleting jump in chain %s with comment: %s",
chainName, comment)
if err := p.iptables.DeleteByComment(natTableName, preroutingChain, comment); err != nil {
logrusWithMapping(mapping).Errorf("Error deleting iptables jump from chain %s to %s: %v",
preroutingChain, chainName, err)
return err
}

// delete custom chain
logrusWithMapping(mapping).Debugf("Deleting chain %s", chainName)
if err := p.iptables.DeleteChain(natTableName, chainName); err != nil {
logrusWithMapping(mapping).Errorf("Error when deleting chain %s: %v", chainName, err)
Expand All @@ -143,24 +149,59 @@ func (p *ThroughServiceDNAT) DeleteDNAT(externalIP net.IP, mapping *v1alpha1.Map
return nil
}

func (p *ThroughServiceDNAT) synchronizeJumpPerProtocol(mapping *v1alpha1.Mapping, ports []v1alpha1.MappingPort,
externalIP net.IP, protocol, fromChain, toChain string) error {
if len(ports) > 0 {
logrusWithMapping(mapping).Debugf("Setting up jump to chain %s from %s for protocol %s", toChain,
fromChain, protocol)
if err := p.setupJumpPerProtocol(externalIP, protocol, ports, toChain, mapping); err != nil {
logrusWithMapping(mapping).Errorf("Error when setting up jump to chain %s from %s "+
"and protocol protocol: %v; %v", toChain, fromChain, protocol, err)
func (p *ThroughServiceDNAT) synchronizeJumpAndIPSet(mapping *v1alpha1.Mapping, externalIP net.IP, fromChain,
toChain string) error {
// ensure ipset for this services exists
setName := p.getIPSetName(mapping)
logrusWithMapping(mapping).Debugf("Ensuring ipset %s exists", setName)
if err := p.ipset.EnsureSetExists(setName, "hash:net,port"); err != nil {
logrusWithMapping(mapping).Errorf("Error creating ipset: %v", err)
return err
}

// synchronize ipset content
logrusWithMapping(mapping).Debugf("Synchronizing entries in ipset %s", setName)
tcpPorts, udpPorts := p.getPerProtocolPorts(mapping)
netPorts := []nettools.NetPort{}
for _, allowedSrc := range mapping.Spec.AllowedSources {
_, srcNet, err := net.ParseCIDR(allowedSrc)
if err != nil {
logrusWithMapping(mapping).Errorf("Error parsing allowed source %s: %v", allowedSrc, err)
return err
}
} else {
if err := p.deleteJumpPerProtocol(protocol, fromChain, mapping); err != nil {
logrusWithMapping(mapping).Errorf("Error when deleting jump to chain %s from %s for "+
"protocol %s: %v", toChain, fromChain, protocol, err)
return err
endpoints := []struct {
ports []v1alpha1.MappingPort
proto nettools.Protocol
}{
{tcpPorts, nettools.TCP},
{udpPorts, nettools.UDP},
}
for _, ep := range endpoints {
for _, port := range ep.ports {
netPort := nettools.NetPort{
Net: *srcNet,
Port: uint16(port.Port),
Protocol: ep.proto,
}
netPorts = append(netPorts, netPort)
}
}
}
return nil
if err := p.ipset.EnsureSetHasOnlyNetPort(setName, netPorts); err != nil {
logrusWithMapping(mapping).Errorf("Error synchronizing entries in ipset %s: %v", setName, err)
return err
}

// ensure jump to toChain exists with match against the ipset
logrusWithMapping(mapping).Debugf("Adding jump from chain %s to chain %s using ipset %s",
fromChain, toChain, setName)
return p.iptables.EnsureExistsOnlyAppend(nettools.IPTablesRuleArgs{
Table: natTableName,
ChainName: fromChain,
Selector: []string{"-d", fmt.Sprintf("%s/32", externalIP.String()), "-m", "set", "--match-set", setName, "src,dst"},
Action: []string{toChain},
Comment: p.getJumpComment(mapping),
})
}

// synchronizePerPortRules needs to get current rules in the specified chain and the new set
Expand All @@ -169,14 +210,15 @@ func (p *ThroughServiceDNAT) synchronizeJumpPerProtocol(mapping *v1alpha1.Mappin
func (p *ThroughServiceDNAT) synchronizePerPortRules(mapping *v1alpha1.Mapping, svc *v1.Service,
chainName string) error {
logrusWithMapping(mapping).Debugf("Starting to synchronize per port rules in chain %s", chainName)
// load current rules from the chain - tentatively we will remove all rules from this list
// load current rules from the chain
current, err := p.iptables.LoadRules(natTableName, chainName)
if err != nil {
logrusWithMapping(mapping).Errorf("Error getting iptables rules from the operating system for"+
" chain %s. Error: %v", chainName, err)
return err
}
current = p.skipMarkMasq(current)
// exclude the mark for masquerade rule, so it is always kept
current = p.excludeMarkMasqRule(current)
new := p.rulesFromMappedPorts(mapping, svc.Spec.ClusterIP, chainName)

// now find set difference "current\new" and "new\current"
Expand Down Expand Up @@ -220,7 +262,7 @@ func (p *ThroughServiceDNAT) synchronizePerPortRules(mapping *v1alpha1.Mapping,
return nil
}

func (*ThroughServiceDNAT) skipMarkMasq(rules []*nettools.IPTablesRuleArgs) []*nettools.IPTablesRuleArgs {
func (*ThroughServiceDNAT) excludeMarkMasqRule(rules []*nettools.IPTablesRuleArgs) []*nettools.IPTablesRuleArgs {
res := make([]*nettools.IPTablesRuleArgs, 0, len(rules))
for _, rule := range rules {
if rule.Comment == markMasqComment {
Expand Down Expand Up @@ -248,39 +290,8 @@ func (p *ThroughServiceDNAT) rulesFromMappedPorts(mapping *v1alpha1.Mapping, svc
return new
}

func (p *ThroughServiceDNAT) setupJumpPerProtocol(externalIP net.IP, protocol string,
ports []v1alpha1.MappingPort, chainName string, mapping *v1alpha1.Mapping) error {
list := make([]string, 0, len(ports))
for _, port := range ports {
list = append(list, fmt.Sprintf("%d", port.Port))
}
stringList := strings.Join(list, ",")
logrusWithMapping(mapping).Debugf("Setting jump from %s to %s for protocol %s",
preroutingChain, chainName, protocol)
src := make([]string, len(mapping.Spec.AllowedSources))
copy(src, mapping.Spec.AllowedSources)
sort.Strings(src)
allowedSources := strings.Join(src, ",")
return p.iptables.EnsureExistsOnlyAppend(nettools.IPTablesRuleArgs{
Table: natTableName,
ChainName: preroutingChain,
Selector: []string{"-d", fmt.Sprintf("%s/32", externalIP.String()), "-p", protocol,
"-m", "multiport", "--dports", stringList, "-s", allowedSources},
Action: []string{chainName},
Comment: p.getJumpComment(mapping, protocol),
})
}

func (p *ThroughServiceDNAT) deleteJumpPerProtocol(protocol string, chainName string,
mapping *v1alpha1.Mapping) error {
comment := p.getJumpComment(mapping, protocol)
logrusWithMapping(mapping).Debugf("Deleting jump in chain %s with comment: %s",
chainName, comment)
return p.iptables.DeleteByComment(natTableName, chainName, comment)
}

func (p *ThroughServiceDNAT) getJumpComment(mapping *v1alpha1.Mapping, protocol string) string {
return fmt.Sprintf("for mapping %s/%s [%s]", mapping.Namespace, mapping.Name, protocol)
func (p *ThroughServiceDNAT) getJumpComment(mapping *v1alpha1.Mapping) string {
return fmt.Sprintf("for mapping %s/%s", mapping.Namespace, mapping.Name)
}

func (p *ThroughServiceDNAT) getPerProtocolPorts(mapping *v1alpha1.Mapping) (
Expand All @@ -301,3 +312,7 @@ func (p *ThroughServiceDNAT) getPerProtocolPorts(mapping *v1alpha1.Mapping) (
func (p *ThroughServiceDNAT) getChainName(mapping *v1alpha1.Mapping) string {
return fmt.Sprintf("%s-%s", chainNamePrefix, p.namer.Name(mapping.ObjectMeta))
}

func (p *ThroughServiceDNAT) getIPSetName(mapping *v1alpha1.Mapping) string {
return fmt.Sprintf("%s-%s", ipSetNamePrefix, p.namer.Name(mapping.ObjectMeta))
}

0 comments on commit 54214d1

Please sign in to comment.