Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move mailer to use a queue #9789

Merged
merged 7 commits into from Jan 16, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 17 additions & 7 deletions modules/setting/queue.go
Expand Up @@ -103,11 +103,11 @@ func NewQueueService() {

// Now handle the old issue_indexer configuration
section := Cfg.Section("queue.issue_indexer")
issueIndexerSectionMap := map[string]string{}
sectionMap := map[string]bool{}
for _, key := range section.Keys() {
issueIndexerSectionMap[key.Name()] = key.Value()
sectionMap[key.Name()] = true
}
if _, ok := issueIndexerSectionMap["TYPE"]; !ok {
if _, ok := sectionMap["TYPE"]; !ok {
switch Indexer.IssueQueueType {
case LevelQueueType:
section.Key("TYPE").SetValue("level")
Expand All @@ -120,18 +120,28 @@ func NewQueueService() {
Indexer.IssueQueueType)
}
}
if _, ok := issueIndexerSectionMap["LENGTH"]; !ok {
if _, ok := sectionMap["LENGTH"]; !ok {
section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength))
}
if _, ok := issueIndexerSectionMap["BATCH_LENGTH"]; !ok {
if _, ok := sectionMap["BATCH_LENGTH"]; !ok {
section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
}
if _, ok := issueIndexerSectionMap["DATADIR"]; !ok {
if _, ok := sectionMap["DATADIR"]; !ok {
section.Key("DATADIR").SetValue(Indexer.IssueQueueDir)
}
if _, ok := issueIndexerSectionMap["CONN_STR"]; !ok {
if _, ok := sectionMap["CONN_STR"]; !ok {
section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr)
}

// Handle the old mailer configuration
section = Cfg.Section("queue.mailer")
sectionMap = map[string]bool{}
for _, key := range section.Keys() {
sectionMap[key.Name()] = true
}
if _, ok := sectionMap["LENGTH"]; !ok {
section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
}
}

// ParseQueueConnStr parses a queue connection string
Expand Down
2 changes: 1 addition & 1 deletion services/mailer/mail.go
Expand Up @@ -51,7 +51,7 @@ func InitMailRender(subjectTpl *texttmpl.Template, bodyTpl *template.Template) {

// SendTestMail sends a test mail
func SendTestMail(email string) error {
return gomail.Send(Sender, NewMessage([]string{email}, "Gitea Test Email!", "Gitea Test Email!").Message)
return gomail.Send(Sender, NewMessage([]string{email}, "Gitea Test Email!", "Gitea Test Email!").ToMessage())
}

// SendUserMail sends a mail to the user
Expand Down
29 changes: 15 additions & 14 deletions services/mailer/mail_test.go
Expand Up @@ -61,11 +61,11 @@ func TestComposeIssueCommentMessage(t *testing.T) {
msgs := composeIssueCommentMessages(&mailCommentContext{Issue: issue, Doer: doer, ActionType: models.ActionCommentIssue,
Content: "test body", Comment: comment}, tos, false, "issue comment")
assert.Len(t, msgs, 2)

mailto := msgs[0].GetHeader("To")
subject := msgs[0].GetHeader("Subject")
inreplyTo := msgs[0].GetHeader("In-Reply-To")
references := msgs[0].GetHeader("References")
gomailMsg := msgs[0].ToMessage()
mailto := gomailMsg.GetHeader("To")
subject := gomailMsg.GetHeader("Subject")
inreplyTo := gomailMsg.GetHeader("In-Reply-To")
references := gomailMsg.GetHeader("References")

assert.Len(t, mailto, 1, "exactly one recipient is expected in the To field")
assert.Equal(t, "Re: ", subject[0][:4], "Comment reply subject should contain Re:")
Expand Down Expand Up @@ -96,14 +96,15 @@ func TestComposeIssueMessage(t *testing.T) {
Content: "test body"}, tos, false, "issue create")
assert.Len(t, msgs, 2)

mailto := msgs[0].GetHeader("To")
subject := msgs[0].GetHeader("Subject")
messageID := msgs[0].GetHeader("Message-ID")
gomailMsg := msgs[0].ToMessage()
mailto := gomailMsg.GetHeader("To")
subject := gomailMsg.GetHeader("Subject")
messageID := gomailMsg.GetHeader("Message-ID")

assert.Len(t, mailto, 1, "exactly one recipient is expected in the To field")
assert.Equal(t, "[user2/repo1] @user2 #1 - issue1", subject[0])
assert.Nil(t, msgs[0].GetHeader("In-Reply-To"))
assert.Nil(t, msgs[0].GetHeader("References"))
assert.Nil(t, gomailMsg.GetHeader("In-Reply-To"))
assert.Nil(t, gomailMsg.GetHeader("References"))
assert.Equal(t, messageID[0], "<user2/repo1/issues/1@localhost>", "Message-ID header doesn't match")
}

Expand Down Expand Up @@ -134,9 +135,9 @@ func TestTemplateSelection(t *testing.T) {
InitMailRender(stpl, btpl)

expect := func(t *testing.T, msg *Message, expSubject, expBody string) {
subject := msg.GetHeader("Subject")
subject := msg.ToMessage().GetHeader("Subject")
msgbuf := new(bytes.Buffer)
_, _ = msg.WriteTo(msgbuf)
_, _ = msg.ToMessage().WriteTo(msgbuf)
wholemsg := msgbuf.String()
assert.Equal(t, []string{expSubject}, subject)
assert.Contains(t, wholemsg, expBody)
Expand Down Expand Up @@ -188,9 +189,9 @@ func TestTemplateServices(t *testing.T) {
msg := testComposeIssueCommentMessage(t, &mailCommentContext{Issue: issue, Doer: doer, ActionType: actionType,
Content: "test body", Comment: comment}, tos, fromMention, "TestTemplateServices")

subject := msg.GetHeader("Subject")
subject := msg.ToMessage().GetHeader("Subject")
msgbuf := new(bytes.Buffer)
_, _ = msg.WriteTo(msgbuf)
_, _ = msg.ToMessage().WriteTo(msgbuf)
wholemsg := msgbuf.String()

assert.Equal(t, []string{expSubject}, subject)
Expand Down
90 changes: 59 additions & 31 deletions services/mailer/mailer.go
Expand Up @@ -18,7 +18,9 @@ import (
"time"

"code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"

"github.com/jaytaylor/html2text"
Expand All @@ -27,38 +29,63 @@ import (

// Message mail body and log info
type Message struct {
Info string // Message information for log purpose.
*gomail.Message
Info string // Message information for log purpose.
FromAddress string
FromDisplayName string
To []string
Subject string
Date time.Time
Body string
Headers map[string][]string
}

// NewMessageFrom creates new mail message object with custom From header.
func NewMessageFrom(to []string, fromDisplayName, fromAddress, subject, body string) *Message {
log.Trace("NewMessageFrom (body):\n%s", body)

// ToMessage converts a Message to gomail.Message
func (m *Message) ToMessage() *gomail.Message {
msg := gomail.NewMessage()
msg.SetAddressHeader("From", fromAddress, fromDisplayName)
msg.SetHeader("To", to...)
msg.SetAddressHeader("From", m.FromAddress, m.FromDisplayName)
msg.SetHeader("To", m.To...)
for header := range m.Headers {
msg.SetHeader(header, m.Headers[header]...)
}

if len(setting.MailService.SubjectPrefix) > 0 {
msg.SetHeader("Subject", setting.MailService.SubjectPrefix+" "+subject)
msg.SetHeader("Subject", setting.MailService.SubjectPrefix+" "+m.Subject)
} else {
msg.SetHeader("Subject", subject)
msg.SetHeader("Subject", m.Subject)
}
msg.SetDateHeader("Date", time.Now())
msg.SetDateHeader("Date", m.Date)
msg.SetHeader("X-Auto-Response-Suppress", "All")

plainBody, err := html2text.FromString(body)
plainBody, err := html2text.FromString(m.Body)
if err != nil || setting.MailService.SendAsPlainText {
if strings.Contains(base.TruncateString(body, 100), "<html>") {
if strings.Contains(base.TruncateString(m.Body, 100), "<html>") {
log.Warn("Mail contains HTML but configured to send as plain text.")
}
msg.SetBody("text/plain", plainBody)
} else {
msg.SetBody("text/plain", plainBody)
msg.AddAlternative("text/html", body)
msg.AddAlternative("text/html", m.Body)
}
return msg
}

// SetHeader adds additional headers to a message
func (m *Message) SetHeader(field string, value ...string) {
m.Headers[field] = value
}

// NewMessageFrom creates new mail message object with custom From header.
func NewMessageFrom(to []string, fromDisplayName, fromAddress, subject, body string) *Message {
log.Trace("NewMessageFrom (body):\n%s", body)

return &Message{
Message: msg,
FromAddress: fromAddress,
FromDisplayName: fromDisplayName,
To: to,
Subject: subject,
Date: time.Now(),
Body: body,
Headers: map[string][]string{},
}
}

Expand Down Expand Up @@ -257,18 +284,7 @@ func (s *dummySender) Send(from string, to []string, msg io.WriterTo) error {
return nil
}

func processMailQueue() {
for msg := range mailQueue {
log.Trace("New e-mail sending request %s: %s", msg.GetHeader("To"), msg.Info)
if err := gomail.Send(Sender, msg.Message); err != nil {
log.Error("Failed to send emails %s: %s - %v", msg.GetHeader("To"), msg.Info, err)
} else {
log.Trace("E-mails sent %s: %s", msg.GetHeader("To"), msg.Info)
}
}
}

var mailQueue chan *Message
var mailQueue queue.Queue

// Sender sender for sending mail synchronously
var Sender gomail.Sender
Expand All @@ -291,22 +307,34 @@ func NewContext() {
Sender = &dummySender{}
}

mailQueue = make(chan *Message, setting.MailService.QueueLength)
go processMailQueue()
mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) {
for _, datum := range data {
msg := datum.(*Message)
gomailMsg := msg.ToMessage()
log.Trace("New e-mail sending request %s: %s", gomailMsg.GetHeader("To"), msg.Info)
if err := gomail.Send(Sender, gomailMsg); err != nil {
log.Error("Failed to send emails %s: %s - %v", gomailMsg.GetHeader("To"), msg.Info, err)
} else {
log.Trace("E-mails sent %s: %s", gomailMsg.GetHeader("To"), msg.Info)
}
}
}, &Message{})

go graceful.GetManager().RunWithShutdownFns(mailQueue.Run)
}

// SendAsync send mail asynchronously
func SendAsync(msg *Message) {
go func() {
mailQueue <- msg
_ = mailQueue.Push(msg)
}()
}

// SendAsyncs send mails asynchronously
func SendAsyncs(msgs []*Message) {
go func() {
for _, msg := range msgs {
mailQueue <- msg
_ = mailQueue.Push(msg)
}
}()
}