forked from absmach/magistrala
-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
129 lines (106 loc) · 3.39 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package notifiers
import (
"context"
"fmt"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
)
var (
// ErrMessage indicates an error converting a message to Mainflux message.
ErrMessage = errors.New("failed to convert to Mainflux message")
)
// Service reprents a notification service.
type Service interface {
// CreateSubscription persists a subscription.
// Successful operation is indicated by non-nil error response.
CreateSubscription(ctx context.Context, token string, sub Subscription) (string, error)
// ViewSubscription retrieves the subscription for the given user and id.
ViewSubscription(ctx context.Context, token, id string) (Subscription, error)
// ListSubscriptions lists subscriptions having the provided user token and search params.
ListSubscriptions(ctx context.Context, token string, pm PageMetadata) (Page, error)
// RemoveSubscription removes the subscription having the provided identifier.
RemoveSubscription(ctx context.Context, token, id string) error
consumers.Consumer
}
var _ Service = (*notifierService)(nil)
type notifierService struct {
auth mainflux.AuthServiceClient
subs SubscriptionsRepository
idp mainflux.IDProvider
notifier Notifier
from string
}
// New instantiates the subscriptions service implementation.
func New(auth mainflux.AuthServiceClient, subs SubscriptionsRepository, idp mainflux.IDProvider, notifier Notifier, from string) Service {
return ¬ifierService{
auth: auth,
subs: subs,
idp: idp,
notifier: notifier,
from: from,
}
}
func (ns *notifierService) CreateSubscription(ctx context.Context, token string, sub Subscription) (string, error) {
res, err := ns.auth.Identify(ctx, &mainflux.Token{Value: token})
if err != nil {
return "", err
}
sub.ID, err = ns.idp.ID()
if err != nil {
return "", err
}
sub.OwnerID = res.GetId()
return ns.subs.Save(ctx, sub)
}
func (ns *notifierService) ViewSubscription(ctx context.Context, token, id string) (Subscription, error) {
if _, err := ns.auth.Identify(ctx, &mainflux.Token{Value: token}); err != nil {
return Subscription{}, err
}
return ns.subs.Retrieve(ctx, id)
}
func (ns *notifierService) ListSubscriptions(ctx context.Context, token string, pm PageMetadata) (Page, error) {
if _, err := ns.auth.Identify(ctx, &mainflux.Token{Value: token}); err != nil {
return Page{}, err
}
return ns.subs.RetrieveAll(ctx, pm)
}
func (ns *notifierService) RemoveSubscription(ctx context.Context, token, id string) error {
if _, err := ns.auth.Identify(ctx, &mainflux.Token{Value: token}); err != nil {
return err
}
return ns.subs.Remove(ctx, id)
}
func (ns *notifierService) Consume(message interface{}) error {
msg, ok := message.(messaging.Message)
if !ok {
return ErrMessage
}
topic := msg.Channel
if msg.Subtopic != "" {
topic = fmt.Sprintf("%s.%s", msg.Channel, msg.Subtopic)
}
pm := PageMetadata{
Topic: topic,
Offset: 0,
Limit: -1,
}
page, err := ns.subs.RetrieveAll(context.Background(), pm)
if err != nil {
return err
}
var to []string
for _, sub := range page.Subscriptions {
to = append(to, sub.Contact)
}
if len(to) > 0 {
err := ns.notifier.Notify(ns.from, to, msg)
if err != nil {
return errors.Wrap(ErrNotify, err)
}
}
return nil
}