-
Notifications
You must be signed in to change notification settings - Fork 1
/
events.go
106 lines (89 loc) · 2.97 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Original source: github.com/micro/go-micro/v3/events/events.go
// Package events is for event streaming and storage
package events
import (
"encoding/json"
"errors"
"time"
)
var (
// DefaultStream is the default events stream implementation
DefaultStream Stream
// DefaultStore is the default events store implementation
DefaultStore Store
)
var (
// ErrMissingTopic is returned if a blank topic was provided to publish
ErrMissingTopic = errors.New("Missing topic")
// ErrEncodingMessage is returned from publish if there was an error encoding the message option
ErrEncodingMessage = errors.New("Error encoding message")
)
// Stream is an event streaming interface
type Stream interface {
Publish(topic string, msg interface{}, opts ...PublishOption) error
Consume(topic string, opts ...ConsumeOption) (<-chan Event, error)
}
// Store is an event store interface
type Store interface {
Read(topic string, opts ...ReadOption) ([]*Event, error)
Write(event *Event, opts ...WriteOption) error
}
type AckFunc func() error
type NackFunc func() error
// Event is the object returned by the broker when you subscribe to a topic
type Event struct {
// ID to uniquely identify the event
ID string
// Topic of event, e.g. "registry.service.created"
Topic string
// Timestamp of the event
Timestamp time.Time
// Metadata contains the values the event was indexed by
Metadata map[string]string
// Payload contains the encoded message
Payload []byte
ackFunc AckFunc
nackFunc NackFunc
}
// Unmarshal the events message into an object
func (e *Event) Unmarshal(v interface{}) error {
return json.Unmarshal(e.Payload, v)
}
// Ack acknowledges successful processing of the event in ManualAck mode
func (e *Event) Ack() error {
return e.ackFunc()
}
func (e *Event) SetAckFunc(f AckFunc) {
e.ackFunc = f
}
// Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode
func (e *Event) Nack() error {
return e.nackFunc()
}
func (e *Event) SetNackFunc(f NackFunc) {
e.nackFunc = f
}
// Publish an event to a topic
func Publish(topic string, msg interface{}, opts ...PublishOption) error {
return DefaultStream.Publish(topic, msg, opts...)
}
// Consume to events
func Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) {
return DefaultStream.Consume(topic, opts...)
}
// Read events for a topic
func Read(topic string, opts ...ReadOption) ([]*Event, error) {
return DefaultStore.Read(topic, opts...)
}