-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
component.go
104 lines (90 loc) · 2.79 KB
/
component.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pubsub
import (
"context"
"io"
"testing"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
compv1pb "github.com/dapr/dapr/pkg/proto/components/v1"
)
// component is an implementation of the pubsub pluggable component
// interface.
type component struct {
impl pubsub.PubSub
pmrCh <-chan *compv1pb.PullMessagesResponse
}
func newComponent(t *testing.T, opts options) *component {
return &component{
impl: opts.pubsub,
pmrCh: opts.pmrCh,
}
}
func (c *component) Init(ctx context.Context, req *compv1pb.PubSubInitRequest) (*compv1pb.PubSubInitResponse, error) {
return new(compv1pb.PubSubInitResponse), c.impl.Init(ctx, pubsub.Metadata{
Base: metadata.Base{
Name: "pubsub.in-memory",
Properties: req.GetMetadata().GetProperties(),
},
})
}
func (c *component) Features(context.Context, *compv1pb.FeaturesRequest) (*compv1pb.FeaturesResponse, error) {
implF := c.impl.Features()
features := make([]string, len(implF))
for i, f := range implF {
features[i] = string(f)
}
return &compv1pb.FeaturesResponse{
Features: features,
}, nil
}
func (c *component) Publish(ctx context.Context, req *compv1pb.PublishRequest) (*compv1pb.PublishResponse, error) {
var contentType *string
if len(req.GetContentType()) != 0 {
contentType = &req.ContentType
}
err := c.impl.Publish(ctx, &pubsub.PublishRequest{
Data: req.GetData(),
PubsubName: req.GetPubsubName(),
Topic: req.GetTopic(),
Metadata: req.GetMetadata(),
ContentType: contentType,
})
if err != nil {
return nil, err
}
return new(compv1pb.PublishResponse), nil
}
func (c *component) BulkPublish(ctx context.Context, req *compv1pb.BulkPublishRequest) (*compv1pb.BulkPublishResponse, error) {
// TODO:
return new(compv1pb.BulkPublishResponse), nil
}
func (c *component) PullMessages(req compv1pb.PubSub_PullMessagesServer) error {
for {
select {
case pmr := <-c.pmrCh:
if err := req.Send(pmr); err != nil {
return err
}
case <-req.Context().Done():
return nil
}
}
}
func (c *component) Ping(ctx context.Context, req *compv1pb.PingRequest) (*compv1pb.PingResponse, error) {
return new(compv1pb.PingResponse), nil
}
func (c *component) Close() error {
return c.impl.(io.Closer).Close()
}