From 81dab77c9b0198ff5c540e1be4cd43dc94e423c6 Mon Sep 17 00:00:00 2001 From: Andy Kuszyk Date: Wed, 28 Aug 2019 15:00:31 +0100 Subject: [PATCH 1/2] Added random latency to SQS receives. Latency is based on the inclusion of two environment variables (GOAWS_RANDOM_LATENCY_MIN and GOAWS_RANDOM_LATENCY_MAX), which, if set will prevent messages from being released from the queue until after a period of random, simulated latency has elapsed. The purpose of this change is to aid in local testing against AWS when SQS latency is expected and needs to be developed against. --- app/gosqs/gosqs.go | 5 +++++ app/sqs.go | 42 ++++++++++++++++++++++++++++++++++++++++++ app/sqs_test.go | 20 ++++++++++++++++++++ 3 files changed, 67 insertions(+) create mode 100644 app/sqs_test.go diff --git a/app/gosqs/gosqs.go b/app/gosqs/gosqs.go index f02c547d6..a4eda5da7 100644 --- a/app/gosqs/gosqs.go +++ b/app/gosqs/gosqs.go @@ -174,6 +174,7 @@ func SendMessage(w http.ResponseWriter, req *http.Request) { msg.MD5OfMessageBody = common.GetMD5Hash(messageBody) msg.Uuid, _ = common.NewUUID() msg.GroupID = messageGroupID + msg.SentTime = time.Now() app.SyncQueues.Lock() fifoSeqNumber := "" @@ -300,6 +301,7 @@ func SendMessageBatch(w http.ResponseWriter, req *http.Request) { msg.MD5OfMessageBody = common.GetMD5Hash(sendEntry.MessageBody) msg.GroupID = sendEntry.MessageGroupId msg.Uuid, _ = common.NewUUID() + msg.SentTime = time.Now() app.SyncQueues.Lock() fifoSeqNumber := "" if app.SyncQueues.Queues[queueName].IsFIFO { @@ -401,6 +403,9 @@ func ReceiveMessage(w http.ResponseWriter, req *http.Request) { uuid, _ := common.NewUUID() msg := &app.SyncQueues.Queues[queueName].Messages[i] + if !msg.IsReadyForReceipt() { + continue + } msg.ReceiptHandle = msg.Uuid + "#" + uuid msg.ReceiptTime = time.Now().UTC() msg.VisibilityTimeout = time.Now().Add(time.Duration(app.SyncQueues.Queues[queueName].TimeoutSecs) * time.Second) diff --git a/app/sqs.go b/app/sqs.go index 491493dcc..644808ae7 100644 --- a/app/sqs.go +++ b/app/sqs.go @@ -1,10 +1,15 @@ package app import ( + "errors" + "fmt" + "math/rand" + "os" "strconv" "strings" "sync" "time" + log "github.com/sirupsen/logrus" ) type SqsErrorType struct { @@ -32,6 +37,43 @@ type Message struct { Retry int MessageAttributes map[string]MessageAttributeValue GroupID string + SentTime time.Time +} + +func (m *Message) IsReadyForReceipt() bool { + randomLatency, err := getRandomLatency() + if err != nil { + log.Error(err) + return true + } + return m.SentTime.Add(randomLatency).Before(time.Now()) +} + +func getRandomLatency() (time.Duration, error){ + minVar := os.Getenv("GOAWS_RANDOM_LATENCY_MIN") + maxVar := os.Getenv("GOAWS_RANDOM_LATENCY_MAX") + if minVar == "" || maxVar == "" { + return time.Duration(0), nil + } + min, err := strconv.Atoi(minVar) + if err != nil { + return time.Duration(0), errors.New(fmt.Sprintf("Invalid value for GOAWS_RANDOM_LATENCY_MIN: %s", minVar)) + } + max, err := strconv.Atoi(maxVar) + if err != nil { + return time.Duration(0), errors.New(fmt.Sprintf("Invalid value for GOAWS_RANDOM_LATENCY_MAX: %s", maxVar)) + } + var randomLatencyValue int + if max == min { + randomLatencyValue = max + } else { + randomLatencyValue = rand.Intn(max-min) + min + } + randomDuration, err := time.ParseDuration(fmt.Sprintf("%dms", randomLatencyValue)) + if err != nil { + return time.Duration(0), errors.New(fmt.Sprintf("Error parsing random latency value: %dms", randomLatencyValue)) + } + return randomDuration, nil } type MessageAttributeValue struct { diff --git a/app/sqs_test.go b/app/sqs_test.go new file mode 100644 index 000000000..dd8298c68 --- /dev/null +++ b/app/sqs_test.go @@ -0,0 +1,20 @@ +package app + +import ( + "github.com/stretchr/testify/assert" + "os" + "testing" + "time" +) + +func TestMessage_IsReadyForReceipt(t *testing.T) { + os.Setenv("GOAWS_RANDOM_LATENCY_MIN", "100") + os.Setenv("GOAWS_RANDOM_LATENCY_MAX", "100") + msg := Message{ + SentTime: time.Now(), + } + assert.False(t, msg.IsReadyForReceipt()) + duration, _ := time.ParseDuration("105ms") + time.Sleep(duration) + assert.True(t, msg.IsReadyForReceipt()) +} From bfccff2ea9a3def2a2d803ff3fda03050f6a3443 Mon Sep 17 00:00:00 2001 From: Andy Kuszyk Date: Mon, 30 Sep 2019 12:24:19 +0100 Subject: [PATCH 2/2] Moved random latency parameters into config --- app/common.go | 6 ++++++ app/conf/goaws.yaml | 3 +++ app/sqs.go | 17 ++++------------- app/sqs_test.go | 5 ++--- 4 files changed, 15 insertions(+), 16 deletions(-) diff --git a/app/common.go b/app/common.go index f2741b0d5..8e1e12b27 100644 --- a/app/common.go +++ b/app/common.go @@ -34,6 +34,7 @@ type Environment struct { Topics []EnvTopic Queues []EnvQueue QueueAttributeDefaults EnvQueueAttributes + RandomLatency RandomLatency } var CurrentEnvironment Environment @@ -54,3 +55,8 @@ type ErrorResult struct { type ErrorResponse struct { Result ErrorResult `xml:"Error"` } + +type RandomLatency struct { + Min int + Max int +} diff --git a/app/conf/goaws.yaml b/app/conf/goaws.yaml index 05fdcc860..a42759517 100644 --- a/app/conf/goaws.yaml +++ b/app/conf/goaws.yaml @@ -26,6 +26,9 @@ Local: # Environment name that can be passed on the Raw: true # Raw message delivery (true/false) #FilterPolicy: '{"foo": ["bar"]}' # Subscription's FilterPolicy, json object as a string - Name: local-topic2 # Topic name - no Subscriptions + RandomLatency: # Parameters for introducing random latency into message queuing + Min: 0 # Desired latency in milliseconds, if min and max are zero, no latency will be applied. + Max: 0 # Desired latency in milliseconds Dev: # Another environment Host: localhost diff --git a/app/sqs.go b/app/sqs.go index 644808ae7..c244db933 100644 --- a/app/sqs.go +++ b/app/sqs.go @@ -3,13 +3,12 @@ package app import ( "errors" "fmt" + log "github.com/sirupsen/logrus" "math/rand" - "os" "strconv" "strings" "sync" "time" - log "github.com/sirupsen/logrus" ) type SqsErrorType struct { @@ -50,19 +49,11 @@ func (m *Message) IsReadyForReceipt() bool { } func getRandomLatency() (time.Duration, error){ - minVar := os.Getenv("GOAWS_RANDOM_LATENCY_MIN") - maxVar := os.Getenv("GOAWS_RANDOM_LATENCY_MAX") - if minVar == "" || maxVar == "" { + min := CurrentEnvironment.RandomLatency.Min + max := CurrentEnvironment.RandomLatency.Max + if min == 0 && max == 0 { return time.Duration(0), nil } - min, err := strconv.Atoi(minVar) - if err != nil { - return time.Duration(0), errors.New(fmt.Sprintf("Invalid value for GOAWS_RANDOM_LATENCY_MIN: %s", minVar)) - } - max, err := strconv.Atoi(maxVar) - if err != nil { - return time.Duration(0), errors.New(fmt.Sprintf("Invalid value for GOAWS_RANDOM_LATENCY_MAX: %s", maxVar)) - } var randomLatencyValue int if max == min { randomLatencyValue = max diff --git a/app/sqs_test.go b/app/sqs_test.go index dd8298c68..73b31949c 100644 --- a/app/sqs_test.go +++ b/app/sqs_test.go @@ -2,14 +2,13 @@ package app import ( "github.com/stretchr/testify/assert" - "os" "testing" "time" ) func TestMessage_IsReadyForReceipt(t *testing.T) { - os.Setenv("GOAWS_RANDOM_LATENCY_MIN", "100") - os.Setenv("GOAWS_RANDOM_LATENCY_MAX", "100") + CurrentEnvironment.RandomLatency.Min = 100 + CurrentEnvironment.RandomLatency.Max = 100 msg := Message{ SentTime: time.Now(), }