diff --git a/app/controllers/network_policy_controller.go b/app/controllers/network_policy_controller.go index 251dbc9a41..8475a0a604 100644 --- a/app/controllers/network_policy_controller.go +++ b/app/controllers/network_policy_controller.go @@ -17,20 +17,21 @@ import ( "github.com/cloudnativelabs/kube-router/utils" "github.com/coreos/go-iptables/iptables" "github.com/golang/glog" - "k8s.io/client-go/kubernetes" api "k8s.io/api/core/v1" apiextensions "k8s.io/api/extensions/v1beta1" networking "k8s.io/api/networking/v1" + "k8s.io/client-go/kubernetes" ) -// Network policy controller provides an ingress firewall for the pods as per the defined network policies. -// Two different types of iptables chains are used. Each pod running on the node which has default ingress -// policy is to deny the traffic gets a pod specific chian. Each network policy has a iptable chain, which +// Network policy controller provides both ingress and egress filtering for the pods as per the defined network +// policies. Two different types of iptables chains are used. Each pod running on the node which either +// requires ingress or egress filtering gets a pod specific chian. Each network policy has a iptable chain, which // has rules expreessed through ipsets matching source and destination pod ip's. In the FORWARD chain of the -// filter table a rule is added jump the traffic destined to the pod to the pod specific iptable chain. Each +// filter table a rule is added to jump the traffic originating (in case of egress network policy) from the pod +// or destined (in case of ingress network policy) to the pod to the pod specific iptable chain. Each // pod specifc iptable chain has rules to jump to the network polices chains, that pod matches. So packet -// destined for pod goes throuh fitler table's, FORWARD chain, followed by pod specific chain, followed -// by one or more network policy chains, till there is a match which will accept the packet, or gets +// originating/destined from/to pod goes throuh fitler table's, FORWARD chain, followed by pod specific chain, +// followed by one or more network policy chains, till there is a match which will accept the packet, or gets // dropped by the rule in the pod chain, if there is no match. // NetworkPolicyController strcut to hold information required by NetworkPolicyController @@ -53,10 +54,16 @@ type networkPolicyInfo struct { labels map[string]string // set of pods matching network policy spec podselector label selector - destPods map[string]podInfo + targetPods map[string]podInfo // whitelist ingress rules from the network policy spec ingressRules []ingressRule + + // whitelist egress rules from the network policy spec + egressRules []egressRule + + // policy type "ingress" or "egress" or "both" as defined by PolicyType in the spec + policyType string } // internal structure to represent Pod @@ -67,6 +74,7 @@ type podInfo struct { labels map[string]string } +// internal stucture to represent NetworkPolicyIngressRule in the spec type ingressRule struct { matchAllPorts bool ports []protocolAndPort @@ -74,12 +82,20 @@ type ingressRule struct { srcPods []podInfo } +// internal structure to represent NetworkPolicyEgressRule in the spec +type egressRule struct { + matchAllPorts bool + ports []protocolAndPort + matchAllDestinations bool + dstPods []podInfo +} + type protocolAndPort struct { protocol string port string } -// Run runs forver till we recive notification on stopCh +// Run runs forver till we receive notification on stopCh func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { t := time.NewTicker(npc.syncPeriod) defer t.Stop() @@ -176,13 +192,12 @@ func (npc *NetworkPolicyController) Sync() error { if err != nil { return errors.New("Aborting sync. Failed to build network policies: " + err.Error()) } - } else { + // TODO remove the Beta support npc.networkPoliciesInfo, err = buildBetaNetworkPoliciesInfo() if err != nil { return errors.New("Aborting sync. Failed to build network policies: " + err.Error()) } - } activePolicyChains, activePolicyIpSets, err := npc.syncNetworkPolicyChains() @@ -231,22 +246,34 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains() (map[string]bool, activePolicyChains[policyChainName] = true // create a ipset for all destination pod ip's matched by the policy spec PodSelector - destPodIpSetName := policyDestinationPodIpSetName(policy.namespace, policy.name) - destPodIpSet, err := npc.ipSetHandler.Create(destPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0") + targetDestPodIpSetName := policyDestinationPodIpSetName(policy.namespace, policy.name) + targetDestPodIpSet, err := npc.ipSetHandler.Create(targetDestPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0") + if err != nil { + return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error()) + } + + // create a ipset for all source pod ip's matched by the policy spec PodSelector + targetSourcePodIpSetName := policySourcePodIpSetName(policy.namespace, policy.name) + targetSourcePodIpSet, err := npc.ipSetHandler.Create(targetSourcePodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0") if err != nil { return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error()) } // flush all entries in the set - if destPodIpSet.Flush() != nil { + if targetSourcePodIpSet.Flush() != nil { + return nil, nil, fmt.Errorf("failed to flush ipset while syncing iptables: %s", err.Error()) + } + if targetDestPodIpSet.Flush() != nil { return nil, nil, fmt.Errorf("failed to flush ipset while syncing iptables: %s", err.Error()) } - activePolicyIpSets[destPodIpSet.Name] = true + activePolicyIpSets[targetDestPodIpSet.Name] = true + activePolicyIpSets[targetSourcePodIpSet.Name] = true - for k := range policy.destPods { + for k := range policy.targetPods { // TODO restrict ipset to ip's of pods running on the node - destPodIpSet.Add(k, utils.OptionTimeout, "0") + targetDestPodIpSet.Add(k, utils.OptionTimeout, "0") + targetSourcePodIpSet.Add(k, utils.OptionTimeout, "0") } // TODO use iptables-restore to better implement the logic, than flush and add rules @@ -255,103 +282,232 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains() (map[string]bool, return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) } - // From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic " - // so no whitelist rules to be added to the network policy - if policy.ingressRules == nil { - continue + err = npc.processIngressRules(policy, targetDestPodIpSetName, activePolicyIpSets) + if err != nil { + return nil, nil, err } - // run through all the ingress rules in the spec and create iptable rules - // in the chain for the network policy - for i, ingressRule := range policy.ingressRules { + err = npc.processEgressRules(policy, targetSourcePodIpSetName, activePolicyIpSets) + if err != nil { + return nil, nil, err + } + } - if len(ingressRule.srcPods) != 0 { - srcPodIpSetName := policySourcePodIpSetName(policy.namespace, policy.name, i) - srcPodIpSet, err := npc.ipSetHandler.Create(srcPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0") - if err != nil { - return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error()) - } - // flush all entries in the set - if srcPodIpSet.Flush() != nil { - return nil, nil, fmt.Errorf("failed to flush ipset while syncing iptables: %s", err.Error()) - } + glog.Infof("Iptables chains in the filter table are synchronized with the network policies.") - activePolicyIpSets[srcPodIpSet.Name] = true + return activePolicyChains, activePolicyIpSets, nil +} - for _, pod := range ingressRule.srcPods { - srcPodIpSet.Add(pod.ip, utils.OptionTimeout, "0") - } +func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo, + targetDestPodIpSetName string, activePolicyIpSets map[string]bool) error { - if len(ingressRule.ports) != 0 { - // case where 'ports' details and 'from' details specified in the ingress rule - // so match on specified source and destination ip's and specified port and protocol - for _, portProtocol := range ingressRule.ports { - comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + - policy.name + " namespace " + policy.namespace - args := []string{"-m", "comment", "--comment", comment, - "-m", "set", "--set", srcPodIpSetName, "src", - "-m", "set", "--set", destPodIpSetName, "dst", - "-p", portProtocol.protocol, - "--dport", portProtocol.port, - "-j", "ACCEPT"} - err := iptablesCmdHandler.AppendUnique("filter", policyChainName, args...) - if err != nil { - return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - } - } else { - // case where no 'ports' details specified in the ingress rule but 'from' details specified - // so match on specified source and destination ip with all port and protocol + // From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic " + // so no whitelist rules to be added to the network policy + if policy.ingressRules == nil { + return nil + } + + iptablesCmdHandler, err := iptables.New() + if err != nil { + return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error()) + } + + policyChainName := networkPolicyChainName(policy.namespace, policy.name) + + // run through all the ingress rules in the spec and create iptable rules + // in the chain for the network policy + for i, ingressRule := range policy.ingressRules { + + if len(ingressRule.srcPods) != 0 { + srcPodIpSetName := policyIndexedSourcePodIpSetName(policy.namespace, policy.name, i) + srcPodIpSet, err := npc.ipSetHandler.Create(srcPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0") + if err != nil { + return fmt.Errorf("failed to create ipset: %s", err.Error()) + } + // flush all entries in the set + if srcPodIpSet.Flush() != nil { + return fmt.Errorf("failed to flush ipset while syncing iptables: %s", err.Error()) + } + + activePolicyIpSets[srcPodIpSet.Name] = true + + for _, pod := range ingressRule.srcPods { + srcPodIpSet.Add(pod.ip, utils.OptionTimeout, "0") + } + + if len(ingressRule.ports) != 0 { + // case where 'ports' details and 'from' details specified in the ingress rule + // so match on specified source and destination ip's and specified port and protocol + for _, portProtocol := range ingressRule.ports { comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace args := []string{"-m", "comment", "--comment", comment, "-m", "set", "--set", srcPodIpSetName, "src", - "-m", "set", "--set", destPodIpSetName, "dst", + "-m", "set", "--set", targetDestPodIpSetName, "dst", + "-p", portProtocol.protocol, + "--dport", portProtocol.port, "-j", "ACCEPT"} err := iptablesCmdHandler.AppendUnique("filter", policyChainName, args...) if err != nil { - return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + return fmt.Errorf("Failed to run iptables command: %s", err.Error()) } } + } else { + // case where no 'ports' details specified in the ingress rule but 'from' details specified + // so match on specified source and destination ip with all port and protocol + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + + policy.name + " namespace " + policy.namespace + args := []string{"-m", "comment", "--comment", comment, + "-m", "set", "--set", srcPodIpSetName, "src", + "-m", "set", "--set", targetDestPodIpSetName, "dst", + "-j", "ACCEPT"} + err := iptablesCmdHandler.AppendUnique("filter", policyChainName, args...) + if err != nil { + return fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } } + } - // case where only 'ports' details specified but no 'from' details in the ingress rule - // so match on all sources, with specified port and protocol - if ingressRule.matchAllSource && !ingressRule.matchAllPorts { - for _, portProtocol := range ingressRule.ports { - comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name: " + + // case where only 'ports' details specified but no 'from' details in the ingress rule + // so match on all sources, with specified port and protocol + if ingressRule.matchAllSource && !ingressRule.matchAllPorts { + for _, portProtocol := range ingressRule.ports { + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name: " + + policy.name + " namespace " + policy.namespace + args := []string{"-m", "comment", "--comment", comment, + "-m", "set", "--set", targetDestPodIpSetName, "dst", + "-p", portProtocol.protocol, + "--dport", portProtocol.port, + "-j", "ACCEPT"} + err := iptablesCmdHandler.AppendUnique("filter", policyChainName, args...) + if err != nil { + return fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + } + + // case where nether ports nor from details are speified in the ingress rule + // so match on all ports, protocol, source IP's + if ingressRule.matchAllSource && ingressRule.matchAllPorts { + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name: " + + policy.name + " namespace " + policy.namespace + args := []string{"-m", "comment", "--comment", comment, + "-m", "set", "--set", targetDestPodIpSetName, "dst", + "-j", "ACCEPT"} + err := iptablesCmdHandler.AppendUnique("filter", policyChainName, args...) + if err != nil { + return fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + } + + return nil +} + +func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, + targetSourcePodIpSetName string, activePolicyIpSets map[string]bool) error { + + // From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic " + // so no whitelist rules to be added to the network policy + if policy.egressRules == nil { + return nil + } + + iptablesCmdHandler, err := iptables.New() + if err != nil { + return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error()) + } + + policyChainName := networkPolicyChainName(policy.namespace, policy.name) + + // run through all the egress rules in the spec and create iptable rules + // in the chain for the network policy + for i, egressRule := range policy.egressRules { + + if len(egressRule.dstPods) != 0 { + dstPodIpSetName := policyIndexedDestinationPodIpSetName(policy.namespace, policy.name, i) + dstPodIpSet, err := npc.ipSetHandler.Create(dstPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0") + if err != nil { + return fmt.Errorf("failed to create ipset: %s", err.Error()) + } + // flush all entries in the set + if dstPodIpSet.Flush() != nil { + return fmt.Errorf("failed to flush ipset while syncing iptables: %s", err.Error()) + } + + activePolicyIpSets[dstPodIpSet.Name] = true + + for _, pod := range egressRule.dstPods { + dstPodIpSet.Add(pod.ip, utils.OptionTimeout, "0") + } + + if len(egressRule.ports) != 0 { + // case where 'ports' details and 'from' details specified in the egress rule + // so match on specified source and destination ip's and specified port and protocol + for _, portProtocol := range egressRule.ports { + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace args := []string{"-m", "comment", "--comment", comment, - "-m", "set", "--set", destPodIpSetName, "dst", + "-m", "set", "--set", targetSourcePodIpSetName, "src", + "-m", "set", "--set", dstPodIpSetName, "dst", "-p", portProtocol.protocol, "--dport", portProtocol.port, "-j", "ACCEPT"} err := iptablesCmdHandler.AppendUnique("filter", policyChainName, args...) if err != nil { - return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + return fmt.Errorf("Failed to run iptables command: %s", err.Error()) } } + } else { + // case where no 'ports' details specified in the ingress rule but 'from' details specified + // so match on specified source and destination ip with all port and protocol + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + + policy.name + " namespace " + policy.namespace + args := []string{"-m", "comment", "--comment", comment, + "-m", "set", "--set", targetSourcePodIpSetName, "src", + "-m", "set", "--set", dstPodIpSetName, "dst", + "-j", "ACCEPT"} + err := iptablesCmdHandler.AppendUnique("filter", policyChainName, args...) + if err != nil { + return fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } } + } - // case where nether ports nor from details are speified in the ingress rule - // so match on all ports, protocol, source IP's - if ingressRule.matchAllSource && ingressRule.matchAllPorts { + // case where only 'ports' details specified but no 'to' details in the egress rule + // so match on all sources, with specified port and protocol + if egressRule.matchAllDestinations && !egressRule.matchAllPorts { + for _, portProtocol := range egressRule.ports { comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace args := []string{"-m", "comment", "--comment", comment, - "-m", "set", "--set", destPodIpSetName, "dst", + "-m", "set", "--set", targetSourcePodIpSetName, "src", + "-p", portProtocol.protocol, + "--dport", portProtocol.port, "-j", "ACCEPT"} err := iptablesCmdHandler.AppendUnique("filter", policyChainName, args...) if err != nil { - return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + return fmt.Errorf("Failed to run iptables command: %s", err.Error()) } } } - } - glog.Infof("Iptables chains in the filter table are synchronized with the network policies.") + // case where nether ports nor from details are speified in the egress rule + // so match on all ports, protocol, source IP's + if egressRule.matchAllDestinations && egressRule.matchAllPorts { + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name: " + + policy.name + " namespace " + policy.namespace + args := []string{"-m", "comment", "--comment", comment, + "-m", "set", "--set", targetSourcePodIpSetName, "src", + "-j", "ACCEPT"} + err := iptablesCmdHandler.AppendUnique("filter", policyChainName, args...) + if err != nil { + return fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + } - return activePolicyChains, activePolicyIpSets, nil + return nil } func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, error) { @@ -363,12 +519,12 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er glog.Fatalf("Failed to initialize iptables executor: %s", err.Error()) } - // loop through the pods running on the node which has default ingress to be denied - firewallEnabledPods, err := npc.getFirewallEnabledPods(npc.nodeIP.String()) + // loop through the pods running on the node which to which ingress network policies to be applied + ingressNetworkPolicyEnabledPods, err := npc.getIngressNetworkPolicyEnabledPods(npc.nodeIP.String()) if err != nil { return nil, err } - for _, pod := range *firewallEnabledPods { + for _, pod := range *ingressNetworkPolicyEnabledPods { // below condition occurs when we get trasient update while removing or adding pod // subseqent update will do the correct action @@ -442,7 +598,105 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er // add entries in pod firewall to run through required network policies for _, policy := range *npc.networkPoliciesInfo { - if _, ok := policy.destPods[pod.ip]; ok { + if _, ok := policy.targetPods[pod.ip]; ok { + comment := "run through nw policy " + policy.name + policyChainName := networkPolicyChainName(policy.namespace, policy.name) + args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName} + exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + if !exists { + err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...) + if err != nil && err.(*iptables.Error).ExitStatus() != 1 { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + } + } + + // ensure statefull firewall, that permits return traffic for the traffic originated by the pod + comment = "rule for stateful firewall for pod" + args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"} + exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + if !exists { + err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + } + + // loop through the pods running on the node which egress network policies to be applied + egressNetworkPolicyEnabledPods, err := npc.getEgressNetworkPolicyEnabledPods(npc.nodeIP.String()) + if err != nil { + return nil, err + } + for _, pod := range *egressNetworkPolicyEnabledPods { + + // below condition occurs when we get trasient update while removing or adding pod + // subseqent update will do the correct action + if len(pod.ip) == 0 || pod.ip == "" { + continue + } + + // ensure pod specific firewall chain exist for all the pods that need egress firewall + podFwChainName := podFirewallChainName(pod.namespace, pod.name) + err = iptablesCmdHandler.NewChain("filter", podFwChainName) + if err != nil && err.(*iptables.Error).ExitStatus() != 1 { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + activePodFwChains[podFwChainName] = true + + // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain + // this rule applies to the traffic getting routed (coming for other node pods) + comment := "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + args := []string{"-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName} + exists, err := iptablesCmdHandler.Exists("filter", "FORWARD", args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + if !exists { + err := iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + + // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain + // this rule applies to the traffic getting switched (coming for same node pods) + comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + args = []string{"-m", "physdev", "--physdev-is-bridged", + "-m", "comment", "--comment", comment, + "-s", pod.ip, + "-j", podFwChainName} + exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + if !exists { + err = iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + + // add default DROP rule at the end of chain + comment = "default rule to REJECT traffic destined for POD name:" + pod.name + " namespace: " + pod.namespace + args = []string{"-m", "comment", "--comment", comment, "-j", "REJECT"} + err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + + // add entries in pod firewall to run through required network policies + for _, policy := range *npc.networkPoliciesInfo { + if _, ok := policy.targetPods[pod.ip]; ok { comment := "run through nw policy " + policy.name policyChainName := networkPolicyChainName(policy.namespace, policy.name) args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName} @@ -610,62 +864,54 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets return nil } -func (npc *NetworkPolicyController) getFirewallEnabledPods(nodeIp string) (*map[string]podInfo, error) { - +func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(nodeIp string) (*map[string]podInfo, error) { nodePods := make(map[string]podInfo) for _, pod := range watchers.PodWatcher.List() { if strings.Compare(pod.Status.HostIP, nodeIp) != 0 { continue } - if npc.v1NetworkPolicy { - podNeedsFirewall := false - for _, policyObj := range watchers.NetworkPolicyWatcher.List() { - policy, _ := policyObj.(*networking.NetworkPolicy) + for _, policy := range *npc.networkPoliciesInfo { + if policy.namespace != pod.ObjectMeta.Namespace { + continue + } + _, ok := policy.targetPods[pod.Status.PodIP] + if ok && (policy.policyType == "both" || policy.policyType == "ingress") { + glog.Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.") + nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, + name: pod.ObjectMeta.Name, + namespace: pod.ObjectMeta.Namespace, + labels: pod.ObjectMeta.Labels} + break + } + } + } + return &nodePods, nil - // we are only interested in the network policies in same namespace that of pod - if policy.Namespace != pod.ObjectMeta.Namespace { - continue - } +} - // An empty podSelector matches all pods in this namespace. - if len(policy.Spec.PodSelector.MatchLabels) == 0 && len(policy.Spec.PodSelector.MatchExpressions) == 0 { - podNeedsFirewall = true - break - } +func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(nodeIp string) (*map[string]podInfo, error) { - // if pod matches atleast on network policy labels then pod needs firewall - matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, - policy.Spec.PodSelector.MatchLabels) - if err != nil { - return nil, fmt.Errorf("Failed to get the pods %s", err.Error()) - } - for _, matchingPod := range matchingPods { - if matchingPod.ObjectMeta.Name == pod.ObjectMeta.Name { - podNeedsFirewall = true - break - } - } - if podNeedsFirewall { - break - } - } - if !podNeedsFirewall { + nodePods := make(map[string]podInfo) + + for _, pod := range watchers.PodWatcher.List() { + if strings.Compare(pod.Status.HostIP, nodeIp) != 0 { + continue + } + for _, policy := range *npc.networkPoliciesInfo { + if policy.namespace != pod.ObjectMeta.Namespace { continue } - } else { - defaultPolicy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace) - if err != nil { - return nil, fmt.Errorf("Failed to get the namespace default ingress policy %s", err.Error()) - } - if strings.Compare(defaultPolicy, "DefaultDeny") != 0 { - continue + _, ok := policy.targetPods[pod.Status.PodIP] + if ok && (policy.policyType == "both" || policy.policyType == "egress") { + glog.Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.") + nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, + name: pod.ObjectMeta.Name, + namespace: pod.ObjectMeta.Namespace, + labels: pod.ObjectMeta.Labels} + break } } - nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, - name: pod.ObjectMeta.Name, - namespace: pod.ObjectMeta.Namespace, - labels: pod.ObjectMeta.Labels} } return &nodePods, nil } @@ -681,15 +927,45 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { return nil, fmt.Errorf("Failed to convert") } newPolicy := networkPolicyInfo{ - name: policy.Name, - namespace: policy.Namespace, - labels: policy.Spec.PodSelector.MatchLabels, + name: policy.Name, + namespace: policy.Namespace, + labels: policy.Spec.PodSelector.MatchLabels, + policyType: "ingress", } + + // check if there is explicitly specified PolicyTypes in the spec + if len(policy.Spec.PolicyTypes) > 0 { + ingressType, egressType := false, false + for _, policyType := range policy.Spec.PolicyTypes { + if policyType == networking.PolicyTypeIngress { + ingressType = true + } + if policyType == networking.PolicyTypeEgress { + egressType = true + } + } + if ingressType && egressType { + newPolicy.policyType = "both" + } else if egressType { + newPolicy.policyType = "egress" + } else if ingressType { + newPolicy.policyType = "ingress" + } + } else { + if policy.Spec.Egress != nil && policy.Spec.Ingress != nil { + newPolicy.policyType = "both" + } else if policy.Spec.Egress != nil { + newPolicy.policyType = "egress" + } else if policy.Spec.Ingress != nil { + newPolicy.policyType = "ingress" + } + } + matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels) - newPolicy.destPods = make(map[string]podInfo) + newPolicy.targetPods = make(map[string]podInfo) if err == nil { for _, matchingPod := range matchingPods { - newPolicy.destPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP, + newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP, name: matchingPod.ObjectMeta.Name, namespace: matchingPod.ObjectMeta.Namespace, labels: matchingPod.ObjectMeta.Labels} @@ -702,6 +978,12 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { newPolicy.ingressRules = make([]ingressRule, 0) } + if policy.Spec.Egress == nil { + newPolicy.egressRules = nil + } else { + newPolicy.egressRules = make([]egressRule, 0) + } + for _, specIngressRule := range policy.Spec.Ingress { ingressRule := ingressRule{} @@ -759,6 +1041,64 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule) } + + for _, specEgressRule := range policy.Spec.Egress { + egressRule := egressRule{} + + egressRule.ports = make([]protocolAndPort, 0) + + // If this field is empty or missing in the spec, this rule matches all ports + if len(specEgressRule.Ports) == 0 { + egressRule.matchAllPorts = true + } else { + egressRule.matchAllPorts = false + for _, port := range specEgressRule.Ports { + protocolAndPort := protocolAndPort{protocol: string(*port.Protocol), port: port.Port.String()} + egressRule.ports = append(egressRule.ports, protocolAndPort) + } + } + + egressRule.dstPods = make([]podInfo, 0) + + // If this field is empty or missing in the spec, this rule matches all sources + if len(specEgressRule.To) == 0 { + egressRule.matchAllDestinations = true + } else { + egressRule.matchAllDestinations = false + var matchingPods []*api.Pod + var err error + for _, peer := range specEgressRule.To { + // spec must have either of PodSelector or NamespaceSelector + if peer.PodSelector != nil { + matchingPods, err = watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, + peer.PodSelector.MatchLabels) + } else if peer.NamespaceSelector != nil { + namespaces, err := watchers.NamespaceWatcher.ListByLabels(peer.NamespaceSelector.MatchLabels) + if err != nil { + return nil, errors.New("Failed to build network policies info due to " + err.Error()) + } + for _, namespace := range namespaces { + namespacePods, err := watchers.PodWatcher.ListByNamespaceAndLabels(namespace.Name, nil) + if err != nil { + return nil, errors.New("Failed to build network policies info due to " + err.Error()) + } + matchingPods = append(matchingPods, namespacePods...) + } + } + if err == nil { + for _, matchingPod := range matchingPods { + egressRule.dstPods = append(egressRule.dstPods, + podInfo{ip: matchingPod.Status.PodIP, + name: matchingPod.ObjectMeta.Name, + namespace: matchingPod.ObjectMeta.Namespace, + labels: matchingPod.ObjectMeta.Labels}) + } + } + } + } + + newPolicy.egressRules = append(newPolicy.egressRules, egressRule) + } NetworkPolicies = append(NetworkPolicies, newPolicy) } @@ -778,11 +1118,11 @@ func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { labels: policy.Spec.PodSelector.MatchLabels, } matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels) - newPolicy.destPods = make(map[string]podInfo) + newPolicy.targetPods = make(map[string]podInfo) newPolicy.ingressRules = make([]ingressRule, 0) if err == nil { for _, matchingPod := range matchingPods { - newPolicy.destPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP, + newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP, name: matchingPod.ObjectMeta.Name, namespace: matchingPod.ObjectMeta.Namespace, labels: matchingPod.ObjectMeta.Labels} @@ -850,18 +1190,30 @@ func networkPolicyChainName(namespace, policyName string) string { return "KUBE-NWPLCY-" + encoded[:16] } +func policySourcePodIpSetName(namespace, policyName string) string { + hash := sha256.Sum256([]byte(namespace + policyName)) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return "KUBE-SRC-" + encoded[:16] +} + func policyDestinationPodIpSetName(namespace, policyName string) string { hash := sha256.Sum256([]byte(namespace + policyName)) encoded := base32.StdEncoding.EncodeToString(hash[:]) return "KUBE-DST-" + encoded[:16] } -func policySourcePodIpSetName(namespace, policyName string, ingressRuleNo int) string { +func policyIndexedSourcePodIpSetName(namespace, policyName string, ingressRuleNo int) string { hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo))) encoded := base32.StdEncoding.EncodeToString(hash[:]) return "KUBE-SRC-" + encoded[:16] } +func policyIndexedDestinationPodIpSetName(namespace, policyName string, egressRuleNo int) string { + hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo))) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return "KUBE-DST-" + encoded[:16] +} + // Cleanup cleanup configurations done func (npc *NetworkPolicyController) Cleanup() {