/
main.go
executable file
·111 lines (85 loc) · 2.39 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package main
import (
"os"
"os/signal"
"syscall"
"time"
"github.com/bygui86/go-kafka-segmentio/reader/commons"
"github.com/bygui86/go-kafka-segmentio/reader/config"
"github.com/bygui86/go-kafka-segmentio/reader/logging"
"github.com/bygui86/go-kafka-segmentio/reader/monitoring"
"github.com/bygui86/go-kafka-segmentio/reader/reader"
)
var (
monitoringServer *monitoring.Server
kafkaReader *reader.KafkaReader
)
func main() {
initLogging()
logging.SugaredLog.Infof("Start %s", commons.ServiceName)
cfg := loadConfig()
if cfg.GetEnableMonitoring() {
monitoringServer = startMonitoringServer()
if cfg.GetEnableCustomMetrics() {
registerCustomMetrics()
}
}
kafkaReader = startReader()
logging.SugaredLog.Infof("%s up and running", commons.ServiceName)
startSysCallChannel()
shutdownAndWait(1)
}
func initLogging() {
err := logging.InitGlobalLogger()
if err != nil {
logging.SugaredLog.Errorf("Logging setup failed: %s", err.Error())
os.Exit(501)
}
}
func loadConfig() *config.Config {
logging.Log.Debug("Load configurations")
return config.LoadConfig()
}
func startMonitoringServer() *monitoring.Server {
logging.Log.Debug("Start monitoring")
server := monitoring.New()
logging.Log.Debug("Monitoring server successfully created")
server.Start()
logging.Log.Debug("Monitoring successfully started")
return server
}
func registerCustomMetrics() {
logging.Log.Debug("Register custom metrics")
monitoring.RegisterCustomMetrics()
}
func startReader() *reader.KafkaReader {
logging.Log.Debug("Start reader")
kReader, newErr := reader.New(commons.ServiceName)
if newErr != nil {
logging.SugaredLog.Errorf("Consumer setup failed: %s", newErr.Error())
os.Exit(501)
}
logging.Log.Debug("Reader successfully created")
startErr := kReader.Start()
if startErr != nil {
logging.SugaredLog.Errorf("Consumer start failed: %s", startErr.Error())
os.Exit(502)
}
logging.Log.Debug("Reader successfully started")
return kReader
}
func startSysCallChannel() {
syscallCh := make(chan os.Signal)
signal.Notify(syscallCh, syscall.SIGTERM, syscall.SIGINT, os.Interrupt)
<-syscallCh
}
func shutdownAndWait(timeout int) {
logging.SugaredLog.Warnf("Termination signal received! Timeout %d", timeout)
if kafkaReader != nil {
kafkaReader.Shutdown(timeout)
}
if monitoringServer != nil {
monitoringServer.Shutdown(timeout)
}
time.Sleep(time.Duration(timeout+1) * time.Second)
}