Skip to content

Commit

Permalink
Revert changes that turned out be unneeded.
Browse files Browse the repository at this point in the history
  • Loading branch information
ncabatoff committed Nov 24, 2023
1 parent 477cf7e commit 2b715ac
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 40 deletions.
37 changes: 2 additions & 35 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ package raft
import (
"bytes"
"container/list"
"context"
"fmt"
"golang.org/x/sync/semaphore"
"io"
"sync/atomic"
"time"
Expand Down Expand Up @@ -94,7 +92,6 @@ type leaderState struct {
replState map[ServerID]*followerReplication
notify map[*verifyFuture]struct{}
stepDown chan struct{}
applyable *semaphore.Weighted
}

// setLeader is used to modify the current leader Address and ID of the cluster
Expand Down Expand Up @@ -404,7 +401,7 @@ func (r *Raft) setLeadershipTransferInProgress(v bool) {

func (r *Raft) getLeadershipTransferInProgress() bool {
v := atomic.LoadInt32(&r.leaderState.leadershipTransferInProgress)
return v > 0
return v == 1
}

func (r *Raft) setupLeaderState() {
Expand All @@ -416,7 +413,6 @@ func (r *Raft) setupLeaderState() {
r.leaderState.replState = make(map[ServerID]*followerReplication)
r.leaderState.notify = make(map[*verifyFuture]struct{})
r.leaderState.stepDown = make(chan struct{}, 1)
r.leaderState.applyable = semaphore.NewWeighted(1)
}

// runLeader runs the main loop while in leader state. Do the setup here and drop into
Expand Down Expand Up @@ -487,7 +483,6 @@ func (r *Raft) runLeader() {
r.leaderState.replState = nil
r.leaderState.notify = nil
r.leaderState.stepDown = nil
r.leaderState.applyable = nil

// If we are stepping down for some reason, no known leader.
// We may have stepped down due to an RPC call, which would
Expand Down Expand Up @@ -526,9 +521,7 @@ func (r *Raft) runLeader() {
// maintain that there exists at most one uncommitted configuration entry in
// any log, so we have to do proper no-ops here.
noop := &logFuture{log: Log{Type: LogNoop}}
r.leaderState.applyable.Acquire(context.Background(), 1)
r.dispatchLogs([]*logFuture{noop})
r.leaderState.applyable.Release(1)

// Sit in the leader loop until we step down
r.leaderLoop()
Expand Down Expand Up @@ -863,7 +856,7 @@ func (r *Raft) leaderLoop() {

case newLog := <-r.applyCh:
r.mainThreadSaturation.working()
if r.getLeadershipTransferInProgress() || !r.leaderState.applyable.TryAcquire(1) {
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
newLog.respond(ErrLeadershipTransferInProgress)
continue
Expand All @@ -890,7 +883,6 @@ func (r *Raft) leaderLoop() {
} else {
r.dispatchLogs(ready)
}
r.leaderState.applyable.Release(1)

case <-lease:
r.mainThreadSaturation.working()
Expand Down Expand Up @@ -950,30 +942,13 @@ func (r *Raft) verifyLeader(v *verifyFuture) {
// leadershipTransfer is doing the heavy lifting for the leadership transfer.
func (r *Raft) leadershipTransfer(id ServerID, address ServerAddress, repl *followerReplication, stopCh chan struct{}, doneCh chan error) {
// make sure we are not already stopped
r.logger.Debug("leadershipTransfer", "id", id, "address", address)

select {
case <-stopCh:
doneCh <- nil
return
default:
}

LOOP:
for {
select {
case <-stopCh:
doneCh <- nil
return
default:
if r.leaderState.applyable.TryAcquire(1) {
break LOOP
}
}
}
defer r.leaderState.applyable.Release(1)

r.logger.Trace("leadershipTransfer", "my_last_index", r.getLastIndex(), "follower_next_index", atomic.LoadUint64(&repl.nextIndex))
for atomic.LoadUint64(&repl.nextIndex) <= r.getLastIndex() {
err := &deferError{}
err.init()
Expand All @@ -989,7 +964,6 @@ LOOP:
return
}
}
r.logger.Trace("leadershipTransfer", "my_last_index", r.getLastIndex(), "follower_next_index", repl.nextIndex)

// Step ?: the thesis describes in chap 6.4.1: Using clocks to reduce
// messaging for read-only queries. If this is implemented, the lease
Expand Down Expand Up @@ -1180,12 +1154,6 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
// configuration entry to the log. This must only be called from the
// main thread.
func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
if !r.leaderState.applyable.TryAcquire(1) {
future.respond(fmt.Errorf("can't apply, semaphor held"))
return
}
defer r.leaderState.applyable.Release(1)

configuration, err := nextConfiguration(r.configurations.latest, r.configurations.latestIndex, future.req)
if err != nil {
future.respond(err)
Expand Down Expand Up @@ -1258,7 +1226,6 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
}
r.leaderState.commitment.match(r.localID, lastIndex)

r.logger.Trace("dispatchLogs", "lastIndex", lastIndex)
// Update the last log since it's on disk now
r.setLastLog(lastIndex, term)

Expand Down
6 changes: 1 addition & 5 deletions testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,12 @@ func newTestLogger(tb testing.TB) hclog.Logger {
// is logged after the test is complete.
func newTestLoggerWithPrefix(tb testing.TB, prefix string) hclog.Logger {
if testing.Verbose() {
return hclog.New(&hclog.LoggerOptions{
Name: prefix,
Level: hclog.Trace,
})
return hclog.New(&hclog.LoggerOptions{Name: prefix})
}

return hclog.New(&hclog.LoggerOptions{
Name: prefix,
Output: &testLoggerAdapter{tb: tb, prefix: prefix},
Level: hclog.Trace,
})
}

Expand Down

0 comments on commit 2b715ac

Please sign in to comment.