Skip to content

Commit

Permalink
Merge 890b15f into 237e011
Browse files Browse the repository at this point in the history
  • Loading branch information
corny committed Apr 18, 2017
2 parents 237e011 + 890b15f commit 5cc8e3f
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 54 deletions.
1 change: 1 addition & 0 deletions database/influxdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

const (
MeasurementLink = "link" // Measurement for per-link statistics
MeasurementNode = "node" // Measurement for per-node statistics
MeasurementGlobal = "global" // Measurement for summarized global statistics
CounterMeasurementFirmware = "firmware" // Measurement for firmware statistics
Expand Down
40 changes: 29 additions & 11 deletions database/influxdb/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,28 @@ import (
"github.com/FreifunkBremen/yanic/runtime"
)

// InsertNode implementation of database
func (conn *Connection) InsertNode(node *runtime.Node) {
tags, fields := buildNodeStats(node)
conn.addPoint(MeasurementNode, tags, fields, time.Now())
}

// PruneNodes prunes historical per-node data
func (conn *Connection) PruneNodes(deleteAfter time.Duration) {
query := fmt.Sprintf("delete from %s where time < now() - %ds", MeasurementNode, deleteAfter/time.Second)
conn.client.Query(client.NewQuery(query, conn.config.Database(), "m"))
for _, measurement := range []string{MeasurementNode, MeasurementLink} {
query := fmt.Sprintf("delete from %s where time < now() - %ds", measurement, deleteAfter/time.Second)
conn.client.Query(client.NewQuery(query, conn.config.Database(), "m"))
}

}

// returns tags and fields for InfluxDB
func buildNodeStats(node *runtime.Node) (tags models.Tags, fields models.Fields) {
// InsertNode stores statistics and neighbours in the database
func (conn *Connection) InsertNode(node *runtime.Node) {
stats := node.Statistics
time := node.Lastseen.GetTime()

if stats == nil {
return
}

tags := models.Tags{}
tags.SetString("nodeid", stats.NodeID)

fields = map[string]interface{}{
fields := models.Fields{
"load": stats.LoadAverage,
"time.up": int64(stats.Uptime),
"time.idle": int64(stats.Idletime),
Expand Down Expand Up @@ -77,6 +81,9 @@ func buildNodeStats(node *runtime.Node) (tags models.Tags, fields models.Fields)
batadv := 0
for _, batadvNeighbours := range neighbours.Batadv {
batadv += len(batadvNeighbours.Neighbours)
for neighbourID, link := range batadvNeighbours.Neighbours {
conn.insertLinkStatistics(stats.NodeID, neighbourID, link.Tq, time)
}
}
fields["neighbours.batadv"] = batadv

Expand Down Expand Up @@ -123,5 +130,16 @@ func buildNodeStats(node *runtime.Node) (tags models.Tags, fields models.Fields)
tags.SetString("frequency"+suffix, strconv.Itoa(int(airtime.Frequency)))
}

conn.addPoint(MeasurementNode, tags, fields, time)

return
}

// adds a link data point
func (conn *Connection) insertLinkStatistics(source string, target string, tq int, t time.Time) {
tags := models.Tags{}
tags.SetString("source", source)
tags.SetString("target", target)

conn.addPoint(MeasurementLink, tags, models.Fields{"tq": tq}, t)
}
92 changes: 71 additions & 21 deletions database/influxdb/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package influxdb
import (
"testing"

"github.com/influxdata/influxdb/client/v2"
"github.com/stretchr/testify/assert"

"github.com/FreifunkBremen/yanic/data"
Expand All @@ -14,7 +15,7 @@ func TestToInflux(t *testing.T) {

node := &runtime.Node{
Statistics: &data.Statistics{
NodeID: "foobar",
NodeID: "deadbeef",
LoadAverage: 0.5,
Wireless: data.WirelessStatistics{
&data.WirelessAirtime{Frequency: 5500},
Expand Down Expand Up @@ -58,31 +59,80 @@ func TestToInflux(t *testing.T) {
Batadv: map[string]data.BatadvNeighbours{
"a-interface": data.BatadvNeighbours{
Neighbours: map[string]data.BatmanLink{
"b-neigbourinterface": data.BatmanLink{},
"BAFF1E5": data.BatmanLink{
Tq: 75,
},
},
},
},
LLDP: map[string]data.LLDPNeighbours{},
},
}

tags, fields := buildNodeStats(node)

assert.Equal("foobar", tags.GetString("nodeid"))
assert.Equal("nobody", tags.GetString("owner"))
assert.Equal(0.5, fields["load"])
assert.Equal(0, fields["neighbours.lldp"])
assert.Equal(1, fields["neighbours.batadv"])
assert.Equal(1, fields["neighbours.vpn"])
assert.Equal(1, fields["neighbours.total"])

assert.Equal(uint32(3), fields["wireless.txpower24"])
assert.Equal(uint32(5500), fields["airtime11a.frequency"])
assert.Equal("", tags.GetString("frequency5500"))

assert.Equal(int64(1213), fields["traffic.rx.bytes"])
assert.Equal(float64(1321), fields["traffic.tx.dropped"])
assert.Equal(int64(1322), fields["traffic.forward.bytes"])
assert.Equal(int64(2331), fields["traffic.mgmt_rx.bytes"])
assert.Equal(float64(2327), fields["traffic.mgmt_tx.packets"])
points := testPoints(node)
var fields map[string]interface{}
var tags map[string]string

assert.Len(points, 2)

// first point contains the neighbour
nPoint := points[0]
tags = nPoint.Tags()
fields, _ = nPoint.Fields()
assert.EqualValues("link", nPoint.Name())
assert.EqualValues(map[string]string{"source": "deadbeef", "target": "BAFF1E5"}, tags)
assert.EqualValues(75, fields["tq"])

// second point contains the statistics
sPoint := points[1]
tags = sPoint.Tags()
fields, _ = sPoint.Fields()

assert.EqualValues("deadbeef", tags["nodeid"])
assert.EqualValues("nobody", tags["owner"])
assert.EqualValues(0.5, fields["load"])
assert.EqualValues(0, fields["neighbours.lldp"])
assert.EqualValues(1, fields["neighbours.batadv"])
assert.EqualValues(1, fields["neighbours.vpn"])
assert.EqualValues(1, fields["neighbours.total"])

assert.EqualValues(uint32(3), fields["wireless.txpower24"])
assert.EqualValues(uint32(5500), fields["airtime11a.frequency"])
assert.EqualValues("", tags["frequency5500"])

assert.EqualValues(int64(1213), fields["traffic.rx.bytes"])
assert.EqualValues(float64(1321), fields["traffic.tx.dropped"])
assert.EqualValues(int64(1322), fields["traffic.forward.bytes"])
assert.EqualValues(int64(2331), fields["traffic.mgmt_rx.bytes"])
assert.EqualValues(float64(2327), fields["traffic.mgmt_tx.packets"])
}

// Processes data and returns the InfluxDB points
func testPoints(nodes ...*runtime.Node) (points []*client.Point) {
// Create dummy client
influxClient, err := client.NewHTTPClient(client.HTTPConfig{Addr: "http://127.0.0.1"})
if err != nil {
panic(err)
}

// Create dummy connection
conn := &Connection{
points: make(chan *client.Point),
client: influxClient,
}

// Process data
go func() {
for _, node := range nodes {
conn.InsertNode(node)
}
conn.Close()
}()

// Read points
for point := range conn.points {
points = append(points, point)
}

return
}
14 changes: 12 additions & 2 deletions respond/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,23 @@ func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) {
return
}

// Set fields to nil if nodeID is inconsistent
if res.Statistics != nil && res.Statistics.NodeID != nodeID {
res.Statistics = nil
}
if res.Neighbours != nil && res.Neighbours.NodeID != nodeID {
res.Neighbours = nil
}
if res.NodeInfo != nil && res.NodeInfo.NodeID != nodeID {
res.NodeInfo = nil
}

// Process the data and update IP address
node := coll.nodes.Update(nodeID, res)
node.Address = addr.IP

// Store statistics in database
if coll.db != nil && node.Statistics != nil {
node.Statistics.NodeID = nodeID
if coll.db != nil {
coll.db.InsertNode(node)
}
}
Expand Down
31 changes: 11 additions & 20 deletions runtime/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,30 +52,21 @@ func (nodes *Nodes) Update(nodeID string, res *data.ResponseData) *Node {
}
nodes.Unlock()

node.Lastseen = now
node.Online = true

// Update neighbours
if val := res.Neighbours; val != nil {
node.Neighbours = val
}

// Update nodeinfo
if val := res.NodeInfo; val != nil {
node.Nodeinfo = val
}

// Update statistics
if val := res.Statistics; val != nil {

// Update wireless statistics
if statistics := res.Statistics; statistics != nil {
// Update channel utilization if previous statistics are present
if node.Statistics != nil && node.Statistics.Wireless != nil && val.Wireless != nil {
val.Wireless.SetUtilization(node.Statistics.Wireless)
if node.Statistics != nil && node.Statistics.Wireless != nil && statistics.Wireless != nil {
statistics.Wireless.SetUtilization(node.Statistics.Wireless)
}

node.Statistics = val
}

// Update fields
node.Lastseen = now
node.Online = true
node.Neighbours = res.Neighbours
node.Nodeinfo = res.NodeInfo
node.Statistics = res.Statistics

return node
}

Expand Down

0 comments on commit 5cc8e3f

Please sign in to comment.