forked from ava-labs/ortelius
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
72 lines (59 loc) · 2.06 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// (c) 2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package stream
import (
"errors"
"fmt"
"path"
"github.com/corpetty/ortelius/services"
kafkaMessage "github.com/segmentio/kafka-go"
)
var (
ErrUnknownVM = errors.New("unknown VM")
ErrInvalidTopicName = errors.New("invalid topic name")
ErrWrongTopicEventType = errors.New("wrong topic event type")
ErrWrongTopicNetworkID = errors.New("wrong topic networkID")
)
const (
EventTypeConsensus EventType = "consensus"
EventTypeDecisions EventType = "decisions"
)
type EventType string
// Message is a message on the event stream
type Message struct {
id string
chainID string
body []byte
timestamp int64
nanosecond int64
kafkaMessage *kafkaMessage.Message
}
func (m *Message) ID() string { return m.id }
func (m *Message) ChainID() string { return m.chainID }
func (m *Message) Body() []byte { return m.body }
func (m *Message) Timestamp() int64 { return m.timestamp }
func (m *Message) Nanosecond() int64 { return m.nanosecond }
func (m *Message) KafkaMessage() *kafkaMessage.Message { return m.kafkaMessage }
func NewMessage(id string,
chainID string,
body []byte,
timestamp int64,
nanosecond int64,
) services.Consumable {
return &Message{id: id, chainID: chainID, body: body, timestamp: timestamp, nanosecond: nanosecond}
}
func NewMessageWithKafka(id string,
chainID string,
body []byte,
timestamp int64,
nanosecond int64,
kafkaMessage *kafkaMessage.Message,
) services.Consumable {
return &Message{id: id, chainID: chainID, body: body, timestamp: timestamp, nanosecond: nanosecond, kafkaMessage: kafkaMessage}
}
func getSocketName(root string, networkID uint32, chainID string, eventType EventType) string {
return path.Join(root, fmt.Sprintf("%d-%s-%s", networkID, chainID, eventType))
}
func GetTopicName(networkID uint32, chainID string, eventType EventType) string {
return fmt.Sprintf("%d-%s-%s", networkID, chainID, eventType)
}