Skip to content

Commit

Permalink
daemon: Add ingress endpoint
Browse files Browse the repository at this point in the history
Add Cilium Endpoint representing Ingress. It is defined without a veth
interface and no bpf programs or maps are created for it. Ingress
endpoint is needed so that the network policy is computed and configured
to Envoy, so that ingress/egress network policy defined for Ingress can
be enforced.

Cilium Ingress is implemented as L7 LB, which is an Envoy redirect on the
egress packet path. Egress CNP policies are already enforced when
defined. Prior to this commit CNPs defined for reserved:ingress identity
were not computed, however, and all traffic was passed through by Cilium
Ingress was allowed to egress towards the backends. When the backends
receive such packets, they are identified as coming from Cilium Ingress,
so any ingress policies at the backends can not discern the original
source of the traffic.

This commit adds a Cilium endpoint for the reserved:ingress identity,
which makes the Cilium node compute and pass policies whose endpoint
selector selects this identity (e.g., by selecting all entities) to
Envoy, so that they can be enforced. Envoy listener will then enforce not
just the egress policy but also the ingress policy for the original
incoming source security identity.

Signed-off-by: Jarno Rajahalme <jarno@isovalent.com>
  • Loading branch information
jrajahalme authored and ti-mo committed Oct 6, 2023
1 parent d4543a8 commit 04f19e9
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 3 deletions.
19 changes: 19 additions & 0 deletions daemon/cmd/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1765,6 +1765,25 @@ func startDaemon(d *Daemon, restoredEndpoints *endpointRestoreState, cleaner *da
}
}

if option.Config.EnableEnvoyConfig {
if !d.endpointManager.IngressEndpointExists() {
// Creating Ingress Endpoint depends on the Ingress IPs having been
// allocated first. This happens earlier in the agent bootstrap.
if (option.Config.EnableIPv4 && len(node.GetIngressIPv4()) == 0) ||
(option.Config.EnableIPv6 && len(node.GetIngressIPv6()) == 0) {
log.Warn("Ingress IPs are not available, skipping creation of the Ingress Endpoint: Policy enforcement on Cilium Ingress will not work as expected.")
} else {
log.Info("Creating ingress endpoint")
if err := d.endpointManager.AddIngressEndpoint(
d.ctx, d, d, d.ipcache, d.l7Proxy, d.identityAllocator,
"Create ingress endpoint",
); err != nil {
log.Fatalf("unable to create ingress endpoint: %s", err)
}
}
}
}

if option.Config.EnableIPMasqAgent {
ipmasqAgent, err := ipmasq.NewIPMasqAgent(option.Config.IPMasqAgentConfigPath)
if err != nil {
Expand Down
48 changes: 47 additions & 1 deletion pkg/endpoint/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,14 +530,31 @@ func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint
}()

if err != nil {
return 0, compilationExecuted, err
return 0, false, err
}

// No need to compile BPF in dry mode.
if option.Config.DryMode {
return e.nextPolicyRevision, false, nil
}

// Skip BPF if the endpoint has no policy map
if !e.HasBPFPolicyMap() {
// Allow another builder to start while we wait for the proxy
if regenContext.DoneFunc != nil {
regenContext.DoneFunc()
}

stats.proxyWaitForAck.Start()
err = e.waitForProxyCompletions(datapathRegenCtxt.proxyWaitGroup)
stats.proxyWaitForAck.End(err == nil)
if err != nil {
return 0, false, fmt.Errorf("Error while updating network policy: %s", err)
}

return e.nextPolicyRevision, false, nil
}

// Wait for connection tracking cleaning to complete
stats.waitingForCTClean.Start()
<-datapathRegenCtxt.ctCleaned
Expand Down Expand Up @@ -740,6 +757,30 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
return false, nil
}

// Endpoints without policy maps only need Network Policy Updates
if !e.HasBPFPolicyMap() {
if logging.CanLogAt(log.Logger, logrus.DebugLevel) {
log.WithField(logfields.EndpointID, e.ID).Debug("Ingress Endpoint skipping bpf regeneration")
}

if e.SecurityIdentity != nil {
_ = e.updateAndOverrideEndpointOptions(nil)

if logging.CanLogAt(log.Logger, logrus.DebugLevel) {
log.WithField(logfields.EndpointID, e.ID).Debug("Ingress Endpoint updating Network policy")
}

stats.proxyPolicyCalculation.Start()
err, networkPolicyRevertFunc := e.updateNetworkPolicy(datapathRegenCtxt.proxyWaitGroup)
stats.proxyPolicyCalculation.End(err == nil)
if err != nil {
return false, err
}
datapathRegenCtxt.revertStack.Push(networkPolicyRevertFunc)
}
return false, nil
}

if e.policyMap == nil {
e.policyMap, err = policymap.OpenOrCreate(e.policyMapPath())
if err != nil {
Expand Down Expand Up @@ -1348,6 +1389,11 @@ func (e *Endpoint) syncPolicyMapWithDump() error {
}

func (e *Endpoint) startSyncPolicyMapController() {
// Skip the controller if the endpoint has no policy map
if !e.HasBPFPolicyMap() {
return
}

ctrlName := fmt.Sprintf("sync-policymap-%d", e.ID)
e.controllers.CreateController(ctrlName,
controller.ControllerParams{
Expand Down
45 changes: 43 additions & 2 deletions pkg/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/identity/identitymanager"
ippkg "github.com/cilium/cilium/pkg/ip"
ipamOption "github.com/cilium/cilium/pkg/ipam/option"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
"github.com/cilium/cilium/pkg/labels"
Expand Down Expand Up @@ -376,7 +377,8 @@ type Endpoint struct {

allocator cache.IdentityAllocator

isHost bool
isIngress bool
isHost bool

noTrackPort uint16

Expand Down Expand Up @@ -536,6 +538,22 @@ func (e *Endpoint) initDNSHistoryTrigger() {
}
}

// CreateIngressEndpoint creates the endpoint corresponding to Cilium Ingress.
func CreateIngressEndpoint(owner regeneration.Owner, policyGetter policyRepoGetter, namedPortsGetter namedPortsGetter, proxy EndpointProxy, allocator cache.IdentityAllocator) (*Endpoint, error) {
ep := createEndpoint(owner, policyGetter, namedPortsGetter, proxy, allocator, 0, "")
ep.DatapathConfiguration = NewDatapathConfiguration()

ep.isIngress = true
// node.GetIngressIPv4 has been parsed with net.ParseIP() and may be in IPv4 mapped IPv6
// address format. Use ippkg.AddrFromIP() to make sure we get a plain IPv4 address.
ep.IPv4, _ = ippkg.AddrFromIP(node.GetIngressIPv4())
ep.IPv6, _ = netip.AddrFromSlice(node.GetIngressIPv6())

ep.setState(StateWaitingForIdentity, "Ingress Endpoint creation")

return ep, nil
}

// CreateHostEndpoint creates the endpoint corresponding to the host.
func CreateHostEndpoint(owner regeneration.Owner, policyGetter policyRepoGetter, namedPortsGetter namedPortsGetter, proxy EndpointProxy, allocator cache.IdentityAllocator) (*Endpoint, error) {
mac, err := link.GetHardwareAddr(defaults.HostDevice)
Expand Down Expand Up @@ -609,6 +627,8 @@ func (e *Endpoint) GetIPv4Address() string {
if !e.IPv4.IsValid() {
return ""
}
// e.IPv4 is assumed to not be an IPv4 mapped IPv6 address, which would be
// formatted like "::ffff:1.2.3.4"
return e.IPv4.String()
}

Expand Down Expand Up @@ -879,8 +899,10 @@ func parseEndpoint(ctx context.Context, owner regeneration.Owner, policyGetter p

// If host label is present, it's the host endpoint.
ep.isHost = ep.HasLabels(labels.LabelHost)
// If Ingress label is present, it's the Ingress endpoint.
ep.isIngress = ep.HasLabels(labels.LabelIngress)

if ep.isHost {
if ep.isHost || ep.isIngress {
// Overwrite datapath configuration with the current agent configuration.
ep.DatapathConfiguration = NewDatapathConfiguration()
}
Expand Down Expand Up @@ -1697,6 +1719,25 @@ func (e *Endpoint) IsInit() bool {
return found && init.Source == labels.LabelSourceReserved
}

// InitWithIngressLabels initializes the endpoint with reserved:ingress.
// It should only be used for the host endpoint.
func (e *Endpoint) InitWithIngressLabels(ctx context.Context, launchTime time.Duration) {
if !e.isIngress {
return
}

epLabels := labels.Labels{}
epLabels.MergeLabels(labels.LabelIngress)

// Give the endpoint a security identity
newCtx, cancel := context.WithTimeout(ctx, launchTime)
defer cancel()
e.UpdateLabels(newCtx, epLabels, epLabels, true)
if errors.Is(newCtx.Err(), context.DeadlineExceeded) {
log.WithError(newCtx.Err()).Warning("Timed out while updating security identify for host endpoint")
}
}

// InitWithNodeLabels initializes the endpoint with the known node labels as
// well as reserved:host. It should only be used for the host endpoint.
func (e *Endpoint) InitWithNodeLabels(ctx context.Context, launchTime time.Duration) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/endpoint/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ var (
syncAddressIdentityMappingControllerGroup = controller.NewGroup("sync-address-identity-mapping")
)

// HasBPFPolicyMap returns true if policy map changes should be collected
func (e *Endpoint) HasBPFPolicyMap() bool {
// Ingress Endpoint has no policy maps
return !e.isIngress
}

// GetNamedPort returns the port for the given name.
// Must be called with e.mutex NOT held
func (e *Endpoint) GetNamedPort(ingress bool, name string, proto uint8) uint16 {
Expand Down
19 changes: 19 additions & 0 deletions pkg/endpointmanager/cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,31 @@ type EndpointsLookup interface {

// HostEndpointExists returns true if the host endpoint exists.
HostEndpointExists() bool

// GetIngressEndpoint returns the ingress endpoint.
GetIngressEndpoint() *endpoint.Endpoint

// IngressEndpointExists returns true if the ingress endpoint exists.
IngressEndpointExists() bool
}

type EndpointsModify interface {
// AddEndpoint takes the prepared endpoint object and starts managing it.
AddEndpoint(owner regeneration.Owner, ep *endpoint.Endpoint, reason string) (err error)

// AddIngressEndpoint creates an Endpoint representing Cilium Ingress on this node without a
// corresponding container necessarily existing. This is needed to be able to ingest and
// sync network policies applicable to Cilium Ingress to Envoy.
AddIngressEndpoint(
ctx context.Context,
owner regeneration.Owner,
policyGetter policyRepoGetter,
ipcache *ipcache.IPCache,
proxy endpoint.EndpointProxy,
allocator cache.IdentityAllocator,
reason string,
) error

AddHostEndpoint(
ctx context.Context,
owner regeneration.Owner,
Expand Down
26 changes: 26 additions & 0 deletions pkg/endpointmanager/ingress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package endpointmanager

import (
"github.com/cilium/cilium/pkg/endpoint"
"github.com/cilium/cilium/pkg/labels"
)

// GetIngressEndpoint returns the ingress endpoint.
func (mgr *endpointManager) GetIngressEndpoint() *endpoint.Endpoint {
mgr.mutex.RLock()
defer mgr.mutex.RUnlock()
for _, ep := range mgr.endpoints {
if ep.HasLabels(labels.LabelIngress) {
return ep
}
}
return nil
}

// IngressEndpointExists returns true if the ingress endpoint exists.
func (mgr *endpointManager) IngressEndpointExists() bool {
return mgr.GetIngressEndpoint() != nil
}
23 changes: 23 additions & 0 deletions pkg/endpointmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,29 @@ func (mgr *endpointManager) AddEndpoint(owner regeneration.Owner, ep *endpoint.E
return nil
}

func (mgr *endpointManager) AddIngressEndpoint(
ctx context.Context,
owner regeneration.Owner,
policyGetter policyRepoGetter,
ipcache *ipcache.IPCache,
proxy endpoint.EndpointProxy,
allocator cache.IdentityAllocator,
reason string,
) error {
ep, err := endpoint.CreateIngressEndpoint(owner, policyGetter, ipcache, proxy, allocator)
if err != nil {
return err
}

if err := mgr.AddEndpoint(owner, ep, reason); err != nil {
return err
}

ep.InitWithIngressLabels(ctx, launchTime)

return nil
}

func (mgr *endpointManager) AddHostEndpoint(
ctx context.Context,
owner regeneration.Owner,
Expand Down
4 changes: 4 additions & 0 deletions pkg/policy/l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,10 @@ func (l4 *L4Policy) AccumulateMapChanges(cs CachedSelector, adds, deletes []iden
l4.mutex.RUnlock()

for epPolicy := range users {
// Skip if endpoint has no policy maps
if !epPolicy.PolicyOwner.HasBPFPolicyMap() {
continue
}
// resolve named port
if port == 0 && l4Filter.PortName != "" {
port = epPolicy.PolicyOwner.GetNamedPort(direction == trafficdirection.Ingress, l4Filter.PortName, proto)
Expand Down
1 change: 1 addition & 0 deletions pkg/policy/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type EndpointPolicy struct {
type PolicyOwner interface {
GetID() uint64
LookupRedirectPortLocked(ingress bool, protocol string, port uint16) uint16
HasBPFPolicyMap() bool
GetNamedPort(ingress bool, name string, proto uint8) uint16
PolicyDebug(fields logrus.Fields, msg string)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/policy/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ func (d DummyOwner) LookupRedirectPortLocked(bool, string, uint16) uint16 {
return 4242
}

func (d DummyOwner) HasBPFPolicyMap() bool {
return true
}

func (d DummyOwner) GetNamedPort(ingress bool, name string, proto uint8) uint16 {
return 80
}
Expand Down

0 comments on commit 04f19e9

Please sign in to comment.