Skip to content

Commit

Permalink
Separate logs for kv/remotedbserver (ledgerwatch#993)
Browse files Browse the repository at this point in the history
Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro-2.local>
  • Loading branch information
2 people authored and blxdyx committed Sep 13, 2023
1 parent 3509c99 commit 8e1f300
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 16 deletions.
18 changes: 10 additions & 8 deletions kv/mdbx/kv_abstract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,13 @@ func TestRemoteKvVersion(t *testing.T) {
t.Skip("fix me on win please")
}
ctx := context.Background()
writeDB := mdbx.NewMDBX(log.New()).InMem("").MustOpen()
logger := log.New()
writeDB := mdbx.NewMDBX(logger).InMem("").MustOpen()
defer writeDB.Close()
conn := bufconn.Listen(1024 * 1024)
grpcServer := grpc.NewServer()
go func() {
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDB, nil, nil))
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDB, nil, nil, logger))
if err := grpcServer.Serve(conn); err != nil {
log.Error("private RPC server fail", "err", err)
}
Expand All @@ -180,23 +181,23 @@ func TestRemoteKvVersion(t *testing.T) {

cc, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithContextDialer(func(ctx context.Context, url string) (net.Conn, error) { return conn.Dial() }))
assert.NoError(t, err)
a, err := remotedb.NewRemote(v1, log.New(), remote.NewKVClient(cc)).Open()
a, err := remotedb.NewRemote(v1, logger, remote.NewKVClient(cc)).Open()
if err != nil {
t.Fatalf("%v", err)
}
require.False(t, a.EnsureVersionCompatibility())
// Different Minor versions
v2 := v
v2.Minor++
a, err = remotedb.NewRemote(v2, log.New(), remote.NewKVClient(cc)).Open()
a, err = remotedb.NewRemote(v2, logger, remote.NewKVClient(cc)).Open()
if err != nil {
t.Fatalf("%v", err)
}
require.False(t, a.EnsureVersionCompatibility())
// Different Patch versions
v3 := v
v3.Patch++
a, err = remotedb.NewRemote(v3, log.New(), remote.NewKVClient(cc)).Open()
a, err = remotedb.NewRemote(v3, logger, remote.NewKVClient(cc)).Open()
require.NoError(t, err)
require.True(t, a.EnsureVersionCompatibility())
}
Expand All @@ -205,10 +206,11 @@ func TestRemoteKvRange(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("fix me on win please")
}
logger := log.New()
ctx, writeDB := context.Background(), memdb.NewTestDB(t)
grpcServer, conn := grpc.NewServer(), bufconn.Listen(1024*1024)
go func() {
kvServer := remotedbserver.NewKvServer(ctx, writeDB, nil, nil)
kvServer := remotedbserver.NewKvServer(ctx, writeDB, nil, nil, logger)
remote.RegisterKVServer(grpcServer, kvServer)
if err := grpcServer.Serve(conn); err != nil {
log.Error("private RPC server fail", "err", err)
Expand All @@ -217,7 +219,7 @@ func TestRemoteKvRange(t *testing.T) {

cc, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithContextDialer(func(ctx context.Context, url string) (net.Conn, error) { return conn.Dial() }))
require.NoError(t, err)
db, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), log.New(), remote.NewKVClient(cc)).Open()
db, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, remote.NewKVClient(cc)).Open()
require.NoError(t, err)
require.True(t, db.EnsureVersionCompatibility())

Expand Down Expand Up @@ -342,7 +344,7 @@ func setupDatabases(t *testing.T, logger log.Logger, f mdbx.TableCfgFunc) (write

grpcServer := grpc.NewServer()
f2 := func() {
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDBs[1], nil, nil))
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDBs[1], nil, nil, logger))
if err := grpcServer.Serve(conn); err != nil {
logger.Error("private RPC server fail", "err", err)
}
Expand Down
16 changes: 9 additions & 7 deletions kv/remotedbserver/remotedbserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type KvServer struct {

trace bool
rangeStep int // make sure `s.with` has limited time
logger log.Logger
}

type threadSafeTx struct {
Expand All @@ -93,13 +94,14 @@ type Snapsthots interface {
Files() []string
}

func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapsthots, historySnapshots Snapsthots) *KvServer {
func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapsthots, historySnapshots Snapsthots, logger log.Logger) *KvServer {
return &KvServer{
trace: false,
rangeStep: 1024,
kv: db, stateChangeStreams: newStateChangeStreams(), ctx: ctx,
blockSnapshots: snapshots, historySnapshots: historySnapshots,
txs: map[uint64]*threadSafeTx{}, txsMapLock: &sync.RWMutex{},
logger: logger,
}
}

Expand All @@ -123,7 +125,7 @@ func (s *KvServer) Version(context.Context, *emptypb.Empty) (*types.VersionReply

func (s *KvServer) begin(ctx context.Context) (id uint64, err error) {
if s.trace {
log.Info(fmt.Sprintf("[kv_server] begin %d %s\n", id, dbg.Stack()))
s.logger.Info(fmt.Sprintf("[kv_server] begin %d %s\n", id, dbg.Stack()))
}
s.txsMapLock.Lock()
defer s.txsMapLock.Unlock()
Expand All @@ -139,7 +141,7 @@ func (s *KvServer) begin(ctx context.Context) (id uint64, err error) {
// renew - rollback and begin tx without changing it's `id`
func (s *KvServer) renew(ctx context.Context, id uint64) (err error) {
if s.trace {
log.Info(fmt.Sprintf("[kv_server] renew %d %s\n", id, dbg.Stack()[:2]))
s.logger.Info(fmt.Sprintf("[kv_server] renew %d %s\n", id, dbg.Stack()[:2]))
}
s.txsMapLock.Lock()
defer s.txsMapLock.Unlock()
Expand All @@ -159,7 +161,7 @@ func (s *KvServer) renew(ctx context.Context, id uint64) (err error) {

func (s *KvServer) rollback(id uint64) {
if s.trace {
log.Info(fmt.Sprintf("[kv_server] rollback %d %s\n", id, dbg.Stack()[:2]))
s.logger.Info(fmt.Sprintf("[kv_server] rollback %d %s\n", id, dbg.Stack()[:2]))
}
s.txsMapLock.Lock()
defer s.txsMapLock.Unlock()
Expand Down Expand Up @@ -189,16 +191,16 @@ func (s *KvServer) with(id uint64, f func(kv.Tx) error) error {
}

if s.trace {
log.Info(fmt.Sprintf("[kv_server] with %d try lock %s\n", id, dbg.Stack()[:2]))
s.logger.Info(fmt.Sprintf("[kv_server] with %d try lock %s\n", id, dbg.Stack()[:2]))
}
tx.Lock()
if s.trace {
log.Info(fmt.Sprintf("[kv_server] with %d can lock %s\n", id, dbg.Stack()[:2]))
s.logger.Info(fmt.Sprintf("[kv_server] with %d can lock %s\n", id, dbg.Stack()[:2]))
}
defer func() {
tx.Unlock()
if s.trace {
log.Info(fmt.Sprintf("[kv_server] with %d unlock %s\n", id, dbg.Stack()[:2]))
s.logger.Info(fmt.Sprintf("[kv_server] with %d unlock %s\n", id, dbg.Stack()[:2]))
}
}()
return f(tx.Tx)
Expand Down
3 changes: 2 additions & 1 deletion kv/remotedbserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/log/v3"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
Expand All @@ -43,7 +44,7 @@ func TestKvServer_renew(t *testing.T) {
return nil
}))

s := NewKvServer(ctx, db, nil, nil)
s := NewKvServer(ctx, db, nil, nil, log.New())
g, ctx := errgroup.WithContext(ctx)
testCase := func() error {
id, err := s.begin(ctx)
Expand Down

0 comments on commit 8e1f300

Please sign in to comment.