Skip to content

Commit

Permalink
Support external registry functions via ExpertConfig.
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerwilliams committed Sep 9, 2023
1 parent 0e4811f commit 6490d3d
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 21 deletions.
9 changes: 8 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,10 @@ type LogDBFactory interface {
Name() string
}

type NodeRegistryFactory interface {
Create(nhid string, streamConnections uint64, v TargetValidator) (raftio.INodeRegistry, error)
}

// TransportFactory is the interface used for creating custom transport modules.
type TransportFactory interface {
// Create creates a transport module.
Expand Down Expand Up @@ -699,7 +703,7 @@ func (c *NodeHostConfig) GetDeploymentID() uint64 {
// GetTargetValidator returns a TargetValidator based on the specified
// NodeHostConfig instance.
func (c *NodeHostConfig) GetTargetValidator() TargetValidator {
if c.AddressByNodeHostID {
if c.Expert.NodeRegistryFactory != nil {
return id.IsNodeHostID
} else if c.Expert.TransportFactory != nil {
return c.Expert.TransportFactory.Validate
Expand Down Expand Up @@ -916,6 +920,9 @@ type ExpertConfig struct {
// TestGossipProbeInterval define the probe interval used by the gossip
// service in tests.
TestGossipProbeInterval time.Duration
// NodeRegistryFactory defines a custom node registry function that can be used
// instead of a static registry or the built in memberlist gossip mechanism.
NodeRegistryFactory NodeRegistryFactory
}

// GossipConfig contains configurations for the gossip service. Gossip service
Expand Down
3 changes: 2 additions & 1 deletion internal/transport/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/lni/goutils/syncutil"

"github.com/lni/dragonboat/v3/config"
"github.com/lni/dragonboat/v3/raftio"
)

// NodeHostIDRegistry is a node registry backed by gossip. It is capable of
Expand All @@ -35,7 +36,7 @@ type NodeHostIDRegistry struct {
// NewNodeHostIDRegistry creates a new NodeHostIDRegistry instance.
func NewNodeHostIDRegistry(nhid string,
nhConfig config.NodeHostConfig, streamConnections uint64,
v config.TargetValidator) (INodeRegistry, error) {
v config.TargetValidator) (raftio.INodeRegistry, error) {
gossip, err := newGossipManager(nhid, nhConfig)
if err != nil {
return nil, err
Expand Down
12 changes: 1 addition & 11 deletions internal/transport/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,7 @@ var (
ErrUnknownTarget = errors.New("target address unknown")
)

// INodeRegistry is the local registry interface used to keep all known
// nodes in the system..
type INodeRegistry interface {
Stop()
Add(clusterID uint64, nodeID uint64, url string)
Remove(clusterID uint64, nodeID uint64)
RemoveCluster(clusterID uint64)
Resolve(clusterID uint64, nodeID uint64) (string, string, error)
}

var _ INodeRegistry = (*Registry)(nil)
var _ raftio.INodeRegistry = (*Registry)(nil)
var _ IResolver = (*Registry)(nil)

// Registry is used to manage all known node addresses in the multi raft system.
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func NewTransport(nhConfig config.NodeHostConfig,
dir server.SnapshotDirFunc, sysEvents ITransportEvent,
fs vfs.IFS) (*Transport, error) {
sourceID := nhConfig.RaftAddress
if nhConfig.AddressByNodeHostID {
if nhConfig.Expert.NodeRegistryFactory != nil {
sourceID = env.NodeHostID()
}
t := &Transport{
Expand Down
4 changes: 2 additions & 2 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type node struct {
logReader *logdb.LogReader
logdb raftio.ILogDB
snapshotter *snapshotter
nodeRegistry transport.INodeRegistry
nodeRegistry raftio.INodeRegistry
stopC chan struct{}
clusterInfo atomic.Value
currentTick uint64
Expand Down Expand Up @@ -140,7 +140,7 @@ func newNode(peers map[uint64]string,
getStreamSink func(uint64, uint64) *transport.Sink,
handleSnapshotStatus func(uint64, uint64, bool),
sendMessage func(pb.Message),
nodeRegistry transport.INodeRegistry,
nodeRegistry raftio.INodeRegistry,
pool *sync.Pool,
ldb raftio.ILogDB,
metrics *logDBMetrics,
Expand Down
24 changes: 19 additions & 5 deletions nodehost.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ type NodeHost struct {
env *server.Env
nhConfig config.NodeHostConfig
stopper *syncutil.Stopper
nodes transport.INodeRegistry
nodes raftio.INodeRegistry
requestPools []*sync.Pool
engine *engine
transport transport.ITransport
Expand All @@ -302,6 +302,14 @@ var _ nodeLoader = (*NodeHost)(nil)

var dn = logutil.DescribeNode

type gossipRegistryFactory struct {
nodeHostConfig config.NodeHostConfig
}

func (erf *gossipRegistryFactory) Create(nhid string, streamConnections uint64, v config.TargetValidator) (raftio.INodeRegistry, error) {
return transport.NewNodeHostIDRegistry(nhid, erf.nodeHostConfig, streamConnections, v)
}

// NewNodeHost creates a new NodeHost instance. The returned NodeHost instance
// is configured using the specified NodeHostConfig instance. In a typical
// application, it is expected to have one NodeHost on each server.
Expand Down Expand Up @@ -358,6 +366,12 @@ func NewNodeHost(nhConfig config.NodeHostConfig) (*NodeHost, error) {
return nil, err
}
plog.Infof("NodeHost ID: %s", nh.id.String())
if nh.nhConfig.AddressByNodeHostID {
if nh.nhConfig.Expert.NodeRegistryFactory != nil {
panic("Expert.NodeRegistryFactory should not be set with AddressByNodeHostID and Gossip")
}
nh.nhConfig.Expert.NodeRegistryFactory = &gossipRegistryFactory{nh.nhConfig}
}
if err := nh.createNodeRegistry(); err != nil {
nh.Stop()
return nil, err
Expand Down Expand Up @@ -1795,10 +1809,10 @@ func (nh *NodeHost) createNodeRegistry() error {
validator := nh.nhConfig.GetTargetValidator()
// TODO:
// more tests here required
if nh.nhConfig.AddressByNodeHostID {
plog.Infof("AddressByNodeHostID: true, use gossip based node registry")
r, err := transport.NewNodeHostIDRegistry(nh.ID(),
nh.nhConfig, streamConnections, validator)
if nh.nhConfig.Expert.NodeRegistryFactory != nil {
plog.Infof("NodeRegistryFactory was set: using custom registry")
r, err := nh.nhConfig.Expert.NodeRegistryFactory.Create(nh.ID(),
streamConnections, validator)
if err != nil {
return err
}
Expand Down
102 changes: 102 additions & 0 deletions nodehost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,108 @@ func TestCustomTransportCanGoWithoutNodeHostID(t *testing.T) {
testAddressByNodeHostID(t, false, factory)
}

func TestExternalNodeRegistry(t *testing.T) {
fs := vfs.GetTestFS()
datadir1 := fs.PathJoin(singleNodeHostTestDir, "nh1")
datadir2 := fs.PathJoin(singleNodeHostTestDir, "nh2")
os.RemoveAll(singleNodeHostTestDir)
defer os.RemoveAll(singleNodeHostTestDir)
addr1 := nodeHostTestAddr1
addr2 := nodeHostTestAddr2
nhc1 := config.NodeHostConfig{
NodeHostDir: datadir1,
RTTMillisecond: getRTTMillisecond(fs, datadir1),
RaftAddress: addr1,
Expert: config.ExpertConfig{
FS: fs,
TestGossipProbeInterval: 50 * time.Millisecond,
},
Gossip: config.GossipConfig{
BindAddress: "127.0.0.1:25001",
AdvertiseAddress: "127.0.0.1:25001",
Seed: []string{"127.0.0.1:25002"},
},
}
factory1 := &gossipRegistryFactory{nhc1}
nhc1.Expert.NodeRegistryFactory = factory1
nhc1.Gossip = config.GossipConfig{}
nhc1.AddressByNodeHostID = false

nhc2 := config.NodeHostConfig{
NodeHostDir: datadir2,
RTTMillisecond: getRTTMillisecond(fs, datadir2),
RaftAddress: addr2,
Expert: config.ExpertConfig{
FS: fs,
TestGossipProbeInterval: 50 * time.Millisecond,
},
Gossip: config.GossipConfig{
BindAddress: "127.0.0.1:25002",
AdvertiseAddress: "127.0.0.1:25002",
Seed: []string{"127.0.0.1:25001"},
},
}
factory2 := &gossipRegistryFactory{nhc2}
nhc2.Expert.NodeRegistryFactory = factory2
nhc2.Gossip = config.GossipConfig{}
nhc2.AddressByNodeHostID = false

nhid1, err := id.ParseNodeHostID(testNodeHostID1)
if err != nil {
t.Fatalf("failed to parse nhid")
}
nhc1.Expert.TestNodeHostID = nhid1.Value()
nhid2, err := id.ParseNodeHostID(testNodeHostID2)
if err != nil {
t.Fatalf("failed to parse nhid")
}
nhc2.Expert.TestNodeHostID = nhid2.Value()
nh1, err := NewNodeHost(nhc1)
if err != nil {
t.Fatalf("failed to create nh, %v", err)
}
defer nh1.Stop()
nh2, err := NewNodeHost(nhc2)
if err != nil {
t.Fatalf("failed to create nh2, %v", err)
}
defer nh2.Stop()
peers := make(map[uint64]string)
peers[1] = testNodeHostID1
peers[2] = testNodeHostID2
createSM := func(uint64, uint64) sm.IStateMachine {
return &PST{}
}
rc := config.Config{
ClusterID: 1,
NodeID: 1,
ElectionRTT: 10,
HeartbeatRTT: 1,
SnapshotEntries: 0,
}
if err := nh1.StartCluster(peers, false, createSM, rc); err != nil {
t.Fatalf("failed to start node %v", err)
}
rc.NodeID = 2
if err := nh2.StartCluster(peers, false, createSM, rc); err != nil {
t.Fatalf("failed to start node %v", err)
}
waitForLeaderToBeElected(t, nh1, 1)
waitForLeaderToBeElected(t, nh2, 1)
pto := lpto(nh1)
session := nh1.GetNoOPSession(1)
for i := 0; i < 1000; i++ {
ctx, cancel := context.WithTimeout(context.Background(), pto)
if _, err := nh1.SyncPropose(ctx, session, make([]byte, 0)); err == nil {
cancel()
return
}
cancel()
time.Sleep(100 * time.Millisecond)
}
t.Fatalf("failed to make proposal")
}

func testAddressByNodeHostID(t *testing.T,
addressByNodeHostID bool, factory config.TransportFactory) {
fs := vfs.GetTestFS()
Expand Down
10 changes: 10 additions & 0 deletions raftio/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,13 @@ type ITransport interface {
GetSnapshotConnection(ctx context.Context,
target string) (ISnapshotConnection, error)
}

// INodeRegistry is the local registry interface used to keep all known
// nodes in the system..
type INodeRegistry interface {
Stop()
Add(clusterID uint64, nodeID uint64, url string)
Remove(clusterID uint64, nodeID uint64)
RemoveCluster(clusterID uint64)
Resolve(clusterID uint64, nodeID uint64) (string, string, error)
}

0 comments on commit 6490d3d

Please sign in to comment.