-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
strategies.go
159 lines (137 loc) · 4.51 KB
/
strategies.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package relayer
import (
"context"
"fmt"
tmservice "github.com/tendermint/tendermint/libs/service"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
)
var (
txEvents = "tm.event='Tx'"
blEvents = "tm.event='NewBlock'"
)
// Strategy defines
type Strategy interface {
GetType() string
HandleEvents(src, dst *Chain, srch, dsth int64, events map[string][]string)
UnrelayedSequences(src, dst *Chain) (*RelaySequences, error)
UnrelayedAcknowledgements(src, dst *Chain) (*RelaySequences, error)
RelayPackets(src, dst *Chain, sp *RelaySequences) error
RelayAcknowledgements(src, dst *Chain, sp *RelaySequences) error
}
// MustGetStrategy returns the strategy and panics on error
func (p *Path) MustGetStrategy() Strategy {
strategy, err := p.GetStrategy()
if err != nil {
panic(err)
}
return strategy
}
// GetStrategy the strategy defined in the relay messages
func (p *Path) GetStrategy() (Strategy, error) {
switch p.Strategy.Type {
case (&NaiveStrategy{}).GetType():
return &NaiveStrategy{}, nil
default:
return nil, fmt.Errorf("invalid strategy: %s", p.Strategy.Type)
}
}
// StrategyCfg defines which relaying strategy to take for a given path
type StrategyCfg struct {
Type string `json:"type" yaml:"type"`
}
// RunStrategy runs a given strategy
func RunStrategy(src, dst *Chain, strategy Strategy) (func(), error) {
doneChan := make(chan struct{})
// Fetch latest headers for each chain and store them in sync headers
// _, _, err := UpdateLightClients(src, dst)
// if err != nil {
// return nil, err
// }
// Next start the goroutine that listens to each chain for block and tx events
go relayerListenLoop(src, dst, doneChan, strategy)
// Fetch any unrelayed sequences depending on the channel order
sp, err := strategy.UnrelayedSequences(src, dst)
if err != nil {
return nil, err
}
if err = strategy.RelayPackets(src, dst, sp); err != nil {
return nil, err
}
// Return a function to stop the relayer goroutine
return func() { doneChan <- struct{}{} }, nil
}
func relayerListenLoop(src, dst *Chain, doneChan chan struct{}, strategy Strategy) {
var (
srcTxEvents, srcBlockEvents, dstTxEvents, dstBlockEvents <-chan ctypes.ResultEvent
srcTxCancel, srcBlockCancel, dstTxCancel, dstBlockCancel context.CancelFunc
err error
)
// Start client for source chain
if err = src.Start(); err != nil {
if err != tmservice.ErrAlreadyStarted {
src.Error(err)
return
}
}
// Subscibe to txEvents from the source chain
if srcTxEvents, srcTxCancel, err = src.Subscribe(txEvents); err != nil {
src.Error(err)
return
}
defer srcTxCancel()
src.Log(fmt.Sprintf("- listening to tx events from %s...", src.ChainID))
// Subscibe to blockEvents from the source chain
if srcBlockEvents, srcBlockCancel, err = src.Subscribe(blEvents); err != nil {
src.Error(err)
return
}
defer srcBlockCancel()
src.Log(fmt.Sprintf("- listening to block events from %s...", src.ChainID))
// Subscribe to destination chain
if err = dst.Start(); err != nil {
if err != tmservice.ErrAlreadyStarted {
dst.Error(err)
return
}
}
// Subscibe to txEvents from the destination chain
if dstTxEvents, dstTxCancel, err = dst.Subscribe(txEvents); err != nil {
dst.Error(err)
return
}
defer dstTxCancel()
dst.Log(fmt.Sprintf("- listening to tx events from %s...", dst.ChainID))
// Subscibe to blockEvents from the destination chain
if dstBlockEvents, dstBlockCancel, err = dst.Subscribe(blEvents); err != nil {
src.Error(err)
return
}
defer dstBlockCancel()
dst.Log(fmt.Sprintf("- listening to block events from %s...", dst.ChainID))
// Listen to channels and take appropriate action
var srch, dsth int64
for {
select {
case srcMsg := <-srcTxEvents:
src.logTx(srcMsg.Events)
go strategy.HandleEvents(dst, src, dsth, srch, srcMsg.Events)
case dstMsg := <-dstTxEvents:
dst.logTx(dstMsg.Events)
go strategy.HandleEvents(src, dst, srch, dsth, dstMsg.Events)
case srcMsg := <-srcBlockEvents:
bl, _ := srcMsg.Data.(tmtypes.EventDataNewBlock)
srch = bl.Block.Height
go strategy.HandleEvents(dst, src, dsth, srch, srcMsg.Events)
case dstMsg := <-dstBlockEvents:
bl, _ := dstMsg.Data.(tmtypes.EventDataNewBlock)
dsth = bl.Block.Height
go strategy.HandleEvents(src, dst, srch, dsth, dstMsg.Events)
case <-doneChan:
src.Log(fmt.Sprintf("- [%s]:{%s} <-> [%s]:{%s} relayer shutting down",
src.ChainID, src.PathEnd.PortID, dst.ChainID, dst.PathEnd.PortID))
close(doneChan)
return
}
}
}