From e3e39930229830b2991ec917ec5d2ba625febd3f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 13 Sep 2016 11:16:53 +0800 Subject: [PATCH] etcdserver: support read index Use read index to achieve l-read. --- etcdserver/raft.go | 11 ++++ etcdserver/server.go | 16 ++++- etcdserver/util.go | 16 +++++ etcdserver/v3_server.go | 127 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 169 insertions(+), 1 deletion(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 95e95fd22a4..27170b602b5 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -103,6 +103,9 @@ type raftNode struct { // a chan to send out apply applyc chan apply + // a chan to send out readState + readStateC chan raft.ReadState + // TODO: remove the etcdserver related logic from raftNode // TODO: add a state machine interface to apply the commit entries // and do snapshot/recover @@ -196,6 +199,14 @@ func (r *raftNode) start(s *EtcdServer) { } } + if len(rd.ReadStates) != 0 { + select { + case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]: + case <-r.stopped: + return + } + } + raftDone := make(chan struct{}, 1) ap := apply{ entries: rd.CommittedEntries, diff --git a/etcdserver/server.go b/etcdserver/server.go index 3bc8d81d68a..f5e58f51d6e 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -177,12 +177,22 @@ type EtcdServer struct { snapCount uint64 w wait.Wait + + readMu sync.RWMutex + // read routine notifies etcd server that it waits for reading by sending an empty struct to + // readwaitC + readwaitc chan struct{} + // readNotifier is used to notify the read routine that it can process the request + // when there is no error + readNotifier *notifier + // stop signals the run goroutine should shutdown. stop chan struct{} // stopping is closed by run goroutine on shutdown. stopping chan struct{} // done is closed when all goroutines from start() complete. - done chan struct{} + done chan struct{} + errorc chan error id types.ID attributes membership.Attributes @@ -391,6 +401,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond), raftStorage: s, storage: NewStorage(w, ss), + readStateC: make(chan raft.ReadState, 1), }, id: id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, @@ -478,6 +489,7 @@ func (s *EtcdServer) Start() { s.goAttach(s.purgeFile) s.goAttach(func() { monitorFileDescriptor(s.stopping) }) s.goAttach(s.monitorVersions) + s.goAttach(s.linearizableReadLoop) } // start prepares and starts server in a new goroutine. It is no longer safe to @@ -493,6 +505,8 @@ func (s *EtcdServer) start() { s.done = make(chan struct{}) s.stop = make(chan struct{}) s.stopping = make(chan struct{}) + s.readwaitc = make(chan struct{}, 1) + s.readNotifier = newNotifier() if s.ClusterVersion() != nil { plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String())) } else { diff --git a/etcdserver/util.go b/etcdserver/util.go index f189cd9463f..66084ae1244 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -79,3 +79,19 @@ func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool } return longest, true } + +type notifier struct { + c chan struct{} + err error +} + +func newNotifier() *notifier { + return ¬ifier{ + c: make(chan struct{}, 0), + } +} + +func (nc *notifier) notify(err error) { + nc.err = err + close(nc.c) +} diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 4c534c88ff9..fb6e3aebbd2 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -15,6 +15,8 @@ package etcdserver import ( + "bytes" + "encoding/binary" "strconv" "strings" "time" @@ -26,6 +28,9 @@ import ( "github.com/coreos/etcd/lease/leasehttp" "github.com/coreos/etcd/lease/leasepb" "github.com/coreos/etcd/mvcc" + "github.com/coreos/etcd/raft" + + "github.com/coreos/go-semver/semver" "golang.org/x/net/context" "google.golang.org/grpc/metadata" ) @@ -44,6 +49,10 @@ const ( maxGapBetweenApplyAndCommitIndex = 1000 ) +var ( + newRangeClusterVersion = *semver.Must(semver.NewVersion("3.1.0")) +) + type RaftKV interface { Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) @@ -86,6 +95,31 @@ type Authenticator interface { } func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { + // TODO: remove this checking when we release etcd 3.2 + if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) { + return s.legacyRange(ctx, r) + } + + if !r.Serializable { + err := s.linearizableReadNotify(ctx) + if err != nil { + return nil, err + } + } + var resp *pb.RangeResponse + var err error + chk := func(ai *auth.AuthInfo) error { + return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) + } + get := func() { resp, err = s.applyV3Base.Range(noTxn, r) } + if serr := s.doSerialize(ctx, chk, get); serr != nil { + return nil, serr + } + return resp, err +} + +// TODO: remove this func when we release etcd 3.2 +func (s *EtcdServer) legacyRange(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { if r.Serializable { var resp *pb.RangeResponse var err error @@ -143,6 +177,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse } return resp, err } + // TODO: readonly Txn do not need to go through raft result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r}) if err != nil { return nil, err @@ -641,3 +676,95 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern // Watchable returns a watchable interface attached to the etcdserver. func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() } + +func (s *EtcdServer) linearizableReadLoop() { + var rs raft.ReadState + internalTimeout := time.Second + + for { + ctx := make([]byte, 8) + binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next()) + + select { + case <-s.readwaitc: + case <-s.stopping: + return + } + + nextnr := newNotifier() + + s.readMu.Lock() + nr := s.readNotifier + s.readNotifier = nextnr + s.readMu.Unlock() + + cctx, cancel := context.WithTimeout(context.Background(), internalTimeout) + if err := s.r.ReadIndex(cctx, ctx); err != nil { + cancel() + if err == raft.ErrStopped { + return + } + plog.Errorf("failed to get read index from raft: %v", err) + nr.notify(err) + continue + } + cancel() + + var ( + timeout bool + done bool + ) + for !timeout && !done { + select { + case rs = <-s.r.readStateC: + done = bytes.Equal(rs.RequestCtx, ctx) + if !done { + // a previous request might time out. now we should ignore the response of it and + // continue waiting for the response of the current requests. + plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx) + } + case <-time.After(internalTimeout): + plog.Warningf("timed out waiting for read index response") + nr.notify(ErrTimeout) + timeout = true + case <-s.stopping: + return + } + } + if !done { + continue + } + + if ai := s.getAppliedIndex(); ai < rs.Index { + select { + case <-s.applyWait.Wait(rs.Index): + case <-s.stopping: + return + } + } + // unblock all l-reads requested at indices before rs.Index + nr.notify(nil) + } +} + +func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error { + s.readMu.RLock() + nc := s.readNotifier + s.readMu.RUnlock() + + // signal linearizable loop for current notify if it hasn't been already + select { + case s.readwaitc <- struct{}{}: + default: + } + + // wait for read state notification + select { + case <-nc.c: + return nc.err + case <-ctx.Done(): + return ctx.Err() + case <-s.done: + return ErrStopped + } +}