Skip to content

Commit

Permalink
feat: fund nodes using node-operator as they are deployed (#371)
Browse files Browse the repository at this point in the history
* wip(k8s): add pod events watcher and operator cmd

* wip: create working demo

* wip: update event log for demo

* refactor(operator): extract operator to separate pkg and add label to config

* feat(operator): set infinite timout as default

* fix(operator): check funder.Fund error

* chore: update readme.md file and bump version to v0.14.3
  • Loading branch information
gacevicljubisa committed Dec 18, 2023
1 parent 7c87f18 commit 2ad80e3
Show file tree
Hide file tree
Showing 12 changed files with 439 additions and 97 deletions.
243 changes: 158 additions & 85 deletions README.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions cmd/beekeeper/cmd/cmd.go
Expand Up @@ -104,6 +104,10 @@ func newCommand(opts ...option) (c *command, err error) {
return nil, err
}

if err := c.initOperatorCmd(); err != nil {
return nil, err
}

if err := c.initPrintCmd(); err != nil {
return nil, err
}
Expand Down
80 changes: 80 additions & 0 deletions cmd/beekeeper/cmd/operator.go
@@ -0,0 +1,80 @@
package cmd

import (
"context"
"errors"
"time"

"github.com/ethersphere/beekeeper/pkg/config"
"github.com/ethersphere/beekeeper/pkg/operator"
"github.com/spf13/cobra"
)

func (c *command) initOperatorCmd() (err error) {
const (
optionNameNamespace = "namespace"
optionNameChainNodeEndpoint = "geth-url"
optionNameWalletKey = "wallet-key"
optionNameMinNative = "min-native"
optionNameMinSwarm = "min-swarm"
optionNameTimeout = "timeout"
)

cmd := &cobra.Command{
Use: "node-operator",
Short: "scans for scheduled pods and funds them",
Long: `Node operator scans for scheduled pods and funds them using node-funder. beekeeper node-operator`,
RunE: func(cmd *cobra.Command, args []string) (err error) {
cfg := config.NodeFunder{}
namespace := c.globalConfig.GetString(optionNameNamespace)

// chain node endpoint check
if cfg.ChainNodeEndpoint = c.globalConfig.GetString(optionNameChainNodeEndpoint); cfg.ChainNodeEndpoint == "" {
return errors.New("chain node endpoint (geth-url) not provided")
}

// wallet key check
if cfg.WalletKey = c.globalConfig.GetString(optionNameWalletKey); cfg.WalletKey == "" {
return errors.New("wallet key not provided")
}

cfg.MinAmounts.NativeCoin = c.globalConfig.GetFloat64(optionNameMinNative)
cfg.MinAmounts.SwarmToken = c.globalConfig.GetFloat64(optionNameMinSwarm)

// add timeout to operator
// if timeout is not set, operator will run infinitely
var ctxNew context.Context
var cancel context.CancelFunc
timeout := c.globalConfig.GetDuration(optionNameTimeout)
if timeout > 0 {
ctxNew, cancel = context.WithTimeout(cmd.Context(), timeout)
} else {
ctxNew = context.Background()
}
if cancel != nil {
defer cancel()
}

return operator.NewClient(&operator.ClientConfig{
Log: c.log,
Namespace: namespace,
WalletKey: cfg.WalletKey,
ChainNodeEndpoint: cfg.ChainNodeEndpoint,
MinAmounts: cfg.MinAmounts,
K8sClient: c.k8sClient,
}).Run(ctxNew)
},
PreRunE: c.preRunE,
}

cmd.Flags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace to scan for scheduled pods.")
cmd.Flags().String(optionNameChainNodeEndpoint, "", "Endpoint to chain node. Required.")
cmd.Flags().String(optionNameWalletKey, "", "Hex-encoded private key for the Bee node wallet. Required.")
cmd.Flags().Float64(optionNameMinNative, 0, "Minimum amount of chain native coins (xDAI) nodes should have.")
cmd.Flags().Float64(optionNameMinSwarm, 0, "Minimum amount of swarm tokens (xBZZ) nodes should have.")
cmd.Flags().Duration(optionNameTimeout, 0*time.Minute, "Timeout. Default is infinite.")

c.root.AddCommand(cmd)

return nil
}
5 changes: 3 additions & 2 deletions config/config.yaml
Expand Up @@ -81,6 +81,7 @@ node-groups:
ingress-debug-class: "nginx-internal"
labels:
app.kubernetes.io/component: "node"
app.kubernetes.io/name: "bee"
app.kubernetes.io/part-of: "bee"
app.kubernetes.io/version: "latest"
node-selector:
Expand Down Expand Up @@ -310,7 +311,7 @@ checks:
nodes-sync-wait: 1m
duration: 12h
downloader-count: 3
upload-group:
upload-group:
- gateway
download-group:
- bee
Expand Down Expand Up @@ -354,7 +355,7 @@ checks:
options:
ref:
concurrency:
max-attempts:
max-attempts:
type: datadurability

# simulations defines simulations Beekeeper can execute against the cluster
Expand Down
1 change: 1 addition & 0 deletions config/local.yaml
Expand Up @@ -122,6 +122,7 @@ node-groups:
ingress-debug-class: "traefik"
labels:
app.kubernetes.io/component: "node"
app.kubernetes.io/name: "bee"
app.kubernetes.io/part-of: "bee"
app.kubernetes.io/version: "latest"
node-selector:
Expand Down
7 changes: 4 additions & 3 deletions config/testnet-giant.yaml
Expand Up @@ -21,9 +21,9 @@ clusters:
bee-config: bootnode-giant
config: testnet-giant
nodes:
- name: bootnode-0
bootnodes: /dns4/bootnode-0-headless.%s.svc.cluster.local/tcp/1634/p2p/16Uiu2HAm6i4dFaJt584m2jubyvnieEECgqM2YMpQ9nusXfy8XFzL
libp2p-key: '{"address":"aa6675fb77f3f84304a00d5ea09902d8a500364091a457cf21e05a41875d48f7","crypto":{"cipher":"aes-128-ctr","ciphertext":"93effebd3f015f496367e14218cb26d22de8f899e1d7b7686deb6ab43c876ea5","cipherparams":{"iv":"627434462c2f960d37338022d27fc92e"},"kdf":"scrypt","kdfparams":{"n":32768,"r":8,"p":1,"dklen":32,"salt":"a59e72e725fe3de25dd9c55aa55a93ed0e9090b408065a7204e2f505653acb70"},"mac":"dfb1e7ad93252928a7ff21ea5b65e8a4b9bda2c2e09cb6a8ac337da7a3568b8c"},"version":3}'
- name: bootnode-0
bootnodes: /dns4/bootnode-0-headless.%s.svc.cluster.local/tcp/1634/p2p/16Uiu2HAm6i4dFaJt584m2jubyvnieEECgqM2YMpQ9nusXfy8XFzL
libp2p-key: '{"address":"aa6675fb77f3f84304a00d5ea09902d8a500364091a457cf21e05a41875d48f7","crypto":{"cipher":"aes-128-ctr","ciphertext":"93effebd3f015f496367e14218cb26d22de8f899e1d7b7686deb6ab43c876ea5","cipherparams":{"iv":"627434462c2f960d37338022d27fc92e"},"kdf":"scrypt","kdfparams":{"n":32768,"r":8,"p":1,"dklen":32,"salt":"a59e72e725fe3de25dd9c55aa55a93ed0e9090b408065a7204e2f505653acb70"},"mac":"dfb1e7ad93252928a7ff21ea5b65e8a4b9bda2c2e09cb6a8ac337da7a3568b8c"},"version":3}'
bee:
mode: node
bee-config: testnet-giant
Expand Down Expand Up @@ -54,6 +54,7 @@ node-groups:
ingress-debug-class: "nginx-internal"
labels:
app.kubernetes.io/component: "node"
app.kubernetes.io/name: "bee"
app.kubernetes.io/part-of: "bee"
app.kubernetes.io/version: "latest"
node-selector:
Expand Down
5 changes: 3 additions & 2 deletions config/testnet.yaml
Expand Up @@ -51,9 +51,10 @@ node-groups:
ingress-debug-class: "nginx-internal"
labels:
app.kubernetes.io/component: "node"
app.kubernetes.io/name: "bee"
app.kubernetes.io/part-of: "bee"
app.kubernetes.io/version: "latest"
node-selector:
node-selector:
node-group: "private"
persistence-enabled: true
persistence-storage-class: "local-storage"
Expand Down Expand Up @@ -118,4 +119,4 @@ bee-configs:
allow-private-cidrs: true
testnet-light-node:
_inherit: testnet
full-node: false
full-node: false
2 changes: 1 addition & 1 deletion pkg/k8s/k8s.go
Expand Up @@ -130,7 +130,7 @@ func (c *Client) setK8sClient(clientset kubernetes.Interface, apiClientset ingre
c.ConfigMap = configmap.NewClient(clientset)
c.Ingress = ingress.NewClient(clientset)
c.Namespace = namespace.NewClient(clientset)
c.Pods = pod.NewClient(clientset)
c.Pods = pod.NewClient(clientset, c.logger)
c.PVC = persistentvolumeclaim.NewClient(clientset)
c.Secret = secret.NewClient(clientset)
c.ServiceAccount = serviceaccount.NewClient(clientset)
Expand Down
45 changes: 44 additions & 1 deletion pkg/k8s/pod/client.go
Expand Up @@ -4,21 +4,25 @@ import (
"context"
"fmt"

"github.com/ethersphere/beekeeper/pkg/logging"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)

// Client manages communication with the Kubernetes Pods.
type Client struct {
clientset kubernetes.Interface
log logging.Logger
}

// NewClient constructs a new Client.
func NewClient(clientset kubernetes.Interface) *Client {
func NewClient(clientset kubernetes.Interface, log logging.Logger) *Client {
return &Client{
clientset: clientset,
log: log,
}
}

Expand Down Expand Up @@ -68,3 +72,42 @@ func (c *Client) Delete(ctx context.Context, name, namespace string) (err error)

return
}

// EventsWatch watches for events.
func (c *Client) EventsWatch(ctx context.Context, namespace string, operatorChan chan string) (err error) {
c.log.Infof("starting events watch")
defer c.log.Infof("events watch done")
defer close(operatorChan)

watcher, err := c.clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: "app.kubernetes.io/name=bee",
// TODO: add this label to beekeeper and filter on it => app.kubernetes.io/name=bee
})
if err != nil {
return fmt.Errorf("getting pod events in namespace %s: %w", namespace, err)
}
defer watcher.Stop()

// listen for either events from the watcher or a context cancellation
for {
select {
case <-ctx.Done():
return ctx.Err()
case event, ok := <-watcher.ResultChan():
if !ok {
return fmt.Errorf("watch channel closed")
}
switch event.Type {
// case watch.Added: //TODO: check if we already need those who are already running
case watch.Modified:
pod, ok := event.Object.(*v1.Pod)
if ok {
if pod.Status.PodIP != "" && pod.ObjectMeta.DeletionTimestamp == nil {
c.log.Tracef("new pod event:{%s}, {%s}, {%s}, {%s}, {%v}", event.Type, pod.Name, pod.Status.Phase, pod.Status.PodIP, pod.ObjectMeta.DeletionTimestamp)
operatorChan <- pod.Status.PodIP
}
}
}
}
}
}
6 changes: 4 additions & 2 deletions pkg/k8s/pod/client_test.go
Expand Up @@ -3,11 +3,13 @@ package pod_test
import (
"context"
"fmt"
"io"
"reflect"
"testing"

mock "github.com/ethersphere/beekeeper/mocks/k8s"
"github.com/ethersphere/beekeeper/pkg/k8s/pod"
"github.com/ethersphere/beekeeper/pkg/logging"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -62,7 +64,7 @@ func TestSet(t *testing.T) {

for _, test := range testTable {
t.Run(test.name, func(t *testing.T) {
client := pod.NewClient(test.clientset)
client := pod.NewClient(test.clientset, logging.New(io.Discard, 0, ""))
response, err := client.Set(context.Background(), test.podName, "test", test.options)
if test.errorMsg == nil {
if err != nil {
Expand Down Expand Up @@ -138,7 +140,7 @@ func TestDelete(t *testing.T) {

for _, test := range testTable {
t.Run(test.name, func(t *testing.T) {
client := pod.NewClient(test.clientset)
client := pod.NewClient(test.clientset, logging.New(io.Discard, 0, ""))
err := client.Delete(context.Background(), test.podName, "test")
if test.errorMsg == nil {
if err != nil {
Expand Down

0 comments on commit 2ad80e3

Please sign in to comment.