Skip to content

Commit

Permalink
feat(telegraf): add default plugins for telegraf
Browse files Browse the repository at this point in the history
  • Loading branch information
Rory-Z committed Feb 28, 2022
1 parent fa4f755 commit e6d4fad
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 70 deletions.
10 changes: 10 additions & 0 deletions apis/apps/v1beta1/emqxbroker_webhook.go
Expand Up @@ -22,6 +22,7 @@ import (
"regexp"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -67,6 +68,15 @@ func (r *EmqxBroker) Default() {
r.Spec.Plugins = generatePlugins(r.Spec.Plugins)
r.Spec.Modules = generateEmqxBrokerModules(r.Spec.Modules)
r.Spec.Listener = generateListener(r.Spec.Listener)

if r.Spec.TelegrafTemplate != nil {
if containsPlugins(r.Spec.Plugins, "emqx_prometheus") == -1 {
r.Spec.Plugins = append(r.Spec.Plugins, Plugin{Name: "emqx_prometheus", Enable: true})
}
if containsEnv(r.Spec.Env, "EMQX_PROMETHEUS__PUSH__GATEWAY__SERVER") == -1 {
r.Spec.Env = append(r.Spec.Env, corev1.EnvVar{Name: "EMQX_PROMETHEUS__PUSH__GATEWAY__SERVER", Value: ""})
}
}
}

// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
Expand Down
43 changes: 43 additions & 0 deletions apis/apps/v1beta1/emqxbroker_webhook_test.go
Expand Up @@ -196,6 +196,49 @@ func TestDefaultBroker(t *testing.T) {
assert.Equal(t, emqx.Spec.Listener.Type, corev1.ServiceType("ClusterIP"))
assert.Equal(t, emqx.Spec.Listener.Ports.MQTTS, int32(8885))
assert.Equal(t, emqx.Spec.Listener.Ports.API, int32(8081))

telegrafConf := `
[global_tags]
instanceID = "test"
[[inputs.http]]
urls = ["http://127.0.0.1:8081/api/v4/emqx_prometheus"]
method = "GET"
timeout = "5s"
username = "admin"
password = "public"
data_format = "json"
[[inputs.tail]]
files = ["/opt/emqx/log/emqx.log.[1-5]"]
from_beginning = false
max_undelivered_lines = 64
character_encoding = "utf-8"
data_format = "grok"
grok_patterns = ['^%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02T15:04:05.999999999-07:00"} \[%{LOGLEVEL:level}\] (?m)%{GREEDYDATA:messages}$']
[[outputs.discard]]
`
emqx.Spec.TelegrafTemplate = &v1beta1.TelegrafTemplate{
Image: "telegraf:1.19.3",
Conf: &telegrafConf,
}
emqx.Default()
assert.Subset(t, emqx.Spec.Plugins,
[]v1beta1.Plugin{
{
Name: "emqx_prometheus",
Enable: true,
},
},
)
assert.Subset(t, emqx.Spec.Env,
[]corev1.EnvVar{
{
Name: "EMQX_PROMETHEUS__PUSH__GATEWAY__SERVER",
Value: "",
},
},
)
}

func TestValidateCreateBroker(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions apis/apps/v1beta1/emqxenterprise_webhook.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1beta1

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -62,6 +63,15 @@ func (r *EmqxEnterprise) Default() {
r.Spec.Plugins = generatePlugins(r.Spec.Plugins)
r.Spec.Modules = generateEmqxEnterpriseModules(r.Spec.Modules)
r.Spec.Listener = generateListener(r.Spec.Listener)

if r.Spec.TelegrafTemplate != nil {
if containsPlugins(r.Spec.Plugins, "emqx_prometheus") == -1 {
r.Spec.Plugins = append(r.Spec.Plugins, Plugin{Name: "emqx_prometheus", Enable: true})
}
if containsEnv(r.Spec.Env, "EMQX_PROMETHEUS__PUSH__GATEWAY__SERVER") == -1 {
r.Spec.Env = append(r.Spec.Env, corev1.EnvVar{Name: "EMQX_PROMETHEUS__PUSH__GATEWAY__SERVER", Value: ""})
}
}
}

// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
Expand Down
19 changes: 9 additions & 10 deletions apis/apps/v1beta1/env_type.go
Expand Up @@ -5,25 +5,24 @@ import (
)

func generateEnv(emqx Emqx) []corev1.EnvVar {
contains := func(Env []corev1.EnvVar, Name string) int {
for index, value := range Env {
if value.Name == Name {
return index
}
}
return -1
}

env := emqx.GetEnv()
e := defaultEnv(emqx)
for _, value := range e {
r := contains(env, value.Name)
r := containsEnv(env, value.Name)
if r == -1 {
env = append(env, value)
}
}
return env
}

func containsEnv(Env []corev1.EnvVar, Name string) int {
for index, value := range Env {
if value.Name == Name {
return index
}
}
return -1
}

func defaultEnv(emqx Emqx) []corev1.EnvVar {
Expand Down
20 changes: 10 additions & 10 deletions apis/apps/v1beta1/plugins_type.go
Expand Up @@ -11,22 +11,22 @@ func generatePlugins(plugins []Plugin) []Plugin {
return defaultLoadedPlugins()
}

contains := func(plugins []Plugin) int {
for index, value := range plugins {
if value.Name == "emqx_management" {
return index
}
}
return -1
}

if contains(plugins) == -1 {
if containsPlugins(plugins, "emqx_management") == -1 {
plugins = append(plugins, Plugin{Name: "emqx_management", Enable: true})
}

return plugins
}

func containsPlugins(plugins []Plugin, name string) int {
for index, value := range plugins {
if value.Name == name {
return index
}
}
return -1
}

func defaultLoadedPlugins() []Plugin {
return []Plugin{
{
Expand Down
18 changes: 10 additions & 8 deletions config/samples/emqx/v1beta1/emqx-ee.yaml
Expand Up @@ -51,23 +51,25 @@ spec:
enable: true
- name: emqx_dashboard
enable: true
- name: emqx_prometheus
enable: true
# license: the license for EMQX Enterprise should be completed
telegrafTemplate:
image: "telegraf:1.19.3"
imagePullPolicy: "IfNotPresent"
conf: |
[global_tags]
instanceID = "test"
instanceID = "test"
[[inputs.http]]
urls = ["http://127.0.0.1:8081/api/v4/emqx_prometheus"]
method = "GET"
timeout = "5s"
username = "admin"
password = "public"
data_format = "json"
urls = ["http://127.0.0.1:8081/api/v4/emqx_prometheus"]
method = "GET"
timeout = "5s"
username = "admin"
password = "public"
data_format = "json"
[[inputs.tail]]
files = ["/opt/emqx/log/emqx.log.[1-9]"]
files = ["/opt/emqx/log/emqx.log.[1-5]"]
from_beginning = false
max_undelivered_lines = 64
character_encoding = "utf-8"
Expand Down
2 changes: 2 additions & 0 deletions config/samples/emqx/v1beta1/emqx.yaml
Expand Up @@ -135,6 +135,8 @@ spec:
plugins:
- name: emqx_management
enable: true
- name: emqx_prometheus
enable: true
- name: emqx_recon
enable: true
- name: emqx_retainer
Expand Down
86 changes: 44 additions & 42 deletions controllers_suite/suite_test.go
Expand Up @@ -273,26 +273,26 @@ func generateEmqxNamespace(namespace string) *corev1.Namespace {
func generateEmqxBroker(name, namespace string) *v1beta1.EmqxBroker {
storageClassName := "standard"
telegrafConf := `
[global_tags]
instanceID = "test"
[[inputs.http]]
urls = ["http://127.0.0.1:8081/api/v4/emqx_prometheus"]
method = "GET"
timeout = "5s"
username = "admin"
password = "public"
data_format = "json"
[[inputs.tail]]
files = ["/opt/emqx/log/emqx.log.[1-9]"]
from_beginning = false
max_undelivered_lines = 64
character_encoding = "utf-8"
data_format = "grok"
grok_patterns = ['^%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02T15:04:05.999999999-07:00"} \[%{LOGLEVEL:level}\] (?m)%{GREEDYDATA:messages}$']
[[outputs.discard]]
`
[global_tags]
instanceID = "test"
[[inputs.http]]
urls = ["http://127.0.0.1:8081/api/v4/emqx_prometheus"]
method = "GET"
timeout = "5s"
username = "admin"
password = "public"
data_format = "json"
[[inputs.tail]]
files = ["/opt/emqx/log/emqx.log.[1-5]"]
from_beginning = false
max_undelivered_lines = 64
character_encoding = "utf-8"
data_format = "grok"
grok_patterns = ['^%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02T15:04:05.999999999-07:00"} \[%{LOGLEVEL:level}\] (?m)%{GREEDYDATA:messages}$']
[[outputs.discard]]
`
emqx := &v1beta1.EmqxBroker{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps.emqx.io/v1beta1",
Expand Down Expand Up @@ -361,26 +361,26 @@ func generateEmqxBroker(name, namespace string) *v1beta1.EmqxBroker {
// Slim
func generateEmqxEnterprise(name, namespace string) *v1beta1.EmqxEnterprise {
telegrafConf := `
[global_tags]
instanceID = "test"
[[inputs.http]]
urls = ["http://127.0.0.1:8081/api/v4/emqx_prometheus"]
method = "GET"
timeout = "5s"
username = "admin"
password = "public"
data_format = "json"
[[inputs.tail]]
files = ["/opt/emqx/log/emqx.log.[1-9]"]
from_beginning = false
max_undelivered_lines = 64
character_encoding = "utf-8"
data_format = "grok"
grok_patterns = ['^%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02T15:04:05.999999999-07:00"} \[%{LOGLEVEL:level}\] (?m)%{GREEDYDATA:messages}$']
[[outputs.discard]]
`
[global_tags]
instanceID = "test"
[[inputs.http]]
urls = ["http://127.0.0.1:8081/api/v4/emqx_prometheus"]
method = "GET"
timeout = "5s"
username = "admin"
password = "public"
data_format = "json"
[[inputs.tail]]
files = ["/opt/emqx/log/emqx.log.[1-5]"]
from_beginning = false
max_undelivered_lines = 64
character_encoding = "utf-8"
data_format = "grok"
grok_patterns = ['^%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02T15:04:05.999999999-07:00"} \[%{LOGLEVEL:level}\] (?m)%{GREEDYDATA:messages}$']
[[outputs.discard]]
`
emqx := &v1beta1.EmqxEnterprise{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps.emqx.io/v1beta1",
Expand All @@ -395,7 +395,8 @@ func generateEmqxEnterprise(name, namespace string) *v1beta1.EmqxEnterprise {
},
Spec: v1beta1.EmqxEnterpriseSpec{
Image: "emqx/emqx-ee:4.4.0",
License: `-----BEGIN CERTIFICATE-----
License: `
-----BEGIN CERTIFICATE-----
MIIENzCCAx+gAwIBAgIDdMvVMA0GCSqGSIb3DQEBBQUAMIGDMQswCQYDVQQGEwJD
TjERMA8GA1UECAwIWmhlamlhbmcxETAPBgNVBAcMCEhhbmd6aG91MQwwCgYDVQQK
DANFTVExDDAKBgNVBAsMA0VNUTESMBAGA1UEAwwJKi5lbXF4LmlvMR4wHAYJKoZI
Expand All @@ -419,7 +420,8 @@ iL3a2tdZ4sq+h/Z1elIXD71JJBAImjr6BljTIdUCfVtNvxlE8M0D/rKSn2jwzsjI
UrW88THMtlz9sb56kmM3JIOoIJoep6xNEajIBnoChSGjtBYFNFwzdwSTCodYkgPu
JifqxTKSuwAGSlqxJUwhjWG8ulzL3/pCAYEwlWmd2+nsfotQdiANdaPnez7o0z0s
EujOCZMbK8qNfSbyo50q5iIXhz2ZIGl+4hdp
-----END CERTIFICATE-----`,
-----END CERTIFICATE-----
`,
TelegrafTemplate: &v1beta1.TelegrafTemplate{
Image: "telegraf:1.19.3",
Conf: &telegrafConf,
Expand Down

0 comments on commit e6d4fad

Please sign in to comment.