Skip to content

Commit

Permalink
dirty hack: top n hosts by bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
cha87de committed Sep 5, 2018
1 parent 9f79518 commit 2917129
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 25 deletions.
30 changes: 6 additions & 24 deletions kafka-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ func runKafkaListener() {
}
}()

startPeriodicHostExport()

// handle kafka flow messages in foreground
for {
flow := <-kafkaConn.Messages()
Expand All @@ -33,30 +35,10 @@ func handleFlow(flow *flow.FlowMessage) {

if uint64(flow.GetCid()) == *filterCustomerID {
promExporter.Increment(flow)
ipDst := net.IP(flow.GetDstIP()).String()
ipSrc := net.IP(flow.GetSrcIP()).String()
countHostTraffic(ipDst, flow.GetBytes())
countHostTraffic(ipSrc, flow.GetBytes())
}

/*
srcIP := decodeIP("", flow.GetSrcIP())
dstIP := decodeIP("", flow.GetDstIP())
if srcIP == "134.60.30.246" || dstIP == "134.60.30.246" {
// fmt.Printf("flow: %v -> %v, dir: %v, cid: %v, norm: %v\n", net.IP(flow.SrcIP), net.IP(flow.DstIP), flow.Direction, flow.Cid, flow.Normalized)
// fmt.Printf(" %v - %v (%v) -> %v - %v (%v)\n", flow.SrcIfName, flow.SrcIfDesc, flow.SrcIfSpeed, flow.DstIfName, flow.DstIfDesc, flow.DstIfSpeed)
promExporter.Increment(flow)
}
*/

}

func decodeIP(name string, value []byte) string {
str := ""
ipconv := net.IP{}
if value != nil {
invvalue := make([]byte, len(value))
for i := range value {
invvalue[len(value)-i-1] = value[i]
}
ipconv = value
str += name + ipconv.String()
}
return str
}
108 changes: 108 additions & 0 deletions perhosttraffic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package main

import (
"fmt"
"runtime"
"sort"
"sync"
"time"
)

// TOPN defines how many hosts are exported
var TOPN = 50
var hostlist topHosts

var byHostBytes sync.Map // concurrent map of map[string]uint64
func countHostTraffic(ip string, bytes uint64) {
if value, ok := byHostBytes.Load(ip); ok {
currentBytes := value.(uint64)
bytes = bytes + currentBytes
}
byHostBytes.Store(ip, bytes)
}

func printTopHostList() {
length := 0
byHostBytes.Range(func(key, value interface{}) bool {
length++

// check if in top N
currentIP := key.(string)
currentBytes := value.(uint64)
hostlist.addToTopN(host{
ip: currentIP,
bytes: currentBytes,
})

return true
})
fmt.Printf("byHostBytes length: %d\n", length)

// push hostlist to promExporter
for _, host := range hostlist {
promExporter.TopHost(host.ip, host.bytes)
}
printMemUsage()
}

func startPeriodicHostExport() {

hostlist = make(topHosts, TOPN)

ticker := time.NewTicker(1 * time.Minute)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
// do stuff
printTopHostList()
case <-quit:
ticker.Stop()
return
}
}
}()
}

func printMemUsage() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
// For info on each, see: https://golang.org/pkg/runtime/#MemStats
fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
fmt.Printf("\tNumGC = %v\n", m.NumGC)
}

func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}

/////////
// see https://play.golang.org/p/e6SQCR4cu-1
/////////
type host struct {
ip string
bytes uint64
}

type topHosts []host

func (t topHosts) addToTopN(host host) {
if host.bytes <= t[0].bytes {
// Doesn't belong on the list
return
}

// Find the insertion position
pos := sort.Search(len(t), func(a int) bool {
return t[a].bytes > host.bytes
})

// Shift lower elements
copy(t[:pos-1], t[1:pos])

// Insert it
t[pos-1] = host
}
13 changes: 13 additions & 0 deletions prometheus/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,16 @@ var (
Help: "Number of Packets received across Flows.",
}, labels)
)

// TOP HOSTS
var (
hostlabels = []string{
"ip",
}

hostBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "host_bytes",
Help: "Number of Bytes for top N hosts.",
}, hostlabels)
)
2 changes: 1 addition & 1 deletion prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ func (exporter *Exporter) Initialize(addr string) {

func (exporter *Exporter) registerCollectors() {
// TODO make more dynamic
prometheus.MustRegister(flowNumber, flowBytes, flowPackets)
prometheus.MustRegister(flowNumber, flowBytes, flowPackets, hostBytes)
}
13 changes: 13 additions & 0 deletions prometheus/tophosts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package prometheus

import (
"github.com/prometheus/client_golang/prometheus"
)

// TopHost updates one entry for Top Hosts
func (exporter *Exporter) TopHost(ip string, bytes uint64) {
labels := prometheus.Labels{
"ip": ip,
}
hostBytes.With(labels).Add(float64(bytes))
}

0 comments on commit 2917129

Please sign in to comment.