From 8cad51d71cb2ea844e71caa7e965e4ba5af479a8 Mon Sep 17 00:00:00 2001 From: Martin/Geno Date: Thu, 24 Jan 2019 02:54:21 +0100 Subject: [PATCH] [TASK] add respond(d) daemon - WIP --- .gitignore | 1 + cmd/config.go | 31 ++------- cmd/config_test.go | 8 ++- cmd/import.go | 5 +- cmd/respondd.go | 40 ++++++++++++ cmd/serve.go | 20 ++++-- config-respondd_example.toml | 2 + data/response.go | 6 +- respond/collector.go | 15 ----- respond/daemon.go | 121 +++++++++++++++++++++++++++++++++++ respond/respond.go | 37 +++++++++++ 11 files changed, 232 insertions(+), 54 deletions(-) create mode 100644 cmd/respondd.go create mode 100644 config-respondd_example.toml create mode 100644 respond/daemon.go diff --git a/.gitignore b/.gitignore index 894526c2..d25e8646 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,5 @@ _testmain.go *.prof webroot /config.toml +/config-respondd.toml /vendor diff --git a/cmd/config.go b/cmd/config.go index 21ab5dd9..17428ea4 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -1,54 +1,31 @@ package cmd import ( - "fmt" "io/ioutil" - "os" "github.com/naoina/toml" - "github.com/FreifunkBremen/yanic/database" "github.com/FreifunkBremen/yanic/respond" "github.com/FreifunkBremen/yanic/runtime" - "github.com/FreifunkBremen/yanic/webserver" ) -// Config represents the whole configuration -type Config struct { - Respondd respond.Config - Webserver webserver.Config - Nodes runtime.NodesConfig - Database database.Config -} - var ( configPath string collector *respond.Collector nodes *runtime.Nodes ) -func loadConfig() *Config { - config, err := ReadConfigFile(configPath) - if err != nil { - fmt.Fprintln(os.Stderr, "unable to load config file:", err) - os.Exit(2) - } - return config -} - // ReadConfigFile reads a config model from path of a yml file -func ReadConfigFile(path string) (config *Config, err error) { - config = &Config{} - +func ReadConfigFile(path string, config interface{}) error { file, err := ioutil.ReadFile(path) if err != nil { - return nil, err + return err } err = toml.Unmarshal(file, config) if err != nil { - return nil, err + return err } - return + return nil } diff --git a/cmd/config_test.go b/cmd/config_test.go index ce6cd61a..df296cec 100644 --- a/cmd/config_test.go +++ b/cmd/config_test.go @@ -10,7 +10,9 @@ import ( func TestReadConfig(t *testing.T) { assert := assert.New(t) - config, err := ReadConfigFile("../config_example.toml") + config := &ServeConfig{} + err := ReadConfigFile("../config_example.toml", config) + assert.NoError(err) assert.NotNil(config) @@ -40,11 +42,11 @@ func TestReadConfig(t *testing.T) { }, }, meshviewer) - _, err = ReadConfigFile("testdata/config_invalid.toml") + err = ReadConfigFile("testdata/config_invalid.toml", config) assert.Error(err, "not unmarshalable") assert.Contains(err.Error(), "invalid TOML syntax") - _, err = ReadConfigFile("testdata/adsa.toml") + err = ReadConfigFile("testdata/adsa.toml", config) assert.Error(err, "not found able") assert.Contains(err.Error(), "no such file or directory") } diff --git a/cmd/import.go b/cmd/import.go index 89ff203d..ef5edafd 100644 --- a/cmd/import.go +++ b/cmd/import.go @@ -19,7 +19,10 @@ var importCmd = &cobra.Command{ path := args[0] site := args[1] domain := args[2] - config := loadConfig() + config := &ServeConfig{} + if err := ReadConfigFile(configPath, config); err != nil { + log.Panicf("unable to load config file: %s", err) + } err := allDatabase.Start(config.Database) if err != nil { diff --git a/cmd/respondd.go b/cmd/respondd.go new file mode 100644 index 00000000..d7a7bb37 --- /dev/null +++ b/cmd/respondd.go @@ -0,0 +1,40 @@ +package cmd + +import ( + "os" + "os/signal" + "syscall" + + "github.com/bdlm/log" + "github.com/spf13/cobra" + + "github.com/FreifunkBremen/yanic/respond" +) + +// serveCmd represents the serve command +var responddCMD = &cobra.Command{ + Use: "respondd", + Short: "Runs a respond daemon", + Example: "yanic respondd --config /etc/respondd.toml", + Run: func(cmd *cobra.Command, args []string) { + daemon := &respond.Daemon{} + if err := ReadConfigFile(configPath, daemon); err != nil { + log.Panicf("unable to load config file: %s", err) + } + + go daemon.Start() + + log.Info("respondd daemon started") + // Wait for INT/TERM + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + sig := <-sigs + log.Infof("received %s", sig) + + }, +} + +func init() { + RootCmd.AddCommand(responddCMD) + responddCMD.Flags().StringVarP(&configPath, "config", "c", "config-respondd.toml", "Path to configuration file") +} diff --git a/cmd/serve.go b/cmd/serve.go index 2b1f6486..0096e748 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -9,6 +9,7 @@ import ( "github.com/bdlm/log" "github.com/spf13/cobra" + "github.com/FreifunkBremen/yanic/database" allDatabase "github.com/FreifunkBremen/yanic/database/all" allOutput "github.com/FreifunkBremen/yanic/output/all" "github.com/FreifunkBremen/yanic/respond" @@ -16,16 +17,26 @@ import ( "github.com/FreifunkBremen/yanic/webserver" ) +// Config represents the whole configuration +type ServeConfig struct { + Respondd respond.Config + Webserver webserver.Config + Nodes runtime.NodesConfig + Database database.Config +} + // serveCmd represents the serve command var serveCmd = &cobra.Command{ Use: "serve", Short: "Runs the yanic server", Example: "yanic serve --config /etc/yanic.toml", Run: func(cmd *cobra.Command, args []string) { - config := loadConfig() + config := &ServeConfig{} + if err := ReadConfigFile(configPath, config); err != nil { + log.Panicf("unable to load config file: %s", err) + } - err := allDatabase.Start(config.Database) - if err != nil { + if err := allDatabase.Start(config.Database); err != nil { log.Panicf("could not connect to database: %s", err) } defer allDatabase.Close() @@ -33,8 +44,7 @@ var serveCmd = &cobra.Command{ nodes = runtime.NewNodes(&config.Nodes) nodes.Start() - err = allOutput.Start(nodes, config.Nodes) - if err != nil { + if err := allOutput.Start(nodes, config.Nodes); err != nil { log.Panicf("error on init outputs: %s", err) } defer allOutput.Close() diff --git a/config-respondd_example.toml b/config-respondd_example.toml new file mode 100644 index 00000000..e89602f1 --- /dev/null +++ b/config-respondd_example.toml @@ -0,0 +1,2 @@ +multi_instance = false +data_interval = "3m" diff --git a/data/response.go b/data/response.go index b52874d5..af55cc27 100644 --- a/data/response.go +++ b/data/response.go @@ -2,7 +2,7 @@ package data // ResponseData struct type ResponseData struct { - Neighbours *Neighbours `json:"neighbours"` - Nodeinfo *Nodeinfo `json:"nodeinfo"` - Statistics *Statistics `json:"statistics"` + Nodeinfo *Nodeinfo `json:"nodeinfo" toml:"nodeinfo"` + Statistics *Statistics `json:"statistics" toml:"statistics"` + Neighbours *Neighbours `json:"neighbours" toml:"neighbours"` } diff --git a/respond/collector.go b/respond/collector.go index 929d58a5..1f6379cc 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -1,9 +1,6 @@ package respond import ( - "bytes" - "compress/flate" - "encoding/json" "fmt" "net" "time" @@ -245,18 +242,6 @@ func (coll *Collector) parser() { } } -func (res *Response) parse() (*data.ResponseData, error) { - // Deflate - deflater := flate.NewReader(bytes.NewReader(res.Raw)) - defer deflater.Close() - - // Unmarshal - rdata := &data.ResponseData{} - err := json.NewDecoder(deflater).Decode(rdata) - - return rdata, err -} - func (coll *Collector) saveResponse(addr *net.UDPAddr, res *data.ResponseData) { // Search for NodeID var nodeID string diff --git a/respond/daemon.go b/respond/daemon.go new file mode 100644 index 00000000..8321b7db --- /dev/null +++ b/respond/daemon.go @@ -0,0 +1,121 @@ +package respond + +import ( + "io/ioutil" + "net" + "os" + "strings" + "time" + + "github.com/bdlm/log" + + "github.com/FreifunkBremen/yanic/data" + "github.com/FreifunkBremen/yanic/lib/duration" +) + +type Daemon struct { + MultiInstance bool `toml:"multi_instance"` + DataInterval duration.Duration `toml:"data_interval"` + Data *data.ResponseData `toml:"data"` +} + +func (d *Daemon) Start() { + if d.Data == nil { + d.Data = &data.ResponseData{} + } + + d.updateData() + go d.updateWorker() + + socket, err := net.ListenMulticastUDP("udp6", nil, &net.UDPAddr{ + IP: net.ParseIP(multicastAddressDefault), + Port: port, + }) + if err != nil { + log.Fatal(err) + } + + socket.SetReadBuffer(maxDataGramSize) + + // Loop forever reading from the socket + for { + buf := make([]byte, maxDataGramSize) + n, src, err := socket.ReadFromUDP(buf) + if err != nil { + log.Errorf("ReadFromUDP failed: %s", err) + } + raw := make([]byte, n) + copy(raw, buf) + log.WithFields(map[string]interface{}{ + "bytes": n, + "data": string(raw), + "src": src.String(), + }).Debug("recieve request") + + // TODO handle single request + + res, err := NewRespone(d.Data, src) + if err != nil { + log.Errorf("Decode failed: %s", err) + } else { + n, err := socket.WriteToUDP(res.Raw, res.Address) + if err != nil { + log.Errorf("WriteToUDP failed: %s", err) + } else { + log.WithFields(map[string]interface{}{ + "bytes": n, + "dest": res.Address.String(), + }).Debug("send respond") + } + } + + } +} + +func trim(s string) string { + return strings.TrimSpace(strings.Trim(s, "\n")) +} + +func (d *Daemon) updateWorker() { + c := time.Tick(d.DataInterval.Duration) + + for range c { + d.updateData() + } +} + +func (d *Daemon) updateData() { + nodeID := "" + // Nodeinfo + if d.Data.Nodeinfo == nil { + d.Data.Nodeinfo = &data.Nodeinfo{} + } else { + nodeID = d.Data.Nodeinfo.NodeID + } + if d.Data.Nodeinfo.Hostname == "" { + d.Data.Nodeinfo.Hostname, _ = os.Hostname() + } + + // Statistics + if d.Data.Statistics == nil { + d.Data.Statistics = &data.Statistics{} + } else if nodeID == "" { + nodeID = d.Data.Statistics.NodeID + } + + // Neighbours + if d.Data.Neighbours == nil { + d.Data.Neighbours = &data.Neighbours{} + } else if nodeID == "" { + nodeID = d.Data.Neighbours.NodeID + } + + if nodeID == "" { + if v, err := ioutil.ReadFile("/etc/machine-id"); err == nil { + nodeID = trim(string(v))[:12] + } + } + d.Data.Nodeinfo.NodeID = nodeID + d.Data.Statistics.NodeID = nodeID + d.Data.Neighbours.NodeID = nodeID +} diff --git a/respond/respond.go b/respond/respond.go index 9564b8e6..c5ff87e4 100644 --- a/respond/respond.go +++ b/respond/respond.go @@ -1,7 +1,12 @@ package respond import ( + "bytes" + "compress/flate" + "encoding/json" "net" + + "github.com/FreifunkBremen/yanic/data" ) const ( @@ -20,3 +25,35 @@ type Response struct { Address *net.UDPAddr Raw []byte } + +func NewRespone(res *data.ResponseData, addr *net.UDPAddr) (*Response, error) { + buf := new(bytes.Buffer) + flater, err := flate.NewWriter(buf, flate.BestCompression) + if err != nil { + return nil, err + } + defer flater.Close() + + if err = json.NewEncoder(flater).Encode(res); err != nil { + return nil, err + } + + err = flater.Flush() + + return &Response{ + Raw: buf.Bytes(), + Address: addr, + }, err +} + +func (res *Response) parse() (*data.ResponseData, error) { + // Deflate + deflater := flate.NewReader(bytes.NewReader(res.Raw)) + defer deflater.Close() + + // Unmarshal + rdata := &data.ResponseData{} + err := json.NewDecoder(deflater).Decode(rdata) + + return rdata, err +}