Skip to content

Commit

Permalink
Create a histogram to track time spent processing each line.
Browse files Browse the repository at this point in the history
Refactor the loader interface to let us pass in a nondefault Prometheus
registry.  Doing our own makes testing easier elswhere in the code.

Fixes: #214
  • Loading branch information
jaqx0r committed Mar 25, 2019
1 parent a3c2987 commit 1a5bbe6
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 8 deletions.
10 changes: 6 additions & 4 deletions internal/mtail/mtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func (m *Server) StartTailing() error {

// initLoader constructs a new program loader and performs the initial load of program files in the program directory.
func (m *Server) initLoader() error {
opts := []func(*vm.Loader) error{}
opts := []func(*vm.Loader) error{
vm.PrometheusRegisterer(m.reg),
}
if m.compileOnly {
opts = append(opts, vm.CompileOnly)
if m.oneShot {
Expand Down Expand Up @@ -170,9 +172,6 @@ func (m *Server) initExporter() (err error) {
// internal/watcher/log_watcher.go
"log_watcher_error_count": prometheus.NewDesc("log_watcher_error_count", "number of errors received from fsnotify", nil, nil),
}
// Using a non-pedantic registry means we can be looser with metrics that
// are not fully specified at startup.
m.reg = prometheus.NewRegistry()
m.reg.MustRegister(m.e,
prometheus.NewGoCollector(),
prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
Expand Down Expand Up @@ -248,6 +247,9 @@ func New(store *metrics.Store, w watcher.Watcher, options ...func(*Server) error
webquit: make(chan struct{}),
closeQuit: make(chan struct{}),
h: &http.Server{},
// Using a non-pedantic registry means we can be looser with metrics that
// are not fully specified at startup.
reg: prometheus.NewRegistry(),
}
if err := m.SetOption(options...); err != nil {
return nil, err
Expand Down
19 changes: 16 additions & 3 deletions internal/vm/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/google/mtail/internal/logline"
"github.com/google/mtail/internal/metrics"
Expand Down Expand Up @@ -253,9 +254,10 @@ func nameToCode(name string) uint32 {
// managing the running virtual machines that receive input from the lines
// channel.
type Loader struct {
ms *metrics.Store // pointer to metrics.Store to pass to compiler
w watcher.Watcher // watches for program changes
programPath string // Path that contains mtail programs.
ms *metrics.Store // pointer to metrics.Store to pass to compiler
w watcher.Watcher // watches for program changes
reg prometheus.Registerer // plce to reg metrics
programPath string // Path that contains mtail programs.

eventsHandle int // record the handle with which to add programs to the watcher

Expand Down Expand Up @@ -328,6 +330,14 @@ func OmitMetricSource(l *Loader) error {
return nil
}

// PrometheusRegisterer passes in a registry for setting up exported metrics.
func PrometheusRegisterer(reg prometheus.Registerer) func(l *Loader) error {
return func(l *Loader) error {
l.reg = reg
return nil
}
}

// NewLoader creates a new program loader that reads programs from programPath.
func NewLoader(programPath string, store *metrics.Store, lines <-chan *logline.LogLine, w watcher.Watcher, options ...func(*Loader) error) (*Loader, error) {
if store == nil || lines == nil {
Expand All @@ -345,6 +355,9 @@ func NewLoader(programPath string, store *metrics.Store, lines <-chan *logline.L
if err := l.SetOption(options...); err != nil {
return nil, err
}
if l.reg != nil {
l.reg.MustRegister(lineProcessingDurations)
}
handle, eventsChan := l.w.Events()
l.eventsHandle = handle
go l.processEvents(eventsChan)
Expand Down
16 changes: 15 additions & 1 deletion internal/vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,25 @@ import (
"time"

"github.com/golang/glog"
"github.com/golang/groupcache/lru"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/google/mtail/internal/logline"
"github.com/google/mtail/internal/metrics"
"github.com/google/mtail/internal/metrics/datum"
"github.com/google/mtail/internal/vm/code"
"github.com/google/mtail/internal/vm/object"
)

"github.com/golang/groupcache/lru"
var (
lineProcessingDurations = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "mtail",
Subsystem: "vm",
Name: "line_processing_duration_milliseconds",
Help: "VM line processing time distribution in milliseconds.",
Buckets: prometheus.DefBuckets,
}, []string{"prog"})
)

type thread struct {
Expand Down Expand Up @@ -719,6 +729,10 @@ func (v *VM) execute(t *thread, i code.Instr) {
// fetch-execute cycle on the VM bytecode with the line as input to the
// program, until termination.
func (v *VM) processLine(line *logline.LogLine) {
start := time.Now()
defer func() {
lineProcessingDurations.WithLabelValues(v.name).Observe(float64(time.Since(start).Nanoseconds()) / 1.e6)
}()
t := new(thread)
t.matched = false
v.t = t
Expand Down

0 comments on commit 1a5bbe6

Please sign in to comment.