Skip to content
This repository has been archived by the owner on Jan 23, 2019. It is now read-only.

Gearman monitoring #26

Merged
merged 9 commits into from
May 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 10 additions & 21 deletions cmd/gearmand/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,30 @@
package main

import (
_ "net/http/pprof"
"os"

gearmand "github.com/appscode/g2/pkg/server"
"github.com/appscode/g2/pkg/storage"
"github.com/appscode/g2/pkg/storage/leveldb"
"github.com/appscode/go/flags"
"github.com/appscode/go/runtime"
"github.com/appscode/log"
logs "github.com/appscode/log/golog"
"github.com/spf13/pflag"
)

var (
addr string
storageDir string
)

func main() {
pflag.StringVar(&addr, "addr", ":4730", "listening on, such as 0.0.0.0:4730")
pflag.StringVar(&storageDir, "storage-dir", os.TempDir(), "Directory where LevelDB file is stored.")
cfg := &gearmand.Config{}

pflag.StringVar(&cfg.ListenAddr, "addr", ":4730", "listening on, such as 0.0.0.0:4730")
pflag.StringVar(&cfg.Storage, "storage-dir", os.TempDir()+"/gearmand", "Directory where LevelDB file is stored.")
pflag.StringVar(&cfg.WebAddress, "web.addr", ":3000", "Server HTTP api Address")

defer runtime.HandleCrash()

flags.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
flags.DumpAll()

var store storage.Db
if storageDir != "" {
if s, err := leveldbq.New(storageDir); err == nil {
store = s
} else {
log.Info(err)
}
}
flags.DumpAll()

defer runtime.HandleCrash()
gearmand.NewServer(store).Start(addr)
gearmand.NewServer(cfg).Start()
}
58 changes: 39 additions & 19 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package: github.com/appscode/g2
import:
- package: gopkg.in/macaron.v1
version: v1.1.11
- package: github.com/mikespook/golib
- package: github.com/ngaut/stats
- package: github.com/syndtr/goleveldb
- package: github.com/appscode/go
- package: github.com/appscode/log
- package: github.com/spf13/pflag
- package: gopkg.in/robfig/cron.v2
version: v2
- package: github.com/mikespook/golib
- package: github.com/prometheus/client_golang
version: ^0.8.0
- package: github.com/spf13/pflag
testImport:
- package: github.com/stretchr/testify
subpackages:
- assert
24 changes: 24 additions & 0 deletions pkg/metrics/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package metrics

import "github.com/prometheus/client_golang/prometheus"

type collector struct {
metrics []*element
}

func (s *collector) Describe(ch chan<- *prometheus.Desc) {
for _, c := range s.metrics {
ch <- c.desc
}
}

func (s *collector) Collect(ch chan<- prometheus.Metric) {
for _, c := range s.metrics {
c.collect(c.desc, ch)
}
}

type element struct {
collect func(desc *prometheus.Desc, ch chan<- prometheus.Metric)
desc *prometheus.Desc
}
119 changes: 119 additions & 0 deletions pkg/metrics/server_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package metrics

import (
"github.com/prometheus/client_golang/prometheus"
)

const (
serverNamespace = "gearman_server"
)

type ServerData interface {
Stats() map[string]int
Clients() int
Workers() int
Jobs() int
RunningJobsByWorker() map[string]int
RunningJobsByFunction() map[string]int
}

// TODO: Add Some More Complex Matrics As Needed
func NewServerCollector(s ServerData) prometheus.Collector {
return &collector{
metrics: []*element{
{
desc: prometheus.NewDesc(
prometheus.BuildFQName(serverNamespace, "", "worker_count"),
"Count Connected Workers",
nil, nil,
),
collect: func(d *prometheus.Desc, ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
d,
prometheus.GaugeValue,
float64(s.Workers()),
)
},
},
{
desc: prometheus.NewDesc(
prometheus.BuildFQName(serverNamespace, "", "job_count"),
"Count jobs",
nil, nil,
),
collect: func(d *prometheus.Desc, ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
d,
prometheus.GaugeValue,
float64(s.Jobs()),
)
},
},
{
desc: prometheus.NewDesc(
prometheus.BuildFQName(serverNamespace, "", "client_count"),
"Count Connected Clients",
nil, nil,
),
collect: func(d *prometheus.Desc, ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
d,
prometheus.GaugeValue,
float64(s.Clients()),
)
},
},
{
desc: prometheus.NewDesc(
prometheus.BuildFQName(serverNamespace, "", "worker_running_job_count"),
"Running Job By workers",
[]string{"worker"}, nil,
),
collect: func(d *prometheus.Desc, ch chan<- prometheus.Metric) {
for k, v := range s.RunningJobsByWorker() {
ch <- prometheus.MustNewConstMetric(
d,
prometheus.GaugeValue,
float64(v),
k,
)
}
},
},
{
desc: prometheus.NewDesc(
prometheus.BuildFQName(serverNamespace, "", "function_running_job_count"),
"Running job count by functions",
[]string{"function"}, nil,
),
collect: func(d *prometheus.Desc, ch chan<- prometheus.Metric) {
for k, v := range s.RunningJobsByFunction() {
ch <- prometheus.MustNewConstMetric(
d,
prometheus.GaugeValue,
float64(v),
k,
)
}
},
},
{
desc: prometheus.NewDesc(
prometheus.BuildFQName(serverNamespace, "", "stats"),
"Running job count by functions",
[]string{"stats"}, nil,
),
collect: func(d *prometheus.Desc, ch chan<- prometheus.Metric) {
for k, v := range s.Stats() {
ch <- prometheus.MustNewConstMetric(
d,
prometheus.GaugeValue,
float64(v),
k,
)
}
},
},
},
}
}
51 changes: 51 additions & 0 deletions pkg/metrics/worker_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package metrics

import (
"github.com/prometheus/client_golang/prometheus"
)

const (
workerNamespace = "gearman_worker"
)

type WorkerData interface {
Running() (string, int)
Agents() int
}

func NewWorkerCollector(w WorkerData) prometheus.Collector {
return &collector{
metrics: []*element{
{
desc: prometheus.NewDesc(
prometheus.BuildFQName(workerNamespace, "", "worker_running"),
"Worker Running",
[]string{"worker"}, nil,
),
collect: func(d *prometheus.Desc, ch chan<- prometheus.Metric) {
id, running := w.Running()
ch <- prometheus.MustNewConstMetric(
d,
prometheus.GaugeValue,
float64(running),
id,
)
},
},
{
desc: prometheus.NewDesc(
prometheus.BuildFQName(workerNamespace, "", "agent_count"),
"Count Connected Agents",
nil, nil,
),
collect: func(d *prometheus.Desc, ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
d,
prometheus.GaugeValue,
float64(w.Agents()),
)
},
},
},
}
}
4 changes: 3 additions & 1 deletion pkg/runtime/protocol.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package runtime

import "fmt"
import (
"fmt"
)

/*
Binary Packet
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/pt_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading