forked from smartcontractkit/chainlink
/
delegate.go
112 lines (96 loc) · 2.79 KB
/
delegate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package keeper
import (
"github.com/pkg/errors"
"gorm.io/gorm"
"github.com/DCMMC/chainlink/core/chains/evm"
"github.com/DCMMC/chainlink/core/internal/gethwrappers/generated/keeper_registry_wrapper"
"github.com/DCMMC/chainlink/core/logger"
"github.com/DCMMC/chainlink/core/services/bulletprooftxmanager"
"github.com/DCMMC/chainlink/core/services/job"
"github.com/DCMMC/chainlink/core/services/pipeline"
)
// To make sure Delegate struct implements job.Delegate interface
var _ job.Delegate = (*Delegate)(nil)
type transmitter interface {
CreateEthTransaction(db *gorm.DB, newTx bulletprooftxmanager.NewTx) (etx bulletprooftxmanager.EthTx, err error)
}
type Delegate struct {
logger logger.Logger
db *gorm.DB
jrm job.ORM
pr pipeline.Runner
chainSet evm.ChainSet
}
// NewDelegate is the constructor of Delegate
func NewDelegate(
db *gorm.DB,
jrm job.ORM,
pr pipeline.Runner,
logger logger.Logger,
chainSet evm.ChainSet,
) *Delegate {
return &Delegate{
logger: logger,
db: db,
jrm: jrm,
pr: pr,
chainSet: chainSet,
}
}
// JobType returns job type
func (d *Delegate) JobType() job.Type {
return job.Keeper
}
func (Delegate) AfterJobCreated(spec job.Job) {}
func (Delegate) BeforeJobDeleted(spec job.Job) {}
func (d *Delegate) ServicesForSpec(spec job.Job) (services []job.Service, err error) {
// TODO: we need to fill these out manually, find a better fix
spec.PipelineSpec.JobName = spec.Name.ValueOrZero()
spec.PipelineSpec.JobID = spec.ID
if spec.KeeperSpec == nil {
return nil, errors.Errorf("Delegate expects a *job.KeeperSpec to be present, got %v", spec)
}
chain, err := d.chainSet.Get(spec.KeeperSpec.EVMChainID.ToInt())
if err != nil {
return nil, err
}
contractAddress := spec.KeeperSpec.ContractAddress
contract, err := keeper_registry_wrapper.NewKeeperRegistry(
contractAddress.Address(),
chain.Client(),
)
if err != nil {
return nil, errors.Wrap(err, "unable to create keeper registry contract wrapper")
}
strategy := bulletprooftxmanager.NewQueueingTxStrategy(spec.ExternalJobID, chain.Config().KeeperDefaultTransactionQueueDepth(), false)
orm := NewORM(d.db, chain.TxManager(), chain.Config(), strategy)
svcLogger := d.logger.With(
"jobID", spec.ID,
"registryAddress", contractAddress.Hex(),
)
registrySynchronizer := NewRegistrySynchronizer(
spec,
contract,
orm,
d.jrm,
chain.LogBroadcaster(),
chain.Config().KeeperRegistrySyncInterval(),
chain.Config().KeeperMinimumRequiredConfirmations(),
svcLogger,
chain.Config().KeeperRegistrySyncUpkeepQueueSize(),
)
upkeepExecuter := NewUpkeepExecuter(
spec,
orm,
d.pr,
chain.Client(),
chain.HeadBroadcaster(),
chain.TxManager().GetGasEstimator(),
svcLogger,
chain.Config(),
)
return []job.Service{
registrySynchronizer,
upkeepExecuter,
}, nil
}