Skip to content

Commit

Permalink
Merge pull request #12896 from wilsonwang371/profiling-txn2
Browse files Browse the repository at this point in the history
server: make applier use ReadTx() in Txn() instead of ConcurrentReadTx()
  • Loading branch information
gyuho committed May 6, 2021
2 parents 8af8f6a + 98083ea commit 344c9f3
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 66 deletions.
4 changes: 4 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ type ServerConfig struct {
// Currently all etcd memory gets mlocked, but in future the flag can
// be refined to mlock in-use area of bbolt only.
ExperimentalMemoryMlock bool `json:"experimental-memory-mlock"`

// ExperimentalTxnModeWriteWithSharedBuffer enable write transaction to use
// a shared buffer in its readonly check operations.
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`
}

// VerifyBootstrap sanity-checks the initial config for bootstrap case
Expand Down
8 changes: 6 additions & 2 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ type Config struct {
// Currently all etcd memory gets mlocked, but in future the flag can
// be refined to mlock in-use area of bbolt only.
ExperimentalMemoryMlock bool `json:"experimental-memory-mlock"`

// ExperimentalTxnModeWriteWithSharedBuffer enables write transaction to use a shared buffer in its readonly check operations.
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`
}

// configYAML holds the config suitable for yaml parsing
Expand Down Expand Up @@ -444,8 +447,9 @@ func NewConfig() *Config {
LogLevel: logutil.DefaultLogLevel,
EnableGRPCGateway: true,

ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
ExperimentalMemoryMlock: false,
ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
ExperimentalMemoryMlock: false,
ExperimentalTxnModeWriteWithSharedBuffer: true,
}
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
return cfg
Expand Down
101 changes: 51 additions & 50 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,56 +164,57 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)

srvcfg := config.ServerConfig{
Name: cfg.Name,
ClientURLs: cfg.ACUrls,
PeerURLs: cfg.APUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapshotCount: cfg.SnapshotCount,
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
NewCluster: cfg.IsNewCluster(),
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
QuotaBackendBytes: cfg.QuotaBackendBytes,
BackendBatchLimit: cfg.BackendBatchLimit,
BackendFreelistType: backendFreelistType,
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
BcryptCost: cfg.BcryptCost,
TokenTTL: cfg.AuthTokenTTL,
CORS: cfg.CORS,
HostWhitelist: cfg.HostWhitelist,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
LoggerConfig: cfg.loggerConfig,
LoggerCore: cfg.loggerCore,
LoggerWriteSyncer: cfg.loggerWriteSyncer,
ForceNewCluster: cfg.ForceNewCluster,
EnableGRPCGateway: cfg.EnableGRPCGateway,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
Name: cfg.Name,
ClientURLs: cfg.ACUrls,
PeerURLs: cfg.APUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapshotCount: cfg.SnapshotCount,
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
NewCluster: cfg.IsNewCluster(),
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
QuotaBackendBytes: cfg.QuotaBackendBytes,
BackendBatchLimit: cfg.BackendBatchLimit,
BackendFreelistType: backendFreelistType,
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
BcryptCost: cfg.BcryptCost,
TokenTTL: cfg.AuthTokenTTL,
CORS: cfg.CORS,
HostWhitelist: cfg.HostWhitelist,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
LoggerConfig: cfg.loggerConfig,
LoggerCore: cfg.loggerCore,
LoggerWriteSyncer: cfg.loggerWriteSyncer,
ForceNewCluster: cfg.ForceNewCluster,
EnableGRPCGateway: cfg.EnableGRPCGateway,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
}
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
Expand Down
1 change: 1 addition & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
fs.BoolVar(&cfg.ec.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ec.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.")
fs.BoolVar(&cfg.ec.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.")

// unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ Experimental feature:
Duration of periodical watch progress notification.
--experimental-warning-apply-duration '100ms'
Warning is generated if requests take more than this duration.
--experimental-txn-mode-write-with-shared-buffer 'true'
Enable the write transaction to use a shared buffer in its readonly check operations.
Unsafe feature:
--force-new-cluster 'false'
Expand Down
12 changes: 10 additions & 2 deletions server/etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra
resp.Header = &pb.ResponseHeader{}

if txn == nil {
txn = a.s.kv.Read(trace)
txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode, trace)
defer txn.End()
}

Expand Down Expand Up @@ -434,7 +434,15 @@ func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnR
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
}
isWrite := !isTxnReadonly(rt)
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace))

// When the transaction contains write operations, we use ReadTx instead of
// ConcurrentReadTx to avoid extra overhead of copying buffer.
var txn mvcc.TxnWrite
if isWrite && a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer {
txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.SharedBufReadTxMode, trace))
} else {
txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.ConcurrentReadTxMode, trace))
}

var txnPath []bool
trace.StepWithFunction(
Expand Down
11 changes: 10 additions & 1 deletion server/mvcc/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,21 @@ func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil }

func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} }

type ReadTxMode uint32

const (
// Use ConcurrentReadTx and the txReadBuffer is copied
ConcurrentReadTxMode = ReadTxMode(1)
// Use backend ReadTx and txReadBuffer is not copied
SharedBufReadTxMode = ReadTxMode(2)
)

type KV interface {
ReadView
WriteView

// Read creates a read transaction.
Read(trace *traceutil.Trace) TxnRead
Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead

// Write creates a write transaction.
Write(trace *traceutil.Trace) TxnWrite
Expand Down
2 changes: 1 addition & 1 deletion server/mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var (
return kv.Range(context.TODO(), key, end, ro)
}
txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
txn := kv.Read(traceutil.TODO())
txn := kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer txn.End()
return txn.Range(context.TODO(), key, end, ro)
}
Expand Down
6 changes: 3 additions & 3 deletions server/mvcc/kv_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ import (
type readView struct{ kv KV }

func (rv *readView) FirstRev() int64 {
tr := rv.kv.Read(traceutil.TODO())
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer tr.End()
return tr.FirstRev()
}

func (rv *readView) Rev() int64 {
tr := rv.kv.Read(traceutil.TODO())
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer tr.End()
return tr.Rev()
}

func (rv *readView) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
tr := rv.kv.Read(traceutil.TODO())
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer tr.End()
return tr.Range(ctx, key, end, ro)
}
Expand Down
6 changes: 3 additions & 3 deletions server/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)

// readTx simulates a long read request
readTx1 := s.Read(traceutil.TODO())
readTx1 := s.Read(ConcurrentReadTxMode, traceutil.TODO())

// write should not be blocked by reads
done := make(chan struct{}, 1)
Expand All @@ -673,7 +673,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
}

// readTx2 simulates a short read request
readTx2 := s.Read(traceutil.TODO())
readTx2 := s.Read(ConcurrentReadTxMode, traceutil.TODO())
ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
ret, err := readTx2.Range(context.TODO(), []byte("foo"), nil, ro)
if err != nil {
Expand Down Expand Up @@ -756,7 +756,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) {
mu.Lock()
wKVs := make(kvs, len(committedKVs))
copy(wKVs, committedKVs)
tx := s.Read(traceutil.TODO())
tx := s.Read(ConcurrentReadTxMode, traceutil.TODO())
mu.Unlock()
// get all keys in backend store, and compare with wKVs
ret, err := tx.Range(context.TODO(), []byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
Expand Down
16 changes: 12 additions & 4 deletions server/mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,20 @@ type storeTxnRead struct {
trace *traceutil.Trace
}

func (s *store) Read(trace *traceutil.Trace) TxnRead {
func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
s.mu.RLock()
s.revMu.RLock()
// backend holds b.readTx.RLock() only when creating the concurrentReadTx. After
// ConcurrentReadTx is created, it will not block write transaction.
tx := s.b.ConcurrentReadTx()
// For read-only workloads, we use shared buffer by copying transaction read buffer
// for higher concurrency with ongoing blocking writes.
// For write/write-read transactions, we use the shared buffer
// rather than duplicating transaction read buffer to avoid transaction overhead.
var tx backend.ReadTx
if mode == ConcurrentReadTxMode {
tx = s.b.ConcurrentReadTx()
} else {
tx = s.b.ReadTx()
}

tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
Expand Down

0 comments on commit 344c9f3

Please sign in to comment.