forked from shirou/prometheus_remote_kinesis
/
main.go
108 lines (92 loc) · 2.33 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package main
import (
"context"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var logger *zap.Logger
func init() {
initLogger()
}
type Server struct {
mux *http.ServeMux
}
func initLogger() {
level := zap.NewAtomicLevel()
level.SetLevel(zapcore.DebugLevel)
zapConfig := zap.Config{
Level: level,
Encoding: "json",
EncoderConfig: zapcore.EncoderConfig{
MessageKey: "msg",
TimeKey: "time",
EncodeTime: zapcore.ISO8601TimeEncoder,
LevelKey: "level",
EncodeLevel: zapcore.CapitalLevelEncoder,
},
OutputPaths: []string{"stdout"},
ErrorOutputPaths: []string{"stderr"},
}
l, err := zapConfig.Build()
if err != nil {
panic(err)
}
logger = l
}
func setup(writer *kinesisWriter, addr string) *http.Server {
s := &Server{
mux: http.NewServeMux(),
}
s.mux.HandleFunc("/receive", writer.receive)
hs := &http.Server{Addr: addr, Handler: s}
return hs
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.mux.ServeHTTP(w, r)
}
func main() {
var (
streamName = flag.String("stream-name", "", "Kinesis stream name")
listenAddr = flag.String("listen-addr", ":9501", "The address to listen on.")
aws_region = flag.String("region", os.Getenv("AWS_REGION"), "AWS region name")
writeInterval = flag.Duration("write-interval", 10*time.Second, "The interval between write.")
)
flag.Parse()
if *streamName == "" {
logger.Fatal("stream-name option is required")
}
config := Config{
streamName: *streamName,
writeInterval: *writeInterval,
AWSRegion: *aws_region,
}
logger.Info("starting prometheus_remote_kinesis", zap.String("stream-name", *streamName))
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
writer := newWriter(config)
h := setup(writer, *listenAddr)
go func() {
logger.Info(fmt.Sprintf("start http server on port %s", *listenAddr))
if err := h.ListenAndServe(); err != nil {
logger.Warn("http server shutting down")
if err != http.ErrServerClosed {
logger.Fatal("closed unexpected error", zap.NamedError("error", err))
}
writer.close()
h.Close()
}
}()
<-stop
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
h.Shutdown(ctx)
time.Sleep(1 * time.Second)
logger.Warn("shutting down")
}