Skip to content

Commit

Permalink
*: migrate to tracker v2 (#1853)
Browse files Browse the repository at this point in the history
Migrate to tracker v2 and remove tracker v1 and its references. Also removes tracker_v2 featureflag.

category: refactor
ticket: #1478
  • Loading branch information
dB2510 committed Mar 3, 2023
1 parent 7fb4808 commit bcfa517
Show file tree
Hide file tree
Showing 7 changed files with 702 additions and 2,591 deletions.
67 changes: 10 additions & 57 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/obolnetwork/charon/core/scheduler"
"github.com/obolnetwork/charon/core/sigagg"
"github.com/obolnetwork/charon/core/tracker"
"github.com/obolnetwork/charon/core/tracker/tracker2"
"github.com/obolnetwork/charon/core/validatorapi"
"github.com/obolnetwork/charon/eth2util"
"github.com/obolnetwork/charon/p2p"
Expand Down Expand Up @@ -443,12 +442,6 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

err = wireTracker(ctx, life, deadlineFunc, peers, eth2Cl, sched,
fetch, cons, vapi, parSigDB, parSigEx, sigAgg)
if err != nil {
return err
}

err = wirePrioritise(ctx, conf, life, tcpNode, peerIDs, lock.Threshold,
sender.SendReceive, cons, sched, p2pKey, deadlineFunc, mutableConf)
if err != nil {
Expand All @@ -457,22 +450,15 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,

wireRecaster(sched, sigAgg, broadcaster)

track, err := newTracker(ctx, life, deadlineFunc, peers, eth2Cl)
if err != nil {
return err
}
opts := []core.WireOption{
core.WithTracing(),
core.WithTracking(track),
core.WithAsyncRetry(retryer),
}
if featureset.Enabled(featureset.TrackerV2) {
track, err := newTrackerV2(ctx, life, deadlineFunc, peers, eth2Cl)
if err != nil {
return err
}

opts = []core.WireOption{
core.WithTracing(),
core.WithTracking(track),
core.WithAsyncRetry(retryer),
}
}
core.Wire(sched, fetch, cons, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, opts...)

err = wireValidatorMock(conf, pubshares, sched)
Expand Down Expand Up @@ -556,53 +542,20 @@ func wireRecaster(sched core.Scheduler, sigAgg core.SigAgg, broadcaster core.Bro
recaster.Subscribe(broadcaster.Broadcast)
}

// wireTracker creates a new tracker instance and wires it to the components with "output events".
func wireTracker(ctx context.Context, life *lifecycle.Manager, deadlineFunc func(duty core.Duty) (time.Time, bool),
peers []p2p.Peer, ethCl eth2wrap.Client,
sched core.Scheduler, fetcher core.Fetcher, cons core.Consensus, vapi core.ValidatorAPI,
parSigDB core.ParSigDB, parSigEx core.ParSigEx, sigAgg core.SigAgg,
) error {
analyser := core.NewDeadliner(ctx, "tracker_analyser", deadlineFunc)
deleter := core.NewDeadliner(ctx, "tracker_deleter", func(duty core.Duty) (time.Time, bool) {
d, ok := deadlineFunc(duty)
return d.Add(time.Minute), ok // Delete duties after deadline+1min.
})

trackFrom, err := calculateTrackerDelay(ctx, ethCl, time.Now())
if err != nil {
return err
}

trackr := tracker.New(analyser, deleter, peers, trackFrom)

sched.SubscribeDuties(trackr.SchedulerEvent)
fetcher.Subscribe(trackr.FetcherEvent)
cons.Subscribe(trackr.ConsensusEvent)
vapi.Subscribe(trackr.ValidatorAPIEvent)
parSigDB.SubscribeInternal(trackr.ParSigDBInternalEvent)
parSigDB.SubscribeThreshold(trackr.ParSigDBThresholdEvent)
parSigEx.Subscribe(trackr.ParSigExEvent)
sigAgg.Subscribe(trackr.SigAggEvent)

life.RegisterStart(lifecycle.AsyncBackground, lifecycle.StartTracker, lifecycle.HookFunc(trackr.Run))

return nil
}

// newTrackerV2 creates a new tracker2 instance.
func newTrackerV2(ctx context.Context, life *lifecycle.Manager, deadlineFunc func(duty core.Duty) (time.Time, bool),
// newTracker creates and starts a new tracker instance.
func newTracker(ctx context.Context, life *lifecycle.Manager, deadlineFunc func(duty core.Duty) (time.Time, bool),
peers []p2p.Peer, eth2Cl eth2wrap.Client,
) (core.Tracker, error) {
slotDuration, err := eth2Cl.SlotDuration(ctx)
if err != nil {
return nil, err
}

analyser := core.NewDeadliner(ctx, "tracker2_analyser", func(duty core.Duty) (time.Time, bool) {
analyser := core.NewDeadliner(ctx, "tracker_analyser", func(duty core.Duty) (time.Time, bool) {
d, ok := deadlineFunc(duty)
return d.Add(slotDuration), ok // Add one slot delay to analyser to capture duty expired errors.
})
deleter := core.NewDeadliner(ctx, "tracker2_deleter", func(duty core.Duty) (time.Time, bool) {
deleter := core.NewDeadliner(ctx, "tracker_deleter", func(duty core.Duty) (time.Time, bool) {
d, ok := deadlineFunc(duty)
return d.Add(time.Minute), ok // Delete duties after deadline+1min.
})
Expand All @@ -612,7 +565,7 @@ func newTrackerV2(ctx context.Context, life *lifecycle.Manager, deadlineFunc fun
return nil, err
}

track := tracker2.New(analyser, deleter, peers, trackFrom)
track := tracker.New(analyser, deleter, peers, trackFrom)
life.RegisterStart(lifecycle.AsyncBackground, lifecycle.StartTracker, lifecycle.HookFunc(track.Run))

return track, nil
Expand Down
5 changes: 0 additions & 5 deletions app/featureset/featureset.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ const (

// HerumiBLS enables usage of the Herumi BLS12-381 implementation, rather than Kryptology.
HerumiBLS Feature = "herumi_bls"

// TrackerV2 enables support of tracker2.
// TODO(dhruv): remove this featureflag once we are more confident about tracker2.
TrackerV2 Feature = "tracker_v2"
)

var (
Expand All @@ -55,7 +51,6 @@ var (
MockAlpha: statusAlpha,
RelayDiscovery: statusStable,
HerumiBLS: statusBeta,
TrackerV2: statusAlpha,
// Add all features and there status here.
}

Expand Down
Loading

0 comments on commit bcfa517

Please sign in to comment.