Skip to content

Commit

Permalink
feat: If http-port is not specified, the http service is disabled to …
Browse files Browse the repository at this point in the history
…avoid security vulnerabilities
  • Loading branch information
YenchangChan committed May 23, 2024
1 parent 474034e commit 90b0b92
Showing 1 changed file with 69 additions and 67 deletions.
136 changes: 69 additions & 67 deletions cmd/clickhouse_sinker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,27 @@ func init() {

func main() {
util.Run("clickhouse_sinker", func() error {
// Initialize http server for metrics and debug
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`
httpHost := cmdOps.HTTPHost
if httpHost == "" {
ip, err := util.GetOutboundIP()
if err != nil {
return fmt.Errorf("failed to determine outbound ip: %w", err)
}
httpHost = ip.String()
}

httpPort := cmdOps.HTTPPort
if httpPort == 0 {
httpPort = util.HttpPortBase
}
httpPort = util.GetSpareTCPPort(httpPort)

// cmdOps.HTTPPort=0: disable the http server
if cmdOps.HTTPPort > 0 {
// Initialize http server for metrics and debug
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`
<html><head><title>ClickHouse Sinker</title></head>
<body>
<h1>ClickHouse Sinker</h1>
Expand All @@ -149,80 +166,65 @@ func main() {
<p><a href="/live?full=1">Live Full</a></p>
<p><a href="/debug/pprof/">pprof</a></p>
</body></html>`))
})
})

mux.HandleFunc("/state", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if runner != nil && runner.GetCurrentConfig() != nil {
var stateLags map[string]cm.StateLag
var bs []byte
var err error
if stateLags, err = cm.GetTaskStateAndLags(runner.GetCurrentConfig()); err == nil {
if bs, err = json.Marshal(stateLags); err == nil {
_, _ = w.Write(bs)
mux.HandleFunc("/state", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if runner != nil && runner.GetCurrentConfig() != nil {
var stateLags map[string]cm.StateLag
var bs []byte
var err error
if stateLags, err = cm.GetTaskStateAndLags(runner.GetCurrentConfig()); err == nil {
if bs, err = json.Marshal(stateLags); err == nil {
_, _ = w.Write(bs)
}
}
}
}
})
health.Health.AddLivenessCheck("task", func() error {
var err error
if runner != nil && runner.GetCurrentConfig() != nil {
var stateLags map[string]cm.StateLag
var count int
if stateLags, err = cm.GetTaskStateAndLags(runner.GetCurrentConfig()); err == nil {
for _, value := range stateLags {
if value.State == "Dead" {
count++
})
health.Health.AddLivenessCheck("task", func() error {
var err error
if runner != nil && runner.GetCurrentConfig() != nil {
var stateLags map[string]cm.StateLag
var count int
if stateLags, err = cm.GetTaskStateAndLags(runner.GetCurrentConfig()); err == nil {
for _, value := range stateLags {
if value.State == "Dead" {
count++
}
}
if count == len(stateLags) {
return fmt.Errorf("All task is Dead.")
}
} else {
return err
}
if count == len(stateLags) {
return fmt.Errorf("All task is Dead.")
}
} else {
return err
}
}
return nil
})
mux.Handle("/metrics", httpMetrics)
mux.HandleFunc("/ready", health.Health.ReadyEndpoint) // GET /ready?full=1
mux.HandleFunc("/live", health.Health.LiveEndpoint) // GET /live?full=1
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.Handle("/debug/vars", http.DefaultServeMux)

// cmdOps.HTTPPort=0: let OS choose the listen port, and record the exact metrics URL to log.
httpPort := cmdOps.HTTPPort
if httpPort == 0 {
httpPort = util.HttpPortBase
}
httpPort = util.GetSpareTCPPort(httpPort)
return nil
})
mux.Handle("/metrics", httpMetrics)
mux.HandleFunc("/ready", health.Health.ReadyEndpoint) // GET /ready?full=1
mux.HandleFunc("/live", health.Health.LiveEndpoint) // GET /live?full=1
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.Handle("/debug/vars", http.DefaultServeMux)

httpHost := cmdOps.HTTPHost
if httpHost == "" {
ip, err := util.GetOutboundIP()
httpAddr = fmt.Sprintf("%s:%d", httpHost, httpPort)
listener, err := net.Listen("tcp", httpAddr)
if err != nil {
return fmt.Errorf("failed to determine outbound ip: %w", err)
return fmt.Errorf("failed to listen on %q: %w", httpAddr, err)
}
httpHost = ip.String()
}

httpAddr = fmt.Sprintf("%s:%d", httpHost, httpPort)
listener, err := net.Listen("tcp", httpAddr)
if err != nil {
return fmt.Errorf("failed to listen on %q: %w", httpAddr, err)
}
util.Logger.Info(fmt.Sprintf("Run http server at http://%s/", httpAddr))

util.Logger.Info(fmt.Sprintf("Run http server at http://%s/", httpAddr))

go func() {
if err := http.Serve(listener, mux); err != nil {
util.Logger.Error("http.ListenAndServe failed", zap.Error(err))
}
}()
go func() {
if err := http.Serve(listener, mux); err != nil {
util.Logger.Error("http.ListenAndServe failed", zap.Error(err))
}
}()
}

var rcm cm.RemoteConfManager
var properties map[string]interface{}
Expand Down

0 comments on commit 90b0b92

Please sign in to comment.