diff --git a/app/common.go b/app/common.go index cb11b7278..54f07227c 100644 --- a/app/common.go +++ b/app/common.go @@ -37,6 +37,7 @@ type Environment struct { Topics []EnvTopic Queues []EnvQueue QueueAttributeDefaults EnvQueueAttributes + RandomLatency RandomLatency } var CurrentEnvironment Environment @@ -57,3 +58,8 @@ type ErrorResponse struct { Result ErrorResult `xml:"Error"` RequestId string `xml:"RequestId"` } + +type RandomLatency struct { + Min int + Max int +} diff --git a/app/conf/goaws.yaml b/app/conf/goaws.yaml index 675cd1fa2..3c6e1b812 100644 --- a/app/conf/goaws.yaml +++ b/app/conf/goaws.yaml @@ -37,6 +37,9 @@ Local: # Environment name that can be passed on the TopicArn: arn:aws:sns:us-east-1:000000000000:my_topic FilterPolicy: '{"event": ["my_event"]}' Raw: true + RandomLatency: # Parameters for introducing random latency into message queuing + Min: 0 # Desired latency in milliseconds, if min and max are zero, no latency will be applied. + Max: 0 # Desired latency in milliseconds Dev: # Another environment Host: localhost diff --git a/app/gosqs/gosqs.go b/app/gosqs/gosqs.go index d51f5e1cb..f7f828175 100644 --- a/app/gosqs/gosqs.go +++ b/app/gosqs/gosqs.go @@ -174,6 +174,7 @@ func SendMessage(w http.ResponseWriter, req *http.Request) { msg.MD5OfMessageBody = common.GetMD5Hash(messageBody) msg.Uuid, _ = common.NewUUID() msg.GroupID = messageGroupID + msg.SentTime = time.Now() app.SyncQueues.Lock() fifoSeqNumber := "" @@ -300,6 +301,7 @@ func SendMessageBatch(w http.ResponseWriter, req *http.Request) { msg.MD5OfMessageBody = common.GetMD5Hash(sendEntry.MessageBody) msg.GroupID = sendEntry.MessageGroupId msg.Uuid, _ = common.NewUUID() + msg.SentTime = time.Now() app.SyncQueues.Lock() fifoSeqNumber := "" if app.SyncQueues.Queues[queueName].IsFIFO { @@ -414,6 +416,9 @@ func ReceiveMessage(w http.ResponseWriter, req *http.Request) { uuid, _ := common.NewUUID() msg := &app.SyncQueues.Queues[queueName].Messages[i] + if !msg.IsReadyForReceipt() { + continue + } msg.ReceiptHandle = msg.Uuid + "#" + uuid msg.ReceiptTime = time.Now().UTC() msg.VisibilityTimeout = time.Now().Add(time.Duration(app.SyncQueues.Queues[queueName].TimeoutSecs) * time.Second) diff --git a/app/sqs.go b/app/sqs.go index 491493dcc..c244db933 100644 --- a/app/sqs.go +++ b/app/sqs.go @@ -1,6 +1,10 @@ package app import ( + "errors" + "fmt" + log "github.com/sirupsen/logrus" + "math/rand" "strconv" "strings" "sync" @@ -32,6 +36,35 @@ type Message struct { Retry int MessageAttributes map[string]MessageAttributeValue GroupID string + SentTime time.Time +} + +func (m *Message) IsReadyForReceipt() bool { + randomLatency, err := getRandomLatency() + if err != nil { + log.Error(err) + return true + } + return m.SentTime.Add(randomLatency).Before(time.Now()) +} + +func getRandomLatency() (time.Duration, error){ + min := CurrentEnvironment.RandomLatency.Min + max := CurrentEnvironment.RandomLatency.Max + if min == 0 && max == 0 { + return time.Duration(0), nil + } + var randomLatencyValue int + if max == min { + randomLatencyValue = max + } else { + randomLatencyValue = rand.Intn(max-min) + min + } + randomDuration, err := time.ParseDuration(fmt.Sprintf("%dms", randomLatencyValue)) + if err != nil { + return time.Duration(0), errors.New(fmt.Sprintf("Error parsing random latency value: %dms", randomLatencyValue)) + } + return randomDuration, nil } type MessageAttributeValue struct { diff --git a/app/sqs_test.go b/app/sqs_test.go new file mode 100644 index 000000000..73b31949c --- /dev/null +++ b/app/sqs_test.go @@ -0,0 +1,19 @@ +package app + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestMessage_IsReadyForReceipt(t *testing.T) { + CurrentEnvironment.RandomLatency.Min = 100 + CurrentEnvironment.RandomLatency.Max = 100 + msg := Message{ + SentTime: time.Now(), + } + assert.False(t, msg.IsReadyForReceipt()) + duration, _ := time.ParseDuration("105ms") + time.Sleep(duration) + assert.True(t, msg.IsReadyForReceipt()) +}