forked from harlow/kinesis-consumer
/
main.go
142 lines (123 loc) · 3.34 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package main
import (
"context"
"expvar"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
alog "github.com/apex/log"
"github.com/apex/log/handlers/text"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/kinesis"
consumer "github.com/healiha/kinesis-consumer"
storage "github.com/healiha/kinesis-consumer/store/ddb"
)
// kick off a server for exposing scan metrics
func init() {
sock, err := net.Listen("tcp", "localhost:8080")
if err != nil {
log.Printf("net listen error: %v", err)
}
go func() {
fmt.Println("Metrics available at http://localhost:8080/debug/vars")
http.Serve(sock, nil)
}()
}
// A myLogger provides a minimalistic logger satisfying the Logger interface.
type myLogger struct {
logger alog.Logger
}
// Log logs the parameters to the stdlib logger. See log.Println.
func (l *myLogger) Log(args ...interface{}) {
l.logger.Infof("producer: %v", args...)
}
func main() {
// Wrap myLogger around apex logger
log := &myLogger{
logger: alog.Logger{
Handler: text.New(os.Stdout),
Level: alog.DebugLevel,
},
}
var (
app = flag.String("app", "", "Consumer app name")
stream = flag.String("stream", "", "Stream name")
table = flag.String("table", "", "Checkpoint table name")
kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint")
awsRegion = flag.String("region", "us-west-2", "AWS Region")
)
flag.Parse()
// New Kinesis and DynamoDB clients (if you need custom config)
sess, err := session.NewSession(aws.NewConfig())
if err != nil {
log.Log("new session error: %v", err)
}
myDdbClient := dynamodb.New(sess)
var myKsis = kinesis.New(session.Must(session.NewSession(
aws.NewConfig().
WithEndpoint(*kinesisEndpoint).
WithRegion(*awsRegion).
WithLogLevel(3),
)))
// ddb persitance
ddb, err := storage.New(*app, *table, storage.WithDynamoClient(myDdbClient), storage.WithRetryer(&MyRetryer{}))
if err != nil {
log.Log("checkpoint error: %v", err)
}
// expvar counter
var counter = expvar.NewMap("counters")
// consumer
c, err := consumer.New(
*stream,
consumer.WithStore(ddb),
consumer.WithLogger(log),
consumer.WithCounter(counter),
consumer.WithClient(myKsis),
)
if err != nil {
log.Log("consumer error: %v", err)
}
// use cancel func to signal shutdown
ctx, cancel := context.WithCancel(context.Background())
// trap SIGINT, wait to trigger shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
go func() {
<-signals
cancel()
}()
// scan stream
err = c.Scan(ctx, func(r *consumer.Record) error {
fmt.Println(string(r.Data))
return nil // continue scanning
})
if err != nil {
log.Log("scan error: %v", err)
}
if err := ddb.Shutdown(); err != nil {
log.Log("storage shutdown error: %v", err)
}
}
// MyRetryer used for storage
type MyRetryer struct {
storage.Retryer
}
// ShouldRetry implements custom logic for when errors should retry
func (r *MyRetryer) ShouldRetry(err error) bool {
if awsErr, ok := err.(awserr.Error); ok {
switch awsErr.Code() {
case dynamodb.ErrCodeProvisionedThroughputExceededException, dynamodb.ErrCodeLimitExceededException:
return true
default:
return false
}
}
return false
}