This repository has been archived by the owner on May 13, 2022. It is now read-only.
/
execution_events_server.go
214 lines (192 loc) · 6.66 KB
/
execution_events_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package rpcevents
import (
"context"
"fmt"
"io"
"github.com/hyperledger/burrow/bcm"
"github.com/hyperledger/burrow/event"
"github.com/hyperledger/burrow/event/query"
"github.com/hyperledger/burrow/execution/exec"
"github.com/hyperledger/burrow/logging"
)
const SubscribeBufferSize = 100
type Provider interface {
// Get transactions
IterateStreamEvents(start, end *exec.StreamKey, consumer func(*exec.StreamEvent) error) (err error)
// Get a particular TxExecution by hash
TxByHash(txHash []byte) (*exec.TxExecution, error)
}
type executionEventsServer struct {
eventsProvider Provider
emitter *event.Emitter
tip bcm.BlockchainInfo
logger *logging.Logger
}
func NewExecutionEventsServer(eventsProvider Provider, emitter *event.Emitter,
tip bcm.BlockchainInfo, logger *logging.Logger) ExecutionEventsServer {
return &executionEventsServer{
eventsProvider: eventsProvider,
emitter: emitter,
tip: tip,
logger: logger.WithScope("NewExecutionEventsServer"),
}
}
func (ees *executionEventsServer) Tx(ctx context.Context, request *TxRequest) (*exec.TxExecution, error) {
txe, err := ees.eventsProvider.TxByHash(request.TxHash)
if err != nil {
return nil, err
}
if txe != nil {
return txe, nil
}
if !request.Wait {
return nil, fmt.Errorf("transaction with hash %v not found in state", request.TxHash)
}
subID := event.GenSubID()
out, err := ees.emitter.Subscribe(ctx, subID, exec.QueryForTxExecution(request.TxHash), SubscribeBufferSize)
if err != nil {
return nil, err
}
defer ees.emitter.UnsubscribeAll(ctx, subID)
for msg := range out {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return msg.(*exec.TxExecution), nil
}
}
return nil, fmt.Errorf("subscription waiting for tx %v ended prematurely", request.TxHash)
}
func (ees *executionEventsServer) Stream(request *BlocksRequest, stream ExecutionEvents_StreamServer) error {
qry, err := query.NewOrEmpty(request.Query)
if err != nil {
return fmt.Errorf("could not parse TxExecution query: %v", err)
}
return ees.streamEvents(stream.Context(), request.BlockRange, func(ev *exec.StreamEvent) error {
if qry.Matches(ev.Tagged()) {
return stream.Send(ev)
}
return nil
})
}
func (ees *executionEventsServer) Events(request *BlocksRequest, stream ExecutionEvents_EventsServer) error {
qry, err := query.NewOrEmpty(request.Query)
if err != nil {
return fmt.Errorf("could not parse Event query: %v", err)
}
var response *EventsResponse
var stack exec.TxStack
return ees.streamEvents(stream.Context(), request.BlockRange, func(sev *exec.StreamEvent) error {
switch {
case sev.BeginBlock != nil:
response = &EventsResponse{
Height: sev.BeginBlock.Height,
}
case sev.EndBlock != nil && len(response.Events) > 0:
return stream.Send(response)
default:
// We need to consume transaction to exclude events belong to an exceptional transaction
txe := stack.Consume(sev)
if txe != nil && txe.Exception == nil {
for _, ev := range txe.Events {
if qry.Matches(ev.Tagged()) {
response.Events = append(response.Events, ev)
}
}
}
}
return nil
})
}
func (ees *executionEventsServer) streamEvents(ctx context.Context, blockRange *BlockRange,
consumer func(execution *exec.StreamEvent) error) error {
// Converts the bounds to half-open interval needed
start, end, streaming := blockRange.Bounds(ees.tip.LastBlockHeight())
ees.logger.TraceMsg("Streaming blocks", "start", start, "end", end, "streaming", streaming)
// Pull blocks from state and receive the upper bound (exclusive) on the what we were able to send
// Set this to start since it will be the start of next streaming batch (if needed)
start, err := ees.iterateStreamEvents(start, end, consumer)
// If we are not streaming and all blocks requested were retrieved from state then we are done
if !streaming && start >= end {
return err
}
return ees.subscribeBlockExecution(ctx, func(block *exec.BlockExecution) error {
streamEnd := block.Height
if streamEnd < start {
// We've managed to receive a block event we already processed directly from state above - wait for next block
return nil
}
finished := !streaming && streamEnd >= end
if finished {
// Truncate streamEnd to final end to get exactly the blocks we want from state
streamEnd = end
}
if start < streamEnd {
// This implies there are some blocks between the previous batchEnd (now start) and the current BlockExecution that
// we have not emitted so we will pull them from state. This can occur if a block is emitted during/after
// the initial streaming but before we have subscribed to block events or if we spill BlockExecutions
// when streaming them and need to catch up
_, err := ees.iterateStreamEvents(start, streamEnd, consumer)
if err != nil {
return err
}
}
if finished {
return io.EOF
}
for _, ev := range block.StreamEvents() {
err = consumer(ev)
if err != nil {
return err
}
}
// We've just streamed block so our next start marker is the next block
start = block.Height + 1
return nil
})
}
func (ees *executionEventsServer) subscribeBlockExecution(ctx context.Context,
consumer func(*exec.BlockExecution) error) (err error) {
// Otherwise we need to begin streaming blocks as they are produced
subID := event.GenSubID()
// Subscribe to BlockExecution events
out, err := ees.emitter.Subscribe(ctx, subID, exec.QueryForBlockExecution(), SubscribeBufferSize)
if err != nil {
return err
}
defer func() {
err = ees.emitter.UnsubscribeAll(context.Background(), subID)
for range out {
// flush
}
}()
for msg := range out {
select {
case <-ctx.Done():
return ctx.Err()
default:
err = consumer(msg.(*exec.BlockExecution))
if err != nil {
return err
}
}
}
return nil
}
func (ees *executionEventsServer) iterateStreamEvents(startHeight, endHeight uint64,
consumer func(*exec.StreamEvent) error) (uint64, error) {
// Assume that we have seen the previous block before start to have ended up here
// NOTE: this will underflow when start is 0 (as it often will be - and needs to be for restored chains)
// however we at most underflow by 1 and we always add 1 back on when returning so we get away with this.
lastHeightSeen := startHeight - 1
err := ees.eventsProvider.IterateStreamEvents(&exec.StreamKey{Height: startHeight}, &exec.StreamKey{Height: endHeight},
func(blockEvent *exec.StreamEvent) error {
if blockEvent.EndBlock != nil {
lastHeightSeen = blockEvent.EndBlock.GetHeight()
}
return consumer(blockEvent)
})
// Returns the appropriate _next_ starting block - the one after the one we have seen - from which to stream next
return lastHeightSeen + 1, err
}