Skip to content

Commit

Permalink
improve config structure (#345)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #345

Improve config structure to allow easier introduction of new options.

Reviewed By: abulimov

Differential Revision: D56253896

fbshipit-source-id: 35f4fa29eed8c75aa2b6b20609cce3e4656bf276
  • Loading branch information
leoleovich authored and facebook-github-bot committed Apr 18, 2024
1 parent b1396fb commit 0d7d069
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 61 deletions.
39 changes: 19 additions & 20 deletions cmd/ntpresponder/main.go
Expand Up @@ -40,26 +40,25 @@ func main() {
s := server.Server{}

var (
debugger bool
logLevel string
monitoringport int
debugger bool
logLevel string
)

flag.StringVar(&logLevel, "loglevel", "info", "Set a log level. Can be: debug, info, warning, error")
flag.StringVar(&s.ListenConfig.Iface, "interface", "lo", "Interface to add IPs to")
flag.StringVar(&s.RefID, "refid", "OLEG", "Reference ID of the server")
flag.IntVar(&s.ListenConfig.Port, "port", 123, "Port to run service on")
flag.IntVar(&monitoringport, "monitoringport", 0, "Port to run monitoring server on")
flag.IntVar(&s.Stratum, "stratum", 1, "Stratum of the server")
flag.IntVar(&s.Workers, "workers", runtime.NumCPU()*100, "How many workers (routines) to run")
flag.Var(&s.ListenConfig.IPs, "ip", fmt.Sprintf("IP to listen to. Repeat for multiple. Default: %s", server.DefaultServerIPs))
flag.StringVar(&s.Config.Iface, "interface", "lo", "Interface to add IPs to")
flag.StringVar(&s.Config.RefID, "refid", "OLEG", "Reference ID of the server")
flag.IntVar(&s.Config.Port, "port", 123, "Port to run service on")
flag.IntVar(&s.Config.MonitoringPort, "monitoringport", 0, "Port to run monitoring server on")
flag.IntVar(&s.Config.Stratum, "stratum", 1, "Stratum of the server")
flag.IntVar(&s.Config.Workers, "workers", runtime.NumCPU()*100, "How many workers (routines) to run")
flag.Var(&s.Config.IPs, "ip", fmt.Sprintf("IP to listen to. Repeat for multiple. Default: %s", server.DefaultServerIPs))
flag.BoolVar(&debugger, "pprof", false, "Enable pprof")
flag.BoolVar(&s.ListenConfig.ShouldAnnounce, "announce", false, "Advertize IPs")
flag.DurationVar(&s.ExtraOffset, "extraoffset", 0, "Extra offset to return to clients")
flag.BoolVar(&s.ManageLoopback, "manage-loopback", true, "Add/remove IPs. If false, these must be managed elsewhere")
flag.BoolVar(&s.Config.ShouldAnnounce, "announce", false, "Advertize IPs")
flag.DurationVar(&s.Config.ExtraOffset, "extraoffset", 0, "Extra offset to return to clients")
flag.BoolVar(&s.Config.ManageLoopback, "manage-loopback", true, "Add/remove IPs. If false, these must be managed elsewhere")

flag.Parse()
s.ListenConfig.IPs.SetDefault()
s.Config.IPs.SetDefault()

switch logLevel {
case "debug":
Expand All @@ -74,8 +73,8 @@ func main() {
log.Fatalf("Unrecognized log level: %v", logLevel)
}

if s.Workers < 1 {
log.Fatalf("Will not start without workers")
if err := s.Config.Validate(); err != nil {
log.Fatalf("Config is invalid: %v", err)
}

if debugger {
Expand All @@ -85,21 +84,21 @@ func main() {
}()
}

if s.ListenConfig.ShouldAnnounce {
if s.Config.ShouldAnnounce {
log.Warningf("Will announce VIPs")
}

// Monitoring
// Replace with your implementation of Stats
st := &stats.JSONStats{}
go st.Start(monitoringport)
go st.Start(s.Config.MonitoringPort)

// Replace with your implementation of Announce
s.Announce = &announce.NoopAnnounce{}

ch := &checker.SimpleChecker{
ExpectedListeners: int64(len(s.ListenConfig.IPs)),
ExpectedWorkers: int64(s.Workers),
ExpectedListeners: int64(len(s.Config.IPs)),
ExpectedWorkers: int64(s.Config.Workers),
}

// context is used in server in case work needs to be interrupted internally
Expand Down
21 changes: 18 additions & 3 deletions ntp/responder/server/config.go
Expand Up @@ -20,17 +20,24 @@ import (
"fmt"
"net"
"strings"
"time"
)

// DefaultServerIPs is a default list of IPs server will bind to if nothing else is specified
var DefaultServerIPs = MultiIPs{net.ParseIP("127.0.0.1"), net.ParseIP("::1")}

// ListenConfig is a wrapper around multiple IPs and Port to bind to
type ListenConfig struct {
// Config is a server config structure
type Config struct {
ExtraOffset time.Duration
Iface string
IPs MultiIPs
ManageLoopback bool
MonitoringPort int
Port int
RefID string
ShouldAnnounce bool
Iface string
Stratum int
Workers int
}

// MultiIPs is a wrapper allowing to set multiple IPs with flag parser
Expand Down Expand Up @@ -63,3 +70,11 @@ func (m *MultiIPs) SetDefault() {

*m = DefaultServerIPs
}

// Validate checks if config is valid
func (c *Config) Validate() error {
if c.Workers < 1 {
return fmt.Errorf("will not start without workers")
}
return nil
}
7 changes: 7 additions & 0 deletions ntp/responder/server/config_test.go
Expand Up @@ -61,3 +61,10 @@ func TestConfigSetDefault(t *testing.T) {

require.Equal(t, DefaultServerIPs, m)
}

func TestConfigValidate(t *testing.T) {
c := Config{Workers: 42}
require.NoError(t, c.Validate())
c.Workers = 0
require.Error(t, c.Validate())
}
16 changes: 8 additions & 8 deletions ntp/responder/server/ip.go
Expand Up @@ -31,27 +31,27 @@ const ipv6Mask = 64

// AddIPOnInterface adds ip to interface
func (s *Server) addIPToInterface(vip net.IP) error {
if !s.ManageLoopback {
if !s.Config.ManageLoopback {
return nil
}
log.Infof("Adding %s to %s", vip, s.ListenConfig.Iface)
log.Infof("Adding %s to %s", vip, s.Config.Iface)
// Add IPs to the interface
iface, err := net.InterfaceByName(s.ListenConfig.Iface)
iface, err := net.InterfaceByName(s.Config.Iface)
if err != nil {
return fmt.Errorf("failed to add IP to the %s interface: %w", s.ListenConfig.Iface, err)
return fmt.Errorf("failed to add IP to the %s interface: %w", s.Config.Iface, err)
}

return addIfaceIP(iface, &vip)
}

// deleteIPFromInterface deletes ip from interface
func (s *Server) deleteIPFromInterface(vip net.IP) error {
if !s.ManageLoopback {
if !s.Config.ManageLoopback {
return nil
}
log.Infof("Deleting %s to %s", vip, s.ListenConfig.Iface)
log.Infof("Deleting %s to %s", vip, s.Config.Iface)
// Delete IPs to the interface
iface, err := net.InterfaceByName(s.ListenConfig.Iface)
iface, err := net.InterfaceByName(s.Config.Iface)
if err != nil {
return err
}
Expand All @@ -61,7 +61,7 @@ func (s *Server) deleteIPFromInterface(vip net.IP) error {

// DeleteAllIPs deletes all IPs from interface specified in config
func (s *Server) DeleteAllIPs() {
for _, vip := range s.ListenConfig.IPs {
for _, vip := range s.Config.IPs {
if err := s.deleteIPFromInterface(vip); err != nil {
// Don't return error. Continue deleting
log.Errorf("[server]: %v", err)
Expand Down
10 changes: 4 additions & 6 deletions ntp/responder/server/ip_test.go
Expand Up @@ -26,20 +26,18 @@ import (
const testIP = "1.2.3.4"

func TestAddIPToInterfaceError(t *testing.T) {
lc := ListenConfig{Iface: "lol-does-not-exist"}
c := Config{Iface: "lol-does-not-exist", ManageLoopback: true}
s := &Server{
ListenConfig: lc,
ManageLoopback: true,
Config: c,
}
err := s.addIPToInterface(net.ParseIP(testIP))
require.NotNil(t, err)
}

func TestDeleteIPFromInterfaceError(t *testing.T) {
lc := ListenConfig{Iface: "lol-does-not-exist"}
c := Config{Iface: "lol-does-not-exist", ManageLoopback: true}
s := &Server{
ListenConfig: lc,
ManageLoopback: true,
Config: c,
}
err := s.deleteIPFromInterface(net.ParseIP(testIP))
require.NotNil(t, err)
Expand Down
39 changes: 17 additions & 22 deletions ntp/responder/server/server.go
Expand Up @@ -44,31 +44,26 @@ type task struct {

// Server is a type for UDP server which handles connections.
type Server struct {
ListenConfig ListenConfig
Workers int
Announce Announce
Stats Stats
Checker Checker
tasks chan task
ExtraOffset time.Duration
RefID string
Stratum int
ManageLoopback bool
Config Config
Announce Announce
Stats Stats
Checker Checker
tasks chan task
}

// Start UDP server.
func (s *Server) Start(ctx context.Context, cancelFunc context.CancelFunc) {
log.Infof("Creating %d goroutine workers", s.Workers)
s.tasks = make(chan task, s.Workers)
log.Infof("Creating %d goroutine workers", s.Config.Workers)
s.tasks = make(chan task, s.Config.Workers)
// Pre-create workers
for i := 0; i < s.Workers; i++ {
for i := 0; i < s.Config.Workers; i++ {
go s.startWorker()
}

log.Infof("Starting %d listener(s)", len(s.ListenConfig.IPs))
log.Infof("Starting %d listener(s)", len(s.Config.IPs))

for _, ip := range s.ListenConfig.IPs {
log.Infof("Starting listener on %s:%d", ip.String(), s.ListenConfig.Port)
for _, ip := range s.Config.IPs {
log.Infof("Starting listener on %s:%d", ip.String(), s.Config.Port)

go func(ip net.IP) {
s.Stats.IncListeners()
Expand All @@ -77,7 +72,7 @@ func (s *Server) Start(ctx context.Context, cancelFunc context.CancelFunc) {
log.Errorf("[server]: %v", err)
}

conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: s.ListenConfig.Port})
conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: s.Config.Port})
if err != nil {
log.Fatalf("listening error: %v", err)
}
Expand Down Expand Up @@ -109,10 +104,10 @@ func (s *Server) Start(ctx context.Context, cancelFunc context.CancelFunc) {
case <-ctx.Done():
return
case <-time.After(30 * time.Second):
if s.ListenConfig.ShouldAnnounce {
if s.Config.ShouldAnnounce {
// First run will be 30 seconds delayed
log.Debug("Requesting VIPs announce")
err := s.Announce.Advertise(s.ListenConfig.IPs)
err := s.Announce.Advertise(s.Config.IPs)
if err != nil {
log.Errorf("Error during announcement: %v", err)
s.Stats.ResetAnnounce()
Expand Down Expand Up @@ -188,7 +183,7 @@ func (s *Server) startWorker() {
s.Stats.IncWorkers()
for {
t := <-s.tasks
t.serve(response, s.ExtraOffset)
t.serve(response, s.Config.ExtraOffset)
}
}

Expand Down Expand Up @@ -220,14 +215,14 @@ func (t *task) serve(response *ntp.Packet, extraoffset time.Duration) {
// fillStaticHeaders pre-sets all the headers per worker which will never change
// numbers are taken from tcpdump.
func (s *Server) fillStaticHeaders(response *ntp.Packet) {
response.Stratum = uint8(s.Stratum)
response.Stratum = uint8(s.Config.Stratum)
response.Precision = -32
// Root delay. We pretend to be stratum 1
response.RootDelay = 0
// Root dispersion, big-endian 0.000152
response.RootDispersion = 10
// Reference ID ATOM. Only for stratum 1
response.ReferenceID = binary.BigEndian.Uint32([]byte(fmt.Sprintf("%-4s", s.RefID)))
response.ReferenceID = binary.BigEndian.Uint32([]byte(fmt.Sprintf("%-4s", s.Config.RefID)))
}

// generateResponse generates response NTP packet
Expand Down
4 changes: 2 additions & 2 deletions ntp/responder/server/server_test.go
Expand Up @@ -45,14 +45,14 @@ var ntpRequest = &ntp.Packet{

func TestFillStaticHeadersStratum(t *testing.T) {
stratum := 1
s := &Server{Stratum: stratum}
s := &Server{Config: Config{Stratum: stratum}}
response := &ntp.Packet{}
s.fillStaticHeaders(response)
require.Equal(t, uint8(stratum), response.Stratum)
}

func TestFillStaticHeadersReferenceID(t *testing.T) {
s := &Server{RefID: "CHANDLER"}
s := &Server{Config: Config{RefID: "CHANDLER"}}
response := &ntp.Packet{}

s.fillStaticHeaders(response)
Expand Down

0 comments on commit 0d7d069

Please sign in to comment.