Skip to content

Commit

Permalink
Implemented MaximumMessageSize as a queue attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
kukushkin committed Dec 18, 2020
1 parent c35d6e3 commit 46a9697
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 1 deletion.
2 changes: 2 additions & 0 deletions app/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ type EnvTopic struct {
type EnvQueue struct {
Name string
ReceiveMessageWaitTimeSeconds int
MaximumMessageSize int
}

type EnvQueueAttributes struct {
VisibilityTimeout int
ReceiveMessageWaitTimeSeconds int
MaximumMessageSize int
}

type Environment struct {
Expand Down
9 changes: 9 additions & 0 deletions app/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func LoadYamlConfig(filename string, env string) []string {
app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout = 30
}

if app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize == 0 {
app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize = 262144 // 256K
}

if app.CurrentEnvironment.AccountID == "" {
app.CurrentEnvironment.AccountID = "queue"
}
Expand All @@ -86,13 +90,17 @@ func LoadYamlConfig(filename string, env string) []string {
if queue.ReceiveMessageWaitTimeSeconds == 0 {
queue.ReceiveMessageWaitTimeSeconds = app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds
}
if queue.MaximumMessageSize == 0 {
queue.MaximumMessageSize = app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize
}

app.SyncQueues.Queues[queue.Name] = &app.Queue{
Name: queue.Name,
TimeoutSecs: app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout,
Arn: queueArn,
URL: queueUrl,
ReceiveWaitTimeSecs: queue.ReceiveMessageWaitTimeSeconds,
MaximumMessageSize: queue.MaximumMessageSize,
IsFIFO: app.HasFIFOQueueName(queue.Name),
}
}
Expand Down Expand Up @@ -155,6 +163,7 @@ func createSqsSubscription(configSubscription app.EnvSubsciption, topicArn strin
Arn: queueArn,
URL: queueUrl,
ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds,
MaximumMessageSize: app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize,
IsFIFO: app.HasFIFOQueueName(configSubscription.QueueName),
}
}
Expand Down
8 changes: 8 additions & 0 deletions app/conf/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,19 @@ func TestConfig_QueueAttributes(t *testing.T) {
if timeoutSecs != 10 {
t.Errorf("Expected local-queue1 Queue to be configured with VisibilityTimeout: 10 but got %d\n", timeoutSecs)
}
maximumMessageSize := app.SyncQueues.Queues["local-queue1"].MaximumMessageSize
if maximumMessageSize != 1024 {
t.Errorf("Expected local-queue1 Queue to be configured with MaximumMessageSize: 1024 but got %d\n", maximumMessageSize)
}

receiveWaitTime = app.SyncQueues.Queues["local-queue2"].ReceiveWaitTimeSecs
if receiveWaitTime != 20 {
t.Errorf("Expected local-queue2 Queue to be configured with ReceiveMessageWaitTimeSeconds: 20 but got %d\n", receiveWaitTime)
}
maximumMessageSize = app.SyncQueues.Queues["local-queue2"].MaximumMessageSize
if maximumMessageSize != 128 {
t.Errorf("Expected local-queue1 Queue to be configured with MaximumMessageSize: 128 but got %d\n", maximumMessageSize)
}
}

func TestConfig_NoQueueAttributeDefaults(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions app/conf/goaws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Local: # Environment name that can be passed on the
QueueAttributeDefaults: # default attributes for all queues
VisibilityTimeout: 30 # message visibility timeout
ReceiveMessageWaitTimeSeconds: 0 # receive message max wait time
MaximumMessageSize: 262144 # maximum message size (bytes)
Queues: # List of queues to create at startup
- Name: local-queue1 # Queue name
- Name: local-queue2 # Queue name
Expand Down
2 changes: 2 additions & 0 deletions app/conf/mock-data/mock-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ Local: # Environment name that can be passed on the
QueueAttributeDefaults: # default attributes for all queues
VisibilityTimeout: 10 # message visibility timeout
ReceiveMessageWaitTimeSeconds: 10 # receive message max wait time
MaximumMessageSize: 1024 # maximum message size (bytes)
Queues: # List of queues to create at startup
- Name: local-queue1 # Queue name
- Name: local-queue2 # Queue name
ReceiveMessageWaitTimeSeconds: 20 # Queue receive message max wait time
MaximumMessageSize: 128 # Queue maximum message size (bytes)
- Name: local-queue3 # Queue name
Topics: # List of topic to create at startup
- Name: local-topic1 # Topic name - with some Subscriptions
Expand Down
12 changes: 11 additions & 1 deletion app/gosqs/gosqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

log "github.com/sirupsen/logrus"

"github.com/gorilla/mux"
"github.com/p4tin/goaws/app"
"github.com/p4tin/goaws/app/common"
Expand Down Expand Up @@ -38,6 +38,8 @@ func init() {
app.SqsErrors["InvalidVisibilityTimeout"] = err8
err9 := app.SqsErrorType{HttpError: http.StatusBadRequest, Type: "MessageNotInFlight", Code: "AWS.SimpleQueueService.MessageNotInFlight", Message: "The message referred to isn't in flight."}
app.SqsErrors["MessageNotInFlight"] = err9
err10 := app.SqsErrorType{HttpError: http.StatusBadRequest, Type: "MessageTooBig", Code: "InvalidMessageContents", Message: "The message size exceeds the limit."}
app.SqsErrors["MessageTooBig"] = err10
app.SqsErrors[ErrInvalidParameterValue.Type] = *ErrInvalidParameterValue
app.SqsErrors[ErrInvalidAttributeValue.Type] = *ErrInvalidAttributeValue
}
Expand Down Expand Up @@ -123,6 +125,7 @@ func CreateQueue(w http.ResponseWriter, req *http.Request) {
Arn: queueArn,
TimeoutSecs: app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout,
ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds,
MaximumMessageSize: app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize,
IsFIFO: app.HasFIFOQueueName(queueName),
}
if err := validateAndSetQueueAttributes(queue, req.Form); err != nil {
Expand Down Expand Up @@ -165,6 +168,13 @@ func SendMessage(w http.ResponseWriter, req *http.Request) {
return
}

if (app.SyncQueues.Queues[queueName].MaximumMessageSize > 0 &&
len(messageBody) > app.SyncQueues.Queues[queueName].MaximumMessageSize) {
// Message size is too big
createErrorResponse(w, req, "MessageTooBig")
return
}

log.Println("Putting Message in Queue:", queueName)
msg := app.Message{MessageBody: []byte(messageBody)}
if len(messageAttributes) > 0 {
Expand Down
83 changes: 83 additions & 0 deletions app/gosqs/gosqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func TestCreateQueuehandler_POST_CreateQueue(t *testing.T) {
form.Add("QueueName", "UnitTestQueue1")
form.Add("Attribute.1.Name", "VisibilityTimeout")
form.Add("Attribute.1.Value", "60")
form.Add("Attribute.2.Name", "MaximumMessageSize")
form.Add("Attribute.2.Value", "2048")
req.PostForm = form

// We create a ResponseRecorder (which satisfies http.ResponseWriter) to record the response.
Expand Down Expand Up @@ -135,6 +137,7 @@ func TestCreateQueuehandler_POST_CreateQueue(t *testing.T) {
URL: "http://://" + queueName,
Arn: "arn:aws:sqs:::" + queueName,
TimeoutSecs: 60,
MaximumMessageSize: 2048,
}
actualQueue := app.SyncQueues.Queues[queueName]
if !reflect.DeepEqual(expectedQueue, actualQueue) {
Expand Down Expand Up @@ -189,6 +192,82 @@ func TestCreateFIFOQueuehandler_POST_CreateQueue(t *testing.T) {
}
}

func TestSendMessage_MaximumMessageSize_Success(t *testing.T) {
req, err := http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}

app.SyncQueues.Queues["test_max_message_size"] =
&app.Queue{Name: "test_max_message_size", MaximumMessageSize: 100}

form := url.Values{}
form.Add("Action", "SendMessage")
form.Add("QueueUrl", "http://localhost:4100/queue/test_max_message_size")
form.Add("MessageBody", "test%20message%20body%201")
form.Add("Version", "2012-11-05")
req.PostForm = form

// We create a ResponseRecorder (which satisfies http.ResponseWriter) to record the response.
rr := httptest.NewRecorder()
handler := http.HandlerFunc(SendMessage)

// Our handlers satisfy http.Handler, so we can call their ServeHTTP method
// directly and pass in our Request and ResponseRecorder.
handler.ServeHTTP(rr, req)

// Check the status code is what we expect.
if status := rr.Code; status != http.StatusOK {
t.Errorf("handler returned wrong status code: got %v want %v",
status, http.StatusOK)
}

// Check the response body is what we expect.
expected := "MD5OfMessageBody"
if !strings.Contains(rr.Body.String(), expected) {
t.Errorf("handler returned unexpected body: got %v want %v",
rr.Body.String(), expected)
}
}

func TestSendMessage_MaximumMessageSize_MessageTooBig(t *testing.T) {
req, err := http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}

app.SyncQueues.Queues["test_max_message_size"] =
&app.Queue{Name: "test_max_message_size", MaximumMessageSize: 10}

form := url.Values{}
form.Add("Action", "SendMessage")
form.Add("QueueUrl", "http://localhost:4100/queue/test_max_message_size")
form.Add("MessageBody", "test%20message%20body%201")
form.Add("Version", "2012-11-05")
req.PostForm = form

// We create a ResponseRecorder (which satisfies http.ResponseWriter) to record the response.
rr := httptest.NewRecorder()
handler := http.HandlerFunc(SendMessage)

// Our handlers satisfy http.Handler, so we can call their ServeHTTP method
// directly and pass in our Request and ResponseRecorder.
handler.ServeHTTP(rr, req)

// Check the status code is what we expect.
if status := rr.Code; status != http.StatusBadRequest {
t.Errorf("handler returned wrong status code: got %v want %v",
status, http.StatusBadRequest)
}

// Check the response body is what we expect.
expected := "MessageTooBig"
if !strings.Contains(rr.Body.String(), expected) {
t.Errorf("handler returned unexpected body: got %v want %v",
rr.Body.String(), expected)
}
}

func TestSendQueue_POST_NonExistant(t *testing.T) {
// Create a request to pass to our handler. We don't have any query parameters for now, so we'll
// pass 'nil' as the third parameter.
Expand Down Expand Up @@ -235,6 +314,10 @@ func TestSendMessageBatch_POST_QueueNotFound(t *testing.T) {
form := url.Values{}
form.Add("Action", "SendMessageBatch")
form.Add("QueueUrl", "http://localhost:4100/queue/testing")
form.Add("SendMessageBatchRequestEntry.1.Id", "test_msg_001")
form.Add("SendMessageBatchRequestEntry.1.MessageBody", "test%20message%20body%201")
form.Add("SendMessageBatchRequestEntry.2.Id", "test_msg_002")
form.Add("SendMessageBatchRequestEntry.2.MessageBody", "test%20message%20body%202")
form.Add("Version", "2012-11-05")
req.PostForm = form

Expand Down
4 changes: 4 additions & 0 deletions app/gosqs/queue_attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func validateAndSetQueueAttributes(q *app.Queue, u url.Values) error {
if receiveWaitTime != 0 {
q.ReceiveWaitTimeSecs = receiveWaitTime
}
maximumMessageSize, _ := strconv.Atoi(attr["MaximumMessageSize"])
if maximumMessageSize != 0 {
q.MaximumMessageSize = maximumMessageSize
}
strRedrivePolicy := attr["RedrivePolicy"]
if strRedrivePolicy != "" {
// support both int and string maxReceiveCount (Amazon clients use string)
Expand Down
1 change: 1 addition & 0 deletions app/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Queue struct {
Arn string
TimeoutSecs int
ReceiveWaitTimeSecs int
MaximumMessageSize int
Messages []Message
DeadLetterQueue *Queue
MaxReceiveCount int
Expand Down

0 comments on commit 46a9697

Please sign in to comment.