Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

daemon: Perform early (partial) local node info initialization #24866

Merged
merged 4 commits into from Jun 6, 2023

Conversation

joamaki
Copy link
Contributor

@joamaki joamaki commented Apr 13, 2023

As we're moving more code into modules the global variables in pkg/node
are becoming increasingly dangerous. These were populated as part of the
newDaemon() and as such were not accessible before daemon is started.

The first three commits do bit of cleanup around the node resource (switching to slim)
and removes the singleton hack of LocalNodeStore and switches to the proper cell.

The last commit moves the config and k8s initialization into a LocalNodeStore
initializer function, thus making the node IP, name and labels available
to modules outside daemon. To validate this initialization the test
test/controlplane/node/localnode.go was added.

Fields still initialized in daemon are:

  • Cilium internal IPs (restored from cilium_host or allocated by IPAM)
  • Health IPs (allocated by IPAM)
  • Ingress IPs (restored from ipcache or allocated)
  • Wireguard key (set by wireguard agent)
  • IPsec key (set by IPsec)
  • alloc CIDRs (depends on IPAM mode; restored from Node or CiliumNode)
  • ClusterID (set by NodeDiscovery)

@maintainer-s-little-helper maintainer-s-little-helper bot added the dont-merge/needs-release-note-label The author needs to describe the release impact of these changes. label Apr 13, 2023
@joamaki joamaki added the release-note/misc This PR makes changes that have no direct user impact. label Apr 17, 2023
@maintainer-s-little-helper maintainer-s-little-helper bot removed the dont-merge/needs-release-note-label The author needs to describe the release impact of these changes. label Apr 17, 2023
@joamaki joamaki force-pushed the pr/joamaki/k8s-node-info-init branch from 2ec9ca0 to ac4adc3 Compare April 17, 2023 12:39
@joamaki joamaki changed the title DRAFT: Early local node initialization daemon: Perform early (partial) local node info initialization Apr 17, 2023
@joamaki
Copy link
Contributor Author

joamaki commented Apr 17, 2023

/test

@joamaki
Copy link
Contributor Author

joamaki commented Apr 18, 2023

#24677 already depends on these changes as it requires access to the K8s NodeIP before daemon module starts.

@joamaki
Copy link
Contributor Author

joamaki commented Apr 18, 2023

/test

Job 'Cilium-PR-K8s-1.25-kernel-5.4' failed:

Click to show.

Test Name

K8sDatapathConfig Host firewall With VXLAN and endpoint routes

Failure Output

FAIL: Error deleting resource /home/jenkins/workspace/Cilium-PR-K8s-1.25-kernel-5.4/src/github.com/cilium/cilium/test/k8s/manifests/host-policies.yaml: Cannot retrieve "cilium-vzz7c"'s policy revision: cannot get policy revision: ""

Jenkins URL: https://jenkins.cilium.io/job/Cilium-PR-K8s-1.25-kernel-5.4/117/

If it is a flake and a GitHub issue doesn't already exist to track it, comment /mlh new-flake Cilium-PR-K8s-1.25-kernel-5.4 so I can create one.

Then please upload the Jenkins artifacts to that issue.

@joamaki joamaki marked this pull request as ready for review April 18, 2023 10:40
@joamaki joamaki requested review from a team as code owners April 18, 2023 10:40
Copy link
Contributor

@lmb lmb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the idea around the FIXME comments? These are things that need to be addressed before merging, or after?

pkg/node/local_node_store.go Outdated Show resolved Hide resolved
pkg/bgp/fence/fence.go Outdated Show resolved Hide resolved
pkg/bgp/fence/fence.go Outdated Show resolved Hide resolved
daemon/cmd/daemon.go Outdated Show resolved Hide resolved
pkg/k8s/init.go Outdated Show resolved Hide resolved
pkg/k8s/init_test.go Outdated Show resolved Hide resolved
test/controlplane/node/localnode.go Outdated Show resolved Hide resolved
test/controlplane/node/localnode.go Show resolved Hide resolved
@joamaki
Copy link
Contributor Author

joamaki commented Apr 19, 2023

What's the idea around the FIXME comments? These are things that need to be addressed before merging, or after?

Yeah that was just sloppy of me. I had forgotten that I didn't address them. Adding a git hook to stop me from doing this in the future. Will address them and ping you for another review round when they're all fixed.

Copy link
Member

@joestringer joestringer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First three commits look good, seems like they could be submitted and merged separately as refactoring / cleanup PRs.

Requesting changes since the FIXMEs etc. for the last commit clearly need to be addressed.

Unfortunately I have more questions than answers in this review, I'm not really sure how to reason about the safety of shuffling some of this logic around. I agree with the premise of the main commit here that local node initialization ordering and how that info is accessed across the daemon is a critical aspect of startup and there's a bunch of danger corresponding to that. The commit message also talks about how some of these were populated as part of newDaemon(), but some of the initialization logic was previously much earlier. I agree it seems like the k8s-related parts are happening earlier, and I didn't quite understand (a) how we ensure node discovery works nicely with that, (b) how that correctly pulls in the k8s node resource dependency, or (c) how we know that it's safe to move some of that k8s-dependent logic earlier or whether that could also trigger other dependency issues where we are assuming that for instance k8s nodes are not synced until after some of the current newDaemon() logic. (Disclaimer: I don't know either, maybe there's nothing or maybe this introduces some race condition on startup. How do we know?) I've started a few threads below for some of these aspects to assist drilling deeper.

// Provide a LocalNodeInitializer that is invoked when LocalNodeStore is started.
// This fills in the initial state before it the node information can be accessed by
// other sub-systems.
cell.Provide(newLocalNodeInitializer),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't the Local Node initialize itself?

Copy link
Contributor Author

@joamaki joamaki Apr 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want that logic to be separate of the LocalNodeStore itself to keep things non-cyclic. Having the initializer in daemon/cmd makes most sense as that's where we're in the best position to depend on different sub-systems to initialize this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgive my ignorance here, is that a general long term principle or is this more to do with the need for intermediate solutions given the current structure with most logic being started in newDaemon()? Is this for testing purposes or there's some other principle here?

Do you think it makes sense to add wording something like the following to the hive guidelines?

Common resources for the agent should be exposed through a dedicated "observable" Cell. This Cell serves as the focal point for all subsequent Cells to depend upon when they need to be able to respond to updates for the type exposed by this observable. Separately, each input that may update this observable (including initializers) should exist in its own Cell, and be passed as an input parameter to the observable Cell.

This design enforces that initializer logic can be independently tested, as well as ensuring that any modules that rely on the resource are written in a way that can reliably handle dynamic updates of any input resources.

Copy link
Contributor Author

@joamaki joamaki May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any general principles in play here. Sorry if I gave that impression. The initialization of this node local information is complicated as the different sources overlap and I don't think it makes sense to go overly modular here (though I did first think about making initializers group values). It's much easier to follow if we keep the initialization linear and located to one file and resolve "merge conflicts" there. It does mean we have this central initializer, but I don't really see other good options.

To make this more concrete: we're essentially merging Node, CiliumNode, cilium_host IP, IPsec key, WireGuard key, IngressIP (and maybe few others) into a single types.Node struct. Many of these overlap and so we can't allow these "initializations" to proceed in a random or even dependency-driven order.

@@ -1383,29 +1382,6 @@ func initEnv() {
log.Fatal("BPF masquerade is not supported for IPv6.")
}

// If there is one device specified, use it to derive better default
// allocation prefixes
node.InitDefaultPrefix(option.Config.DirectRoutingDevice)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see these bits are being removed from initEnv() (run before agentHive.Run()) and moved later in the initialization order. How late are they moving? How do we know this is safe? I didn't see any discussion about this risk in the commit message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're not really moving later. LocalNodeStore is dependency of the daemon cell so it gets initialized before it and there are nothing else depending on these.

Copy link
Contributor Author

@joamaki joamaki Apr 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though this change is not necessary in this PR, I can move them back for the time being.

EDIT: Ah no, this cannot be reverted since this code uses the node package setters and those are not usable before LocalNodeStore starts, so they have to move.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess from the hive output, the InitDefaultPreifx() call (now InitLocalNode() -> SetDefaultPrefix()) really just moving after start of pprof, gops, k8s-client and cni-config. Is that how to read this?

$ go run ./daemon hive                                                                                                                                                                                             
level=info msg="Memory available for map entries (0.003% of 33378533376B): 83446333B" subsys=config                                                                                                                                                                    
level=info msg="option bpf-ct-global-tcp-max set by dynamic sizing to 292794" subsys=config                                                                                                                                                                            
level=info msg="option bpf-ct-global-any-max set by dynamic sizing to 146397" subsys=config                                                                                                                                                                            
level=info msg="option bpf-nat-global-max set by dynamic sizing to 292794" subsys=config                                                                                                                                                                               
level=info msg="option bpf-neigh-global-max set by dynamic sizing to 292794" subsys=config                                                                                                                                                                             
level=info msg="option bpf-sock-rev-map-max set by dynamic sizing to 146397" subsys=config                                                                                                                                                                             
level=warning msg="Running Cilium with \"kvstore\"=\"\" requires identity allocation via CRDs. Changing identity-allocation-mode to \"crd\"" subsys=config                                                                                                             
Cells:                                                                                                                                                                                                                                                                 
                                                                                                                                                                                                                                                                       
  Ⓜ️ agent (Cilium Agent):                                                                                                                                                                                                                                              
      Ⓜ️ infra (Infrastructure):                                                                                                                                                                                                                                        
          Ⓜ️ pprof (pprof HTTP server to expose runtime profiling data):                                                                                                                                                                                                
              🚧 pprof.newServer (cell.go:67):                                                                                                                                                                                                                         
                  ⇨ hive.Lifecycle, logrus.FieldLogger, pprof.Config                                                                                                                                                                                                   
                  ⇦ pprof.Server                                                                                                                                                                                                                                       
                                                                                                                                                                                                                                                                       
              🛠️ pprof.glob..func1 (cell.go:51): func(pprof.Server)                                                                                                                                                                                                     
                                                                                                                                                                                                                                                                       
          ⚙️ (pprof.Config) {                                                                                                                                                                                                                                           
              Pprof: (bool) false,                                                                                                                                                                                                                                     
              PprofAddress: (string) (len=9) "localhost",                                                                                                                                                                                                              
              PprofPort: (uint16) 6060                                                                                                                                                                                                                                 
          }                                                                                                                                                                                                 
                                                                                                                                                                                                            
                                                                                                                                                                                                            
          Ⓜ️ gops (Gops Agent):                                                                                                                                                                              
              ⚙️ (gops.GopsConfig) {                                                                                                                                                                         
                  GopsPort: (uint16) 9890                                                                                                                                                                   
              }                                                                                                                                                                                             
                                                                                                                                                                                                            
                                                                                                                                                                                                            
              🛠️ gops.registerGopsHooks (cell.go:39): func(hive.Lifecycle, logrus.FieldLogger,                                                                                                               
                gops.GopsConfig)                                                                                                                                                                            
                                                                                                                                                                                                            
          Ⓜ️ k8s-client (Kubernetes Client):                                                                                                                                                                 
              ⚙️ (client.Config) {                                                                                                                                                                           
                  EnableK8s: (bool) true,                                                                                                                                                                   
                  K8sAPIServer: (string) "",                                                                                                                                                                
                  K8sKubeConfigPath: (string) "",                                                     
                  K8sClientQPS: (float32) 0,                                                          
                  K8sClientBurst: (int) 0,                                                            
                  K8sHeartbeatTimeout: (time.Duration) 30s,                                           
                  EnableK8sAPIDiscovery: (bool) false                                                 
              }                                                                                       
                                                                                                      
                                                                                                      
              🚧 client.newClientset (cell.go:112):                                                   
                  ⇨ client.Config, hive.Lifecycle, logrus.FieldLogger                                 
                  ⇦ client.Clientset                                                                  
                                                                                                      
          Ⓜ️ cni-config (CNI configuration manager):                                                   
              ⚙️ (cni.Config) {                                                                        
                  WriteCNIConfWhenReady: (string) "",                                                 
                  ReadCNIConf: (string) "",                                                           
                  CNIChainingMode: (string) (len=4) "none",                                           
                  CNILogFile: (string) (len=30) "/var/run/cilium/cilium-cni.log",                     
                  CNIExclusive: (bool) false                                                          
              }                                                                                       
                                                                                                      
                                                                                                      
              🚧 cni.enableConfigManager (cell.go:58):                                                
                  ⇨ *option.DaemonConfig, cni.Config, hive.Lifecycle, logrus.FieldLogger              
                  ⇦ cni.CNIConfigManager                                                              
                                                                                                      
          🚧 cmd.glob..func1 (cells.go:58):                                                           
              ⇦ *option.DaemonConfig                                                                  
                                                                                                      
      Ⓜ️ controlplane (Control Plane):                                                                 
          Ⓜ️ local-node-store (Provides LocalNodeStore for observing and updating local node info):    
              🚧 node.NewLocalNodeStore (local_node_store.go:69):                                     
                  ⇨ hive.Lifecycle, node.LocalNodeInitializer[optional]                               
                  ⇦ *node.LocalNodeStore                                                              
                                                                                                      
          🚧 cmd.newLocalNodeInitializer (local_node_init.go:53):                                     
              ⇨ *option.DaemonConfig, client.Clientset, k8s.LocalNodeResource                         
              ⇦ node.LocalNodeInitializer

...

If I read that right, it moves this to after the k8s apiserver connection is established.

Then I guess based on the above ordering, the Start() function for the local node store is synchronously run (? I didn't see documentation about synchronous assumption in pkg/hive), which would then mean that we know that the initialization shouldn't functionally change because it's still ensured to be run before we start handling async events for k8s resources in other Cells. Is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's how the start hooks look like:

$ go run ./daemon hive --k8s-api-server localhost:1234
..
Start hooks:

  • gops.registerGopsHooks.func1 (cell.go:44)
  • client.(*compositeClientset).onStart
  • authmap.newAuthMap.func1 (cell.go:28)
  • configmap.newMap.func1 (cell.go:24)
  • datapath.newDatapath.func1 (cells.go:85)
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1.Node].Start
  • node.NewLocalNodeStore.func1 (local_node_store.go:77)
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1.Service].Start
  • *gobgp.diffStore[*github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1.Service].Start
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2.CiliumNode].Start
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1.Namespace].Start
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1.CiliumLoadBalancerIPPool].Start
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2.CiliumIdentity].Start
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2.CiliumNetworkPolicy].Start
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2.CiliumClusterwideNetworkPolicy].Start
  • *resource.resource[*github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1.CiliumCIDRGroup].Start
  • endpointmanager.newDefaultEndpointManager.func1 (cell.go:185)
  • cmd.newPolicyTrifecta.func1 (policy.go:128)
  • *manager.manager.Start
  • monitor.(*dropMonitor).OnStart
  • *cni.cniConfigManager.Start
  • cmd.newDaemonPromise.func1 (daemon_main.go:1599)
  • utime.initUtimeSync.func1 (cell.go:34)

We just need to worry about the start hooks before node.NewLocalNodeStore accessing anything in pkg/node. This PR drops the default localNode instance in pkg/node/address.go, so any uses of the getters will panic so we'll catch those cases. And when localNode is set (so that pkg/node getters work again), it's is already partially initialized. And once we've gotten rid of the global getters/setters completely this will be fully driven by the dependency to LocalNodeStore.

daemon/cmd/local_node_init.go Show resolved Hide resolved
return err
}

if err := ini.initFromK8s(ctx, n); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this aspect is pulling a k8s dependency in much earlier, or node initialization much later in the initialization process. From the commit message that seems somewhat intentional. I guess this raises the same question I had earlier, how do we reason about the initialization ordering?

Related, I see that newDaemon has the steps for node discovery and `k8s.waitForNodeInformation(), how do those aspects line up with this initializer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WaitForNodeInformation was kept as-is except for the node IP, name and labels as those are not dependent on configuration or IPAM mode. Point is to resolve those much earlier so that we can lift more things out from daemon into modules. This makes the initialization safer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it fair to say that "By moving more of this logic behind an Observable Cell, this commit makes progress towards a code structure where the full node object may be initialized before exposing any updates for the node to the other subsystems. This provides stronger guardrails against race conditions where users of the Node objects may inadvertently access the structure before it has been initialized"?

I'm think the core of my question is "why is it safe to move this initialization earlier, will it cause other logic to be reordered?". If I understand, the counter-argument is "well it's not safe today, other modules could attempt to read local node state before it's initialized". So by moving towards a better structure, maybe at some point we can move all of the users of the local node over to relying on the Observable and then we will be able to eliminate this entire class of bugs. 👍

Then my concern about reordering other logic should be a non-issue, the new initFromK8s() logic is just doing a k8s fetch + updating some fields in what's effectively an internal structure. That's not inherently changing any ordering of other logic or triggering any events, it's just partially initializing a structure. So otherwise the current ordering of operations should not change, some data is just available earlier. I guess the one thing that does change here is that now we rely on the watcher for the local node to synchronize with k8s apiserver before continuing startup. That logic is already filtering the watch for only the current node, so it should be relatively quick, so no concerns there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now made this a bit more safer by leaving the localNode global variable used by the getters and setters unset by default and setting it from the LocalNodeStore start hook. This allows catching reordering bugs early, e.g. if we'd modularize something out of daemon which uses e.g. node.GetIPv4 instead of LocalNodeStore it would crash early and thus would give a hint to the author to switch to using LocalNodeStore.

@joamaki joamaki force-pushed the pr/joamaki/k8s-node-info-init branch from d12328c to 2c95fff Compare April 25, 2023 08:59
@joamaki joamaki requested a review from a team as a code owner April 25, 2023 08:59
// runs the given test. Afterwards the 'localNode' is restored to nil.
// This is a temporary workaround for tests until the LocalNodeStoreCell can be
// used.
func WithTestLocalNodeStore(runTest func()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying this approach out as having a default usable instance is scary since any calls to node.Set* before LocalNodeStore is started will be thrown out. It's better to have silly wrappers in tests than have bugs in the production code.

@joestringer joestringer dismissed their stale review April 25, 2023 21:22

I think I've convinced myself that the startup reordering isn't likely to introduce additional problems. Overall the direction is good.

@joamaki joamaki requested a review from a team as a code owner May 29, 2023 13:33
@joamaki joamaki force-pushed the pr/joamaki/k8s-node-info-init branch from f1a29ec to ccd0542 Compare May 29, 2023 13:35
@joamaki
Copy link
Contributor Author

joamaki commented May 29, 2023

/test

@joamaki joamaki force-pushed the pr/joamaki/k8s-node-info-init branch from ccd0542 to b798e9e Compare May 29, 2023 14:17
@joamaki
Copy link
Contributor Author

joamaki commented May 30, 2023

/test

Job 'Cilium-PR-K8s-1.16-kernel-4.19' failed:

Click to show.

Test Name

K8sAgentPolicyTest Multi-node policy test with L7 policy using connectivity-check to check datapath

Failure Output

FAIL: cannot install connectivity-check

Jenkins URL: https://jenkins.cilium.io/job/Cilium-PR-K8s-1.16-kernel-4.19/236/

If it is a flake and a GitHub issue doesn't already exist to track it, comment /mlh new-flake Cilium-PR-K8s-1.16-kernel-4.19 so I can create one.

Then please upload the Jenkins artifacts to that issue.

Copy link
Member

@joestringer joestringer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly minor nits remaining, it seems like my prior concern about ordering is no longer an issue. I dug into pkg/stream a little more this time and noted some areas where documentation could be improved, and I asked a few questions about the behaviour of pkg/stream based on this, but they don't seem to be consequential.

I mostly skipped over test code files, assuming other owners take a closer look there.

}

return s.value
n, _ := stream.First[LocalNode](context.Background(), s)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I see a couple of assumptions here which are not checked (probably for good reasons and I'm just unfamiliar with the constructs):

  • stream.First gets just the first element, implying there is only ever one. I guess that stream.Multicast(stream.EmitLatest) enforces that there is only ever one element for the type(?). I don't quite follow how updates interact with this, but maybe that's just my lack of understanding. Part of me wonders whether a (new) stream.Only[...](...) could help to provide stronger guarantees to assert that there's only one, and raise an error/log if not. Implication I might be concerned about would be if stream.First() emitted stale data.
  • There can never be errors (err is assigned to _). By the looks this is true given that the lifecycle OnStart() will emit() the local node and the only way that any code could call Get() would be to have an initialized handle on the LocalNodeStore object. Maybe this is fine as-is, though a one-liner comment could potentially help to explain why. Any given code like this will likely be sampled/copied as examples by others in the codebase, so my higher level concern would be any future code ignoring error values like this without also explaining why this is OK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, First gets the first element of the stream or fails if context is cancelled or stream completes.
stream.Multicast with EmitLatest will keep the last emitted element to give to any new subscriber, so calling First is effectively just picking whatever the last emitted item is.

To me Only doesn't make any sense here. What are its semantics if the stream has multiple elements? Would it assert that the stream must complete after the initial element? If so then we can't use it here as the stream never completes. We really just want the first element of the stream (e.g. the current latest).

Agreed regarding errors. I'll add a comment about the assumptions we rely on here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream.Multicast with EmitLatest will keep the last emitted element to give to any new subscriber, so calling First is effectively just picking whatever the last emitted item is.

I presume that Last would give the same result? By your explanation above I do find it a bit confusing why we'd use First if the intent is to get the Last element.

To me Only doesn't make any sense here. What are its semantics if the stream has multiple elements? Would it assert that the stream must complete after the initial element? If so then we can't use it here as the stream never completes. We really just want the first element of the stream (e.g. the current latest).

The idea I was thinking about is that in cases like this, if there are somehow multiple elements in the stream, that is violating the expectations of code like this in the local node store. In such a scenario, there's likely some other bug that follows because local node store only ever picks one element, and the other one is ignored. Semantically at emit() or if necessary any time someone calls Only(), if there are multiple elements, it would return some error that could be raised up to the user (ideally actually to the developer) to announce clearly that somehow the expectation of "There's only one element" has been violated. That said, maybe this sort of suggestion is a little too paranoid. It sounds like the EmitLatest flag will guarantee that there's only ever one element, in which case unit tests should ensure that it's impossible to violate the expectation.

Copy link
Contributor Author

@joamaki joamaki Jun 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume that Last would give the same result? By your explanation above I do find it a bit confusing why we'd >
use First if the intent is to get the Last element.

No, Last would block forever as the stream never completes. Last in the context of reactive streams means walk through the stream and give the last seen element before completion.

See: https://rxmarbles.com/#last

The idea I was thinking about is that in cases like this, if there are somehow multiple elements in the stream,
that is violating the expectations of code like this in the local node store. In such a scenario, there's likely some >
other bug that follows because local node store only ever picks one element, and the other one is ignored. >
Semantically at emit() or if necessary any time someone calls Only(), if there are multiple elements, it would return
some error that could be raised up to the user (ideally actually to the developer) to announce clearly that
somehow the expectation of "There's only one element" has been violated. That said, maybe this sort of
suggestion is a little too paranoid. It sounds like the EmitLatest flag will guarantee that there's only ever one
element, in which case unit tests should ensure that it's impossible to violate the expectation.

EmitLatest means that when a new observer subscribes give it the last seen element. Without this option Multicast would not give anything to the subscriber until the next emit call. See: https://github.com/cilium/cilium/blob/main/pkg/stream/sources.go#L197. I can see now that EmitLatest is a bit confusing. Any suggestions for a better name?

I'm not sure where your impression that "there's only one element" comes from? LocalNodeStore emits essentially an infinite stream of new states and when subscribing we immediately give whatever was last emitted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Last would block forever as the stream never completes. Last in the context of reactive streams means walk through the stream and give the last seen element before completion.

Shall we document this in the comment above Last()? The completion aspect of the semantics were not obvious to me from reading the code or code comments. For that matter, if the completion is a key aspect of the semantics, would it be more appropriate to name this Final() or FinalItem()? That said, I recognize that Last reflects the semantics described on rxmarbles. Personally I find "first" and "last" ambiguous when discussing streams given that it depends on the implementation details of the stream and exactly when the "first"/"last" call is made, but we can decide whether to clarify this through naming or code comments or developer education.

For that matter, the return nil semantics should probably also be documented. I guess that depends on whether the stream is Multicast or not, but if not then presumably only one caller can successfully call Last() and the subsequent ones will immediately return nil? Then if it's multicast, any number of callers can call Last() and each one will receive the most recent update prior to completion?

EmitLatest means that when a new observer subscribes give it the last seen element.

By itself, this makes sense to me and the code comments align well, I think this one was just on me to learn more about how it works. I don't have any better suggestions.

I'm not sure where your impression that "there's only one element" comes from? LocalNodeStore emits essentially an infinite stream of new states and when subscribing we immediately give whatever was last emitted.

For the Local node, there is exactly one unique underlying object, the current state of the node. As that object is modified, it emits a stream of updates. As a programmer reasoning about the code, if I pick an arbitrary point in time and assume that there have been four updates, then it is tempting to visualize this as a list [a, b, c, d]. When I call First() on such a list, logically it would return the element at the start of the list, a. Theoretically this may represent stale data. I suspect that the issue is that actually we don't track all updates [a, b, c, d] but rather we elide the updates. When the caller calls First(), First is not first as in "the beginning of all [previous and future] updates", but rather "the beginning of all future updates"? Or perhaps "The current state of the underlying object"? Maybe MostRecent or MostRecentItem would be more obvious?

Related, looking at rxmarbles for first, I see:

image

If I interpret time going from left to right, there are three objects {yellow, orange, green} and there are four updates, {1 (yellow), 2 (orange), 3 (green), 4 (yellow)}, then according to this diagram, First gives update 1. So that's not the most recent update for the yellow object. 🤔

Maybe these semantic changes to the meaning of First() are introduced specifically by the EmitLatest flag(?) -- If so, that's surprising and should be called out in the code comment for First().

Separately, contrasting to the local node case... if I consider updates for k8s Node or CiliumNode resources.. there are N unique underlying objects, each of which corresponds to some data stored in k8s, ultimately backed by another node in the cluster. Each of those objects may be updated independently. Given one stream for these resource updates, presumably if I call First(), it will provide an update for an arbitrary node. There's no guarantee which one it will be for. That implies to me that First() would be difficult to use for any code like for Nodes / CiliumNodes because inherently the stream is backed by multiple unique objects. It's not particularly meaningful to get the "first" or even "most recent" element for such a stream, and any such code probably shouldn't be trying to use First(), since it's missing updates for all the other nodes. By extension, if we were to try to make this code more programmer-safe, I could argue "It's a bug if someone tried to call First() for a stream where the updates correspond to multiple different underlying objects". A step even further would be "we should actively prevent the programmer from writing such bugs". I don't have a strong opinion where on this spectrum we are - prevent these bugs through developer education, through code comments/package API semantics, through review, through active code mechanisms, through testing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my suggestions around MostRecent(), I think the key thing that I was missing is that First() immediately creates a brand new subscription in the multicast stream, starting from the latest event at that point in time, and therefore even if it's calling First() at that time, it's immediately already fast-forwarded to the current state when the call is made. Latest seems perhaps slightly more well-aligned for such semantics, but MostRecent can work or even First() if the code comments clearly describe that it's the First update that happens from this point forward, and ideally mention that when combined with EmitLatest, it is the most recent update that occurred in the past.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think about it, the more I'm inclined to agree that First() is the right naming for it, it would just help to have clearer comments around the semantics with respect to time, with respect to the way the function changes behaviour with different flags, and with respect to blocking behaviour (per the other thread).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for digging through this! My approach with pkg/stream has been to just stick with the "reactive extensions" API as close as I can to "reuse" the established vocabulary (https://reactivex.io/documentation/observable.html). It gets confusing when the semantics of the stream are not that of a e.g. a simple container, but are somehow dependant on subscription time or consume the underlying resource.

pkg/node/local_node_store.go Outdated Show resolved Hide resolved
func (s *localNodeStore) Get() LocalNode {
// Get retrieves the current local node. Use Get() only for inspecting the state,
// e.g. in API handlers. Do not assume the value does not change over time.
// Blocks until the store has been initialized.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is blocking a general property of functions like stream.First(), stream.Last()? I don't see such comments in those functions, but this would be helpful to know there (submitted as another separate PR).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ Side note while trying to answer my own questions here: I got confused for a while searching for the implementation of Observe() to satisfy the Observable interface. It would help to expand the comment above the Observable[] interface in pkg/stream/observable.go to explain the common usage by embedding this field, and that the implementation of the Observe() function is expected to be provided by other types in pkg/stream, provided by FuncObservable. This could potentially help others to explore this code in future and understand the implementation if they intend to dig through and confirm properties like the blocking property above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe coming back to this, it seems like maybe the intent behind the "Blocking" property is that the s.mu will stall while trying to grab the mutex, but it doesn't look like initialization grabs the mutex, so initial calls to Get() might not block (?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking the lock in Get is wrong. If someone calls Get in the background before LocalNodeStore has started it'll block its start hook. While it's not expected that LocalNodeStore is used before it has been started and initialized, I'll still try to make this safe to do so.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say that someone manages to get a LocalNodeStore pointer and calls Get(). Where will the block occur exactly? Presumably that Get() call descends into stream.First() -> s.Observe() -> (???).

Alternatively someone can only get a LocalNodeStore pointer via Hive after the start hook has run, in which case this comment doesn't really apply to this specific function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you get the LocalNodeStore in your constructor and then fork a goroutine that calls Get it'll block until LocalNodeStore start hook runs and it gets initialized and the first emit call is made.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so tracing through:

  • Some code calls Get()
  • Get() calls First()
  • First does some non-blocking operations and then calls src.Observe()
  • Observe() is implemented by the anonymous function in Multicast(), which may grab function-level locks (should not be held) and spins out goroutines, then returns
  • First() provides a next() function to Observe() that grabs the first item and cancels the context immediately. Multicast's Observe() implementation can call this asynchronously.
  • When First() returns from Observe(), it blocks on the completion error channel, which is closed as soon as the first element is emitted (per the function above).
  • The anonymous goroutines from the Observe() call will only close the completion / err channel once the first element is emitted
  • The first element is only emitted by the code calling Multicast()'s returned emit() function, which is only triggered by an OnStart() cell lifecycle hook.

From that, I conclude that pkg/stream:First() function should have in its API description "This function blocks until the first element is emitted into the stream".

That makes me wonder about the io.EOF case in First() though, how can that happen if it blocks until the first element is emitted? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, of course, when creating the stream, a complete() func is also returned, so it's also entirely valid for the caller to just complete without emitting an event.

So if the suggestion above was:

"First() blocks until the first element is emitted into the stream".

Instead it should be something like:

"First() blocks until the first element is emitted into the stream or the stream is completed by the stream owner".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#25922 -- doc improvements here

daemon/cmd/local_node_init.go Show resolved Hide resolved
pkg/node/local_node_store.go Show resolved Hide resolved
@joamaki joamaki force-pushed the pr/joamaki/k8s-node-info-init branch 4 times, most recently from 4485eb3 to f3623a0 Compare June 2, 2023 09:12
@joamaki
Copy link
Contributor Author

joamaki commented Jun 2, 2023

/test

@joamaki joamaki force-pushed the pr/joamaki/k8s-node-info-init branch from f3623a0 to 6254dc4 Compare June 2, 2023 11:01
@joamaki joamaki requested a review from a team as a code owner June 2, 2023 11:01
@joamaki joamaki requested a review from brb June 2, 2023 11:01
@joamaki joamaki force-pushed the pr/joamaki/k8s-node-info-init branch from 6254dc4 to 033034c Compare June 2, 2023 11:34
@joamaki
Copy link
Contributor Author

joamaki commented Jun 2, 2023

/test

To support proper initialization of the LocalNodeStore with
a LocalNodeInitializer remove the default instance and construct
it properly.

Remove the interface type for LocalNodeStore as this implementation
should not need test fakes.

Signed-off-by: Jussi Maki <jussi@isovalent.com>
As we're moving more code into modules the global variables in pkg/node
are becoming increasingly dangerous. These were populated as part of the
newDaemon() and as such were not accessible before daemon is started.

This commit moves the config and k8s initialization into a LocalNodeStore
initializer function, thus making the node IP, name and labels available
to modules outside daemon. To validate this initialization the test
test/controlplane/node/localnode.go was added.

Fields still initialized in daemon are:

- Cilium internal IPs (restored from cilium_host or allocated by IPAM)
- Health IPs (allocated by IPAM)
- Ingress IPs (restored from ipcache or allocated)
- Wireguard key (set by wireguard agent)
- IPsec key (set by IPsec)
- alloc CIDRs (depends on IPAM mode; restored from Node or CiliumNode)
- ClusterID (set by NodeDiscovery)

Signed-off-by: Jussi Maki <jussi@isovalent.com>
Using a default instance is dangerous since any prior calls to node.Set* will
be ignored when LocalNodeStore starts. To avoid this pitfall, add a helper
in node package to temporarily set the 'localNode' global and unset after
the test. This way we'll catch any unintended calls to node.Set* outside tests
at the cost of some extra wrapping in select test cases.

Signed-off-by: Jussi Maki <jussi@isovalent.com>
The 'localNode' variable in pkg/node used by the old getters and setters is
uninitialized until the LocalNodeStore has started. This means any module
wanting to use pkg/node accessors will need to depend on LocalNodeStore
and preferably of course use LocalNodeStore to access or update.

This fixes the issue for wireguard and uses Update to update the WireguardPubKey
and access the OptOutNodeEncryption instead of the global getters.

Signed-off-by: Jussi Maki <jussi@isovalent.com>
@joamaki joamaki force-pushed the pr/joamaki/k8s-node-info-init branch from 033034c to 831cb2e Compare June 5, 2023 08:34
@joamaki
Copy link
Contributor Author

joamaki commented Jun 5, 2023

/test

@joamaki joamaki removed the request for review from ldelossa June 5, 2023 14:20
@joamaki
Copy link
Contributor Author

joamaki commented Jun 5, 2023

ci-l4lb is hitting #25892.

Copy link
Member

@brb brb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pkg/wireguard LGTM, thanks!

@brb
Copy link
Member

brb commented Jun 6, 2023

All reviews are in. Merging to unblock other PRs.

@brb brb merged commit 8735ad6 into cilium:main Jun 6, 2023
60 of 61 checks passed
@joamaki joamaki added the ready-to-merge This PR has passed all tests and received consensus from code owners to merge. label Jun 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/daemon Impacts operation of the Cilium daemon. area/modularization kind/enhancement This would improve or streamline existing functionality. ready-to-merge This PR has passed all tests and received consensus from code owners to merge. release-note/misc This PR makes changes that have no direct user impact.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants