Skip to content

Commit

Permalink
Add support for worker timeouts. Copy message id and attributes to HT…
Browse files Browse the repository at this point in the history
…TP header (#8)
  • Loading branch information
flindskog authored and fterrag committed Jun 6, 2019
1 parent 46ecf65 commit b3adb53
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 13 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ $ docker run -e AWS_ACCESS_KEY_ID=your-access-id AWS_SECRET_ACCESS_KEY=your-secr
|`SQSD_HTTP_HMAC_HEADER`||no|The name of the HTTP header to send the HMAC hash with.|
|`SQSD_HMAC_SECRET_KEY`||no|Secret key to use when generating HMAC hash send to `SQSD_HTTP_URL`.|
|`SQSD_HTTP_HEALTH_PATH`||no|The path to a health check endpoint of your service. When provided, messages will not be processed until the health check returns a 200 for `HTTPHealthInterval` times |
|`SQSD_HTTP_HEALTH_WAIT`|`5`|no|How often to wait before starting health checks|
|`SQSD_HTTP_HEALTH_WAIT`|`5`|no|How long to wait before starting health checks|
|`SQSD_HTTP_HEALTH_INTERVAL`|`5`|no|How often to wait between health checks|
|`SQSD_HTTP_HEALTH_SUCCESS_COUNT`|`1`|no|How many successful health checks required in a row|
|`SQSD_HTTP_TIMEOUT`|`30`|no|Number of seconds to wait for a response from the worker|

## HMAC

Expand Down
24 changes: 17 additions & 7 deletions cmd/simplesqsd/simplesqsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type config struct {
HTTPMaxConns int
HTTPURL string
HTTPContentType string
HTTPTimeout int

AWSEndpoint string
HTTPHMACHeader string
Expand Down Expand Up @@ -51,6 +52,7 @@ func main() {
c.HTTPHealthWait = getEnvInt("SQSD_HTTP_HEALTH_WAIT", 5)
c.HTTPHealthInterval = getEnvInt("SQSD_HTTP_HEALTH_INTERVAL", 5)
c.HTTPHealthSucessCount = getEnvInt("SQSD_HTTP_HEALTH_SUCCESS_COUNT", 1)
c.HTTPTimeout = getEnvInt("SQSD_HTTP_TIMEOUT", 30)

c.AWSEndpoint = os.Getenv("SQSD_AWS_ENDPOINT")
c.HTTPHMACHeader = os.Getenv("SQSD_HTTP_HMAC_HEADER")
Expand Down Expand Up @@ -108,20 +110,19 @@ func main() {
log.Info("Health check succeeded. Starting message processing")
}

httpClient := &http.Client{
awsSess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))

sqsHttpClient := &http.Client{
Transport: &http.Transport{
MaxIdleConns: c.HTTPMaxConns,
MaxIdleConnsPerHost: c.HTTPMaxConns,
},
}

awsSess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))

sqsConfig := aws.NewConfig().
WithRegion(c.QueueRegion).
WithHTTPClient(httpClient)
WithHTTPClient(sqsHttpClient)

if len(c.AWSEndpoint) > 0 {
sqsConfig.WithEndpoint(c.AWSEndpoint)
Expand All @@ -141,6 +142,15 @@ func main() {
HMACSecretKey: c.HMACSecretKey,
}

httpClient := &http.Client{
Transport: &http.Transport{
MaxIdleConns: c.HTTPMaxConns,
MaxIdleConnsPerHost: c.HTTPMaxConns,

},
Timeout: time.Duration(c.HTTPTimeout) * time.Second,
}

s := supervisor.NewSupervisor(logger, sqsSvc, httpClient, wConf)
s.Start(c.HTTPMaxConns)
s.Wait()
Expand Down
20 changes: 15 additions & 5 deletions supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ func (s *Supervisor) worker() {
}

recInput := &sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(int64(s.workerConfig.QueueMaxMessages)),
QueueUrl: aws.String(s.workerConfig.QueueURL),
WaitTimeSeconds: aws.Int64(int64(s.workerConfig.QueueWaitTime)),
MaxNumberOfMessages: aws.Int64(int64(s.workerConfig.QueueMaxMessages)),
QueueUrl: aws.String(s.workerConfig.QueueURL),
WaitTimeSeconds: aws.Int64(int64(s.workerConfig.QueueWaitTime)),
MessageAttributeNames: aws.StringSlice([]string{"All"}),
}

output, err := s.sqs.ReceiveMessage(recInput)
Expand All @@ -110,7 +111,7 @@ func (s *Supervisor) worker() {
changeVisibilityEntries := make([]*sqs.ChangeMessageVisibilityBatchRequestEntry, 0)

for _, msg := range output.Messages {
res, err := s.httpRequest(*msg.Body)
res, err := s.httpRequest(msg)
if err != nil {
s.logger.Errorf("Error making HTTP request: %s", err)
continue
Expand Down Expand Up @@ -170,8 +171,11 @@ func (s *Supervisor) worker() {
}
}

func (s *Supervisor) httpRequest(body string) (*http.Response, error) {
func (s *Supervisor) httpRequest(msg *sqs.Message) (*http.Response, error) {
body := *msg.Body
req, err := http.NewRequest("POST", s.workerConfig.HTTPURL, bytes.NewBufferString(body))
req.Header.Add("X-Aws-Sqsd-Msgid", *msg.MessageId)
s.addMessageAttributesToHeader(msg.MessageAttributes, req.Header)
if err != nil {
return nil, fmt.Errorf("Error while creating HTTP request: %s", err)
}
Expand Down Expand Up @@ -199,6 +203,12 @@ func (s *Supervisor) httpRequest(body string) (*http.Response, error) {
return res, nil
}

func (s *Supervisor) addMessageAttributesToHeader(attrs map[string]*sqs.MessageAttributeValue, header http.Header) {
for k, v := range attrs {
header.Add("X-Aws-Sqsd-Attr-" + k, *v.StringValue)
}
}

func makeHMAC(signature string, secretKey []byte) (string, error) {
mac := hmac.New(sha256.New, secretKey)

Expand Down

0 comments on commit b3adb53

Please sign in to comment.