Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: implement NodeStore migration #4029

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 79 additions & 1 deletion pkg/routing/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -17,7 +18,8 @@ import (
)

const (
DefaultBucketName = "nodes"
V130BucketName = "nodes"
DefaultBucketName = "node_states"
frrist marked this conversation as resolved.
Show resolved Hide resolved
)

type NodeStoreParams struct {
Expand All @@ -40,6 +42,11 @@ func NewNodeStore(ctx context.Context, params NodeStoreParams) (*NodeStore, erro
if bucketName == "" {
return nil, pkgerrors.New("bucket name is required")
}

if err := migrateNodeInfoToNodeState(ctx, js, V130BucketName, bucketName); err != nil {
return nil, err
}

kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: bucketName,
})
Expand All @@ -53,6 +60,77 @@ func NewNodeStore(ctx context.Context, params NodeStoreParams) (*NodeStore, erro
}, nil
}

func migrateNodeInfoToNodeState(ctx context.Context, js jetstream.JetStream, from string, to string) (retErr error) {
defer func() {
if retErr == nil {
frrist marked this conversation as resolved.
Show resolved Hide resolved
if err := js.DeleteKeyValue(ctx, from); err != nil {
if errors.Is(err, jetstream.ErrBucketNotFound) {
// migration is successful since there isn't previous state to migrate from
retErr = nil
} else {
retErr = fmt.Errorf("NodeStore migration succeeded, but failed to remove old bucket: %w", err)
}
}
}
}()

fromKV, err := js.KeyValue(ctx, from)
if err != nil {
if errors.Is(err, jetstream.ErrBucketNotFound) {
// migration is successful since there isn't previous state to migrate from
return nil
}
return fmt.Errorf("NodeStore migration failed: failed to open 'from' bucket: %w", err)
}

keys, err := fromKV.Keys(ctx)
if err != nil {
if pkgerrors.Is(err, jetstream.ErrNoKeysFound) {
// if the store is empty the migration is successful as there isn't anything to migrate
return nil
}
return fmt.Errorf("NodeStore migration failed: failed to list store: %w", err)
}

nodeInfos := make([]models.NodeInfo, 0, len(keys))
for _, key := range keys {
entry, err := fromKV.Get(ctx, key)
if err != nil {
return fmt.Errorf("NodeStore migration failed: failed to read node info with name: %s: %w", key, err)
}
frrist marked this conversation as resolved.
Show resolved Hide resolved

var nodeinfo models.NodeInfo
if err := json.Unmarshal(entry.Value(), &nodeinfo); err != nil {
return fmt.Errorf("NodeStore migration failed: failed to unmarshal node info: %w", err)
}
nodeInfos = append(nodeInfos, nodeinfo)
}

toKV, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: to,
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to create the bucket first, or will this create it for us? What if the if the bucket already exist?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// CreateKeyValue will create a KeyValue store with the given
// configuration.
//
// If a KeyValue store with the same name already exists and the
// configuration is different, ErrBucketExists will be returned.

My interpretation of the comment is:

  1. Create a store if it DNE
  2. Open a store if it exists with the same config
  3. Fail if the store exists, but with a different config.

And, fwiw, this is the same method we use in NewNodeStore to open/create the bucket. I think we're good here.

if err != nil {
return fmt.Errorf("NodeStore migration failed: failed to open to bucket: %w", err)
}

for _, ni := range nodeInfos {
nodestate := models.NodeState{
Info: ni,
Membership: models.NodeMembership.PENDING,
Connection: models.NodeStates.DISCONNECTED,
}
data, err := json.Marshal(nodestate)
if err != nil {
return fmt.Errorf("NodeStore migration failed: failed to marshal node state: %w", err)
}
if _, err := toKV.Put(ctx, nodestate.Info.ID(), data); err != nil {
return fmt.Errorf("NodeStore migration failed: failed to write node state to store: %w", err)
}
}

return nil
}

func (n *NodeStore) FindPeer(ctx context.Context, peerID peer.ID) (peer.AddrInfo, error) {
// TODO: Remove this once we now longer need to implement the routing.PeerStore interface
// We are temporarily matching the code of the inmemory.NodeStore which never returns an
Expand Down
144 changes: 144 additions & 0 deletions pkg/routing/kvstore/migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//go:build unit || !integration

package kvstore_test

import (
"context"
"encoding/json"
"testing"

"github.com/nats-io/nats-server/v2/server"
natsserver "github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/stretchr/testify/suite"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/routing/kvstore"
)

type KVMigrationSuite struct {
suite.Suite
nats *server.Server
client *nats.Conn
js jetstream.JetStream
}

func (s *KVMigrationSuite) SetupTest() {
opts := &natsserver.DefaultTestOptions
opts.Port = TEST_PORT
opts.JetStream = true
opts.StoreDir = s.T().TempDir()

s.nats = natsserver.RunServer(opts)
var err error
s.client, err = nats.Connect(s.nats.Addr().String())
s.Require().NoError(err)

s.js, err = jetstream.New(s.client)
s.Require().NoError(err)
}

func (s *KVMigrationSuite) TearDownTest() {
s.nats.Shutdown()
s.client.Close()
}

func TestKVMigrationSuite(t *testing.T) {
suite.Run(t, new(KVMigrationSuite))
}

func (s *KVMigrationSuite) TestMigrationFromNodeInfoToNodeState() {
ctx := context.Background()

// Create 'from' bucket and populate it, simulating a requester on v130 with state to migrate.
fromKV, err := s.js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: kvstore.V130BucketName})
s.Require().NoError(err)

nodeInfos := []models.NodeInfo{
generateNodeInfo("node1", models.EngineDocker),
generateNodeInfo("node2", models.EngineWasm),
generateNodeInfo("node3", models.EngineDocker, models.EngineWasm),
}

// populate bucket with models.NodeInfo, these will be migrated to models.NodeState
for _, n := range nodeInfos {
data, err := json.Marshal(n)
s.Require().NoError(err)
_, err = fromKV.Put(ctx, n.ID(), data)
s.Require().NoError(err)
}

fromBucket := kvstore.V130BucketName
toBucket := kvstore.DefaultBucketName

// Open a NodeStore to trigger migration
ns, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{
BucketName: toBucket,
Client: s.client,
})
s.Require().NoError(err)

// Assert the migrated data is correct
for _, ni := range nodeInfos {
ns, err := ns.Get(ctx, ni.ID())
s.Require().NoError(err)
s.Equal(models.NodeStates.DISCONNECTED, ns.Connection)
s.Equal(models.NodeMembership.PENDING, ns.Membership)
s.Equal(ni, ns.Info)
}

// Assert the from bucket has been cleaned up
_, err = s.js.KeyValue(ctx, fromBucket)
s.Require().Equal(jetstream.ErrBucketNotFound, err)
}

func (s *KVMigrationSuite) TestMigrationStoreEmpty() {
ctx := context.Background()

// Create an empty 'from' bucket
_, err := s.js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: kvstore.V130BucketName})
s.Require().NoError(err)

fromBucket := kvstore.V130BucketName
toBucket := kvstore.DefaultBucketName

// Open a NodeStore to trigger migration, in this case there is a from bucket, but it's empty.
ns, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{
BucketName: toBucket,
Client: s.client,
})
s.Require().NoError(err)

// Assert the from bucket has been cleaned up
_, err = s.js.KeyValue(ctx, fromBucket)
s.Require().Contains(err.Error(), "bucket not found")

// Assert that no data was migrated since the from bucket was empty
resp, err := ns.List(ctx)
s.Require().NoError(err)
s.Require().Len(resp, 0)
}

func (s *KVMigrationSuite) TestMigrationStoreDNE() {
ctx := context.Background()

fromBucket := kvstore.V130BucketName
toBucket := kvstore.DefaultBucketName

// Open a NodeStore to trigger migration, in this case there isn't a from bucket to migrate from.
ns, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{
BucketName: toBucket,
Client: s.client,
})
s.Require().NoError(err)

// Assert the from bucket has been cleaned up
_, err = s.js.KeyValue(ctx, fromBucket)
s.Require().Contains(err.Error(), "bucket not found")

// Assert that no data was migrated since the from bucket DNE (does not exist)
resp, err := ns.List(ctx)
s.Require().NoError(err)
s.Require().Len(resp, 0)
}
Loading