-
Notifications
You must be signed in to change notification settings - Fork 10
/
pubsub.go
85 lines (75 loc) · 2.88 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package pubsub
import (
"context"
"github.com/aperturerobotics/bifrost/link"
"github.com/aperturerobotics/bifrost/peer"
"github.com/aperturerobotics/controllerbus/controller"
"github.com/libp2p/go-libp2p/core/crypto"
)
// PubSub is an implementation of a pub-sub message router.
// The PubSub controller provides a common implementation for pub-sub routers.
// The PubSub interface declares the requirements for a router.
// The router is constructed with a private key which is used for communications.
// Each subscription also has a private key to identify the publisher/subscriber.
// Publishing is performed by first subscribing and then publishing to the subscription.
type PubSub interface {
// Execute executes the PubSub routines.
Execute(ctx context.Context) error
// AddPeerStream adds a negotiated peer stream.
// Two streams will be negotiated, one outgoing, one incoming.
// The pubsub should communicate over the stream.
AddPeerStream(tpl PeerLinkTuple, initiator bool, mstrm link.MountedStream)
// AddSubscription adds a channel subscription, returning a subscription handle.
AddSubscription(ctx context.Context, privKey crypto.PrivKey, channelID string) (Subscription, error)
// Close closes the pubsub.
Close()
}
// PubSubHandler manages a PubSub and receives event callbacks.
// This is typically fulfilled by the PubSub controller.
type PubSubHandler interface {
}
// PeerLinkTuple is the peer-id link-id tuple.
type PeerLinkTuple struct {
PeerID peer.ID
LinkID uint64
}
// NewPeerLinkTuple constructs a new peer link tuple.
func NewPeerLinkTuple(lnk link.Link) PeerLinkTuple {
return PeerLinkTuple{
PeerID: lnk.GetRemotePeer(),
LinkID: lnk.GetUUID(),
}
}
// Message is a pubsub message.
type Message interface {
// GetFrom returns the peer ID of the sender.
GetFrom() peer.ID
// GetAuthenticated indicates if the signature is valid.
GetAuthenticated() bool
// GetData returns the message data.
GetData() []byte
}
// Controller is a PubSub controller.
type Controller interface {
// Controller is the controllerbus controller interface.
controller.Controller
// GetPubSub returns the controlled PubSub router.
// This may wait for the PubSub to be ready.
GetPubSub(ctx context.Context) (PubSub, error)
}
// Subscription is a pubsub channel subscription handle.
type Subscription interface {
// GetPeerId returns the peer ID for this subscription derived from private key.
GetPeerId() peer.ID
// GetChannelId returns the channel id.
GetChannelId() string
// Publish writes to the channel using the subscription's private key.
Publish(data []byte) error
// AddHandler adds a callback that is called with each received message.
// The callback should not block.
// Returns a remove function.
// The handler(s) are also removed when the subscription is released.
AddHandler(cb func(m Message)) func()
// Release releases the subscription handle, clearing the handlers.
Release()
}