Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Fixes issue #101 - Gateway Timeout deadlock. (#102)
* Fixes issue #101 - Gateway Timeout deadlock.

* update tests
  • Loading branch information
flashmob committed Mar 8, 2018
1 parent 6f895c6 commit 5980d3e
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 55 deletions.
4 changes: 4 additions & 0 deletions Makefile
Expand Up @@ -24,6 +24,10 @@ dependencies:
guerrillad: *.go */*.go */*/*.go
$(GO_VARS) $(GO) build -o="guerrillad" -ldflags="$(LD_FLAGS)" $(ROOT)/cmd/guerrillad

guerrilladrace: *.go */*.go */*/*.go
$(GO_VARS) $(GO) build -o="guerrillad" -race -ldflags="$(LD_FLAGS)" $(ROOT)/cmd/guerrillad


test: *.go */*.go */*/*.go
$(GO_VARS) $(GO) test -v .
$(GO_VARS) $(GO) test -v ./tests
Expand Down
15 changes: 4 additions & 11 deletions api_test.go
Expand Up @@ -16,16 +16,14 @@ import (

// Test Starting smtp without setting up logger / backend
func TestSMTP(t *testing.T) {
done := make(chan bool)
go func() {
select {
case <-time.After(time.Second * 40):
//buf := make([]byte, 1<<16)
//stackSize := runtime.Stack(buf, true)
//fmt.Printf("%s\n", string(buf[0:stackSize]))
//panic("timeout")
t.Error("timeout")
return

case <-done:
return
}
}()

Expand All @@ -52,6 +50,7 @@ func TestSMTP(t *testing.T) {
}
time.Sleep(time.Second * 2)
d.Shutdown()
done <- true

}

Expand Down Expand Up @@ -414,25 +413,19 @@ func talkToServer(address string) {
}
in := bufio.NewReader(conn)
str, err := in.ReadString('\n')
// fmt.Println(str)
fmt.Fprint(conn, "HELO maildiranasaurustester\r\n")
str, err = in.ReadString('\n')
// fmt.Println(str)
fmt.Fprint(conn, "MAIL FROM:<test@example.com>r\r\n")
str, err = in.ReadString('\n')
// fmt.Println(str)
fmt.Fprint(conn, "RCPT TO:test@grr.la\r\n")
str, err = in.ReadString('\n')
// fmt.Println(str)
fmt.Fprint(conn, "DATA\r\n")
str, err = in.ReadString('\n')
// fmt.Println(str)
fmt.Fprint(conn, "Subject: Test subject\r\n")
fmt.Fprint(conn, "\r\n")
fmt.Fprint(conn, "A an email body\r\n")
fmt.Fprint(conn, ".\r\n")
str, err = in.ReadString('\n')
// fmt.Println(str)
_ = str
}

Expand Down
32 changes: 21 additions & 11 deletions backends/gateway.go
Expand Up @@ -139,14 +139,20 @@ func (gw *BackendGateway) Process(e *mail.Envelope) Result {
// or timeout
select {
case status := <-workerMsg.notifyMe:
defer workerMsgPool.Put(workerMsg) // can be recycled since we used the notifyMe channel
workerMsgPool.Put(workerMsg) // can be recycled since we used the notifyMe channel
if status.err != nil {
return NewResult(response.Canned.FailBackendTransaction + status.err.Error())
}
return NewResult(response.Canned.SuccessMessageQueued + status.queuedID)

case <-time.After(gw.saveTimeout()):
Log().Error("Backend has timed out while saving eamil")
Log().Error("Backend has timed out while saving email")
e.Lock() // lock the envelope - it's still processing here, we don't want the server to recycle it
go func() {
// keep waiting for the backend to finish processing
<-workerMsg.notifyMe
e.Unlock()
workerMsgPool.Put(workerMsg)
}()
return NewResult(response.Canned.FailBackendTimeout)
}
}
Expand All @@ -169,13 +175,20 @@ func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
// or timeout
select {
case status := <-workerMsg.notifyMe:
workerMsgPool.Put(workerMsg)
if status.err != nil {
return status.err
}
return nil

case <-time.After(gw.validateRcptTimeout()):
Log().Error("Backend has timed out while validating rcpt")
e.Lock()
go func() {
<-workerMsg.notifyMe
e.Unlock()
workerMsgPool.Put(workerMsg)
Log().Error("Backend has timed out while validating rcpt")
}()
return StorageTimeout
}
}
Expand Down Expand Up @@ -260,7 +273,7 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
gw.Lock()
defer gw.Unlock()
if gw.State != BackendStateNew && gw.State != BackendStateShuttered {
return errors.New("Can only Initialize in BackendStateNew or BackendStateShuttered state")
return errors.New("can only Initialize in BackendStateNew or BackendStateShuttered state")
}
err := gw.loadConfig(cfg)
if err != nil {
Expand All @@ -270,7 +283,7 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
workersSize := gw.workersSize()
if workersSize < 1 {
gw.State = BackendStateError
return errors.New("Must have at least 1 worker")
return errors.New("must have at least 1 worker")
}
gw.processors = make([]Processor, 0)
gw.validators = make([]Processor, 0)
Expand Down Expand Up @@ -403,7 +416,6 @@ func (gw *BackendGateway) workDispatcher(

if state == dispatcherStateWorking {
msg.notifyMe <- &notifyMsg{err: errors.New("storage failed")}
msg.e.Unlock()
}
state = dispatcherStatePanic
return
Expand All @@ -421,14 +433,13 @@ func (gw *BackendGateway) workDispatcher(
Log().Infof("stop signal for worker (#%d)", workerId)
return
case msg = <-workIn:
msg.e.Lock()
state = dispatcherStateWorking
state = dispatcherStateWorking // recovers from panic if in this state
if msg.task == TaskSaveMail {
// process the email here
result, _ := save.Process(msg.e, TaskSaveMail)
state = dispatcherStateNotify
if result.Code() < 300 {
// if all good, let the gateway know that it was queued
// if all good, let the gateway know that it was saved
msg.notifyMe <- &notifyMsg{nil, msg.e.QueuedId}
} else {
// notify the gateway about the error
Expand All @@ -445,7 +456,6 @@ func (gw *BackendGateway) workDispatcher(
msg.notifyMe <- &notifyMsg{err: nil}
}
}
msg.e.Unlock()
}
state = dispatcherStateIdle
}
Expand Down
14 changes: 14 additions & 0 deletions backends/p_debugger.go
Expand Up @@ -3,6 +3,7 @@ package backends
import (
"github.com/flashmob/go-guerrilla/mail"
"strings"
"time"
)

// ----------------------------------------------------------------------------------
Expand All @@ -24,6 +25,7 @@ func init() {

type debuggerConfig struct {
LogReceivedMails bool `json:"log_received_mails"`
SleepSec int `json:"sleep_seconds,omitempty"`
}

func Debugger() Decorator {
Expand All @@ -45,6 +47,18 @@ func Debugger() Decorator {
Log().Infof("Mail from: %s / to: %v", e.MailFrom.String(), e.RcptTo)
Log().Info("Headers are:", e.Header)
}

if config.SleepSec > 0 {
Log().Infof("sleeping for %d", config.SleepSec)
time.Sleep(time.Second * time.Duration(config.SleepSec))
Log().Infof("woke up")

if config.SleepSec == 1 {
panic("panic on purpose")
}

}

// continue to the next Processor in the decorator stack
return p.Process(e, task)
} else {
Expand Down
6 changes: 3 additions & 3 deletions backends/p_guerrilla_db_redis.go
Expand Up @@ -30,7 +30,7 @@ import (
// ----------------------------------------------------------------------------------
func init() {
processors["guerrillaredisdb"] = func() Decorator {
return GuerrillaDbReddis()
return GuerrillaDbRedis()
}
}

Expand Down Expand Up @@ -352,9 +352,9 @@ func (c *redisClient) redisConnection(redisInterface string) (err error) {

type feedChan chan []interface{}

// GuerrillaDbReddis is a specialized processor for Guerrilla mail. It is here as an example.
// GuerrillaDbRedis is a specialized processor for Guerrilla mail. It is here as an example.
// It's an example of a 'monolithic' processor.
func GuerrillaDbReddis() Decorator {
func GuerrillaDbRedis() Decorator {

g := GuerrillaDBAndRedisBackend{}
redisClient := &redisClient{}
Expand Down
4 changes: 3 additions & 1 deletion goguerrilla.conf.sample
Expand Up @@ -13,7 +13,9 @@
"log_received_mails": true,
"save_workers_size": 1,
"save_process" : "HeadersParser|Header|Debugger",
"primary_mail_host" : "mail.example.com"
"primary_mail_host" : "mail.example.com",
"gw_save_timeout" : "30s",
"gw_val_rcpt_timeout" : "3s"
},
"servers" : [
{
Expand Down
41 changes: 12 additions & 29 deletions mail/envelope.go
Expand Up @@ -104,7 +104,7 @@ func queuedID(clientID uint64) string {
func (e *Envelope) ParseHeaders() error {
var err error
if e.Header != nil {
return errors.New("Headers already parsed")
return errors.New("headers already parsed")
}
buf := bytes.NewBuffer(e.Data.Bytes())
// find where the header ends, assuming that over 30 kb would be max
Expand Down Expand Up @@ -153,6 +153,12 @@ func (e *Envelope) String() string {

// ResetTransaction is called when the transaction is reset (keeping the connection open)
func (e *Envelope) ResetTransaction() {

// ensure not processing by the backend, will only get lock if finished, otherwise block
e.Lock()
// got the lock, it means processing finished
e.Unlock()

e.MailFrom = Address{}
e.RcptTo = []Address{}
// reset the data buffer, keep it allocated
Expand Down Expand Up @@ -318,36 +324,13 @@ func (p *Pool) Borrow(remoteAddr string, clientID uint64) *Envelope {
}

// Return returns an envelope back to the envelope pool
// Note that an envelope will not be recycled while it still is
// processing
// Make sure that envelope finished processing before calling this
func (p *Pool) Return(e *Envelope) {
// we down't want to recycle an envelope that may still be processing
isUnlocked := func() <-chan bool {
signal := make(chan bool)
// make sure envelope finished processing
go func() {
// lock will block if still processing
e.Lock()
// got the lock, it means processing finished
e.Unlock()
// generate a signal
signal <- true
}()
return signal
}()

select {
case <-time.After(time.Second * 30):
// envelope still processing, we can't recycle it.
case <-isUnlocked:
// The envelope was _unlocked_, it finished processing
// put back in the pool or destroy
select {
case p.pool <- e:
//placed envelope back in pool
default:
// pool is full, don't return
}
case p.pool <- e:
//placed envelope back in pool
default:
// pool is full, discard it
}
// take a value off the semaphore to make room for more envelopes
<-p.sem
Expand Down

0 comments on commit 5980d3e

Please sign in to comment.