/
stream.go
109 lines (90 loc) · 3.22 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package stream
import (
"context"
"github.com/deixis/spine/contextutil"
)
type MsgHandler func(context.Context, []byte) error
type Stream interface {
Pub
Sub
// Start does the initialisation work to bootstrap a Stream adapter.
// For example, this function may open a connection, start an event loop, etc.
Start(ctx context.Context) error
// Drain signals to the Stream client/server that inbound messages should
// no longer be accepted, but outbound messages can still be delivered.
Drain()
// Close closes the client/server for both inbound/outbound messages
Close() error
}
// Pub is the publish interface
type Pub interface {
// Publish publishes data to the channel ch
// Publish(ctx context.Context, ch string, data []byte) error
Publish(ctx context.Context, ch string, data []byte) error
}
// Sub is the subscribe interface
type Sub interface {
// Subscribe subscribes the message handler h to the channel ch.
// All subscriptions with the same q will form a queue group.
// Each message will be delivered to only one subscriber per queue group.
// Subscribe(q, ch string, h MsgHandler) error
Subscribe(q, ch string, f MsgHandler, opts ...SubscriptionOption) (Subscription, error)
}
// Subscription represents a subscription to the streaming platform
type Subscription interface {
// Unsubscribe removes interest in the subscription.
// For durables, it means that the durable interest is also removed from
// the server. Restarting a durable with the same name will not resume
// the subscription, it will be considered a new one.
Unsubscribe() error
}
type SubscriptionOption interface{}
type contextKey struct{}
var activePubContextKey = contextKey{}
// PubFromContext returns a `Pub` instance associated with `ctx`, or
// the local `Pub` if no instance could be found.
func PubFromContext(ctx contextutil.ValueContext) Pub {
val := ctx.Value(activePubContextKey)
if o, ok := val.(Pub); ok {
return o
}
return NopStream()
}
// PubWithContext returns a copy of parent in which the `Pub` is stored
func PubWithContext(ctx context.Context, pub Pub) context.Context {
return context.WithValue(ctx, activePubContextKey, pub)
}
var activeSubContextKey = contextKey{}
// SubFromContext returns a `Sub` instance associated with `ctx`, or
// the local `Sub` if no instance could be found.
func SubFromContext(ctx contextutil.ValueContext) Sub {
val := ctx.Value(activeSubContextKey)
if o, ok := val.(Sub); ok {
return o
}
return NopStream()
}
// SubWithContext returns a copy of parent in which the `Sub` is stored
func SubWithContext(ctx context.Context, sub Sub) context.Context {
return context.WithValue(ctx, activeSubContextKey, sub)
}
var activeStreamContextKey = contextKey{}
// FromContext returns a `Stream` instance associated with `ctx`, or
// the local `Sub` if no instance could be found.
func FromContext(ctx contextutil.ValueContext) Stream {
val := ctx.Value(activeStreamContextKey)
if o, ok := val.(Stream); ok {
return o
}
return NopStream()
}
// WithContext returns a copy of parent in which `Stream`, `Pub`, and `Sub` are stored
func WithContext(ctx context.Context, ps Stream) context.Context {
return PubWithContext(
SubWithContext(
context.WithValue(ctx, activeStreamContextKey, ps),
ps,
),
ps,
)
}