/
transport.go
108 lines (91 loc) · 2.54 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package session
import (
"context"
"encoding/hex"
"fmt"
"github.com/binance-chain/tss-lib/tss"
"github.com/fury-labs/fury-bridge/relayer/broadcast"
"github.com/fury-labs/fury-bridge/relayer/mp_tss"
mp_tss_types "github.com/fury-labs/fury-bridge/relayer/mp_tss/types"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/sync/errgroup"
)
// SessionTransport is a transport for a specific session.
type SessionTransport struct {
broadcaster broadcast.Broadcaster
partyIDStore *mp_tss.PartyIDStore
sessionID mp_tss_types.AggregateSigningSessionID
participants []peer.ID
recvChan chan mp_tss.ReceivedPartyState
}
var _ mp_tss.Transporter = (*SessionTransport)(nil)
// Send sends a tss message with the provided routing data.
func (mt *SessionTransport) Send(
ctx context.Context,
data []byte,
routing *tss.MessageRouting,
isResharing bool,
) error {
log.Debugw(
"sending message",
"routing", routing,
)
// TODO: Implement broadcast resharing
if isResharing {
return nil
}
msg := mp_tss_types.NewSigningPartMessage(mt.sessionID, data, routing.IsBroadcast)
if routing.IsBroadcast {
return mt.broadcaster.BroadcastMessage(
ctx,
&msg,
mt.participants,
30,
)
}
// Point to point concurrently
// TODO: Might not be necessary, routing.To may only consist of one peer.
g, ctx := errgroup.WithContext(ctx)
for _, to := range routing.To {
peerID, found := mt.partyIDStore.GetPeerID(to)
if !found {
return fmt.Errorf(
"peer %s not found (key %v), this may happen if the party IDs do not match the key savedata",
to, hex.EncodeToString(to.Key),
)
}
g.Go(func() error {
err := mt.broadcaster.BroadcastMessage(
ctx,
&msg,
[]peer.ID{peerID},
30,
)
if err != nil {
return fmt.Errorf("failed to send message to peer: %w", err)
}
return nil
})
}
return g.Wait()
}
// Receive returns the channel for receiving messages from other parties. This
// can also be used to add messages to be handled from other parties.
func (mt *SessionTransport) Receive() chan mp_tss.ReceivedPartyState {
return mt.recvChan
}
// NewSessionTransport returns a new Transporter using reliable broadcast.
func NewSessionTransport(
broadcaster broadcast.Broadcaster,
sessionID mp_tss_types.AggregateSigningSessionID,
partyIDStore *mp_tss.PartyIDStore,
participants []peer.ID,
) mp_tss.Transporter {
return &SessionTransport{
broadcaster: broadcaster,
sessionID: sessionID,
partyIDStore: partyIDStore,
participants: participants,
recvChan: make(chan mp_tss.ReceivedPartyState, 1),
}
}