Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: migrate to tracker v2 #1853

Merged
merged 1 commit into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 10 additions & 57 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/obolnetwork/charon/core/scheduler"
"github.com/obolnetwork/charon/core/sigagg"
"github.com/obolnetwork/charon/core/tracker"
"github.com/obolnetwork/charon/core/tracker/tracker2"
"github.com/obolnetwork/charon/core/validatorapi"
"github.com/obolnetwork/charon/eth2util"
"github.com/obolnetwork/charon/p2p"
Expand Down Expand Up @@ -443,12 +442,6 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

err = wireTracker(ctx, life, deadlineFunc, peers, eth2Cl, sched,
fetch, cons, vapi, parSigDB, parSigEx, sigAgg)
if err != nil {
return err
}

err = wirePrioritise(ctx, conf, life, tcpNode, peerIDs, lock.Threshold,
sender.SendReceive, cons, sched, p2pKey, deadlineFunc, mutableConf)
if err != nil {
Expand All @@ -457,22 +450,15 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,

wireRecaster(sched, sigAgg, broadcaster)

track, err := newTracker(ctx, life, deadlineFunc, peers, eth2Cl)
if err != nil {
return err
}
opts := []core.WireOption{
core.WithTracing(),
core.WithTracking(track),
core.WithAsyncRetry(retryer),
}
if featureset.Enabled(featureset.TrackerV2) {
track, err := newTrackerV2(ctx, life, deadlineFunc, peers, eth2Cl)
if err != nil {
return err
}

opts = []core.WireOption{
core.WithTracing(),
core.WithTracking(track),
core.WithAsyncRetry(retryer),
}
}
core.Wire(sched, fetch, cons, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, opts...)

err = wireValidatorMock(conf, pubshares, sched)
Expand Down Expand Up @@ -556,53 +542,20 @@ func wireRecaster(sched core.Scheduler, sigAgg core.SigAgg, broadcaster core.Bro
recaster.Subscribe(broadcaster.Broadcast)
}

// wireTracker creates a new tracker instance and wires it to the components with "output events".
func wireTracker(ctx context.Context, life *lifecycle.Manager, deadlineFunc func(duty core.Duty) (time.Time, bool),
peers []p2p.Peer, ethCl eth2wrap.Client,
sched core.Scheduler, fetcher core.Fetcher, cons core.Consensus, vapi core.ValidatorAPI,
parSigDB core.ParSigDB, parSigEx core.ParSigEx, sigAgg core.SigAgg,
) error {
analyser := core.NewDeadliner(ctx, "tracker_analyser", deadlineFunc)
deleter := core.NewDeadliner(ctx, "tracker_deleter", func(duty core.Duty) (time.Time, bool) {
d, ok := deadlineFunc(duty)
return d.Add(time.Minute), ok // Delete duties after deadline+1min.
})

trackFrom, err := calculateTrackerDelay(ctx, ethCl, time.Now())
if err != nil {
return err
}

trackr := tracker.New(analyser, deleter, peers, trackFrom)

sched.SubscribeDuties(trackr.SchedulerEvent)
fetcher.Subscribe(trackr.FetcherEvent)
cons.Subscribe(trackr.ConsensusEvent)
vapi.Subscribe(trackr.ValidatorAPIEvent)
parSigDB.SubscribeInternal(trackr.ParSigDBInternalEvent)
parSigDB.SubscribeThreshold(trackr.ParSigDBThresholdEvent)
parSigEx.Subscribe(trackr.ParSigExEvent)
sigAgg.Subscribe(trackr.SigAggEvent)

life.RegisterStart(lifecycle.AsyncBackground, lifecycle.StartTracker, lifecycle.HookFunc(trackr.Run))

return nil
}

// newTrackerV2 creates a new tracker2 instance.
func newTrackerV2(ctx context.Context, life *lifecycle.Manager, deadlineFunc func(duty core.Duty) (time.Time, bool),
// newTracker creates and starts a new tracker instance.
func newTracker(ctx context.Context, life *lifecycle.Manager, deadlineFunc func(duty core.Duty) (time.Time, bool),
peers []p2p.Peer, eth2Cl eth2wrap.Client,
) (core.Tracker, error) {
slotDuration, err := eth2Cl.SlotDuration(ctx)
if err != nil {
return nil, err
}

analyser := core.NewDeadliner(ctx, "tracker2_analyser", func(duty core.Duty) (time.Time, bool) {
analyser := core.NewDeadliner(ctx, "tracker_analyser", func(duty core.Duty) (time.Time, bool) {
d, ok := deadlineFunc(duty)
return d.Add(slotDuration), ok // Add one slot delay to analyser to capture duty expired errors.
})
deleter := core.NewDeadliner(ctx, "tracker2_deleter", func(duty core.Duty) (time.Time, bool) {
deleter := core.NewDeadliner(ctx, "tracker_deleter", func(duty core.Duty) (time.Time, bool) {
d, ok := deadlineFunc(duty)
return d.Add(time.Minute), ok // Delete duties after deadline+1min.
})
Expand All @@ -612,7 +565,7 @@ func newTrackerV2(ctx context.Context, life *lifecycle.Manager, deadlineFunc fun
return nil, err
}

track := tracker2.New(analyser, deleter, peers, trackFrom)
track := tracker.New(analyser, deleter, peers, trackFrom)
life.RegisterStart(lifecycle.AsyncBackground, lifecycle.StartTracker, lifecycle.HookFunc(track.Run))

return track, nil
Expand Down
5 changes: 0 additions & 5 deletions app/featureset/featureset.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ const (

// HerumiBLS enables usage of the Herumi BLS12-381 implementation, rather than Kryptology.
HerumiBLS Feature = "herumi_bls"

// TrackerV2 enables support of tracker2.
// TODO(dhruv): remove this featureflag once we are more confident about tracker2.
TrackerV2 Feature = "tracker_v2"
)

var (
Expand All @@ -55,7 +51,6 @@ var (
MockAlpha: statusAlpha,
RelayDiscovery: statusStable,
HerumiBLS: statusBeta,
TrackerV2: statusAlpha,
// Add all features and there status here.
}

Expand Down
Loading