Permalink
Browse files

Add missing geodns-influxdb files

  • Loading branch information...
abh committed Feb 13, 2017
1 parent 8d0d241 commit 7a0e8cc2dda958294b5a838ea791d83b87041291
Showing with 558 additions and 0 deletions.
  1. +130 −0 geodns-influxdb/influx.go
  2. +243 −0 geodns-influxdb/process-stats.go
  3. +120 −0 geodns-influxdb/stats.go
  4. +65 −0 geodns-influxdb/stats_test.go
View
@@ -0,0 +1,130 @@
package main
import (
"fmt"
"log"
"sync"
"time"
"github.com/influxdata/influxdb/client/v2"
"github.com/kr/pretty"
)
const UserAgent = "geodns-logs/1.1"
type influxClient struct {
ServerID string
ServerGroups []string
URL string
Username string
Password string
Database string
Verbose bool
Channel chan *Stats
wg sync.WaitGroup
hclient client.Client
}
func NewInfluxClient() *influxClient {
influx := &influxClient{}
influx.Channel = make(chan *Stats, 10)
return influx
}
func (influx *influxClient) Start() error {
if len(influx.URL) == 0 {
return fmt.Errorf("InfluxDB URL required")
}
if len(influx.Username) == 0 {
return fmt.Errorf("InfluxDB Username required")
}
if len(influx.Password) == 0 {
return fmt.Errorf("InfluxDB Password required")
}
if len(influx.Database) == 0 {
return fmt.Errorf("InfluxDB Databse required")
}
conf := client.HTTPConfig{
Addr: influx.URL,
Username: influx.Username,
Password: influx.Password,
UserAgent: UserAgent,
}
hclient, err := client.NewHTTPClient(conf)
if err != nil {
return fmt.Errorf("Could not setup http client: %s", err)
}
_, _, err = hclient.Ping(time.Second * 2)
if err != nil {
return fmt.Errorf("Could not ping %s: %s", conf.Addr, err)
}
influx.hclient = hclient
influx.wg.Add(1)
go influx.post()
return nil
}
func (influx *influxClient) Close() {
close(influx.Channel)
influx.wg.Wait()
}
func (influx *influxClient) post() {
hclient := influx.hclient
for stats := range influx.Channel {
if influx.Verbose {
pretty.Println("Sending", stats)
}
log.Printf("Sending %d stats points", len(stats.Map))
batch, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "geodns_logs",
RetentionPolicy: "incoming",
})
if err != nil {
log.Printf("Could not setup batch points: %s", err)
continue
}
for _, s := range stats.Map {
pnt, err := client.NewPoint(
"log_stats",
map[string]string{
"Label": s.Label,
"Name": s.Name,
"Origin": s.Origin,
"PoolCC": s.PoolCC,
"Vendor": s.Vendor,
"Qtype": s.Qtype,
"Server": influx.ServerID,
},
map[string]interface{}{
"Count": s.Count,
},
time.Unix(s.Time, 0),
)
if err != nil {
log.Printf("Could not create a point from '%+v': %s", s, err)
continue
}
batch.AddPoint(pnt)
}
err = hclient.Write(batch)
if err != nil {
log.Printf("Error writing batch points: %s", err)
}
}
influx.wg.Done()
}
@@ -0,0 +1,243 @@
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"strings"
"sync"
"time"
"github.com/hpcloud/tail"
"github.com/miekg/dns"
"github.com/abh/geodns/countries"
"github.com/abh/geodns/querylog"
)
// TODO:
// Add vendor yes/no
// add server region tag (identifier)?
func main() {
tailFlag := flag.Bool("tail", false, "tail the log file instead of processing all arguments")
identifierFlag := flag.String("identifier", "", "identifier (hostname, pop name or similar)")
verboseFlag := flag.Bool("verbose", false, "verbose output")
flag.Parse()
var serverID string
var serverGroups []string
if len(*identifierFlag) > 0 {
ids := strings.Split(*identifierFlag, ",")
serverID = ids[0]
if len(ids) > 1 {
serverGroups = ids[1:]
}
}
if len(serverID) == 0 {
var err error
serverID, err = os.Hostname()
if err != nil {
log.Printf("Could not get hostname: %s", err)
os.Exit(2)
}
}
influx := NewInfluxClient()
influx.URL = os.Getenv("INFLUXDB_URL")
influx.Username = os.Getenv("INFLUXDB_USERNAME")
influx.Password = os.Getenv("INFLUXDB_PASSWORD")
influx.Database = os.Getenv("INFLUXDB_DATABASE")
influx.ServerID = serverID
influx.ServerGroups = serverGroups
influx.Verbose = *verboseFlag
err := influx.Start()
if err != nil {
log.Printf("Could not start influxdb poster: %s", err)
os.Exit(2)
}
if len(flag.Args()) < 1 {
log.Printf("filename to process required")
os.Exit(2)
}
if *tailFlag {
filename := flag.Arg(0)
logf, err := tail.TailFile(filename, tail.Config{
// Location: &tail.SeekInfo{-1, 0},
Poll: true, // inotify is flaky on EL6, so try this ...
ReOpen: true,
MustExist: false,
Follow: true,
})
if err != nil {
log.Printf("Could not tail '%s': %s", filename, err)
}
in := make(chan string)
go processChan(in, influx.Channel, nil)
for line := range logf.Lines {
if line.Err != nil {
log.Printf("Error tailing file: %s", line.Err)
}
in <- line.Text
}
} else {
for _, file := range flag.Args() {
log.Printf("Log: %s", file)
err := processFile(file, influx.Channel)
if err != nil {
log.Printf("Error processing '%s': %s", file, err)
}
log.Printf("Done with %s", file)
}
}
influx.Close()
}
var extraValidLabels = map[string]struct{}{
"uk": struct{}{},
"_status": struct{}{},
"www": struct{}{},
"nag-test": struct{}{},
}
func validCC(label string) bool {
if _, ok := countries.CountryContinent[label]; ok {
return true
}
if _, ok := countries.ContinentCountries[label]; ok {
return true
}
if _, ok := countries.RegionGroupRegions[label]; ok {
return true
}
if _, ok := countries.RegionGroups[label]; ok {
return true
}
if _, ok := extraValidLabels[label]; ok {
return true
}
return false
}
func getPoolCC(label string) (string, bool) {
l := dns.SplitDomainName(label)
// log.Printf("LABEL: %+v", l)
if len(l) == 0 {
return "", true
}
for _, cc := range l {
if validCC(cc) {
return cc, true
}
}
if len(l[0]) == 1 && strings.ContainsAny(l[0], "01234") {
if len(l) == 1 {
return "", true
}
}
// log.Printf("LABEL '%s' unhandled cc...", label)
return "", false
}
func processChan(in chan string, out chan<- *Stats, wg *sync.WaitGroup) error {
e := querylog.Entry{}
// the grafana queries depend on this being one minute
submitInterval := time.Minute * 1
stats := NewStats()
i := 0
lastMinute := int64(0)
for line := range in {
err := json.Unmarshal([]byte(line), &e)
if err != nil {
log.Printf("Can't unmarshal '%s': %s", line, err)
return err
}
eMinute := ((e.Time - e.Time%int64(submitInterval)) / int64(time.Second))
e.Time = eMinute
if len(stats.Map) == 0 {
lastMinute = eMinute
log.Printf("Last MInute: %d", lastMinute)
} else {
if eMinute > lastMinute {
fmt.Printf("eMinute %d\nlastMin %d - should summarize\n", eMinute, lastMinute)
stats.Summarize()
out <- stats
stats = NewStats()
lastMinute = eMinute
}
}
e.Name = strings.ToLower(e.Name)
// fmt.Printf("%s %s\n", e.Origin, e.Name)
err = stats.Add(&e)
if err != nil {
return err
}
if i%10000 == 0 {
// pretty.Println(stats)
}
// minute
}
if len(stats.Map) > 0 {
out <- stats
}
if wg != nil {
wg.Done()
}
return nil
}
func processFile(file string, out chan<- *Stats) error {
fh, err := os.Open(file)
if err != nil {
return err
}
in := make(chan string)
wg := sync.WaitGroup{}
wg.Add(1)
go processChan(in, out, &wg)
scanner := bufio.NewScanner(fh)
for scanner.Scan() {
in <- scanner.Text()
}
if err := scanner.Err(); err != nil {
log.Println("reading standard input:", err)
}
close(in)
wg.Wait()
return nil
}
Oops, something went wrong.

0 comments on commit 7a0e8cc

Please sign in to comment.