Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support policy matching against kube-apiserver #17823

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b798244
daemon: Remove code marked for removal
christarazi Oct 5, 2021
d9603bc
policy: Fix incorrect godoc comment
christarazi Oct 21, 2021
266dcab
testutils: Reimplement mock identity allocator
christarazi Oct 28, 2021
000e1ee
labels: Provide Has() and Remove()
christarazi Oct 7, 2021
ed91844
labels: Define constant labels for reserved identities
christarazi Oct 29, 2021
54b6287
identity: Introduce lock around reserved ID cache
christarazi Oct 20, 2021
26ac817
identity: Add AddReservedIdentityWithLabels
christarazi Oct 20, 2021
b97cea7
identity: Rework IterateReservedIdentities()
christarazi Oct 20, 2021
7acaca5
identity: Hide and protect ReservedIdentityCache
joestringer Nov 16, 2021
8fc786b
identity, labels: Add kube-apiserver identity and label
christarazi Oct 5, 2021
a62e9ff
ipcache: Introduce CIDR -> label mapping
christarazi Oct 5, 2021
02f87bd
ipcache: Refactor CIDR identity allocation to accept labels from caller
christarazi Oct 23, 2021
fc97395
daemon, endpoint: Remove GetPolicyRepository from regeneration.Owner
christarazi Oct 13, 2021
30cca76
daemon, policy: Refactor policy updater trigger
christarazi Oct 13, 2021
cee7f99
source: Add KubeAPIServer source
christarazi Oct 15, 2021
5a11499
identity/cache: Export check for local IDs being initialized
christarazi Oct 25, 2021
c9bf53e
ipcache, policy: Inject labels from identity metadata
christarazi Oct 11, 2021
b1cf1c9
k8s/watchers: Push kube-apiserver IPs into IDMD
christarazi Oct 5, 2021
f66760d
watchers, node/manager: Push Node IPs to IDMD
christarazi Oct 10, 2021
985bc60
bpf: Refactor node identity checking
joestringer Oct 21, 2021
cac52f3
bpf: Add kube-apiserver identity
joestringer Oct 22, 2021
ecb6d2e
k8s, policy/api: Add EntityKubeAPIServer for selecting kube-apiserver
christarazi Nov 5, 2021
abdc4b4
endpoint: Rename prg -> policyGetter
joestringer Nov 10, 2021
b83b9d4
ipcache: Hide IdentityMetadata
joestringer Nov 11, 2021
8b5ae5e
ipcache: Improve IdentityMetadata description
joestringer Nov 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 2 additions & 4 deletions bpf/bpf_lxc.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ static __always_inline int ipv6_l3_from_lxc(struct __ctx_buff *ctx,
encrypt_key = get_min_encrypt_key(info->key);
#ifdef ENABLE_WIREGUARD
if (info->tunnel_endpoint != 0 &&
info->sec_label != HOST_ID &&
info->sec_label != REMOTE_NODE_ID)
!identity_is_node(info->sec_label))
dst_remote_ep = true;
#endif /* ENABLE_WIREGUARD */
} else {
Expand Down Expand Up @@ -631,8 +630,7 @@ static __always_inline int handle_ipv4_from_lxc(struct __ctx_buff *ctx,
* in the code in the same place where we handle IPSec.
*/
if (info->tunnel_endpoint != 0 &&
info->sec_label != HOST_ID &&
info->sec_label != REMOTE_NODE_ID)
!identity_is_node(info->sec_label))
dst_remote_ep = true;
#endif /* ENABLE_WIREGUARD */
} else {
Expand Down
5 changes: 3 additions & 2 deletions bpf/bpf_sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "lib/common.h"
#include "lib/lb.h"
#include "lib/eps.h"
#include "lib/identity.h"
#include "lib/metrics.h"

#define SYS_REJECT 0
Expand Down Expand Up @@ -261,7 +262,7 @@ sock4_wildcard_lookup(struct lb4_key *key __maybe_unused,

info = ipcache_lookup4(&IPCACHE_MAP, key->address, V4_CACHE_KEY_LEN);
if (info != NULL && (info->sec_label == HOST_ID ||
(include_remote_hosts && info->sec_label == REMOTE_NODE_ID)))
(include_remote_hosts && identity_is_remote_node(info->sec_label))))
goto wildcard_lookup;

return NULL;
Expand Down Expand Up @@ -746,7 +747,7 @@ sock6_wildcard_lookup(struct lb6_key *key __maybe_unused,

info = ipcache_lookup6(&IPCACHE_MAP, &key->address, V6_CACHE_KEY_LEN);
if (info != NULL && (info->sec_label == HOST_ID ||
(include_remote_hosts && info->sec_label == REMOTE_NODE_ID)))
(include_remote_hosts && identity_is_remote_node(info->sec_label))))
goto wildcard_lookup;

return NULL;
Expand Down
4 changes: 3 additions & 1 deletion bpf/lib/egress_policies.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#ifndef __LIB_EGRESS_POLICIES_H_
#define __LIB_EGRESS_POLICIES_H_

#include "lib/identity.h"

#ifdef ENABLE_EGRESS_GATEWAY
/* is_cluster_destination returns true if the given destination is part of the
* cluster. It uses the ipcache and endpoint maps information.
Expand All @@ -20,7 +22,7 @@ is_cluster_destination(struct iphdr *ip4, __u32 dst_id, __u32 tunnel_endpoint)
/* If the destination is a Cilium-managed node (remote or local), it's
* part of the cluster.
*/
if (dst_id == REMOTE_NODE_ID || dst_id == HOST_ID)
if (identity_is_node(dst_id))
return true;

/* Use the endpoint map to know if the destination is a local endpoint.
Expand Down
31 changes: 29 additions & 2 deletions bpf/lib/identity.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,33 @@
#ifndef __LIB_IDENTITY_H_
#define __LIB_IDENTITY_H_

#include "drop.h"
#include "dbg.h"

static __always_inline bool identity_is_remote_node(__u32 identity)
{
/* KUBE_APISERVER_NODE_ID is the reserved identity that corresponds to
* the labels 'reserved:remote-node' and 'reserved:kube-apiserver'. As
* such, if it is ever used for determining the identity of a node in
* the cluster, then routing decisions and so on should be made the
* same way as for REMOTE_NODE_ID. If we ever assign unique identities
* to each node in the cluster, then we'll probably need to convert
* the implementation here into a map to select any of the possible
* identities. But for now, this is good enough to capture the notion
* of 'remote nodes in the cluster' for routing decisions.
*
* Note that kube-apiserver policy is handled entirely separately by
* the standard policymap enforcement logic and has no relationship to
* the identity as used here. If the apiserver is outside the cluster,
* then the KUBE_APISERVER_NODE_ID case should not ever be hit.
*/
return identity == REMOTE_NODE_ID || identity == KUBE_APISERVER_NODE_ID;
}

static __always_inline bool identity_is_node(__u32 identity)
{
return identity == HOST_ID || identity_is_remote_node(identity);
}
pchaigno marked this conversation as resolved.
Show resolved Hide resolved

/**
* identity_is_reserved is used to determine whether an identity is one of the
* reserved identities that are not handed out to endpoints.
Expand All @@ -16,6 +40,7 @@
* - ReservedIdentityHost
* - ReservedIdentityWorld
* - ReservedIdentityRemoteNode
* - ReservedIdentityKubeAPIServer
*
* The following identities are given to endpoints so return false for these:
* - ReservedIdentityUnmanaged
Expand All @@ -26,9 +51,10 @@
*/
static __always_inline bool identity_is_reserved(__u32 identity)
{
return identity < UNMANAGED_ID || identity == REMOTE_NODE_ID;
return identity < UNMANAGED_ID || identity_is_remote_node(identity);
}

#if __ctx_is == __ctx_skb
static __always_inline bool inherit_identity_from_host(struct __ctx_buff *ctx,
__u32 *identity)
{
Expand Down Expand Up @@ -65,5 +91,6 @@ static __always_inline bool inherit_identity_from_host(struct __ctx_buff *ctx,

return from_proxy;
}
#endif /* __ctx_is == __ctx_skb */

#endif
4 changes: 2 additions & 2 deletions bpf/lib/maps.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ struct {
#endif /* ENABLE_EGRESS_GATEWAY */

#ifndef SKIP_CALLS_MAP
static __always_inline void ep_tail_call(struct __ctx_buff *ctx,
const __u32 index)
static __always_inline void ep_tail_call(struct __ctx_buff *ctx __maybe_unused,
const __u32 index __maybe_unused)
{
tail_call_static(ctx, &CALLS_MAP, index);
}
Expand Down
7 changes: 4 additions & 3 deletions bpf/lib/nodeport.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "conntrack.h"
#include "csum.h"
#include "encap.h"
#include "identity.h"
#include "trace.h"
#include "ghash.h"
#include "pcap.h"
Expand Down Expand Up @@ -1140,10 +1141,10 @@ static __always_inline bool snat_v4_needed(struct __ctx_buff *ctx, __be32 *addr,
* by the remote node if its native dev's
* rp_filter=1.
*/
if (info->sec_label == REMOTE_NODE_ID)
if (identity_is_remote_node(info->sec_label))
return false;
#endif
#if defined(ENABLE_EGRESS_GATEWAY)
#if defined(ENABLE_EGRESS_GATEWAY)
/* Check egress gateway policy only for traffic which matches
* one of the following conditions.
* - Not from a local endpoint (inc. local host): that tells us
Expand All @@ -1154,7 +1155,7 @@ static __always_inline bool snat_v4_needed(struct __ctx_buff *ctx, __be32 *addr,
* would either leave through the tunnel or match the above
* IPV4_SNAT_EXCLUSION_DST_CIDR check.
*/
if (!ep || info->sec_label != REMOTE_NODE_ID) {
if (!ep || !identity_is_remote_node(info->sec_label)) {
struct egress_info *einfo;

/* Check if SNAT needs to be applied to the packet.
Expand Down
1 change: 1 addition & 0 deletions bpf/node_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ DEFINE_IPV6(HOST_IP, 0xbe, 0xef, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0xa, 0x
#define INIT_ID 5
#define LOCAL_NODE_ID 6
#define REMOTE_NODE_ID 6
#define KUBE_APISERVER_NODE_ID 7
#define HOST_IFINDEX_MAC { .addr = { 0xce, 0x72, 0xa7, 0x03, 0x88, 0x56 } }
#define NAT46_PREFIX { .addr = { 0xbe, 0xef, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa, 0x0, 0x0, 0x0, 0x0, 0x0 } }
#define NODEPORT_PORT_MIN 30000
Expand Down
8 changes: 7 additions & 1 deletion cilium-health/launch/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
nodeTypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/pidfile"
"github.com/cilium/cilium/pkg/policy"
"github.com/cilium/cilium/pkg/sysctl"

"github.com/containernetworking/plugins/pkg/ns"
Expand Down Expand Up @@ -226,6 +227,7 @@ type EndpointAdder interface {
// cleanup of prior cilium-health endpoint instances.
func LaunchAsEndpoint(baseCtx context.Context,
owner regeneration.Owner,
policyGetter policyRepoGetter,
n *nodeTypes.Node,
mtuConfig mtu.Configuration,
epMgr EndpointAdder,
Expand Down Expand Up @@ -311,7 +313,7 @@ func LaunchAsEndpoint(baseCtx context.Context,
}

// Create the endpoint
ep, err := endpoint.NewEndpointFromChangeModel(baseCtx, owner, proxy, allocator, info)
ep, err := endpoint.NewEndpointFromChangeModel(baseCtx, owner, policyGetter, proxy, allocator, info)
if err != nil {
return nil, fmt.Errorf("Error while creating endpoint model: %s", err)
}
Expand Down Expand Up @@ -366,6 +368,10 @@ func LaunchAsEndpoint(baseCtx context.Context,
return client, nil
}

type policyRepoGetter interface {
GetPolicyRepository() *policy.Repository
}

type routingConfigurer interface {
Configure(ip net.IP, mtu int, compat bool) error
}
55 changes: 14 additions & 41 deletions daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cilium/cilium/pkg/clustermesh"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/counter"
"github.com/cilium/cilium/pkg/crypto/certificatemanager"
"github.com/cilium/cilium/pkg/datapath"
"github.com/cilium/cilium/pkg/datapath/linux/probes"
linuxrouting "github.com/cilium/cilium/pkg/datapath/linux/routing"
Expand All @@ -34,7 +33,6 @@ import (
"github.com/cilium/cilium/pkg/endpoint"
"github.com/cilium/cilium/pkg/endpoint/regeneration"
"github.com/cilium/cilium/pkg/endpointmanager"
"github.com/cilium/cilium/pkg/envoy"
"github.com/cilium/cilium/pkg/eventqueue"
"github.com/cilium/cilium/pkg/fqdn"
"github.com/cilium/cilium/pkg/hubble/observer"
Expand Down Expand Up @@ -103,6 +101,7 @@ type Daemon struct {
svc *service.Service
rec *recorder.Recorder
policy *policy.Repository
policyUpdater *policy.Updater
preFilter datapath.PreFilter

statusCollectMutex lock.RWMutex
Expand All @@ -128,8 +127,7 @@ type Daemon struct {

clustermesh *clustermesh.ClusterMesh

mtuConfig mtu.Configuration
policyTrigger *trigger.Trigger
mtuConfig mtu.Configuration

datapathRegenTrigger *trigger.Trigger

Expand Down Expand Up @@ -368,12 +366,12 @@ func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointma
externalIP,
)

nodeMngr, err := nodemanager.NewManager("all", dp.Node(), ipcache.IPIdentityCache, option.Config)
nodeMngr, err := nodemanager.NewManager("all", dp.Node(), ipcache.IPIdentityCache, option.Config, nil, nil)
if err != nil {
return nil, nil, err
}

identity.IterateReservedIdentities(func(_ string, _ identity.NumericIdentity) {
identity.IterateReservedIdentities(func(_ identity.NumericIdentity, _ *identity.Identity) {
metrics.Identity.Inc()
})
if option.Config.EnableWellKnownIdentities {
Expand Down Expand Up @@ -414,10 +412,11 @@ func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointma
}

d.identityAllocator = NewCachingIdentityAllocator(&d)
d.policy = policy.NewPolicyRepository(d.identityAllocator,
d.identityAllocator.GetIdentityCache(),
certificatemanager.NewManager(option.Config.CertDirectory, k8s.Client()))
d.policy.SetEnvoyRulesFunc(envoy.GetEnvoyHTTPRules)
if err := d.initPolicy(epMgr); err != nil {
return nil, nil, fmt.Errorf("error while initializing policy subsystem: %w", err)
}
nodeMngr = nodeMngr.WithSelectorCacheUpdater(d.policy.GetSelectorCache()) // must be after initPolicy
nodeMngr = nodeMngr.WithPolicyTriggerer(d.policyUpdater) // must be after initPolicy

// Propagate identity allocator down to packages which themselves do not
// have types to which we can add an allocator member.
Expand Down Expand Up @@ -457,6 +456,9 @@ func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointma
option.Config,
)
nd.RegisterK8sNodeGetter(d.k8sWatcher)
// GH-17849: The daemon does not have a reference to the ipcache,
// instead we rely on the global.
ipcache.IPIdentityCache.RegisterK8sSyncedChecker(&d)

d.k8sWatcher.NodeChain.Register(d.endpointManager)
if option.Config.BGPAnnounceLBIP || option.Config.BGPAnnouncePodCIDR {
Expand Down Expand Up @@ -512,31 +514,6 @@ func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointma
bootstrapStats.restore.End(true)
}

t, err := trigger.NewTrigger(trigger.Parameters{
Name: "policy_update",
MetricsObserver: &policyTriggerMetrics{},
MinInterval: option.Config.PolicyTriggerInterval,
TriggerFunc: d.policyUpdateTrigger,
})
if err != nil {
return nil, nil, err
}
d.policyTrigger = t

// Reuse policyTriggerMetrics and PolicyTriggerInterval here since
// this is only triggered by agent configuration changes for now
// and should be counted in policyTriggerMetrics.
regenerationTrigger, err := trigger.NewTrigger(trigger.Parameters{
Name: "datapath-regeneration",
MetricsObserver: &policyTriggerMetrics{},
MinInterval: option.Config.PolicyTriggerInterval,
TriggerFunc: d.datapathRegen,
})
if err != nil {
return nil, nil, err
}
d.datapathRegenTrigger = regenerationTrigger

debug.RegisterStatusObject("k8s-service-cache", &d.k8sWatcher.K8sSvcCache)
debug.RegisterStatusObject("ipam", d.ipam)
debug.RegisterStatusObject("ongoing-endpoint-creations", d.endpointCreations)
Expand Down Expand Up @@ -660,10 +637,6 @@ func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointma
case !option.Config.EnableRemoteNodeIdentity:
msg = fmt.Sprintf("BPF masquerade requires remote node identities (--%s=\"true\").",
option.EnableRemoteNodeIdentity)
// Remove the check after https://github.com/cilium/cilium/issues/12544 is fixed
case option.Config.TunnelingEnabled() && !hasFullHostReachableServices():
msg = fmt.Sprintf("BPF masquerade requires --%s to be fully enabled (TCP and UDP).",
option.EnableHostReachableServices)
case option.Config.EgressMasqueradeInterfaces != "":
msg = fmt.Sprintf("BPF masquerade does not allow to specify devices via --%s (use --%s instead).",
option.EgressMasqueradeInterfaces, option.Devices)
Expand Down Expand Up @@ -957,8 +930,8 @@ func (d *Daemon) bootstrapClusterMesh(nodeMngr *nodemanager.Manager) {

// Close shuts down a daemon
func (d *Daemon) Close() {
if d.policyTrigger != nil {
d.policyTrigger.Shutdown()
if d.policyUpdater != nil {
d.policyUpdater.Shutdown()
}
if d.datapathRegenTrigger != nil {
d.datapathRegenTrigger.Shutdown()
Expand Down
6 changes: 4 additions & 2 deletions daemon/cmd/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1685,8 +1685,10 @@ func runDaemon() {
d.endpointManager.InitHostEndpointLabels(d.ctx)
} else {
log.Info("Creating host endpoint")
if err := d.endpointManager.AddHostEndpoint(d.ctx, d, d.l7Proxy, d.identityAllocator,
"Create host endpoint", nodeTypes.GetName()); err != nil {
if err := d.endpointManager.AddHostEndpoint(
d.ctx, d, d, d.l7Proxy, d.identityAllocator,
"Create host endpoint", nodeTypes.GetName(),
); err != nil {
log.WithError(err).Fatal("Unable to create host endpoint")
}
}
Expand Down
18 changes: 16 additions & 2 deletions daemon/cmd/datapath.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cilium/cilium/pkg/endpointmanager"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/maps/ctmap"
"github.com/cilium/cilium/pkg/maps/egressmap"
Expand Down Expand Up @@ -243,9 +244,13 @@ func (d *Daemon) syncEndpointsAndHostIPs() error {

// Upsert will not propagate (reserved:foo->ID) mappings across the cluster,
// and we specifically don't want to do so.
//
// This upsert will fail with ErrOverwrite continuously as long as the
// EP / CN watcher have found an apiserver IP and upserted it into the
// ipcache. Until then, it is expected to succeed.
ipcache.IPIdentityCache.Upsert(ipIDPair.PrefixString(), nil, hostKey, nil, ipcache.Identity{
ID: ipIDPair.ID,
Source: source.Local,
Source: sourceByIP(ipIDPair.IP.String(), source.Local),
})
}

Expand All @@ -259,13 +264,22 @@ func (d *Daemon) syncEndpointsAndHostIPs() error {
log.Debugf("Removed outdated host ip %s from endpoint map", hostIP)
}

ipcache.IPIdentityCache.Delete(hostIP, source.Local)
ipcache.IPIdentityCache.Delete(hostIP, sourceByIP(hostIP, source.Local))
}
}

return nil
}

func sourceByIP(prefix string, defaultSrc source.Source) source.Source {
if lbls := ipcache.GetIDMetadataByIP(prefix); lbls.Has(
labels.LabelKubeAPIServer[labels.IDNameKubeAPIServer],
) {
return source.KubeAPIServer
}
return defaultSrc
}

// initMaps opens all BPF maps (and creates them if they do not exist). This
// must be done *before* any operations which read BPF maps, especially
// restoring endpoints and services.
Expand Down