-
Notifications
You must be signed in to change notification settings - Fork 16
/
conclude.go
146 lines (132 loc) · 4.66 KB
/
conclude.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Copyright 2020 - See NOTICE file for copyright holders.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package channel
import (
"context"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/pkg/errors"
"perun.network/go-perun/backend/ethereum/bindings/adjudicator"
"perun.network/go-perun/channel"
)
const secondaryWaitBlocks = 2
// ensureConcluded ensures that conclude or concludeFinal (for non-final and
// final states, resp.) is called on the adjudicator.
// - a subscription on Concluded events is established
// - it searches for a past concluded event
// - if found, channel is already concluded and success is returned
// - if none found, conclude/concludeFinal is called on the adjudicator
// - it waits for a Concluded event from the blockchain.
func (a *Adjudicator) ensureConcluded(ctx context.Context, req channel.AdjudicatorReq, subStates channel.StateMap) error {
// Listen for Concluded event.
watchOpts, err := a.NewWatchOpts(ctx)
if err != nil {
return errors.WithMessage(err, "creating watchOpts")
}
events := make(chan *adjudicator.AdjudicatorChannelUpdate)
sub, err := a.contract.WatchChannelUpdate(watchOpts, events, [][32]byte{req.Params.ID()})
if err != nil {
return errors.Wrap(err, "creating subscription failed")
}
defer sub.Unsubscribe()
if found, err := a.filterConcluded(ctx, req.Params.ID()); err != nil {
return errors.WithMessage(err, "filtering old Concluded events")
} else if found {
return nil
}
// In final Register calls, as the non-initiator, we optimistically wait for
// the other party to send the transaction first for secondaryWaitBlocks many
// blocks.
if req.Tx.IsFinal && req.Secondary {
isConcluded, err := waitConcludedForNBlocks(ctx, a, sub, events, secondaryWaitBlocks)
if err != nil {
return err
} else if isConcluded {
return nil
}
}
// No conclude event found in the past, send transaction.
if req.Tx.IsFinal {
err = errors.WithMessage(a.callConcludeFinal(ctx, req), "calling concludeFinal")
} else {
err = errors.WithMessage(a.callConclude(ctx, req, subStates), "calling conclude")
}
if IsErrTxFailed(err) {
a.log.Warn("Calling conclude(Final) failed, waiting for event anyways...")
} else if err != nil {
return err
}
select {
case <-events:
return nil
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "context cancelled")
case err = <-sub.Err():
return errors.Wrap(err, "subscription error")
}
}
// waitConcludedForNBlocks waits for up to numBlocks blocks for a Concluded
// event on the concluded channel. If an event is emitted, true is returned.
// Otherwise, if numBlocks blocks have passed, false is returned.
//
// cr is the ChainReader used for setting up a block header subscription. sub is
// the Concluded event subscription instance.
func waitConcludedForNBlocks(ctx context.Context,
cr ethereum.ChainReader,
sub ethereum.Subscription,
concluded chan *adjudicator.AdjudicatorChannelUpdate,
numBlocks int,
) (bool, error) {
h := make(chan *types.Header)
hsub, err := cr.SubscribeNewHead(ctx, h)
if err != nil {
return false, errors.Wrap(err, "subscribing to new blocks")
}
defer hsub.Unsubscribe()
for i := 0; i < numBlocks; i++ {
select {
case <-h: // do nothing, wait another block
case e := <-concluded: // other participant performed transaction
if e.Phase == phaseConcluded {
return true, nil
}
case <-ctx.Done():
return false, errors.Wrap(ctx.Err(), "context cancelled")
case err = <-hsub.Err():
return false, errors.Wrap(err, "header subscription error")
case err = <-sub.Err():
return false, errors.Wrap(err, "concluded subscription error")
}
}
return false, nil
}
// filterConcluded returns whether there has been a Concluded event in the past.
func (a *Adjudicator) filterConcluded(ctx context.Context, channelID channel.ID) (bool, error) {
filterOpts, err := a.NewFilterOpts(ctx)
if err != nil {
return false, err
}
iter, err := a.contract.FilterChannelUpdate(filterOpts, [][32]byte{channelID})
if err != nil {
return false, errors.Wrap(err, "creating iterator")
}
found := false
for iter.Next() {
if iter.Event.Phase == phaseConcluded {
found = true
break
}
}
return found, nil
}