forked from flashmob/go-guerrilla
/
envelope.go
298 lines (267 loc) · 7.55 KB
/
envelope.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
package mail
import (
"bufio"
"bytes"
"crypto/md5"
"errors"
"fmt"
"io"
"mime"
"net/mail"
"net/textproto"
"strings"
"sync"
"time"
)
// A WordDecoder decodes MIME headers containing RFC 2047 encoded-words.
// Used by the MimeHeaderDecode function.
// It's exposed public so that an alternative decoder can be set, eg Gnu iconv
// by importing the mail/inconv package.
// Another alternative would be to use https://godoc.org/golang.org/x/text/encoding
var Dec mime.WordDecoder
func init() {
// use the default decoder, without Gnu inconv. Import the mail/inconv package to use iconv.
Dec = mime.WordDecoder{}
}
const maxHeaderChunk = 1 + (4 << 10) // 4KB
// Address encodes an email address of the form `<user@host>`
type Address struct {
// User is local part
User string
// Host is the domain
Host string
// ADL is at-domain list if matched
ADL []string
// PathParams contains any ESTMP parameters that were matched
PathParams [][]string
// NullPath is true if <> was received
NullPath bool
}
func (ep *Address) String() string {
return fmt.Sprintf("%s@%s", ep.User, ep.Host)
}
func (ep *Address) IsEmpty() bool {
return ep.User == "" && ep.Host == ""
}
var ap = mail.AddressParser{}
// NewAddress takes a string of an RFC 5322 address of the
// form "Gogh Fir <gf@example.com>" or "foo@example.com".
func NewAddress(str string) (Address, error) {
a, err := ap.Parse(str)
if err != nil {
return Address{}, err
}
pos := strings.Index(a.Address, "@")
if pos > 0 {
return Address{
User: a.Address[0:pos],
Host: a.Address[pos+1:],
},
nil
}
return Address{}, errors.New("invalid address")
}
// Envelope of Email represents a single SMTP message.
type Envelope struct {
// Remote IP address
RemoteIP string
// Message sent in EHLO command
Helo string
// Sender
MailFrom Address
// Recipients
RcptTo []Address
// Data stores the header and message body
Data bytes.Buffer
// Subject stores the subject of the email, extracted and decoded after calling ParseHeaders()
Subject string
// TLS is true if the email was received using a TLS connection
TLS bool
// Header stores the results from ParseHeaders()
Header textproto.MIMEHeader
// Values hold the values generated when processing the envelope by the backend
Values map[string]interface{}
// Hashes of each email on the rcpt
Hashes []string
// additional delivery header that may be added
DeliveryHeader string
// Email(s) will be queued with this id
QueuedId string
// When locked, it means that the envelope is being processed by the backend
sync.Mutex
}
func NewEnvelope(remoteAddr string, clientID uint64) *Envelope {
return &Envelope{
RemoteIP: remoteAddr,
Values: make(map[string]interface{}),
QueuedId: queuedID(clientID),
}
}
func queuedID(clientID uint64) string {
return fmt.Sprintf("%x", md5.Sum([]byte(string(time.Now().Unix())+string(clientID))))
}
// ParseHeaders parses the headers into Header field of the Envelope struct.
// Data buffer must be full before calling.
// It assumes that at most 30kb of email data can be a header
// Decoding of encoding to UTF is only done on the Subject, where the result is assigned to the Subject field
func (e *Envelope) ParseHeaders() error {
var err error
if e.Header != nil {
return errors.New("headers already parsed")
}
buf := e.Data.Bytes()
// find where the header ends, assuming that over 30 kb would be max
if len(buf) > maxHeaderChunk {
buf = buf[:maxHeaderChunk]
}
headerEnd := bytes.Index(buf, []byte{'\n', '\n'}) // the first two new-lines chars are the End Of Header
if headerEnd > -1 {
header := buf[0 : headerEnd+2]
headerReader := textproto.NewReader(bufio.NewReader(bytes.NewBuffer(header)))
e.Header, err = headerReader.ReadMIMEHeader()
if err == nil || err == io.EOF {
// decode the subject
if subject, ok := e.Header["Subject"]; ok {
e.Subject = MimeHeaderDecode(subject[0])
}
}
} else {
err = errors.New("header not found")
}
return err
}
// Len returns the number of bytes that would be in the reader returned by NewReader()
func (e *Envelope) Len() int {
return len(e.DeliveryHeader) + e.Data.Len()
}
// NewReader returns a new reader for reading the email contents, including the delivery headers
func (e *Envelope) NewReader() io.Reader {
return io.MultiReader(
strings.NewReader(e.DeliveryHeader),
bytes.NewReader(e.Data.Bytes()),
)
}
// String converts the email to string.
// Typically, you would want to use the compressor guerrilla.Processor for more efficiency, or use NewReader
func (e *Envelope) String() string {
return e.DeliveryHeader + e.Data.String()
}
// ResetTransaction is called when the transaction is reset (keeping the connection open)
func (e *Envelope) ResetTransaction() {
// ensure not processing by the backend, will only get lock if finished, otherwise block
e.Lock()
// got the lock, it means processing finished
e.Unlock()
e.MailFrom = Address{}
e.RcptTo = []Address{}
// reset the data buffer, keep it allocated
e.Data.Reset()
// todo: these are probably good candidates for buffers / use sync.Pool (after profiling)
e.Subject = ""
e.Header = nil
e.Hashes = make([]string, 0)
e.DeliveryHeader = ""
e.Values = make(map[string]interface{})
}
// Reseed is called when used with a new connection, once it's accepted
func (e *Envelope) Reseed(remoteIP string, clientID uint64) {
e.RemoteIP = remoteIP
e.QueuedId = queuedID(clientID)
e.Helo = ""
e.TLS = false
}
// PushRcpt adds a recipient email address to the envelope
func (e *Envelope) PushRcpt(addr Address) {
e.RcptTo = append(e.RcptTo, addr)
}
// PopRcpt removes the last email address that was pushed to the envelope
func (e *Envelope) PopRcpt() Address {
ret := e.RcptTo[len(e.RcptTo)-1]
e.RcptTo = e.RcptTo[:len(e.RcptTo)-1]
return ret
}
// MimeHeaderDecode converts 7 bit encoded mime header strings to UTF-8
func MimeHeaderDecode(str string) string {
state := 0
var buf bytes.Buffer
var out []byte
for i := 0; i < len(str); i++ {
switch state {
case 0:
if str[i] == '=' {
buf.WriteByte(str[i])
state = 1
} else {
out = append(out, str[i])
}
case 1:
if str[i] == '?' {
buf.WriteByte(str[i])
state = 2
} else {
out = append(out, str[i])
buf.Reset()
state = 0
}
case 2:
if str[i] == ' ' {
d, err := Dec.Decode(buf.String())
if err == nil {
out = append(out, []byte(d)...)
} else {
out = append(out, buf.Bytes()...)
}
out = append(out, ' ')
buf.Reset()
state = 0
} else {
buf.WriteByte(str[i])
}
}
}
if buf.Len() > 0 {
d, err := Dec.Decode(buf.String())
if err == nil {
out = append(out, []byte(d)...)
} else {
out = append(out, buf.Bytes()...)
}
}
return string(out)
}
// Envelopes have their own pool
type Pool struct {
// envelopes that are ready to be borrowed
pool chan *Envelope
// semaphore to control number of maximum borrowed envelopes
sem chan bool
}
func NewPool(poolSize int) *Pool {
return &Pool{
pool: make(chan *Envelope, poolSize),
sem: make(chan bool, poolSize),
}
}
func (p *Pool) Borrow(remoteAddr string, clientID uint64) *Envelope {
var e *Envelope
p.sem <- true // block the envelope until more room
select {
case e = <-p.pool:
e.Reseed(remoteAddr, clientID)
default:
e = NewEnvelope(remoteAddr, clientID)
}
return e
}
// Return returns an envelope back to the envelope pool
// Make sure that envelope finished processing before calling this
func (p *Pool) Return(e *Envelope) {
select {
case p.pool <- e:
//placed envelope back in pool
default:
// pool is full, discard it
}
// take a value off the semaphore to make room for more envelopes
<-p.sem
}