-
Notifications
You must be signed in to change notification settings - Fork 3
/
service.go
108 lines (93 loc) · 2.47 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Package msgconsumer reads and sends SMS/Email messages from a repository.
package msgconsumer
import (
"context"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
auth "github.com/fmitra/authenticator"
)
// Consumer reads a message stream from a repository.
type Consumer interface {
Run(ctx context.Context) error
}
// Service consumes messages to be delivered in a parallel through
// goroutines.
type service struct {
logger log.Logger
smsLib auth.SMSer
emailLib auth.Emailer
totalWorkers int
messageRepo auth.MessageRepository
}
// Run retrieves recent messages from the repository and passes
// them into a channel to be consumed by goroutines.
func (s *service) Run(ctx context.Context) error {
msgc, errc := s.messageRepo.Recent(ctx)
s.startWorkers(ctx, msgc)
for {
select {
case err := <-errc:
return err
case <-ctx.Done():
return ctx.Err()
}
}
}
// startWorkers starts a finite number of workers to deliver messages found
// in the message queue.
func (s *service) startWorkers(ctx context.Context, msgc <-chan *auth.Message) {
for i := 0; i < s.totalWorkers; i++ {
go func() {
for msg := range msgc {
s.processMessage(ctx, msg)
}
}()
}
}
// processMessage delivers a message through email or SMS.
func (s *service) processMessage(ctx context.Context, msg *auth.Message) {
logger := log.With(
s.logger,
"source", "msgconsumer.processMessage",
"address", msg.Address,
"delivery", msg.Delivery,
"type", msg.Type,
"delivery_attempts", msg.DeliveryAttempts,
"expires_at", msg.ExpiresAt,
)
isExpired := time.Now().After(msg.ExpiresAt)
if isExpired {
level.Info(logger).Log("message", "dropping expired message")
return
}
var err error
if msg.Delivery == auth.Phone {
err = s.smsLib.SMS(ctx, msg.Address, msg.Content)
} else if msg.Delivery == auth.Email {
err = s.emailLib.Email(ctx, msg.Address, msg.Subject, msg.Content)
}
if err == nil {
level.Info(logger).Log("message", "message sent")
// Enable in config.json: api.debug
level.Debug(logger).Log(
"content", msg.Content,
"message", "message contents",
)
return
}
// Continue to retry the message until expiry.
level.Info(logger).Log("message", "retrying message", "error", err)
if err := s.messageRepo.Publish(ctx, msg); err != nil {
level.Info(logger).Log(
"message",
"failed to retry message",
"error",
err,
)
} else {
level.Info(logger).Log(
"message", "message sent back to queue",
)
}
}