diff --git a/app/common.go b/app/common.go index 54f07227c..9ebe2471b 100644 --- a/app/common.go +++ b/app/common.go @@ -18,11 +18,13 @@ type EnvTopic struct { type EnvQueue struct { Name string ReceiveMessageWaitTimeSeconds int + MaximumMessageSize int } type EnvQueueAttributes struct { VisibilityTimeout int ReceiveMessageWaitTimeSeconds int + MaximumMessageSize int } type Environment struct { diff --git a/app/conf/config.go b/app/conf/config.go index 5004ce22b..9794b162c 100644 --- a/app/conf/config.go +++ b/app/conf/config.go @@ -63,6 +63,10 @@ func LoadYamlConfig(filename string, env string) []string { app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout = 30 } + if app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize == 0 { + app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize = 262144 // 256K + } + if app.CurrentEnvironment.AccountID == "" { app.CurrentEnvironment.AccountID = "queue" } @@ -86,6 +90,9 @@ func LoadYamlConfig(filename string, env string) []string { if queue.ReceiveMessageWaitTimeSeconds == 0 { queue.ReceiveMessageWaitTimeSeconds = app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds } + if queue.MaximumMessageSize == 0 { + queue.MaximumMessageSize = app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize + } app.SyncQueues.Queues[queue.Name] = &app.Queue{ Name: queue.Name, @@ -93,6 +100,7 @@ func LoadYamlConfig(filename string, env string) []string { Arn: queueArn, URL: queueUrl, ReceiveWaitTimeSecs: queue.ReceiveMessageWaitTimeSeconds, + MaximumMessageSize: queue.MaximumMessageSize, IsFIFO: app.HasFIFOQueueName(queue.Name), } } @@ -155,6 +163,7 @@ func createSqsSubscription(configSubscription app.EnvSubsciption, topicArn strin Arn: queueArn, URL: queueUrl, ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds, + MaximumMessageSize: app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize, IsFIFO: app.HasFIFOQueueName(configSubscription.QueueName), } } diff --git a/app/conf/config_test.go b/app/conf/config_test.go index eec8f7d0e..6f1c6b9e4 100644 --- a/app/conf/config_test.go +++ b/app/conf/config_test.go @@ -73,11 +73,19 @@ func TestConfig_QueueAttributes(t *testing.T) { if timeoutSecs != 10 { t.Errorf("Expected local-queue1 Queue to be configured with VisibilityTimeout: 10 but got %d\n", timeoutSecs) } + maximumMessageSize := app.SyncQueues.Queues["local-queue1"].MaximumMessageSize + if maximumMessageSize != 1024 { + t.Errorf("Expected local-queue1 Queue to be configured with MaximumMessageSize: 1024 but got %d\n", maximumMessageSize) + } receiveWaitTime = app.SyncQueues.Queues["local-queue2"].ReceiveWaitTimeSecs if receiveWaitTime != 20 { t.Errorf("Expected local-queue2 Queue to be configured with ReceiveMessageWaitTimeSeconds: 20 but got %d\n", receiveWaitTime) } + maximumMessageSize = app.SyncQueues.Queues["local-queue2"].MaximumMessageSize + if maximumMessageSize != 128 { + t.Errorf("Expected local-queue1 Queue to be configured with MaximumMessageSize: 128 but got %d\n", maximumMessageSize) + } } func TestConfig_NoQueueAttributeDefaults(t *testing.T) { diff --git a/app/conf/goaws.yaml b/app/conf/goaws.yaml index 3c6e1b812..363dd71f1 100644 --- a/app/conf/goaws.yaml +++ b/app/conf/goaws.yaml @@ -13,6 +13,7 @@ Local: # Environment name that can be passed on the QueueAttributeDefaults: # default attributes for all queues VisibilityTimeout: 30 # message visibility timeout ReceiveMessageWaitTimeSeconds: 0 # receive message max wait time + MaximumMessageSize: 262144 # maximum message size (bytes) Queues: # List of queues to create at startup - Name: local-queue1 # Queue name - Name: local-queue2 # Queue name diff --git a/app/conf/mock-data/mock-config.yaml b/app/conf/mock-data/mock-config.yaml index fd54c7147..2c8bbb693 100644 --- a/app/conf/mock-data/mock-config.yaml +++ b/app/conf/mock-data/mock-config.yaml @@ -9,10 +9,12 @@ Local: # Environment name that can be passed on the QueueAttributeDefaults: # default attributes for all queues VisibilityTimeout: 10 # message visibility timeout ReceiveMessageWaitTimeSeconds: 10 # receive message max wait time + MaximumMessageSize: 1024 # maximum message size (bytes) Queues: # List of queues to create at startup - Name: local-queue1 # Queue name - Name: local-queue2 # Queue name ReceiveMessageWaitTimeSeconds: 20 # Queue receive message max wait time + MaximumMessageSize: 128 # Queue maximum message size (bytes) - Name: local-queue3 # Queue name Topics: # List of topic to create at startup - Name: local-topic1 # Topic name - with some Subscriptions diff --git a/app/gosqs/gosqs.go b/app/gosqs/gosqs.go index f7f828175..9436c85a5 100644 --- a/app/gosqs/gosqs.go +++ b/app/gosqs/gosqs.go @@ -10,7 +10,7 @@ import ( "time" log "github.com/sirupsen/logrus" - + "github.com/gorilla/mux" "github.com/p4tin/goaws/app" "github.com/p4tin/goaws/app/common" @@ -38,6 +38,8 @@ func init() { app.SqsErrors["InvalidVisibilityTimeout"] = err8 err9 := app.SqsErrorType{HttpError: http.StatusBadRequest, Type: "MessageNotInFlight", Code: "AWS.SimpleQueueService.MessageNotInFlight", Message: "The message referred to isn't in flight."} app.SqsErrors["MessageNotInFlight"] = err9 + err10 := app.SqsErrorType{HttpError: http.StatusBadRequest, Type: "MessageTooBig", Code: "InvalidMessageContents", Message: "The message size exceeds the limit."} + app.SqsErrors["MessageTooBig"] = err10 app.SqsErrors[ErrInvalidParameterValue.Type] = *ErrInvalidParameterValue app.SqsErrors[ErrInvalidAttributeValue.Type] = *ErrInvalidAttributeValue } @@ -123,6 +125,7 @@ func CreateQueue(w http.ResponseWriter, req *http.Request) { Arn: queueArn, TimeoutSecs: app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout, ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds, + MaximumMessageSize: app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize, IsFIFO: app.HasFIFOQueueName(queueName), } if err := validateAndSetQueueAttributes(queue, req.Form); err != nil { @@ -165,6 +168,13 @@ func SendMessage(w http.ResponseWriter, req *http.Request) { return } + if (app.SyncQueues.Queues[queueName].MaximumMessageSize > 0 && + len(messageBody) > app.SyncQueues.Queues[queueName].MaximumMessageSize) { + // Message size is too big + createErrorResponse(w, req, "MessageTooBig") + return + } + log.Println("Putting Message in Queue:", queueName) msg := app.Message{MessageBody: []byte(messageBody)} if len(messageAttributes) > 0 { diff --git a/app/gosqs/gosqs_test.go b/app/gosqs/gosqs_test.go index 64e81ba56..b3ba1178c 100644 --- a/app/gosqs/gosqs_test.go +++ b/app/gosqs/gosqs_test.go @@ -108,6 +108,8 @@ func TestCreateQueuehandler_POST_CreateQueue(t *testing.T) { form.Add("QueueName", "UnitTestQueue1") form.Add("Attribute.1.Name", "VisibilityTimeout") form.Add("Attribute.1.Value", "60") + form.Add("Attribute.2.Name", "MaximumMessageSize") + form.Add("Attribute.2.Value", "2048") req.PostForm = form // We create a ResponseRecorder (which satisfies http.ResponseWriter) to record the response. @@ -135,6 +137,7 @@ func TestCreateQueuehandler_POST_CreateQueue(t *testing.T) { URL: "http://://" + queueName, Arn: "arn:aws:sqs:::" + queueName, TimeoutSecs: 60, + MaximumMessageSize: 2048, } actualQueue := app.SyncQueues.Queues[queueName] if !reflect.DeepEqual(expectedQueue, actualQueue) { @@ -189,6 +192,82 @@ func TestCreateFIFOQueuehandler_POST_CreateQueue(t *testing.T) { } } +func TestSendMessage_MaximumMessageSize_Success(t *testing.T) { + req, err := http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + + app.SyncQueues.Queues["test_max_message_size"] = + &app.Queue{Name: "test_max_message_size", MaximumMessageSize: 100} + + form := url.Values{} + form.Add("Action", "SendMessage") + form.Add("QueueUrl", "http://localhost:4100/queue/test_max_message_size") + form.Add("MessageBody", "test%20message%20body%201") + form.Add("Version", "2012-11-05") + req.PostForm = form + + // We create a ResponseRecorder (which satisfies http.ResponseWriter) to record the response. + rr := httptest.NewRecorder() + handler := http.HandlerFunc(SendMessage) + + // Our handlers satisfy http.Handler, so we can call their ServeHTTP method + // directly and pass in our Request and ResponseRecorder. + handler.ServeHTTP(rr, req) + + // Check the status code is what we expect. + if status := rr.Code; status != http.StatusOK { + t.Errorf("handler returned wrong status code: got %v want %v", + status, http.StatusOK) + } + + // Check the response body is what we expect. + expected := "MD5OfMessageBody" + if !strings.Contains(rr.Body.String(), expected) { + t.Errorf("handler returned unexpected body: got %v want %v", + rr.Body.String(), expected) + } +} + +func TestSendMessage_MaximumMessageSize_MessageTooBig(t *testing.T) { + req, err := http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + + app.SyncQueues.Queues["test_max_message_size"] = + &app.Queue{Name: "test_max_message_size", MaximumMessageSize: 10} + + form := url.Values{} + form.Add("Action", "SendMessage") + form.Add("QueueUrl", "http://localhost:4100/queue/test_max_message_size") + form.Add("MessageBody", "test%20message%20body%201") + form.Add("Version", "2012-11-05") + req.PostForm = form + + // We create a ResponseRecorder (which satisfies http.ResponseWriter) to record the response. + rr := httptest.NewRecorder() + handler := http.HandlerFunc(SendMessage) + + // Our handlers satisfy http.Handler, so we can call their ServeHTTP method + // directly and pass in our Request and ResponseRecorder. + handler.ServeHTTP(rr, req) + + // Check the status code is what we expect. + if status := rr.Code; status != http.StatusBadRequest { + t.Errorf("handler returned wrong status code: got %v want %v", + status, http.StatusBadRequest) + } + + // Check the response body is what we expect. + expected := "MessageTooBig" + if !strings.Contains(rr.Body.String(), expected) { + t.Errorf("handler returned unexpected body: got %v want %v", + rr.Body.String(), expected) + } +} + func TestSendQueue_POST_NonExistant(t *testing.T) { // Create a request to pass to our handler. We don't have any query parameters for now, so we'll // pass 'nil' as the third parameter. @@ -235,6 +314,10 @@ func TestSendMessageBatch_POST_QueueNotFound(t *testing.T) { form := url.Values{} form.Add("Action", "SendMessageBatch") form.Add("QueueUrl", "http://localhost:4100/queue/testing") + form.Add("SendMessageBatchRequestEntry.1.Id", "test_msg_001") + form.Add("SendMessageBatchRequestEntry.1.MessageBody", "test%20message%20body%201") + form.Add("SendMessageBatchRequestEntry.2.Id", "test_msg_002") + form.Add("SendMessageBatchRequestEntry.2.MessageBody", "test%20message%20body%202") form.Add("Version", "2012-11-05") req.PostForm = form diff --git a/app/gosqs/queue_attributes.go b/app/gosqs/queue_attributes.go index e2d006539..81786d89d 100644 --- a/app/gosqs/queue_attributes.go +++ b/app/gosqs/queue_attributes.go @@ -39,6 +39,10 @@ func validateAndSetQueueAttributes(q *app.Queue, u url.Values) error { if receiveWaitTime != 0 { q.ReceiveWaitTimeSecs = receiveWaitTime } + maximumMessageSize, _ := strconv.Atoi(attr["MaximumMessageSize"]) + if maximumMessageSize != 0 { + q.MaximumMessageSize = maximumMessageSize + } strRedrivePolicy := attr["RedrivePolicy"] if strRedrivePolicy != "" { // support both int and string maxReceiveCount (Amazon clients use string) diff --git a/app/sqs.go b/app/sqs.go index c244db933..556d94544 100644 --- a/app/sqs.go +++ b/app/sqs.go @@ -80,6 +80,7 @@ type Queue struct { Arn string TimeoutSecs int ReceiveWaitTimeSecs int + MaximumMessageSize int Messages []Message DeadLetterQueue *Queue MaxReceiveCount int