/
main.go
148 lines (115 loc) · 3.49 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
adapter "github.com/dc0d/sqstransport"
)
// this is a demonstrative example.
// a real world application will not be implemented like this.
// set:
// export AWS_REGION=...
// export AWS_ACCESS_KEY_ID=...
// export AWS_SECRET_ACCESS_KEY=...
// export QUEUE_NAME=...
// run:
// go run main.go
func main() {
ctx := createRootContext()
client := makeClient(ctx)
queueURL := getQueueURL(ctx, client)
sqsAdapter := adapter.New(
adapter.UseHandler(handler),
adapter.UseInputFactory(inputFactory(queueURL)),
adapter.UseDecodeRequest(decodeRequest),
adapter.UseResponseHandler(responseHandler(client, queueURL)...),
adapter.WithBaseContext(func() context.Context { return ctx }),
adapter.WithErrorHandler(errorHandlerFunc(errorHandler)),
)
go func() { _ = sqsAdapter.Serve(ctx, client) }()
<-ctx.Done()
}
func errorHandler(ctx context.Context, err error) {
log.Println(err)
}
func inputFactory(queueURL string) func() (params *sqs.ReceiveMessageInput, optFns []func(*sqs.Options)) {
return func() (params *sqs.ReceiveMessageInput, optFns []func(*sqs.Options)) {
return &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
AttributeNames: []types.QueueAttributeName{
"SentTimestamp",
},
MessageAttributeNames: []string{
"All",
},
WaitTimeSeconds: 20,
VisibilityTimeout: 10,
MaxNumberOfMessages: 10,
}, nil
}
}
func decodeRequest(ctx context.Context, msg types.Message) (request interface{}, err error) {
return *msg.Body, nil
}
// handler is the port/endpoint and should be inside a separate package.
func handler(ctx context.Context, request interface{}) (response interface{}, err error) {
log.Println(request)
return nil, nil
}
func responseHandler(
client responseHandlerClient,
queueURL string) []adapter.ResponseFunc {
return []adapter.ResponseFunc{
func(ctx context.Context, msg types.Message, response interface{}) context.Context {
log.Println("message processed successfully, deleting the message")
input := &sqs.DeleteMessageInput{
QueueUrl: &queueURL,
ReceiptHandle: aws.String(*msg.ReceiptHandle),
}
_, err := client.DeleteMessage(ctx, input)
if err != nil {
log.Println(err)
}
return ctx
},
}
}
//
func createRootContext() context.Context {
ctx, cancel := context.WithCancel(context.Background())
termSignal := make(chan os.Signal, 1)
signal.Notify(termSignal, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-termSignal
cancel()
}()
return ctx
}
func makeClient(ctx context.Context) *sqs.Client {
cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(os.Getenv("AWS_REGION")))
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
return sqs.NewFromConfig(cfg)
}
func getQueueURL(ctx context.Context, client *sqs.Client) string {
gquInput := &sqs.GetQueueUrlInput{
QueueName: aws.String(os.Getenv("QUEUE_NAME")),
}
result, err := client.GetQueueUrl(ctx, gquInput)
if err != nil {
log.Fatal(err)
}
return *result.QueueUrl
}
type errorHandlerFunc func(ctx context.Context, err error)
func (fn errorHandlerFunc) Handle(ctx context.Context, err error) { fn(ctx, err) }
type responseHandlerClient interface {
DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
}