-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bounce.go
146 lines (125 loc) · 3.18 KB
/
bounce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package bounce
import (
"errors"
"log"
"time"
"github.com/ghostdevv/listmonk-tweaked/internal/bounce/mailbox"
"github.com/ghostdevv/listmonk-tweaked/internal/bounce/webhooks"
"github.com/ghostdevv/listmonk-tweaked/models"
"github.com/jmoiron/sqlx"
)
const (
// subID is the identifying subscriber ID header to look for in
// bounced e-mails.
subID = "X-Listmonk-Subscriber"
campID = "X-Listmonk-Campaign"
)
// Mailbox represents a POP/IMAP mailbox client that can scan messages and pass
// them to a given channel.
type Mailbox interface {
Scan(limit int, ch chan models.Bounce) error
}
// Opt represents bounce processing options.
type Opt struct {
MailboxEnabled bool `json:"mailbox_enabled"`
MailboxType string `json:"mailbox_type"`
Mailbox mailbox.Opt `json:"mailbox"`
WebhooksEnabled bool `json:"webhooks_enabled"`
SESEnabled bool `json:"ses_enabled"`
SendgridEnabled bool `json:"sendgrid_enabled"`
SendgridKey string `json:"sendgrid_key"`
Postmark struct {
Enabled bool
Username string
Password string
}
RecordBounceCB func(models.Bounce) error
}
// Manager handles e-mail bounces.
type Manager struct {
queue chan models.Bounce
mailbox Mailbox
SES *webhooks.SES
Sendgrid *webhooks.Sendgrid
Postmark *webhooks.Postmark
queries *Queries
opt Opt
log *log.Logger
}
// Queries contains the queries.
type Queries struct {
DB *sqlx.DB
RecordQuery *sqlx.Stmt
}
// New returns a new instance of the bounce manager.
func New(opt Opt, q *Queries, lo *log.Logger) (*Manager, error) {
m := &Manager{
opt: opt,
queries: q,
queue: make(chan models.Bounce, 1000),
log: lo,
}
// Is there a mailbox?
if opt.MailboxEnabled {
switch opt.MailboxType {
case "pop":
m.mailbox = mailbox.NewPOP(opt.Mailbox)
default:
return nil, errors.New("unknown bounce mailbox type")
}
}
if opt.WebhooksEnabled {
if opt.SESEnabled {
m.SES = webhooks.NewSES()
}
if opt.SendgridEnabled {
sg, err := webhooks.NewSendgrid(opt.SendgridKey)
if err != nil {
lo.Printf("error initializing sendgrid webhooks: %v", err)
} else {
m.Sendgrid = sg
}
}
if opt.Postmark.Enabled {
m.Postmark = webhooks.NewPostmark(opt.Postmark.Username, opt.Postmark.Password)
}
}
return m, nil
}
// Run is a blocking function that listens for bounce events from webhooks and or mailboxes
// and executes them on the DB.
func (m *Manager) Run() {
if m.opt.MailboxEnabled {
go m.runMailboxScanner()
}
for {
select {
case b, ok := <-m.queue:
if !ok {
return
}
if b.CreatedAt.IsZero() {
b.CreatedAt = time.Now()
}
if err := m.opt.RecordBounceCB(b); err != nil {
continue
}
}
}
}
// runMailboxScanner runs a blocking loop that scans the mailbox at given intervals.
func (m *Manager) runMailboxScanner() {
for {
if err := m.mailbox.Scan(1000, m.queue); err != nil {
m.log.Printf("error scanning bounce mailbox: %v", err)
}
time.Sleep(m.opt.Mailbox.ScanInterval)
}
}
// Record records a new bounce event given the subscriber's email or UUID.
func (m *Manager) Record(b models.Bounce) error {
select {
case m.queue <- b:
}
return nil
}