Skip to content

Commit

Permalink
Upgraded to Go 1.7.1 + Beats 5
Browse files Browse the repository at this point in the history
  • Loading branch information
consulthys committed Nov 23, 2016
1 parent 4b30d23 commit 1c5e83c
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 68 deletions.
9 changes: 2 additions & 7 deletions README.md
Expand Up @@ -3,11 +3,7 @@
Welcome to Logstashbeat.

**Important Notes:**
1. this plugin will only work with Logstash 5.0.0-alpha1 and later as the Logstash Monitoring API (listening on port 5600) is only [available since that version](https://www.elastic.co/guide/en/logstash/5.0/alpha1.html).
2. this plugin will only work with Logstash 5.0.0-alpha5 and later if any of the following points holds true:

* you enable the `stats.pipeline` flag in the `logstashbeat.yml` configuration file.
* you specify a positive `hot_threads` number in the `logstashbeat.yml` configuration file.
This plugin will only work with Logstash 5.0.0 later as the Logstash Monitoring API (listening on port 5600) is only [available since that version](https://www.elastic.co/guide/en/logstash/5.0/alpha1.html).

Ensure that this folder is at the following location:
`${GOPATH}/github.com/consulthys`
Expand All @@ -16,8 +12,7 @@ Ensure that this folder is at the following location:

### Requirements

* [Golang](https://golang.org/dl/) 1.6
* [Glide](https://github.com/Masterminds/glide) >= 0.10.0
* [Golang](https://golang.org/dl/) 1.7.1

### Init Project
To get running with Logstashbeat, run the following command:
Expand Down
80 changes: 26 additions & 54 deletions beater/logstashbeat.go
Expand Up @@ -17,13 +17,11 @@ import (
const selector = "logstashbeat"

type Logstashbeat struct {
period time.Duration
urls []*url.URL

beatConfig *config.Config

done chan struct{}
client publisher.Client
config config.Config

urls []*url.URL

hotThreads int

Expand All @@ -33,26 +31,21 @@ type Logstashbeat struct {
}

// Creates beater
func New() *Logstashbeat {
return &Logstashbeat{
done: make(chan struct{}),
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}
}

/// *** Beater interface methods ***///

func (bt *Logstashbeat) Config(b *beat.Beat) error {

// Load beater beatConfig
err := b.RawConfig.Unpack(&bt.beatConfig)
if err != nil {
return fmt.Errorf("Error reading config file: %v", err)
bt := &Logstashbeat{
done: make(chan struct{}),
config: config,
}

//define default URL if none provided
var urlConfig []string
if bt.beatConfig.Logstashbeat.URLs != nil {
urlConfig = bt.beatConfig.Logstashbeat.URLs
if config.URLs != nil {
urlConfig = config.URLs
} else {
urlConfig = []string{"http://127.0.0.1:9600"}
}
Expand All @@ -62,62 +55,44 @@ func (bt *Logstashbeat) Config(b *beat.Beat) error {
u, err := url.Parse(urlConfig[i])
if err != nil {
logp.Err("Invalid Logstash URL: %v", err)
return err
return nil, err
}
bt.urls[i] = u
}

bt.hotThreads = bt.beatConfig.Logstashbeat.Hot_threads
bt.hotThreads = config.Hot_threads

if bt.beatConfig.Logstashbeat.Stats.JVM != nil {
bt.jvmStats = *bt.beatConfig.Logstashbeat.Stats.JVM
if config.Stats.JVM != nil {
bt.jvmStats = *config.Stats.JVM
} else {
bt.jvmStats = true
}

if bt.beatConfig.Logstashbeat.Stats.Process != nil {
bt.processStats = *bt.beatConfig.Logstashbeat.Stats.Process
if config.Stats.Process != nil {
bt.processStats = *config.Stats.Process
} else {
bt.processStats = true
}

if bt.beatConfig.Logstashbeat.Stats.Pipeline != nil {
bt.pipelineStats = *bt.beatConfig.Logstashbeat.Stats.Pipeline
if config.Stats.Pipeline != nil {
bt.pipelineStats = *config.Stats.Pipeline
} else {
bt.pipelineStats = true
}

if bt.hotThreads == 0 && !bt.jvmStats && !bt.processStats && !bt.pipelineStats {
return errors.New("Invalid statistics configuration")
}

return nil
}

func (bt *Logstashbeat) Setup(b *beat.Beat) error {

// Setting default period if not set
if bt.beatConfig.Logstashbeat.Period == "" {
bt.beatConfig.Logstashbeat.Period = "10s"
}

bt.client = b.Publisher.Connect()

var err error
bt.period, err = time.ParseDuration(bt.beatConfig.Logstashbeat.Period)
if err != nil {
return err
return nil, errors.New("Invalid statistics configuration")
}

logp.Debug(selector, "Init logstashbeat")
logp.Debug(selector, "Period %v\n", bt.period)
logp.Debug(selector, "Period %v\n", bt.config.Period)
logp.Debug(selector, "Watch %v", bt.urls)
logp.Debug(selector, "Capture %v hot threads\n", bt.hotThreads)
logp.Debug(selector, "JVM statistics %t\n", bt.jvmStats)
logp.Debug(selector, "Process statistics %t\n", bt.processStats)
logp.Debug(selector, "Pipeline statistics %t\n", bt.pipelineStats)

return nil
return bt, nil
}

func (bt *Logstashbeat) Run(b *beat.Beat) error {
Expand All @@ -126,7 +101,7 @@ func (bt *Logstashbeat) Run(b *beat.Beat) error {
for _, u := range bt.urls {
go func(u *url.URL) {

ticker := time.NewTicker(bt.period)
ticker := time.NewTicker(bt.config.Period)
counter := 1
for {
select {
Expand Down Expand Up @@ -227,7 +202,7 @@ func (bt *Logstashbeat) Run(b *beat.Beat) error {

timerEnd := time.Now()
duration := timerEnd.Sub(timerStart)
if duration.Nanoseconds() > bt.period.Nanoseconds() {
if duration.Nanoseconds() > bt.config.Period.Nanoseconds() {
logp.Warn("Ignoring tick(s) due to processing taking longer than one period")
}
}
Expand All @@ -240,11 +215,8 @@ func (bt *Logstashbeat) Run(b *beat.Beat) error {
return nil
}

func (bt *Logstashbeat) Cleanup(b *beat.Beat) error {
return nil
}

func (bt *Logstashbeat) Stop() {
logp.Debug(selector, "Stop logstashbeat")
bt.client.Close()
close(bt.done)
}
13 changes: 7 additions & 6 deletions config/config.go
Expand Up @@ -3,13 +3,10 @@

package config

type Config struct {
Logstashbeat LogstashbeatConfig
}

type LogstashbeatConfig struct {
Period string `config:"period"`
import "time"

type Config struct {
Period time.Duration `config:"period"`
URLs []string

Hot_threads int `config:"hot_threads"`
Expand All @@ -20,3 +17,7 @@ type LogstashbeatConfig struct {
Pipeline *bool
}
}

var DefaultConfig = Config{
Period: 1 * time.Second,
}
121 changes: 121 additions & 0 deletions logstashbeat.template-es2x.json
@@ -0,0 +1,121 @@
{
"template": "logstashbeat-*",
"settings": {
"index.refresh_interval": "5s"
},
"mappings": {
"hot_threads": {
"properties": {
"hot_threads": {
"properties": {
"hot_threads": {
"properties": {
"threads": {
"type": "nested",
"properties": {
"percent_of_cpu_time": {
"type": "double"
},
"name": {
"type": "keyword"
},
"traces": {
"type": "text"
},
"state": {
"type": "keyword"
}
}
}
}
}
}
}
}
},
"pipeline": {
"properties": {
"pipeline": {
"properties": {
"plugins": {
"properties": {
"inputs": {
"type": "nested",
"properties": {
"name": {
"type": "keyword"
},
"id": {
"type": "keyword"
},
"events": {
"properties": {
"in": {
"type": "long"
},
"duration_in_millis": {
"type": "long"
},
"out": {
"type": "long"
}
}
}
}
},
"filters": {
"type": "nested",
"properties": {
"name": {
"type": "keyword"
},
"id": {
"type": "keyword"
},
"events": {
"properties": {
"in": {
"type": "long"
},
"duration_in_millis": {
"type": "long"
},
"out": {
"type": "long"
}
}
}
}
},
"outputs": {
"type": "nested",
"properties": {
"name": {
"type": "keyword"
},
"id": {
"type": "keyword"
},
"events": {
"properties": {
"in": {
"type": "long"
},
"duration_in_millis": {
"type": "long"
},
"out": {
"type": "long"
}
}
}
}
}
}
}
}
}
}
}
}
}

0 comments on commit 1c5e83c

Please sign in to comment.