Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

whitelist traffic to cluster IP and node ports in INPUT chain to bypass netwrok policy enforcement #914

Merged
merged 3 commits into from Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/user-guide.md
Expand Up @@ -82,6 +82,8 @@ Usage of kube-router:
--run-firewall Enables Network Policy -- sets up iptables to provide ingress firewall for pods. (default true)
--run-router Enables Pod Networking -- Advertises and learns the routes to Pods via iBGP. (default true)
--run-service-proxy Enables Service Proxy -- sets up IPVS for Kubernetes Services. (default true)
--service-cluster-ip-range string CIDR value from which service cluster IPs are assigned. Default: 10.96.0.0/12 (default "10.96.0.0/12")
--service-node-port-range string NodePort range. Default: 30000-32767 (default "30000:32767")
-v, --v string log level for V logs (default "0")
-V, --version Print version information.
```
Expand Down
37 changes: 22 additions & 15 deletions pkg/cmd/kube-router.go
Expand Up @@ -127,21 +127,6 @@ func (kr *KubeRouter) Run() error {
kr.Config.MetricsEnabled = false
}

if kr.Config.RunFirewall {
npc, err := netpol.NewNetworkPolicyController(kr.Client,
kr.Config, podInformer, npInformer, nsInformer)
if err != nil {
return errors.New("Failed to create network policy controller: " + err.Error())
}

podInformer.AddEventHandler(npc.PodEventHandler)
nsInformer.AddEventHandler(npc.NamespaceEventHandler)
npInformer.AddEventHandler(npc.NetworkPolicyEventHandler)

wg.Add(1)
go npc.Run(healthChan, stopCh, &wg)
}

if kr.Config.BGPGracefulRestart {
if kr.Config.BGPGracefulRestartDeferralTime > time.Hour*18 {
return errors.New("BGPGracefuleRestartDeferralTime should be less than 18 hours")
Expand Down Expand Up @@ -177,6 +162,28 @@ func (kr *KubeRouter) Run() error {

wg.Add(1)
go nsc.Run(healthChan, stopCh, &wg)

// wait for the proxy firewall rules to be setup before network policies
if kr.Config.RunFirewall {
nsc.ProxyFirewallSetup.L.Lock()
nsc.ProxyFirewallSetup.Wait()
nsc.ProxyFirewallSetup.L.Unlock()
}
}

if kr.Config.RunFirewall {
npc, err := netpol.NewNetworkPolicyController(kr.Client,
kr.Config, podInformer, npInformer, nsInformer)
if err != nil {
return errors.New("Failed to create network policy controller: " + err.Error())
}

podInformer.AddEventHandler(npc.PodEventHandler)
nsInformer.AddEventHandler(npc.NamespaceEventHandler)
npInformer.AddEventHandler(npc.NetworkPolicyEventHandler)

wg.Add(1)
go npc.Run(healthChan, stopCh, &wg)
}

// Handle SIGINT and SIGTERM
Expand Down
102 changes: 62 additions & 40 deletions pkg/controllers/netpol/network_policy_controller.go
Expand Up @@ -54,14 +54,16 @@ const (

// NetworkPolicyController strcut to hold information required by NetworkPolicyController
type NetworkPolicyController struct {
nodeIP net.IP
nodeHostName string
mu sync.Mutex
syncPeriod time.Duration
MetricsEnabled bool
v1NetworkPolicy bool
healthChan chan<- *healthcheck.ControllerHeartbeat
fullSyncRequestChan chan struct{}
nodeIP net.IP
nodeHostName string
serviceClusterIPRange string
serviceNodePortRange string
mu sync.Mutex
syncPeriod time.Duration
MetricsEnabled bool
v1NetworkPolicy bool
healthChan chan<- *healthcheck.ControllerHeartbeat
fullSyncRequestChan chan struct{}

ipSetHandler *utils.IPSet

Expand Down Expand Up @@ -197,48 +199,65 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
glog.Fatalf("Failed to initialize iptables executor due to %s", err.Error())
}

chains := map[string]string{"INPUT": kubeInputChainName, "FORWARD": kubeForwardChainName, "OUTPUT": kubeOutputChainName}

for builtinChain, customChain := range chains {
err = iptablesCmdHandler.NewChain("filter", customChain)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
glog.Fatalf("Failed to run iptables command to create %s chain due to %s", customChain, err.Error())
ensureRuleAtposition := func(chain string, ruleSpec []string, position int) {
rules, err := iptablesCmdHandler.List("filter", chain)
if err != nil {
glog.Fatalf("failed to list rules in filter table %s chain due to %s", chain, err.Error())
}
args := []string{"-m", "comment", "--comment", "kube-router netpol", "-j", customChain}
exists, err := iptablesCmdHandler.Exists("filter", builtinChain, args...)

exists, err := iptablesCmdHandler.Exists("filter", chain, ruleSpec...)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really minor nitpic, but we should probably do this existence checking before the List call above. If the iptables rule doesn't exist, then there is no reason to do the above list and it would save us an iptables call.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Will fix it.

if err != nil {
glog.Fatalf("Failed to verify rule exists to jump to chain %s in %s chain due to %s", customChain, builtinChain, err.Error())
glog.Fatalf("Failed to verify rule exists in %s chain due to %s", chain, err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", builtinChain, 1, args...)
err := iptablesCmdHandler.Insert("filter", chain, position, ruleSpec...)
if err != nil {
glog.Fatalf("Failed to run iptables command to insert in %s chain %s", builtinChain, err.Error())
glog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error())
}
} else {
rules, err := iptablesCmdHandler.List("filter", builtinChain)
if err != nil {
glog.Fatalf("failed to list rules in filter table %s chain due to %s", builtinChain, err.Error())
return
}
var ruleNo int
for i, rule := range rules {
rule = strings.Replace(rule, "\"", "", 2) //removes quote from comment string
if strings.Contains(rule, strings.Join(ruleSpec, " ")) {
ruleNo = i
break
}

var ruleNo int
for i, rule := range rules {
if strings.Contains(rule, customChain) {
ruleNo = i
break
}
}
if ruleNo != position {
err = iptablesCmdHandler.Insert("filter", chain, position, ruleSpec...)
if err != nil {
glog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error())
}
if ruleNo != 1 {
err = iptablesCmdHandler.Insert("filter", builtinChain, 1, args...)
if err != nil {
glog.Fatalf("Failed to run iptables command to insert in %s chain %s", builtinChain, err.Error())
}
err = iptablesCmdHandler.Delete("filter", builtinChain, strconv.Itoa(ruleNo+1))
if err != nil {
glog.Fatalf("Failed to delete wrong rule to jump to chain %s in %s chain due to %s", customChain, builtinChain, err.Error())
}
err = iptablesCmdHandler.Delete("filter", chain, strconv.Itoa(ruleNo+1))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that you've talked about this other places in the code, but at some point in the future we should really not rely on deleting iptables rules by number (I know we do it all over the place). But there exists a possibility that since we've listed the rules, that some other process has come in and manipulated iptables causing us to potentially delete the wrong rule.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. There is a possibility. Perhaps holding the iptables lock followed by iptables-save and iptables-restore would be a safe solution. But if there are non-cooperating processes which does not use locks we just have no way to control it with iptables. Such races has been problem:

weaveworks/weave#3230
kubernetes/kubernetes#20391

if err != nil {
glog.Fatalf("Failed to delete incorrect rule in %s chain due to %s", chain, err.Error())
}
}
}

chains := map[string]string{"INPUT": kubeInputChainName, "FORWARD": kubeForwardChainName, "OUTPUT": kubeOutputChainName}

for builtinChain, customChain := range chains {
err = iptablesCmdHandler.NewChain("filter", customChain)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
glog.Fatalf("Failed to run iptables command to create %s chain due to %s", customChain, err.Error())
}
args := []string{"-m", "comment", "--comment", "kube-router netpol", "-j", customChain}
ensureRuleAtposition(builtinChain, args, 1)
}

whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to cluster IP", "-d", npc.serviceClusterIPRange, "-j", "RETURN"}
ensureRuleAtposition(kubeInputChainName, whitelistServiceVips, 1)

whitelistTCPNodeports := []string{"-p", "tcp", "-m", "comment", "--comment", "allow LOCAL traffic to node ports", "-m", "addrtype", "--dst-type", "LOCAL",
"-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"}
ensureRuleAtposition(kubeInputChainName, whitelistTCPNodeports, 2)

whitelistUDPNodeports := []string{"-p", "udp", "-m", "comment", "--comment", "allow LOCAL traffic to node ports", "-m", "addrtype", "--dst-type", "LOCAL",
"-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"}
ensureRuleAtposition(kubeInputChainName, whitelistUDPNodeports, 3)

}

// OnPodUpdate handles updates to pods from the Kubernetes api server
Expand Down Expand Up @@ -953,7 +972,7 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo []
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", chain, 1, args...)
err := iptablesCmdHandler.AppendUnique("filter", chain, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
Expand Down Expand Up @@ -1780,6 +1799,9 @@ func NewNetworkPolicyController(clientset kubernetes.Interface,
// be up to date with all of the policy changes from any enqueued request after that
npc.fullSyncRequestChan = make(chan struct{}, 1)

npc.serviceClusterIPRange = config.ClusterIPCIDR
npc.serviceNodePortRange = config.NodePortRange

if config.MetricsEnabled {
//Register the metrics for this controller
prometheus.MustRegister(metrics.ControllerIptablesSyncTime)
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/proxy/network_services_controller.go
Expand Up @@ -217,6 +217,7 @@ type NetworkServicesController struct {
MetricsEnabled bool
ln LinuxNetworking
readyForUpdates bool
ProxyFirewallSetup *sync.Cond

// Map of ipsets that we use.
ipsetMap map[string]*utils.Set
Expand Down Expand Up @@ -344,6 +345,7 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control
if err != nil {
glog.Error("Error setting up ipvs firewall: " + err.Error())
}
nsc.ProxyFirewallSetup.Broadcast()

gracefulTicker := time.NewTicker(5 * time.Second)
defer gracefulTicker.Stop()
Expand Down Expand Up @@ -2222,6 +2224,8 @@ func NewNetworkServicesController(clientset kubernetes.Interface,
nsc.endpointsMap = make(endpointsInfoMap)
nsc.client = clientset

nsc.ProxyFirewallSetup = sync.NewCond(&sync.Mutex{})

nsc.masqueradeAll = false
if config.MasqueradeAll {
nsc.masqueradeAll = true
Expand Down
8 changes: 8 additions & 0 deletions pkg/options/options.go
Expand Up @@ -23,6 +23,8 @@ type KubeRouterConfig struct {
CleanupConfig bool
ClusterAsn uint
ClusterCIDR string
ClusterIPCIDR string
NodePortRange string
DisableSrcDstCheck bool
EnableCNI bool
EnableiBGP bool
Expand Down Expand Up @@ -74,6 +76,8 @@ func NewKubeRouterConfig() *KubeRouterConfig {
BGPGracefulRestartDeferralTime: 360 * time.Second,
EnableOverlay: true,
OverlayType: "subnet",
ClusterIPCIDR: "10.96.0.0/12",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a safe guess for all of the user's that use our daemonset's that we publish?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its default value kubernetes uses.

NodePortRange: "30000:32767",
}
}

Expand Down Expand Up @@ -102,6 +106,10 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) {
"CIDR range of pods in the cluster. It is used to identify traffic originating from and destinated to pods.")
fs.StringSliceVar(&s.ExcludedCidrs, "excluded-cidrs", s.ExcludedCidrs,
"Excluded CIDRs are used to exclude IPVS rules from deletion.")
fs.StringVar(&s.ClusterIPCIDR, "service-cluster-ip-range", s.ClusterIPCIDR,
"CIDR value from which service cluster IPs are assigned. Default: 10.96.0.0/12")
fs.StringVar(&s.NodePortRange, "service-node-port-range", s.NodePortRange,
"NodePort range. Default: 30000-32767")
fs.BoolVar(&s.EnablePodEgress, "enable-pod-egress", true,
"SNAT traffic from Pods to destinations outside the cluster.")
fs.DurationVar(&s.IPTablesSyncPeriod, "iptables-sync-period", s.IPTablesSyncPeriod,
Expand Down