Skip to content
Permalink
Browse files

Remove built-in InfluxDB support from the log processing tool

... all prometheus metrics all the time now.
  • Loading branch information...
abh committed Aug 1, 2018
1 parent 2ba0ddd commit 489fdbcc5183d5bba352671564cbaf0fa46cc811
@@ -27,7 +27,7 @@ TARS=$(wildcard dist/geodns-*-*.tar)
push: $(TARS) tmp-install.sh
#rsync -avz tmp-install.sh $(TARS) x3.dev:webtmp/2018/04/
rsync --exclude publish tmp-install.sh $(TARS) $(DIST)/$(DISTSUB)/
$(DIST)/../push
$(DIST)/push

builds: linux-build linux-build-i386 freebsd-build push

4 build
@@ -16,8 +16,8 @@ set -ex
go build -o dist/geodns-$OS-$ARCH \
-ldflags "-X main.gitVersion=$REVISION -X main.buildTime=$BUILDTIME" \
-v && \
(cd geodns-influxdb && go build -v -o ../dist/geodns-influxdb-$OS-$ARCH && cd ..) && \
(cd geodns-logs && go build -v -o ../dist/geodns-logs-$OS-$ARCH && cd ..) && \
cd dist && \
tar -cvHf geodns-$OS-$ARCH.tar \
--exclude \*~ geodns-$OS-$ARCH \
geodns-influxdb-$OS-$ARCH service service-influxdb
geodns-logs-$OS-$ARCH service service-logs

This file was deleted.

Oops, something went wrong.
@@ -9,7 +9,6 @@ import (
"os"
"strings"
"sync"
"time"

"github.com/hpcloud/tail"
"github.com/miekg/dns"
@@ -24,23 +23,24 @@ import (
// Add vendor yes/no
// add server region tag (identifier)?

const UserAgent = "geodns-logs/2.0"

func main() {

log.Printf("Starting %q", UserAgent)

tailFlag := flag.Bool("tail", false, "tail the log file instead of processing all arguments")
identifierFlag := flag.String("identifier", "", "identifier (hostname, pop name or similar)")
verboseFlag := flag.Bool("verbose", false, "verbose output")
// verboseFlag := flag.Bool("verbose", false, "verbose output")
flag.Parse()

var serverID string
var serverGroups []string
// var serverGroups []string

if len(*identifierFlag) > 0 {
ids := strings.Split(*identifierFlag, ",")
serverID = ids[0]
if len(ids) > 1 {
serverGroups = ids[1:]
// serverGroups = ids[1:]
}
}

@@ -80,64 +80,34 @@ func main() {
}
}()

influx := NewInfluxClient()
influx.URL = os.Getenv("INFLUXDB_URL")
influx.Username = os.Getenv("INFLUXDB_USERNAME")
influx.Password = os.Getenv("INFLUXDB_PASSWORD")
influx.Database = os.Getenv("INFLUXDB_DATABASE")

influx.ServerID = serverID
influx.ServerGroups = serverGroups
influx.Verbose = *verboseFlag

err := influx.Start()
if err != nil {
log.Printf("Could not start influxdb poster: %s", err)
os.Exit(2)
}

if len(flag.Args()) < 1 {
log.Printf("filename to process required")
os.Exit(2)
}

if *tailFlag {

filename := flag.Arg(0)

logf, err := tail.TailFile(filename, tail.Config{
// Location: &tail.SeekInfo{-1, 0},
Poll: true, // inotify is flaky on EL6, so try this ...
ReOpen: true,
MustExist: false,
Follow: true,
})
if err != nil {
log.Printf("Could not tail '%s': %s", filename, err)
}
filename := flag.Arg(0)

in := make(chan string)
logf, err := tail.TailFile(filename, tail.Config{
// Location: &tail.SeekInfo{-1, 0},
Poll: true, // inotify is flaky on EL6, so try this ...
ReOpen: true,
MustExist: false,
Follow: true,
})
if err != nil {
log.Printf("Could not tail '%s': %s", filename, err)
}

go processChan(in, influx.Channel, nil)
in := make(chan string)
go processChan(in, nil)

for line := range logf.Lines {
if line.Err != nil {
log.Printf("Error tailing file: %s", line.Err)
}
in <- line.Text
}
} else {
for _, file := range flag.Args() {
log.Printf("Log: %s", file)
err := processFile(file, influx.Channel)
if err != nil {
log.Printf("Error processing '%s': %s", file, err)
}
log.Printf("Done with %s", file)
for line := range logf.Lines {
if line.Err != nil {
log.Printf("Error tailing file: %s", line.Err)
}
in <- line.Text
}

influx.Close()
}

var extraValidLabels = map[string]struct{}{
@@ -190,15 +160,11 @@ func getPoolCC(label string) (string, bool) {
return "", false
}

func processChan(in chan string, out chan<- *Stats, wg *sync.WaitGroup) error {
func processChan(in chan string, wg *sync.WaitGroup) error {
e := querylog.Entry{}

// the grafana queries depend on this being one minute
submitInterval := time.Minute * 1

stats := NewStats()
i := 0
lastMinute := int64(0)

for line := range in {
err := json.Unmarshal([]byte(line), &e)
if err != nil {
@@ -207,38 +173,14 @@ func processChan(in chan string, out chan<- *Stats, wg *sync.WaitGroup) error {
}
e.Name = strings.ToLower(e.Name)

eMinute := ((e.Time - e.Time%int64(submitInterval)) / int64(time.Second))
e.Time = eMinute

if len(stats.Map) == 0 {
lastMinute = eMinute
// log.Printf("Last Minute: %d", lastMinute)
} else {
if eMinute > lastMinute {
// fmt.Printf("eMinute %d\nlastMin %d - should summarize\n", eMinute, lastMinute)
stats.Summarize()
out <- stats
stats = NewStats()
lastMinute = eMinute
}
}

// fmt.Printf("%s %s\n", e.Origin, e.Name)

err = stats.Add(&e)
if err != nil {
return err
}

if i%10000 == 0 {
// pretty.Println(stats)
}
// minute
}

if len(stats.Map) > 0 {
out <- stats
}
if wg != nil {
wg.Done()
}
@@ -255,7 +197,7 @@ func processFile(file string, out chan<- *Stats) error {

wg := sync.WaitGroup{}
wg.Add(1)
go processChan(in, out, &wg)
go processChan(in, &wg)

scanner := bufio.NewScanner(fh)

File renamed without changes.
File renamed without changes.

0 comments on commit 489fdbc

Please sign in to comment.
You can’t perform that action at this time.