From 890b15fe55fc3e691cb6ea9f8084c9eee1b3b695 Mon Sep 17 00:00:00 2001 From: Julian Kornberger Date: Tue, 18 Apr 2017 03:11:05 +0200 Subject: [PATCH] Add per-link statistics closes #23 --- database/influxdb/database.go | 1 + database/influxdb/node.go | 40 +++++++++++---- database/influxdb/node_test.go | 92 ++++++++++++++++++++++++++-------- respond/collector.go | 14 +++++- runtime/nodes.go | 31 ++++-------- 5 files changed, 124 insertions(+), 54 deletions(-) diff --git a/database/influxdb/database.go b/database/influxdb/database.go index 0ca036b8..2de6dac5 100644 --- a/database/influxdb/database.go +++ b/database/influxdb/database.go @@ -12,6 +12,7 @@ import ( ) const ( + MeasurementLink = "link" // Measurement for per-link statistics MeasurementNode = "node" // Measurement for per-node statistics MeasurementGlobal = "global" // Measurement for summarized global statistics CounterMeasurementFirmware = "firmware" // Measurement for firmware statistics diff --git a/database/influxdb/node.go b/database/influxdb/node.go index 5f23368f..8779d493 100644 --- a/database/influxdb/node.go +++ b/database/influxdb/node.go @@ -11,24 +11,28 @@ import ( "github.com/FreifunkBremen/yanic/runtime" ) -// InsertNode implementation of database -func (conn *Connection) InsertNode(node *runtime.Node) { - tags, fields := buildNodeStats(node) - conn.addPoint(MeasurementNode, tags, fields, time.Now()) -} - +// PruneNodes prunes historical per-node data func (conn *Connection) PruneNodes(deleteAfter time.Duration) { - query := fmt.Sprintf("delete from %s where time < now() - %ds", MeasurementNode, deleteAfter/time.Second) - conn.client.Query(client.NewQuery(query, conn.config.Database(), "m")) + for _, measurement := range []string{MeasurementNode, MeasurementLink} { + query := fmt.Sprintf("delete from %s where time < now() - %ds", measurement, deleteAfter/time.Second) + conn.client.Query(client.NewQuery(query, conn.config.Database(), "m")) + } + } -// returns tags and fields for InfluxDB -func buildNodeStats(node *runtime.Node) (tags models.Tags, fields models.Fields) { +// InsertNode stores statistics and neighbours in the database +func (conn *Connection) InsertNode(node *runtime.Node) { stats := node.Statistics + time := node.Lastseen.GetTime() + if stats == nil { + return + } + + tags := models.Tags{} tags.SetString("nodeid", stats.NodeID) - fields = map[string]interface{}{ + fields := models.Fields{ "load": stats.LoadAverage, "time.up": int64(stats.Uptime), "time.idle": int64(stats.Idletime), @@ -77,6 +81,9 @@ func buildNodeStats(node *runtime.Node) (tags models.Tags, fields models.Fields) batadv := 0 for _, batadvNeighbours := range neighbours.Batadv { batadv += len(batadvNeighbours.Neighbours) + for neighbourID, link := range batadvNeighbours.Neighbours { + conn.insertLinkStatistics(stats.NodeID, neighbourID, link.Tq, time) + } } fields["neighbours.batadv"] = batadv @@ -123,5 +130,16 @@ func buildNodeStats(node *runtime.Node) (tags models.Tags, fields models.Fields) tags.SetString("frequency"+suffix, strconv.Itoa(int(airtime.Frequency))) } + conn.addPoint(MeasurementNode, tags, fields, time) + return } + +// adds a link data point +func (conn *Connection) insertLinkStatistics(source string, target string, tq int, t time.Time) { + tags := models.Tags{} + tags.SetString("source", source) + tags.SetString("target", target) + + conn.addPoint(MeasurementLink, tags, models.Fields{"tq": tq}, t) +} diff --git a/database/influxdb/node_test.go b/database/influxdb/node_test.go index 3fa7f32d..0d76f360 100644 --- a/database/influxdb/node_test.go +++ b/database/influxdb/node_test.go @@ -3,6 +3,7 @@ package influxdb import ( "testing" + "github.com/influxdata/influxdb/client/v2" "github.com/stretchr/testify/assert" "github.com/FreifunkBremen/yanic/data" @@ -14,7 +15,7 @@ func TestToInflux(t *testing.T) { node := &runtime.Node{ Statistics: &data.Statistics{ - NodeID: "foobar", + NodeID: "deadbeef", LoadAverage: 0.5, Wireless: data.WirelessStatistics{ &data.WirelessAirtime{Frequency: 5500}, @@ -58,7 +59,9 @@ func TestToInflux(t *testing.T) { Batadv: map[string]data.BatadvNeighbours{ "a-interface": data.BatadvNeighbours{ Neighbours: map[string]data.BatmanLink{ - "b-neigbourinterface": data.BatmanLink{}, + "BAFF1E5": data.BatmanLink{ + Tq: 75, + }, }, }, }, @@ -66,23 +69,70 @@ func TestToInflux(t *testing.T) { }, } - tags, fields := buildNodeStats(node) - - assert.Equal("foobar", tags.GetString("nodeid")) - assert.Equal("nobody", tags.GetString("owner")) - assert.Equal(0.5, fields["load"]) - assert.Equal(0, fields["neighbours.lldp"]) - assert.Equal(1, fields["neighbours.batadv"]) - assert.Equal(1, fields["neighbours.vpn"]) - assert.Equal(1, fields["neighbours.total"]) - - assert.Equal(uint32(3), fields["wireless.txpower24"]) - assert.Equal(uint32(5500), fields["airtime11a.frequency"]) - assert.Equal("", tags.GetString("frequency5500")) - - assert.Equal(int64(1213), fields["traffic.rx.bytes"]) - assert.Equal(float64(1321), fields["traffic.tx.dropped"]) - assert.Equal(int64(1322), fields["traffic.forward.bytes"]) - assert.Equal(int64(2331), fields["traffic.mgmt_rx.bytes"]) - assert.Equal(float64(2327), fields["traffic.mgmt_tx.packets"]) + points := testPoints(node) + var fields map[string]interface{} + var tags map[string]string + + assert.Len(points, 2) + + // first point contains the neighbour + nPoint := points[0] + tags = nPoint.Tags() + fields, _ = nPoint.Fields() + assert.EqualValues("link", nPoint.Name()) + assert.EqualValues(map[string]string{"source": "deadbeef", "target": "BAFF1E5"}, tags) + assert.EqualValues(75, fields["tq"]) + + // second point contains the statistics + sPoint := points[1] + tags = sPoint.Tags() + fields, _ = sPoint.Fields() + + assert.EqualValues("deadbeef", tags["nodeid"]) + assert.EqualValues("nobody", tags["owner"]) + assert.EqualValues(0.5, fields["load"]) + assert.EqualValues(0, fields["neighbours.lldp"]) + assert.EqualValues(1, fields["neighbours.batadv"]) + assert.EqualValues(1, fields["neighbours.vpn"]) + assert.EqualValues(1, fields["neighbours.total"]) + + assert.EqualValues(uint32(3), fields["wireless.txpower24"]) + assert.EqualValues(uint32(5500), fields["airtime11a.frequency"]) + assert.EqualValues("", tags["frequency5500"]) + + assert.EqualValues(int64(1213), fields["traffic.rx.bytes"]) + assert.EqualValues(float64(1321), fields["traffic.tx.dropped"]) + assert.EqualValues(int64(1322), fields["traffic.forward.bytes"]) + assert.EqualValues(int64(2331), fields["traffic.mgmt_rx.bytes"]) + assert.EqualValues(float64(2327), fields["traffic.mgmt_tx.packets"]) +} + +// Processes data and returns the InfluxDB points +func testPoints(nodes ...*runtime.Node) (points []*client.Point) { + // Create dummy client + influxClient, err := client.NewHTTPClient(client.HTTPConfig{Addr: "http://127.0.0.1"}) + if err != nil { + panic(err) + } + + // Create dummy connection + conn := &Connection{ + points: make(chan *client.Point), + client: influxClient, + } + + // Process data + go func() { + for _, node := range nodes { + conn.InsertNode(node) + } + conn.Close() + }() + + // Read points + for point := range conn.points { + points = append(points, point) + } + + return } diff --git a/respond/collector.go b/respond/collector.go index b5406c19..ff53710c 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -204,13 +204,23 @@ func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) { return } + // Set fields to nil if nodeID is inconsistent + if res.Statistics != nil && res.Statistics.NodeID != nodeID { + res.Statistics = nil + } + if res.Neighbours != nil && res.Neighbours.NodeID != nodeID { + res.Neighbours = nil + } + if res.NodeInfo != nil && res.NodeInfo.NodeID != nodeID { + res.NodeInfo = nil + } + // Process the data and update IP address node := coll.nodes.Update(nodeID, res) node.Address = addr.IP // Store statistics in database - if coll.db != nil && node.Statistics != nil { - node.Statistics.NodeID = nodeID + if coll.db != nil { coll.db.InsertNode(node) } } diff --git a/runtime/nodes.go b/runtime/nodes.go index 012c1094..68239404 100644 --- a/runtime/nodes.go +++ b/runtime/nodes.go @@ -52,30 +52,21 @@ func (nodes *Nodes) Update(nodeID string, res *data.ResponseData) *Node { } nodes.Unlock() - node.Lastseen = now - node.Online = true - - // Update neighbours - if val := res.Neighbours; val != nil { - node.Neighbours = val - } - - // Update nodeinfo - if val := res.NodeInfo; val != nil { - node.Nodeinfo = val - } - - // Update statistics - if val := res.Statistics; val != nil { - + // Update wireless statistics + if statistics := res.Statistics; statistics != nil { // Update channel utilization if previous statistics are present - if node.Statistics != nil && node.Statistics.Wireless != nil && val.Wireless != nil { - val.Wireless.SetUtilization(node.Statistics.Wireless) + if node.Statistics != nil && node.Statistics.Wireless != nil && statistics.Wireless != nil { + statistics.Wireless.SetUtilization(node.Statistics.Wireless) } - - node.Statistics = val } + // Update fields + node.Lastseen = now + node.Online = true + node.Neighbours = res.Neighbours + node.Nodeinfo = res.NodeInfo + node.Statistics = res.Statistics + return node }