Skip to content

Commit

Permalink
Upgraded to Go 1.7.1 + Beats 5
Browse files Browse the repository at this point in the history
  • Loading branch information
consulthys committed Nov 23, 2016
1 parent 160ceaa commit 98bead2
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 70 deletions.
93 changes: 32 additions & 61 deletions beater/springbeat.go
Expand Up @@ -14,42 +14,34 @@ import (
"github.com/consulthys/springbeat/config"
)

const selector = "springbeat"

type Springbeat struct {
done chan struct{}
config config.Config
client publisher.Client

period time.Duration
urls []*url.URL

beatConfig *config.Config

done chan struct{}
client publisher.Client

metricsStats bool
healthStats bool
}

// Creates beater
func New() *Springbeat {
return &Springbeat{
done: make(chan struct{}),
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}
}

/// *** Beater interface methods ***///

func (bt *Springbeat) Config(b *beat.Beat) error {

// Load beater beatConfig
err := b.RawConfig.Unpack(&bt.beatConfig)
if err != nil {
return fmt.Errorf("Error reading config file: %v", err)
bt := &Springbeat{
done: make(chan struct{}),
config: config,
}

//define default URL if none provided
var urlConfig []string
if bt.beatConfig.Springbeat.URLs != nil {
urlConfig = bt.beatConfig.Springbeat.URLs
if config.URLs != nil {
urlConfig = config.URLs
} else {
urlConfig = []string{"http://127.0.0.1"}
}
Expand All @@ -59,52 +51,34 @@ func (bt *Springbeat) Config(b *beat.Beat) error {
u, err := url.Parse(urlConfig[i])
if err != nil {
logp.Err("Invalid Spring Boot URL: %v", err)
return err
return nil, err
}
bt.urls[i] = u
}

if bt.beatConfig.Springbeat.Stats.Metrics != nil {
bt.metricsStats = *bt.beatConfig.Springbeat.Stats.Metrics
if config.Stats.Metrics != nil {
bt.metricsStats = *config.Stats.Metrics
} else {
bt.metricsStats = true
}

if bt.beatConfig.Springbeat.Stats.Health != nil {
bt.healthStats = *bt.beatConfig.Springbeat.Stats.Health
if config.Stats.Health != nil {
bt.healthStats = *config.Stats.Health
} else {
bt.healthStats = true
}

if !bt.metricsStats && !bt.metricsStats {
return errors.New("Invalid statistics configuration")
}

return nil
}

func (bt *Springbeat) Setup(b *beat.Beat) error {

// Setting default period if not set
if bt.beatConfig.Springbeat.Period == "" {
bt.beatConfig.Springbeat.Period = "10s"
return nil, errors.New("Invalid statistics configuration")
}

bt.client = b.Publisher.Connect()

var err error
bt.period, err = time.ParseDuration(bt.beatConfig.Springbeat.Period)
if err != nil {
return err
}
logp.Debug("springbeat", "Init springbeat")
logp.Debug("springbeat", "Period %v\n", bt.period)
logp.Debug("springbeat", "Watch %v", bt.urls)
logp.Debug("springbeat", "Metrics statistics %t\n", bt.metricsStats)
logp.Debug("springbeat", "Health statistics %t\n", bt.healthStats)

logp.Debug(selector, "Init springbeat")
logp.Debug(selector, "Period %v\n", bt.period)
logp.Debug(selector, "Watch %v", bt.urls)
logp.Debug(selector, "Metrics statistics %t\n", bt.metricsStats)
logp.Debug(selector, "Health statistics %t\n", bt.healthStats)

return nil
return bt, nil
}

func (bt *Springbeat) Run(b *beat.Beat) error {
Expand All @@ -113,7 +87,7 @@ func (bt *Springbeat) Run(b *beat.Beat) error {
for _, u := range bt.urls {
go func(u *url.URL) {

ticker := time.NewTicker(bt.period)
ticker := time.NewTicker(bt.config.Period)
counter := 1
for {
select {
Expand All @@ -125,13 +99,13 @@ func (bt *Springbeat) Run(b *beat.Beat) error {
timerStart := time.Now()

if bt.metricsStats {
logp.Debug(selector, "Metrics stats for url: %v", u)
logp.Debug("springbeat", "Metrics stats for url: %v", u)
metrics_stats, err := bt.GetMetricsStats(*u)

if err != nil {
logp.Err("Error reading Metrics stats: %v", err)
} else {
logp.Debug(selector, "Metrics stats detail: %+v", metrics_stats)
logp.Debug("springbeat", "Metrics stats detail: %+v", metrics_stats)

event := common.MapStr{
"@timestamp": common.Time(time.Now()),
Expand All @@ -147,13 +121,13 @@ func (bt *Springbeat) Run(b *beat.Beat) error {
}

if bt.healthStats {
logp.Debug(selector, "Health stats for url: %v", u)
logp.Debug("springbeat", "Health stats for url: %v", u)
health_stats, err := bt.GetHealthStats(*u)

if err != nil {
logp.Err("Error reading Health stats: %v", err)
} else {
logp.Debug(selector, "Health stats detail: %+v", health_stats)
logp.Debug("springbeat", "Health stats detail: %+v", health_stats)

event := common.MapStr{
"@timestamp": common.Time(time.Now()),
Expand Down Expand Up @@ -183,11 +157,8 @@ func (bt *Springbeat) Run(b *beat.Beat) error {
return nil
}

func (bt *Springbeat) Cleanup(b *beat.Beat) error {
return nil
}

func (bt *Springbeat) Stop() {
logp.Debug(selector, "Stop springbeat")
logp.Debug("springbeat", "Stop springbeat")
bt.client.Close()
close(bt.done)
}
17 changes: 9 additions & 8 deletions config/config.go
Expand Up @@ -3,17 +3,18 @@

package config

type Config struct {
Springbeat SpringbeatConfig
}

type SpringbeatConfig struct {
Period string `config:"period"`
import "time"

type Config struct {
Period time.Duration `config:"period"`
URLs []string

Stats struct {
Metrics *bool
Health *bool
Metrics *bool
Health *bool
}
}

var DefaultConfig = Config{
Period: 1 * time.Second,
}
2 changes: 1 addition & 1 deletion main.go
Expand Up @@ -9,7 +9,7 @@ import (
)

func main() {
err := beat.Run("springbeat", "", beater.New())
err := beat.Run("springbeat", "", beater.New)
if err != nil {
os.Exit(1)
}
Expand Down
6 changes: 6 additions & 0 deletions springbeat.template-es2x.json
@@ -0,0 +1,6 @@
{
"template": "springbeat-*",
"settings": {
"index.refresh_interval": "5s"
}
}
6 changes: 6 additions & 0 deletions springbeat.template.json
@@ -0,0 +1,6 @@
{
"template": "springbeat-*",
"settings": {
"index.refresh_interval": "5s"
}
}

0 comments on commit 98bead2

Please sign in to comment.