Skip to content
This repository has been archived by the owner on Aug 17, 2020. It is now read-only.

Commit

Permalink
Implement batch mode read on SQS
Browse files Browse the repository at this point in the history
This improves the read/sync speed of the cli by a factor of 10.
  • Loading branch information
cwaltken-edrans committed Sep 18, 2017
1 parent e30335b commit e3d803f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 20 deletions.
12 changes: 7 additions & 5 deletions infrastructure/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ func (infra *AwsInfrastructure) Receive(results chan *result.LambdaResults) {

timeoutStart := time.Now()
for {
lambdaResult := adaptor.Receive()
if lambdaResult != nil {
lambdaAggregate := &data.Lambdas[lambdaResult.RunnerID]
result.AddResult(lambdaAggregate, lambdaResult)
results <- data
lambdaResults := adaptor.Receive()
if lambdaResults != nil {
for _, lambdaResult := range lambdaResults {
lambdaAggregate := &data.Lambdas[lambdaResult.RunnerID]
result.AddResult(lambdaAggregate, lambdaResult)
results <- data
}
if data.AllLambdasFinished() {
break
}
Expand Down
37 changes: 23 additions & 14 deletions infrastructure/aws/sqsadapter/sqsadapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ func getClient(awsConfig *aws.Config) *sqs.SQS {
}

// Receive a result, or timeout in 1 second
func (adaptor Adapter) Receive() *api.RunnerResult {
func (adaptor Adapter) Receive() []*api.RunnerResult {
params := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(adaptor.QueueURL),
MaxNumberOfMessages: aws.Int64(1),
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(1),
WaitTimeSeconds: aws.Int64(1),
}
Expand All @@ -56,25 +56,34 @@ func (adaptor Adapter) Receive() *api.RunnerResult {
return nil
}

item := resp.Messages[0]
items := resp.Messages
results := make([]*api.RunnerResult, 0)
deleteEntries := make([]*sqs.DeleteMessageBatchRequestEntry, 0)
for _, item := range items {
result, jsonerr := resultFromJSON(*item.Body)
if jsonerr != nil {
fmt.Println(err.Error())
return nil
}
deleteEntries = append(deleteEntries, &sqs.DeleteMessageBatchRequestEntry{
Id: aws.String(*item.MessageId),
ReceiptHandle: aws.String(*item.ReceiptHandle),
})
results = append(results, result)
}

deleteParams := &sqs.DeleteMessageInput{
QueueUrl: aws.String(adaptor.QueueURL),
ReceiptHandle: aws.String(*item.ReceiptHandle),
deleteParams := &sqs.DeleteMessageBatchInput{
Entries: deleteEntries,
QueueUrl: aws.String(adaptor.QueueURL),
}
_, delerr := adaptor.Client.DeleteMessage(deleteParams)
_, delerr := adaptor.Client.DeleteMessageBatch(deleteParams)

if delerr != nil {
fmt.Println(err.Error())
fmt.Println(delerr.Error())
return nil
}

result, jsonerr := resultFromJSON(*item.Body)
if jsonerr != nil {
fmt.Println(err.Error())
return nil
}
return result
return results
}

func resultFromJSON(str string) (*api.RunnerResult, error) {
Expand Down
1 change: 0 additions & 1 deletion infrastructure/infrastructure.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func InvokeLambdas(inf Infrastructure) {
}

func Aggregate(i Infrastructure) chan *result.LambdaResults {
fmt.Println("AGGREGATE")
results := make(chan *result.LambdaResults)
go i.Receive(results)
return results
Expand Down

0 comments on commit e3d803f

Please sign in to comment.