This repository has been archived by the owner on Aug 3, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
topic.go
131 lines (114 loc) · 3.97 KB
/
topic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package arrebato
import (
"context"
"errors"
"sort"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
topicsvc "github.com/davidsbond/arrebato/internal/proto/arrebato/topic/service/v1"
"github.com/davidsbond/arrebato/internal/proto/arrebato/topic/v1"
)
type (
// The Topic type describes a topic stored within the cluster.
Topic struct {
// The Name of the Topic.
Name string `json:"name"`
// The amount of time messages on the Topic will be stored.
MessageRetentionPeriod time.Duration `json:"messageRetentionPeriod"`
// The maximum age of a consumer index on a Topic before it is reset to zero.
ConsumerRetentionPeriod time.Duration `json:"consumerRetentionPeriod"`
// If true, any attempts to publish an unverified message onto this topic will fail.
RequireVerifiedMessages bool `json:"requireVerifiedMessages"`
}
)
var (
// ErrNoTopic is the error given when attempting to perform an operation against a topic that does not exist.
ErrNoTopic = errors.New("no topic")
// ErrTopicExists is the error given when attempting to create a topic that already exists.
ErrTopicExists = errors.New("topic exists")
)
// CreateTopic creates a new topic described by the provided Topic. Returns ErrTopicExists if the topic already
// exists.
func (c *Client) CreateTopic(ctx context.Context, t Topic) error {
svc := topicsvc.NewTopicServiceClient(c.cluster.leader())
_, err := svc.Create(ctx, &topicsvc.CreateRequest{
Topic: &topic.Topic{
Name: t.Name,
MessageRetentionPeriod: durationpb.New(t.MessageRetentionPeriod),
ConsumerRetentionPeriod: durationpb.New(t.ConsumerRetentionPeriod),
RequireVerifiedMessages: t.RequireVerifiedMessages,
},
})
switch {
case status.Code(err) == codes.AlreadyExists:
return ErrTopicExists
case status.Code(err) == codes.FailedPrecondition:
c.cluster.findLeader(ctx)
return c.CreateTopic(ctx, t)
case err != nil:
return err
default:
return nil
}
}
// Topic returns a named Topic. Returns ErrNoTopic if the topic does not exist.
func (c *Client) Topic(ctx context.Context, name string) (Topic, error) {
svc := topicsvc.NewTopicServiceClient(c.cluster.any())
resp, err := svc.Get(ctx, &topicsvc.GetRequest{
Name: name,
})
switch {
case status.Code(err) == codes.NotFound:
return Topic{}, ErrNoTopic
case err != nil:
return Topic{}, err
default:
return Topic{
Name: resp.GetTopic().GetName(),
MessageRetentionPeriod: resp.GetTopic().GetMessageRetentionPeriod().AsDuration(),
ConsumerRetentionPeriod: resp.GetTopic().GetConsumerRetentionPeriod().AsDuration(),
RequireVerifiedMessages: resp.GetTopic().GetRequireVerifiedMessages(),
}, nil
}
}
// Topics lists all topics stored in the cluster.
func (c *Client) Topics(ctx context.Context) ([]Topic, error) {
svc := topicsvc.NewTopicServiceClient(c.cluster.any())
resp, err := svc.List(ctx, &topicsvc.ListRequest{})
if err != nil {
return nil, err
}
topics := make([]Topic, len(resp.GetTopics()))
for i, tp := range resp.GetTopics() {
topics[i] = Topic{
Name: tp.GetName(),
MessageRetentionPeriod: tp.GetMessageRetentionPeriod().AsDuration(),
ConsumerRetentionPeriod: tp.GetConsumerRetentionPeriod().AsDuration(),
RequireVerifiedMessages: tp.GetRequireVerifiedMessages(),
}
}
sort.Slice(topics, func(i, j int) bool {
return topics[i].Name < topics[j].Name
})
return topics, nil
}
// DeleteTopic removes a named Topic from the cluster. Returns ErrNoTopic if the topic does not exist.
func (c *Client) DeleteTopic(ctx context.Context, name string) error {
svc := topicsvc.NewTopicServiceClient(c.cluster.leader())
_, err := svc.Delete(ctx, &topicsvc.DeleteRequest{
Name: name,
})
switch {
case status.Code(err) == codes.NotFound:
return ErrNoTopic
case status.Code(err) == codes.FailedPrecondition:
c.cluster.findLeader(ctx)
return c.DeleteTopic(ctx, name)
case err != nil:
return err
default:
return nil
}
}