Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

daemon: Perform early (partial) local node info initialization #24866

Merged
merged 4 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions daemon/cmd/cells.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ var (
// observing changes to it.
node.LocalNodeStoreCell,

// Provide a LocalNodeInitializer that is invoked when LocalNodeStore is started.
// This fills in the initial state before it is accessed by other sub-systems.
cell.Provide(newLocalNodeInitializer),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't the Local Node initialize itself?

Copy link
Contributor Author

@joamaki joamaki Apr 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want that logic to be separate of the LocalNodeStore itself to keep things non-cyclic. Having the initializer in daemon/cmd makes most sense as that's where we're in the best position to depend on different sub-systems to initialize this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgive my ignorance here, is that a general long term principle or is this more to do with the need for intermediate solutions given the current structure with most logic being started in newDaemon()? Is this for testing purposes or there's some other principle here?

Do you think it makes sense to add wording something like the following to the hive guidelines?

Common resources for the agent should be exposed through a dedicated "observable" Cell. This Cell serves as the focal point for all subsequent Cells to depend upon when they need to be able to respond to updates for the type exposed by this observable. Separately, each input that may update this observable (including initializers) should exist in its own Cell, and be passed as an input parameter to the observable Cell.

This design enforces that initializer logic can be independently tested, as well as ensuring that any modules that rely on the resource are written in a way that can reliably handle dynamic updates of any input resources.

Copy link
Contributor Author

@joamaki joamaki May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any general principles in play here. Sorry if I gave that impression. The initialization of this node local information is complicated as the different sources overlap and I don't think it makes sense to go overly modular here (though I did first think about making initializers group values). It's much easier to follow if we keep the initialization linear and located to one file and resolve "merge conflicts" there. It does mean we have this central initializer, but I don't really see other good options.

To make this more concrete: we're essentially merging Node, CiliumNode, cilium_host IP, IPsec key, WireGuard key, IngressIP (and maybe few others) into a single types.Node struct. Many of these overlap and so we can't allow these "initializations" to proceed in a random or even dependency-driven order.


// Shared resources provide access to k8s resources as event streams or as
// read-only stores.
agentK8s.ResourcesCell,
Expand Down
111 changes: 57 additions & 54 deletions daemon/cmd/ciliumendpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cilium/cilium/pkg/k8s/types"
"github.com/cilium/cilium/pkg/k8s/watchers"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/node"
)

var (
Expand Down Expand Up @@ -126,66 +127,68 @@ func Test_cleanStaleCEP(t *testing.T) {
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
d := Daemon{
k8sWatcher: &watchers.K8sWatcher{},
}
node.WithTestLocalNodeStore(func() {
assert := assert.New(t)
d := Daemon{
k8sWatcher: &watchers.K8sWatcher{},
}

fakeClient := fake.NewSimpleClientset()
fakeClient.PrependReactor("create", "ciliumendpoints", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
cep := action.(k8stesting.CreateAction).GetObject().(*ciliumv2.CiliumEndpoint)
return true, cep, nil
}))
fakeClient.PrependReactor("get", "ciliumendpoints", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
if !test.enableCES {
assert.Fail("unexpected get on ciliumendpoints in CEP mode, expected only in CES mode")
}
name := action.(k8stesting.GetAction).GetName()
ns := action.(k8stesting.GetActionImpl).Namespace
cep, ok := test.apiserverCEPs[ns+"/"+name]
if !ok {
return true, nil, fmt.Errorf("not found")
}
return true, cep, nil
}))
cepStore := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{
"localNode": watchers.CreateCiliumEndpointLocalPodIndexFunc(), // empty nodeIP means this will index all nodes.
})
ciliumEndpointSlicesStore := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{
"localNode": watchers.CreateCiliumEndpointSliceLocalPodIndexFunc(), // empty nodeIP means this will index all nodes.
})

fakeClient := fake.NewSimpleClientset()
fakeClient.PrependReactor("create", "ciliumendpoints", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
cep := action.(k8stesting.CreateAction).GetObject().(*ciliumv2.CiliumEndpoint)
return true, cep, nil
}))
fakeClient.PrependReactor("get", "ciliumendpoints", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
if !test.enableCES {
assert.Fail("unexpected get on ciliumendpoints in CEP mode, expected only in CES mode")
for _, ces := range test.ciliumEndpointSlices {
ciliumEndpointSlicesStore.Add(ces.DeepCopy())
}
name := action.(k8stesting.GetAction).GetName()
ns := action.(k8stesting.GetActionImpl).Namespace
cep, ok := test.apiserverCEPs[ns+"/"+name]
if !ok {
return true, nil, fmt.Errorf("not found")
for _, cep := range test.ciliumEndpoints {
_, err := fakeClient.CiliumV2().CiliumEndpoints(cep.Namespace).Create(context.Background(), &ciliumv2.CiliumEndpoint{
ObjectMeta: metav1.ObjectMeta{
Name: cep.Name,
Namespace: cep.Namespace,
},
}, metav1.CreateOptions{})
assert.NoError(err)
cepStore.Add(cep.DeepCopy())
}
return true, cep, nil
}))
cepStore := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{
"localNode": watchers.CreateCiliumEndpointLocalPodIndexFunc(), // empty nodeIP means this will index all nodes.
})
ciliumEndpointSlicesStore := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{
"localNode": watchers.CreateCiliumEndpointSliceLocalPodIndexFunc(), // empty nodeIP means this will index all nodes.
})
d.k8sWatcher.SetIndexer("ciliumendpoint", cepStore)
d.k8sWatcher.SetIndexer("ciliumendpointslice", ciliumEndpointSlicesStore)
l := &lock.Mutex{}
var deletedSet []string
fakeClient.PrependReactor("delete", "ciliumendpoints", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
l.Lock()
defer l.Unlock()
a := action.(k8stesting.DeleteAction)
deletedSet = append(deletedSet, fmt.Sprintf("%s/%s", a.GetNamespace(), a.GetName()))
return true, nil, nil
}))

for _, ces := range test.ciliumEndpointSlices {
ciliumEndpointSlicesStore.Add(ces.DeepCopy())
}
for _, cep := range test.ciliumEndpoints {
_, err := fakeClient.CiliumV2().CiliumEndpoints(cep.Namespace).Create(context.Background(), &ciliumv2.CiliumEndpoint{
ObjectMeta: metav1.ObjectMeta{
Name: cep.Name,
Namespace: cep.Namespace,
},
}, metav1.CreateOptions{})
assert.NoError(err)
cepStore.Add(cep.DeepCopy())
}
d.k8sWatcher.SetIndexer("ciliumendpoint", cepStore)
d.k8sWatcher.SetIndexer("ciliumendpointslice", ciliumEndpointSlicesStore)
l := &lock.Mutex{}
var deletedSet []string
fakeClient.PrependReactor("delete", "ciliumendpoints", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
l.Lock()
defer l.Unlock()
a := action.(k8stesting.DeleteAction)
deletedSet = append(deletedSet, fmt.Sprintf("%s/%s", a.GetNamespace(), a.GetName()))
return true, nil, nil
}))

epm := &fakeEPManager{test.managedEndpoints}
epm := &fakeEPManager{test.managedEndpoints}

err := d.cleanStaleCEPs(context.Background(), epm, fakeClient.CiliumV2(), test.enableCES)
err := d.cleanStaleCEPs(context.Background(), epm, fakeClient.CiliumV2(), test.enableCES)

assert.NoError(err)
assert.ElementsMatch(test.expectedDeletedSet, deletedSet)
assert.NoError(err)
assert.ElementsMatch(test.expectedDeletedSet, deletedSet)
})
})
}
}
Expand Down
15 changes: 9 additions & 6 deletions daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type Daemon struct {

// nodeDiscovery defines the node discovery logic of the agent
nodeDiscovery *nodediscovery.NodeDiscovery
nodeLocalStore node.LocalNodeStore
nodeLocalStore *node.LocalNodeStore

// ipam is the IP address manager of the agent
ipam *ipam.IPAM
Expand Down Expand Up @@ -1154,11 +1154,14 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup, params *daemonParams
logfields.V6CiliumHostIP: node.GetIPv6Router(),
}).Info("Annotating k8s node")

_, err := k8s.AnnotateNode(
params.Clientset,
nodeTypes.GetName(),
d.nodeLocalStore.Get().Node,
encryptKeyID)
latestLocalNode, err := d.nodeLocalStore.Get(ctx)
if err == nil {
_, err = k8s.AnnotateNode(
params.Clientset,
nodeTypes.GetName(),
latestLocalNode.Node,
encryptKeyID)
}
if err != nil {
log.WithError(err).Warning("Cannot annotate k8s node with CIDR range")
}
Expand Down
26 changes: 1 addition & 25 deletions daemon/cmd/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package cmd
import (
"context"
"fmt"
"net"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -1413,29 +1412,6 @@ func initEnv() {
}
}

// If there is one device specified, use it to derive better default
// allocation prefixes
node.InitDefaultPrefix(option.Config.DirectRoutingDevice)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see these bits are being removed from initEnv() (run before agentHive.Run()) and moved later in the initialization order. How late are they moving? How do we know this is safe? I didn't see any discussion about this risk in the commit message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're not really moving later. LocalNodeStore is dependency of the daemon cell so it gets initialized before it and there are nothing else depending on these.

Copy link
Contributor Author

@joamaki joamaki Apr 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though this change is not necessary in this PR, I can move them back for the time being.

EDIT: Ah no, this cannot be reverted since this code uses the node package setters and those are not usable before LocalNodeStore starts, so they have to move.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess from the hive output, the InitDefaultPreifx() call (now InitLocalNode() -> SetDefaultPrefix()) really just moving after start of pprof, gops, k8s-client and cni-config. Is that how to read this?

$ go run ./daemon hive                                                                                                                                                                                             
level=info msg="Memory available for map entries (0.003% of 33378533376B): 83446333B" subsys=config                                                                                                                                                                    
level=info msg="option bpf-ct-global-tcp-max set by dynamic sizing to 292794" subsys=config                                                                                                                                                                            
level=info msg="option bpf-ct-global-any-max set by dynamic sizing to 146397" subsys=config                                                                                                                                                                            
level=info msg="option bpf-nat-global-max set by dynamic sizing to 292794" subsys=config                                                                                                                                                                               
level=info msg="option bpf-neigh-global-max set by dynamic sizing to 292794" subsys=config                                                                                                                                                                             
level=info msg="option bpf-sock-rev-map-max set by dynamic sizing to 146397" subsys=config                                                                                                                                                                             
level=warning msg="Running Cilium with \"kvstore\"=\"\" requires identity allocation via CRDs. Changing identity-allocation-mode to \"crd\"" subsys=config                                                                                                             
Cells:                                                                                                                                                                                                                                                                 
                                                                                                                                                                                                                                                                       
  Ⓜ️ agent (Cilium Agent):                                                                                                                                                                                                                                              
      Ⓜ️ infra (Infrastructure):                                                                                                                                                                                                                                        
          Ⓜ️ pprof (pprof HTTP server to expose runtime profiling data):                                                                                                                                                                                                
              🚧 pprof.newServer (cell.go:67):                                                                                                                                                                                                                         
                  ⇨ hive.Lifecycle, logrus.FieldLogger, pprof.Config                                                                                                                                                                                                   
                  ⇦ pprof.Server                                                                                                                                                                                                                                       
                                                                                                                                                                                                                                                                       
              🛠️ pprof.glob..func1 (cell.go:51): func(pprof.Server)                                                                                                                                                                                                     
                                                                                                                                                                                                                                                                       
          ⚙️ (pprof.Config) {                                                                                                                                                                                                                                           
              Pprof: (bool) false,                                                                                                                                                                                                                                     
              PprofAddress: (string) (len=9) "localhost",                                                                                                                                                                                                              
              PprofPort: (uint16) 6060                                                                                                                                                                                                                                 
          }                                                                                                                                                                                                 
                                                                                                                                                                                                            
                                                                                                                                                                                                            
          Ⓜ️ gops (Gops Agent):                                                                                                                                                                              
              ⚙️ (gops.GopsConfig) {                                                                                                                                                                         
                  GopsPort: (uint16) 9890                                                                                                                                                                   
              }                                                                                                                                                                                             
                                                                                                                                                                                                            
                                                                                                                                                                                                            
              🛠️ gops.registerGopsHooks (cell.go:39): func(hive.Lifecycle, logrus.FieldLogger,                                                                                                               
                gops.GopsConfig)                                                                                                                                                                            
                                                                                                                                                                                                            
          Ⓜ️ k8s-client (Kubernetes Client):                                                                                                                                                                 
              ⚙️ (client.Config) {                                                                                                                                                                           
                  EnableK8s: (bool) true,                                                                                                                                                                   
                  K8sAPIServer: (string) "",                                                                                                                                                                
                  K8sKubeConfigPath: (string) "",                                                     
                  K8sClientQPS: (float32) 0,                                                          
                  K8sClientBurst: (int) 0,                                                            
                  K8sHeartbeatTimeout: (time.Duration) 30s,                                           
                  EnableK8sAPIDiscovery: (bool) false                                                 
              }                                                                                       
                                                                                                      
                                                                                                      
              🚧 client.newClientset (cell.go:112):                                                   
                  ⇨ client.Config, hive.Lifecycle, logrus.FieldLogger                                 
                  ⇦ client.Clientset                                                                  
                                                                                                      
          Ⓜ️ cni-config (CNI configuration manager):                                                   
              ⚙️ (cni.Config) {                                                                        
                  WriteCNIConfWhenReady: (string) "",                                                 
                  ReadCNIConf: (string) "",                                                           
                  CNIChainingMode: (string) (len=4) "none",                                           
                  CNILogFile: (string) (len=30) "/var/run/cilium/cilium-cni.log",                     
                  CNIExclusive: (bool) false                                                          
              }                                                                                       
                                                                                                      
                                                                                                      
              🚧 cni.enableConfigManager (cell.go:58):                                                
                  ⇨ *option.DaemonConfig, cni.Config, hive.Lifecycle, logrus.FieldLogger              
                  ⇦ cni.CNIConfigManager                                                              
                                                                                                      
          🚧 cmd.glob..func1 (cells.go:58):                                                           
              ⇦ *option.DaemonConfig                                                                  
                                                                                                      
      Ⓜ️ controlplane (Control Plane):                                                                 
          Ⓜ️ local-node-store (Provides LocalNodeStore for observing and updating local node info):    
              🚧 node.NewLocalNodeStore (local_node_store.go:69):                                     
                  ⇨ hive.Lifecycle, node.LocalNodeInitializer[optional]                               
                  ⇦ *node.LocalNodeStore                                                              
                                                                                                      
          🚧 cmd.newLocalNodeInitializer (local_node_init.go:53):                                     
              ⇨ *option.DaemonConfig, client.Clientset, k8s.LocalNodeResource                         
              ⇦ node.LocalNodeInitializer

...

If I read that right, it moves this to after the k8s apiserver connection is established.

Then I guess based on the above ordering, the Start() function for the local node store is synchronously run (? I didn't see documentation about synchronous assumption in pkg/hive), which would then mean that we know that the initialization shouldn't functionally change because it's still ensured to be run before we start handling async events for k8s resources in other Cells. Is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's how the start hooks look like:

$ go run ./daemon hive --k8s-api-server localhost:1234
..
Start hooks:

  • gops.registerGopsHooks.func1 (cell.go:44)
  • client.(*compositeClientset).onStart
  • authmap.newAuthMap.func1 (cell.go:28)
  • configmap.newMap.func1 (cell.go:24)
  • datapath.newDatapath.func1 (cells.go:85)
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1.Node].Start
  • node.NewLocalNodeStore.func1 (local_node_store.go:77)
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1.Service].Start
  • *gobgp.diffStore[*github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1.Service].Start
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2.CiliumNode].Start
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1.Namespace].Start
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1.CiliumLoadBalancerIPPool].Start
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2.CiliumIdentity].Start
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2.CiliumNetworkPolicy].Start
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2.CiliumClusterwideNetworkPolicy].Start
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1.CiliumCIDRGroup].Start
  • endpointmanager.newDefaultEndpointManager.func1 (cell.go:185)
  • cmd.newPolicyTrifecta.func1 (policy.go:128)
  • *manager.manager.Start
  • monitor.(*dropMonitor).OnStart
  • *cni.cniConfigManager.Start
  • cmd.newDaemonPromise.func1 (daemon_main.go:1599)
  • utime.initUtimeSync.func1 (cell.go:34)

We just need to worry about the start hooks before node.NewLocalNodeStore accessing anything in pkg/node. This PR drops the default localNode instance in pkg/node/address.go, so any uses of the getters will panic so we'll catch those cases. And when localNode is set (so that pkg/node getters work again), it's is already partially initialized. And once we've gotten rid of the global getters/setters completely this will be fully driven by the dependency to LocalNodeStore.


// Initialize node IP addresses from configuration.
if option.Config.IPv6NodeAddr != "auto" {
if ip := net.ParseIP(option.Config.IPv6NodeAddr); ip == nil {
log.WithField(logfields.IPAddr, option.Config.IPv6NodeAddr).Fatal("Invalid IPv6 node address")
} else {
if !ip.IsGlobalUnicast() {
log.WithField(logfields.IPAddr, ip).Fatal("Invalid IPv6 node address: not a global unicast address")
}
node.SetIPv6(ip)
}
}
if option.Config.IPv4NodeAddr != "auto" {
if ip := net.ParseIP(option.Config.IPv4NodeAddr); ip == nil {
log.WithField(logfields.IPAddr, option.Config.IPv4NodeAddr).Fatal("Invalid IPv4 node address")
} else {
node.SetIPv4(ip)
}
}

k8s.SidecarIstioProxyImageRegexp, err = regexp.Compile(option.Config.SidecarIstioProxyImage)
if err != nil {
log.WithError(err).Fatal("Invalid sidecar-istio-proxy-image regular expression")
Expand Down Expand Up @@ -1601,7 +1577,7 @@ type daemonParams struct {
Clientset k8sClient.Clientset
Datapath datapath.Datapath
WGAgent *wireguard.Agent `optional:"true"`
LocalNodeStore node.LocalNodeStore
LocalNodeStore *node.LocalNodeStore
BGPController *bgpv1.Controller
Shutdowner hive.Shutdowner
Resources agentK8s.Resources
Expand Down
139 changes: 139 additions & 0 deletions daemon/cmd/local_node_init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package cmd

import (
"context"
"fmt"
"net"

k8sLabels "k8s.io/apimachinery/pkg/labels"

agentK8s "github.com/cilium/cilium/daemon/k8s"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/k8s"
"github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/resource"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/node"
"github.com/cilium/cilium/pkg/node/addressing"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/source"
)

type localNodeInitializerParams struct {
cell.In

Config *option.DaemonConfig
Clientset client.Clientset
LocalNode agentK8s.LocalNodeResource
}

// localNodeInitializer performs the bootstrapping of the LocalNodeStore,
// which contains information about the local Cilium node populated from
// configuration and Kubernetes.
type localNodeInitializer struct {
localNodeInitializerParams
}

func (ini *localNodeInitializer) InitLocalNode(ctx context.Context, n *node.LocalNode) error {
n.Source = source.Local

if err := ini.initFromConfig(ctx, n); err != nil {
return err
}

if err := ini.initFromK8s(ctx, n); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this aspect is pulling a k8s dependency in much earlier, or node initialization much later in the initialization process. From the commit message that seems somewhat intentional. I guess this raises the same question I had earlier, how do we reason about the initialization ordering?

Related, I see that newDaemon has the steps for node discovery and `k8s.waitForNodeInformation(), how do those aspects line up with this initializer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WaitForNodeInformation was kept as-is except for the node IP, name and labels as those are not dependent on configuration or IPAM mode. Point is to resolve those much earlier so that we can lift more things out from daemon into modules. This makes the initialization safer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it fair to say that "By moving more of this logic behind an Observable Cell, this commit makes progress towards a code structure where the full node object may be initialized before exposing any updates for the node to the other subsystems. This provides stronger guardrails against race conditions where users of the Node objects may inadvertently access the structure before it has been initialized"?

I'm think the core of my question is "why is it safe to move this initialization earlier, will it cause other logic to be reordered?". If I understand, the counter-argument is "well it's not safe today, other modules could attempt to read local node state before it's initialized". So by moving towards a better structure, maybe at some point we can move all of the users of the local node over to relying on the Observable and then we will be able to eliminate this entire class of bugs. 👍

Then my concern about reordering other logic should be a non-issue, the new initFromK8s() logic is just doing a k8s fetch + updating some fields in what's effectively an internal structure. That's not inherently changing any ordering of other logic or triggering any events, it's just partially initializing a structure. So otherwise the current ordering of operations should not change, some data is just available earlier. I guess the one thing that does change here is that now we rely on the watcher for the local node to synchronize with k8s apiserver before continuing startup. That logic is already filtering the watch for only the current node, so it should be relatively quick, so no concerns there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now made this a bit more safer by leaving the localNode global variable used by the getters and setters unset by default and setting it from the LocalNodeStore start hook. This allows catching reordering bugs early, e.g. if we'd modularize something out of daemon which uses e.g. node.GetIPv4 instead of LocalNodeStore it would crash early and thus would give a hint to the author to switch to using LocalNodeStore.

return err
}
return nil
}

func newLocalNodeInitializer(p localNodeInitializerParams) node.LocalNodeInitializer {
return &localNodeInitializer{p}
}

func (ini *localNodeInitializer) initFromConfig(ctx context.Context, n *node.LocalNode) error {
// If there is one device specified, use it to derive better default
// allocation prefixes
node.SetDefaultPrefix(ini.Config, ini.Config.DirectRoutingDevice, n)
joamaki marked this conversation as resolved.
Show resolved Hide resolved

// Initialize node IP addresses from configuration.
if ini.Config.IPv6NodeAddr != "auto" {
if ip := net.ParseIP(ini.Config.IPv6NodeAddr); ip == nil {
return fmt.Errorf("invalid IPv6 node address: %q", ini.Config.IPv6NodeAddr)
} else {
if !ip.IsGlobalUnicast() {
return fmt.Errorf("Invalid IPv6 node address: %q not a global unicast address", ip)
}
n.SetNodeInternalIP(ip)
joestringer marked this conversation as resolved.
Show resolved Hide resolved
}
}
if ini.Config.IPv4NodeAddr != "auto" {
if ip := net.ParseIP(ini.Config.IPv4NodeAddr); ip == nil {
return fmt.Errorf("Invalid IPv4 node address: %q", ini.Config.IPv4NodeAddr)
} else {
n.SetNodeInternalIP(ip)
}
}
return nil
}

func (ini *localNodeInitializer) getK8sLocalNode(ctx context.Context) (*slim_corev1.Node, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for ev := range ini.LocalNode.Events(ctx) {
ev.Done(nil)
if ev.Kind == resource.Upsert {
return ev.Object, nil
}
}
return nil, ctx.Err()
}

func (ini *localNodeInitializer) initFromK8s(ctx context.Context, node *node.LocalNode) error {
if ini.LocalNode == nil {
return nil
}

k8sNode, err := ini.getK8sLocalNode(ctx)
if err != nil {
return err
}
parsedNode := k8s.ParseNode(k8sNode, source.Kubernetes)

// Initialize the fields in local node where the source of truth is in Kubernetes.
// Later stages will deal with updating rest of the fields depending on configuration.
//
// The fields left uninitialized/unrestored here:
// - Cilium internal IPs (restored from cilium_host or allocated by IPAM)
// - Health IPs (allocated by IPAM)
// - Ingress IPs (restored from ipcachemap or allocated)
// - Wireguard key (set by wireguard agent)
// - IPsec key (set by IPsec)
// - alloc CIDRs (depends on IPAM mode; restored from Node or CiliumNode)
// - ClusterID (set by NodeDiscovery)
// - NodeIdentity (always unset)
node.Name = parsedNode.Name
node.Labels = parsedNode.Labels
node.Annotations = parsedNode.Annotations
node.Cluster = parsedNode.Cluster
for _, addr := range parsedNode.IPAddresses {
if addr.Type == addressing.NodeInternalIP {
node.SetNodeInternalIP(addr.IP)
} else if addr.Type == addressing.NodeExternalIP {
node.SetNodeExternalIP(addr.IP)
}
}

if ini.Config.NodeEncryptionOptOutLabels.Matches(k8sLabels.Set(node.Labels)) {
log.WithField(logfields.Selector, ini.Config.NodeEncryptionOptOutLabels).
Infof("Opting out from node-to-node encryption on this node as per '%s' label selector",
option.NodeEncryptionOptOutLabels)
node.OptOutNodeEncryption = true
}

return nil
}