-
Notifications
You must be signed in to change notification settings - Fork 663
/
service.go
167 lines (140 loc) · 4.33 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package notifiers
import (
"context"
"fmt"
"github.com/absmach/magistrala"
"github.com/absmach/magistrala/consumers"
"github.com/absmach/magistrala/pkg/errors"
"github.com/absmach/magistrala/pkg/messaging"
)
// ErrMessage indicates an error converting a message to Magistrala message.
var ErrMessage = errors.New("failed to convert to Magistrala message")
var _ consumers.AsyncConsumer = (*notifierService)(nil)
// Service reprents a notification service.
type Service interface {
// CreateSubscription persists a subscription.
// Successful operation is indicated by non-nil error response.
CreateSubscription(ctx context.Context, token string, sub Subscription) (string, error)
// ViewSubscription retrieves the subscription for the given user and id.
ViewSubscription(ctx context.Context, token, id string) (Subscription, error)
// ListSubscriptions lists subscriptions having the provided user token and search params.
ListSubscriptions(ctx context.Context, token string, pm PageMetadata) (Page, error)
// RemoveSubscription removes the subscription having the provided identifier.
RemoveSubscription(ctx context.Context, token, id string) error
consumers.BlockingConsumer
}
var _ Service = (*notifierService)(nil)
type notifierService struct {
auth magistrala.AuthServiceClient
subs SubscriptionsRepository
idp magistrala.IDProvider
notifier Notifier
errCh chan error
from string
}
// New instantiates the subscriptions service implementation.
func New(auth magistrala.AuthServiceClient, subs SubscriptionsRepository, idp magistrala.IDProvider, notifier Notifier, from string) Service {
return ¬ifierService{
auth: auth,
subs: subs,
idp: idp,
notifier: notifier,
errCh: make(chan error, 1),
from: from,
}
}
func (ns *notifierService) CreateSubscription(ctx context.Context, token string, sub Subscription) (string, error) {
res, err := ns.auth.Identify(ctx, &magistrala.IdentityReq{Token: token})
if err != nil {
return "", err
}
sub.ID, err = ns.idp.ID()
if err != nil {
return "", err
}
sub.OwnerID = res.GetId()
return ns.subs.Save(ctx, sub)
}
func (ns *notifierService) ViewSubscription(ctx context.Context, token, id string) (Subscription, error) {
if _, err := ns.auth.Identify(ctx, &magistrala.IdentityReq{Token: token}); err != nil {
return Subscription{}, err
}
return ns.subs.Retrieve(ctx, id)
}
func (ns *notifierService) ListSubscriptions(ctx context.Context, token string, pm PageMetadata) (Page, error) {
if _, err := ns.auth.Identify(ctx, &magistrala.IdentityReq{Token: token}); err != nil {
return Page{}, err
}
return ns.subs.RetrieveAll(ctx, pm)
}
func (ns *notifierService) RemoveSubscription(ctx context.Context, token, id string) error {
if _, err := ns.auth.Identify(ctx, &magistrala.IdentityReq{Token: token}); err != nil {
return err
}
return ns.subs.Remove(ctx, id)
}
func (ns *notifierService) ConsumeBlocking(ctx context.Context, message interface{}) error {
msg, ok := message.(*messaging.Message)
if !ok {
return ErrMessage
}
topic := msg.GetChannel()
if msg.GetSubtopic() != "" {
topic = fmt.Sprintf("%s.%s", msg.GetChannel(), msg.GetSubtopic())
}
pm := PageMetadata{
Topic: topic,
Offset: 0,
Limit: -1,
}
page, err := ns.subs.RetrieveAll(ctx, pm)
if err != nil {
return err
}
var to []string
for _, sub := range page.Subscriptions {
to = append(to, sub.Contact)
}
if len(to) > 0 {
err := ns.notifier.Notify(ns.from, to, msg)
if err != nil {
return errors.Wrap(ErrNotify, err)
}
}
return nil
}
func (ns *notifierService) ConsumeAsync(ctx context.Context, message interface{}) {
msg, ok := message.(*messaging.Message)
if !ok {
ns.errCh <- ErrMessage
return
}
topic := msg.GetChannel()
if msg.GetSubtopic() != "" {
topic = fmt.Sprintf("%s.%s", msg.GetChannel(), msg.GetSubtopic())
}
pm := PageMetadata{
Topic: topic,
Offset: 0,
Limit: -1,
}
page, err := ns.subs.RetrieveAll(ctx, pm)
if err != nil {
ns.errCh <- err
return
}
var to []string
for _, sub := range page.Subscriptions {
to = append(to, sub.Contact)
}
if len(to) > 0 {
if err := ns.notifier.Notify(ns.from, to, msg); err != nil {
ns.errCh <- errors.Wrap(ErrNotify, err)
}
}
}
func (ns *notifierService) Errors() <-chan error {
return ns.errCh
}