Skip to content

Commit

Permalink
Merge pull request #198 from andykuszyk/random-latency-for-sqs
Browse files Browse the repository at this point in the history
Added random latency to SQS receives.
  • Loading branch information
p4tin committed Mar 1, 2020
2 parents 8b6ddf5 + 8939f8d commit 6ecd9bf
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 0 deletions.
6 changes: 6 additions & 0 deletions app/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Environment struct {
Topics []EnvTopic
Queues []EnvQueue
QueueAttributeDefaults EnvQueueAttributes
RandomLatency RandomLatency
}

var CurrentEnvironment Environment
Expand All @@ -57,3 +58,8 @@ type ErrorResponse struct {
Result ErrorResult `xml:"Error"`
RequestId string `xml:"RequestId"`
}

type RandomLatency struct {
Min int
Max int
}
3 changes: 3 additions & 0 deletions app/conf/goaws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ Local: # Environment name that can be passed on the
TopicArn: arn:aws:sns:us-east-1:000000000000:my_topic
FilterPolicy: '{"event": ["my_event"]}'
Raw: true
RandomLatency: # Parameters for introducing random latency into message queuing
Min: 0 # Desired latency in milliseconds, if min and max are zero, no latency will be applied.
Max: 0 # Desired latency in milliseconds

Dev: # Another environment
Host: localhost
Expand Down
5 changes: 5 additions & 0 deletions app/gosqs/gosqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func SendMessage(w http.ResponseWriter, req *http.Request) {
msg.MD5OfMessageBody = common.GetMD5Hash(messageBody)
msg.Uuid, _ = common.NewUUID()
msg.GroupID = messageGroupID
msg.SentTime = time.Now()

app.SyncQueues.Lock()
fifoSeqNumber := ""
Expand Down Expand Up @@ -300,6 +301,7 @@ func SendMessageBatch(w http.ResponseWriter, req *http.Request) {
msg.MD5OfMessageBody = common.GetMD5Hash(sendEntry.MessageBody)
msg.GroupID = sendEntry.MessageGroupId
msg.Uuid, _ = common.NewUUID()
msg.SentTime = time.Now()
app.SyncQueues.Lock()
fifoSeqNumber := ""
if app.SyncQueues.Queues[queueName].IsFIFO {
Expand Down Expand Up @@ -414,6 +416,9 @@ func ReceiveMessage(w http.ResponseWriter, req *http.Request) {
uuid, _ := common.NewUUID()

msg := &app.SyncQueues.Queues[queueName].Messages[i]
if !msg.IsReadyForReceipt() {
continue
}
msg.ReceiptHandle = msg.Uuid + "#" + uuid
msg.ReceiptTime = time.Now().UTC()
msg.VisibilityTimeout = time.Now().Add(time.Duration(app.SyncQueues.Queues[queueName].TimeoutSecs) * time.Second)
Expand Down
33 changes: 33 additions & 0 deletions app/sqs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package app

import (
"errors"
"fmt"
log "github.com/sirupsen/logrus"
"math/rand"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -32,6 +36,35 @@ type Message struct {
Retry int
MessageAttributes map[string]MessageAttributeValue
GroupID string
SentTime time.Time
}

func (m *Message) IsReadyForReceipt() bool {
randomLatency, err := getRandomLatency()
if err != nil {
log.Error(err)
return true
}
return m.SentTime.Add(randomLatency).Before(time.Now())
}

func getRandomLatency() (time.Duration, error){
min := CurrentEnvironment.RandomLatency.Min
max := CurrentEnvironment.RandomLatency.Max
if min == 0 && max == 0 {
return time.Duration(0), nil
}
var randomLatencyValue int
if max == min {
randomLatencyValue = max
} else {
randomLatencyValue = rand.Intn(max-min) + min
}
randomDuration, err := time.ParseDuration(fmt.Sprintf("%dms", randomLatencyValue))
if err != nil {
return time.Duration(0), errors.New(fmt.Sprintf("Error parsing random latency value: %dms", randomLatencyValue))
}
return randomDuration, nil
}

type MessageAttributeValue struct {
Expand Down
19 changes: 19 additions & 0 deletions app/sqs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package app

import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestMessage_IsReadyForReceipt(t *testing.T) {
CurrentEnvironment.RandomLatency.Min = 100
CurrentEnvironment.RandomLatency.Max = 100
msg := Message{
SentTime: time.Now(),
}
assert.False(t, msg.IsReadyForReceipt())
duration, _ := time.ParseDuration("105ms")
time.Sleep(duration)
assert.True(t, msg.IsReadyForReceipt())
}

0 comments on commit 6ecd9bf

Please sign in to comment.