Skip to content

Commit

Permalink
Merge pull request #170 from aalda/fix-cluster-tests
Browse files Browse the repository at this point in the history
Fix cluster tests
  • Loading branch information
aalda committed Sep 30, 2019
2 parents 6a0ddc5 + 170b91e commit d167cb1
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 31 deletions.
6 changes: 3 additions & 3 deletions consensus/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,13 @@ func NewRaftNodeWithLogger(opts *ClusteringOptions, store storage.ManagedStore,
if opts.RaftHeartbeatTimeout != 0 {
conf.HeartbeatTimeout = opts.RaftHeartbeatTimeout
}
if opts.RaftHeartbeatTimeout != 0 {
if opts.RaftElectionTimeout != 0 {
conf.ElectionTimeout = opts.RaftElectionTimeout
}
if opts.RaftHeartbeatTimeout != 0 {
if opts.RaftLeaseTimeout != 0 {
conf.LeaderLeaseTimeout = opts.RaftLeaseTimeout
}
if opts.RaftHeartbeatTimeout != 0 {
if opts.RaftCommitTimeout != 0 {
conf.CommitTimeout = opts.RaftCommitTimeout
}
conf.TrailingLogs = opts.TrailingLogs
Expand Down
19 changes: 18 additions & 1 deletion consensus/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,20 @@ import (
"testing"
"time"

"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/storage/rocks"
"github.com/bbva/qed/testutils/spec"
)

func TestMain(m *testing.M) {
log.SetDefault(log.New(&log.LoggerOptions{
IncludeLocation: true,
Level: log.Off,
}))
os.Exit(m.Run())
}

func TestOpenAndCloseRaftNode(t *testing.T) {

// start only one seed
Expand Down Expand Up @@ -271,6 +280,10 @@ func newSeed(name string, id int) (*RaftNode, closeF, error) {
opts.Bootstrap = true
opts.SnapshotThreshold = 0
opts.TrailingLogs = 0
opts.RaftLogging = true
opts.RaftHeartbeatTimeout = 2000 * time.Millisecond
opts.RaftElectionTimeout = 2000 * time.Millisecond
opts.RaftLeaseTimeout = 2000 * time.Millisecond
rocksOpts := rocks.DefaultOptions()
return newNode(opts, rocksOpts)
}
Expand All @@ -284,6 +297,10 @@ func newFollower(name string, id int, seeds ...string) (*RaftNode, closeF, error
opts.Bootstrap = false
opts.SnapshotThreshold = 0
opts.TrailingLogs = 0
opts.RaftLogging = true
opts.RaftHeartbeatTimeout = 2000 * time.Millisecond
opts.RaftElectionTimeout = 2000 * time.Millisecond
opts.RaftLeaseTimeout = 2000 * time.Millisecond
opts.Seeds = seeds
rocksOpts := rocks.DefaultOptions()
return newNode(opts, rocksOpts)
Expand Down Expand Up @@ -321,7 +338,7 @@ func newNode(opts *ClusteringOptions, rocksOpts *rocks.Options) (*RaftNode, clos
}
opts.RaftLogPath = raftPath

node, err := NewRaftNode(opts, db, snapshotsCh)
node, err := NewRaftNodeWithLogger(opts, db, snapshotsCh, log.L().Named("cluster"))
if err != nil {
return nil, cleanF, err
}
Expand Down
36 changes: 28 additions & 8 deletions log/hclog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package log

import (
"bytes"
"fmt"
"io"
"log"

Expand All @@ -15,33 +16,54 @@ type HclogAdapter struct {
log Logger
}

// NewHclogAdapter creates an HclogAdapter for a given logger.
func NewHclogAdapter(log Logger) *HclogAdapter {
return &HclogAdapter{log: log}
}

// Trace implementation
func (l HclogAdapter) Trace(msg string, args ...interface{}) {
l.log.Tracef("%s: %v", msg, argsToString(args...))
if len(args) > 0 {
l.log.Tracef("%s: %v", msg, argsToString(args...))
} else {
l.log.Trace(msg)
}
}

// Debug implementation
func (l HclogAdapter) Debug(msg string, args ...interface{}) {
l.log.Debugf("%s: %v", msg, argsToString(args...))
if len(args) > 0 {
l.log.Debugf("%s: %v", msg, argsToString(args...))
} else {
l.log.Debug(msg)
}
}

// Info implementation
func (l HclogAdapter) Info(msg string, args ...interface{}) {
l.log.Infof("%s: %v", msg, argsToString(args...))
if len(args) > 0 {
l.log.Infof("%s: %v", msg, argsToString(args...))
} else {
l.log.Info(msg)
}
}

// Warn implementation
func (l HclogAdapter) Warn(msg string, args ...interface{}) {
l.log.Warnf("%s: %v", msg, argsToString(args...))
if len(args) > 0 {
l.log.Warnf("%s: %v", msg, argsToString(args...))
} else {
l.log.Warn(msg)
}
}

// Error implementation
func (l HclogAdapter) Error(msg string, args ...interface{}) {
l.log.Errorf("%s: %v", msg, argsToString(args...))
if len(args) > 0 {
l.log.Errorf("%s: %v", msg, argsToString(args...))
} else {
l.log.Error(msg)
}
}

// IsTrace implementation.
Expand Down Expand Up @@ -114,9 +136,7 @@ func (l HclogAdapter) StandardWriter(opts *hclog.StandardLoggerOptions) io.Write
func argsToString(args ...interface{}) string {
buf := bytes.Buffer{}
for i := 0; i < len(args); i += 2 {
buf.WriteString(args[i].(string))
buf.WriteByte('=')
buf.WriteString(args[i+1].(string))
buf.WriteString(fmt.Sprintf("%v=%v", args[i], args[i+1]))
}
return buf.String()
}
45 changes: 27 additions & 18 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,31 +83,40 @@ type Config struct {

// DB WAL TTL
DbWalTtl time.Duration

RaftHeartbeatTimeout time.Duration

RaftElectionTimeout time.Duration

RaftLeaseTimeout time.Duration
}

func DefaultConfig() *Config {
hostname, _ := os.Hostname()
currentDir := getCurrentDir()

return &Config{
APIKey: "",
NodeID: hostname,
HTTPAddr: "127.0.0.1:8800",
RaftAddr: "127.0.0.1:8500",
MgmtAddr: "127.0.0.1:8700",
MetricsAddr: "127.0.0.1:8600",
RaftJoinAddr: []string{},
GossipAddr: "127.0.0.1:8400",
GossipJoinAddr: []string{},
DBPath: currentDir + "/db",
RaftPath: currentDir + "/raft",
EnableTLS: false,
EnableProfiling: false,
ProfilingAddr: "127.0.0.1:6060",
SSLCertificate: "",
SSLCertificateKey: "",
PrivateKeyPath: "",
DbWalTtl: 0,
APIKey: "",
NodeID: hostname,
HTTPAddr: "127.0.0.1:8800",
RaftAddr: "127.0.0.1:8500",
MgmtAddr: "127.0.0.1:8700",
MetricsAddr: "127.0.0.1:8600",
RaftJoinAddr: []string{},
GossipAddr: "127.0.0.1:8400",
GossipJoinAddr: []string{},
DBPath: currentDir + "/db",
RaftPath: currentDir + "/raft",
EnableTLS: false,
EnableProfiling: false,
ProfilingAddr: "127.0.0.1:6060",
SSLCertificate: "",
SSLCertificateKey: "",
PrivateKeyPath: "",
DbWalTtl: 0,
RaftHeartbeatTimeout: 1000 * time.Millisecond,
RaftElectionTimeout: 1000 * time.Millisecond,
RaftLeaseTimeout: 1000 * time.Millisecond,
}
}

Expand Down
3 changes: 3 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ func NewServerWithLogger(conf *Config, logger log.Logger) (*Server, error) {
clusterOpts.MgmtAddr = conf.MgmtAddr
clusterOpts.Bootstrap = bootstrap
clusterOpts.RaftLogging = true
clusterOpts.RaftHeartbeatTimeout = conf.RaftHeartbeatTimeout
clusterOpts.RaftElectionTimeout = conf.RaftElectionTimeout
clusterOpts.RaftLeaseTimeout = conf.RaftLeaseTimeout
if !bootstrap {
clusterOpts.Seeds = conf.RaftJoinAddr
}
Expand Down
15 changes: 14 additions & 1 deletion tests/e2e/setup_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/bbva/qed/client"
"github.com/bbva/qed/crypto"
"github.com/bbva/qed/crypto/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/server"
"github.com/bbva/qed/testutils/notifierstore"
"github.com/bbva/qed/testutils/scope"
Expand Down Expand Up @@ -121,6 +122,9 @@ func configQedServer(id int, pathDB, signPath, tlsPath string, tls bool) *server
conf.SSLCertificateKey = tlsPath + "/qed_key.pem"
}
conf.EnableTLS = tls
conf.RaftHeartbeatTimeout = 2000 * time.Millisecond
conf.RaftElectionTimeout = 2000 * time.Millisecond
conf.RaftLeaseTimeout = 2000 * time.Millisecond

return conf
}
Expand Down Expand Up @@ -155,7 +159,7 @@ func newServerSetup(id int, tls bool) (func() (string, error), func() error) {

}

srv, err = server.NewServer(conf)
srv, err = server.NewServerWithLogger(conf, log.L().Named("server"))
if err != nil {
return "", err
}
Expand Down Expand Up @@ -191,9 +195,18 @@ func newQedClient(id int) (*client.HTTPClient, error) {
client.SetMaxRetries(5),
client.SetAttemptToReviveEndpoints(true),
client.SetHasherFunction(hashing.NewSha256Hasher),
client.SetLogger(log.L().Named("client")),
)
if err != nil {
return nil, err
}
return client, nil
}

func TestMain(m *testing.M) {
log.SetDefault(log.New(&log.LoggerOptions{
IncludeLocation: true,
Level: log.Off,
}))
os.Exit(m.Run())
}

0 comments on commit d167cb1

Please sign in to comment.