Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telegraf): add default plugins for telegraf #139

Merged
merged 1 commit into from Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions apis/apps/v1beta1/emqxbroker_webhook.go
Expand Up @@ -22,6 +22,7 @@ import (
"regexp"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -67,6 +68,15 @@ func (r *EmqxBroker) Default() {
r.Spec.Plugins = generatePlugins(r.Spec.Plugins)
r.Spec.Modules = generateEmqxBrokerModules(r.Spec.Modules)
r.Spec.Listener = generateListener(r.Spec.Listener)

if r.Spec.TelegrafTemplate != nil {
if containsPlugins(r.Spec.Plugins, "emqx_prometheus") == -1 {
r.Spec.Plugins = append(r.Spec.Plugins, Plugin{Name: "emqx_prometheus", Enable: true})
}
if containsEnv(r.Spec.Env, "EMQX_PROMETHEUS__PUSH__GATEWAY__SERVER") == -1 {
r.Spec.Env = append(r.Spec.Env, corev1.EnvVar{Name: "EMQX_PROMETHEUS__PUSH__GATEWAY__SERVER", Value: ""})
}
}
}

// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
Expand Down
43 changes: 43 additions & 0 deletions apis/apps/v1beta1/emqxbroker_webhook_test.go
Expand Up @@ -196,6 +196,49 @@ func TestDefaultBroker(t *testing.T) {
assert.Equal(t, emqx.Spec.Listener.Type, corev1.ServiceType("ClusterIP"))
assert.Equal(t, emqx.Spec.Listener.Ports.MQTTS, int32(8885))
assert.Equal(t, emqx.Spec.Listener.Ports.API, int32(8081))

telegrafConf := `
[global_tags]
instanceID = "test"
[[inputs.http]]
urls = ["http://127.0.0.1:8081/api/v4/emqx_prometheus"]
method = "GET"
timeout = "5s"
username = "admin"
password = "public"
data_format = "json"
[[inputs.tail]]
files = ["/opt/emqx/log/emqx.log.[1-5]"]
from_beginning = false
max_undelivered_lines = 64
character_encoding = "utf-8"
data_format = "grok"
grok_patterns = ['^%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02T15:04:05.999999999-07:00"} \[%{LOGLEVEL:level}\] (?m)%{GREEDYDATA:messages}$']
[[outputs.discard]]
`
emqx.Spec.TelegrafTemplate = &v1beta1.TelegrafTemplate{
Image: "telegraf:1.19.3",
Conf: &telegrafConf,
}
emqx.Default()
assert.Subset(t, emqx.Spec.Plugins,
[]v1beta1.Plugin{
{
Name: "emqx_prometheus",
Enable: true,
},
},
)
assert.Subset(t, emqx.Spec.Env,
[]corev1.EnvVar{
{
Name: "EMQX_PROMETHEUS__PUSH__GATEWAY__SERVER",
Value: "",
},
},
)
}

func TestValidateCreateBroker(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions apis/apps/v1beta1/emqxenterprise_webhook.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1beta1

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -62,6 +63,15 @@ func (r *EmqxEnterprise) Default() {
r.Spec.Plugins = generatePlugins(r.Spec.Plugins)
r.Spec.Modules = generateEmqxEnterpriseModules(r.Spec.Modules)
r.Spec.Listener = generateListener(r.Spec.Listener)

if r.Spec.TelegrafTemplate != nil {
if containsPlugins(r.Spec.Plugins, "emqx_prometheus") == -1 {
r.Spec.Plugins = append(r.Spec.Plugins, Plugin{Name: "emqx_prometheus", Enable: true})
}
if containsEnv(r.Spec.Env, "EMQX_PROMETHEUS__PUSH__GATEWAY__SERVER") == -1 {
r.Spec.Env = append(r.Spec.Env, corev1.EnvVar{Name: "EMQX_PROMETHEUS__PUSH__GATEWAY__SERVER", Value: ""})
}
}
}

// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
Expand Down
19 changes: 9 additions & 10 deletions apis/apps/v1beta1/env_type.go
Expand Up @@ -5,25 +5,24 @@ import (
)

func generateEnv(emqx Emqx) []corev1.EnvVar {
contains := func(Env []corev1.EnvVar, Name string) int {
for index, value := range Env {
if value.Name == Name {
return index
}
}
return -1
}

env := emqx.GetEnv()
e := defaultEnv(emqx)
for _, value := range e {
r := contains(env, value.Name)
r := containsEnv(env, value.Name)
if r == -1 {
env = append(env, value)
}
}
return env
}

func containsEnv(Env []corev1.EnvVar, Name string) int {
for index, value := range Env {
if value.Name == Name {
return index
}
}
return -1
}

func defaultEnv(emqx Emqx) []corev1.EnvVar {
Expand Down
20 changes: 10 additions & 10 deletions apis/apps/v1beta1/plugins_type.go
Expand Up @@ -11,22 +11,22 @@ func generatePlugins(plugins []Plugin) []Plugin {
return defaultLoadedPlugins()
}

contains := func(plugins []Plugin) int {
for index, value := range plugins {
if value.Name == "emqx_management" {
return index
}
}
return -1
}

if contains(plugins) == -1 {
if containsPlugins(plugins, "emqx_management") == -1 {
plugins = append(plugins, Plugin{Name: "emqx_management", Enable: true})
}

return plugins
}

func containsPlugins(plugins []Plugin, name string) int {
for index, value := range plugins {
if value.Name == name {
return index
}
}
return -1
}

func defaultLoadedPlugins() []Plugin {
return []Plugin{
{
Expand Down
18 changes: 10 additions & 8 deletions config/samples/emqx/v1beta1/emqx-ee.yaml
Expand Up @@ -51,23 +51,25 @@ spec:
enable: true
- name: emqx_dashboard
enable: true
- name: emqx_prometheus
enable: true
# license: the license for EMQX Enterprise should be completed
telegrafTemplate:
image: "telegraf:1.19.3"
imagePullPolicy: "IfNotPresent"
conf: |
[global_tags]
instanceID = "test"
instanceID = "test"
[[inputs.http]]
urls = ["http://127.0.0.1:8081/api/v4/emqx_prometheus"]
method = "GET"
timeout = "5s"
username = "admin"
password = "public"
data_format = "json"
urls = ["http://127.0.0.1:8081/api/v4/emqx_prometheus"]
method = "GET"
timeout = "5s"
username = "admin"
password = "public"
data_format = "json"
[[inputs.tail]]
files = ["/opt/emqx/log/emqx.log.[1-9]"]
files = ["/opt/emqx/log/emqx.log.[1-5]"]
from_beginning = false
max_undelivered_lines = 64
character_encoding = "utf-8"
Expand Down
2 changes: 2 additions & 0 deletions config/samples/emqx/v1beta1/emqx.yaml
Expand Up @@ -135,6 +135,8 @@ spec:
plugins:
- name: emqx_management
enable: true
- name: emqx_prometheus
enable: true
- name: emqx_recon
enable: true
- name: emqx_retainer
Expand Down
86 changes: 44 additions & 42 deletions controllers_suite/suite_test.go
Expand Up @@ -273,26 +273,26 @@ func generateEmqxNamespace(namespace string) *corev1.Namespace {
func generateEmqxBroker(name, namespace string) *v1beta1.EmqxBroker {
storageClassName := "standard"
telegrafConf := `
[global_tags]
instanceID = "test"
[[inputs.http]]
urls = ["http://127.0.0.1:8081/api/v4/emqx_prometheus"]
method = "GET"
timeout = "5s"
username = "admin"
password = "public"
data_format = "json"
[[inputs.tail]]
files = ["/opt/emqx/log/emqx.log.[1-9]"]
from_beginning = false
max_undelivered_lines = 64
character_encoding = "utf-8"
data_format = "grok"
grok_patterns = ['^%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02T15:04:05.999999999-07:00"} \[%{LOGLEVEL:level}\] (?m)%{GREEDYDATA:messages}$']
[[outputs.discard]]
`
[global_tags]
instanceID = "test"
[[inputs.http]]
urls = ["http://127.0.0.1:8081/api/v4/emqx_prometheus"]
method = "GET"
timeout = "5s"
username = "admin"
password = "public"
data_format = "json"
[[inputs.tail]]
files = ["/opt/emqx/log/emqx.log.[1-5]"]
from_beginning = false
max_undelivered_lines = 64
character_encoding = "utf-8"
data_format = "grok"
grok_patterns = ['^%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02T15:04:05.999999999-07:00"} \[%{LOGLEVEL:level}\] (?m)%{GREEDYDATA:messages}$']
[[outputs.discard]]
`
emqx := &v1beta1.EmqxBroker{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps.emqx.io/v1beta1",
Expand Down Expand Up @@ -361,26 +361,26 @@ func generateEmqxBroker(name, namespace string) *v1beta1.EmqxBroker {
// Slim
func generateEmqxEnterprise(name, namespace string) *v1beta1.EmqxEnterprise {
telegrafConf := `
[global_tags]
instanceID = "test"
[[inputs.http]]
urls = ["http://127.0.0.1:8081/api/v4/emqx_prometheus"]
method = "GET"
timeout = "5s"
username = "admin"
password = "public"
data_format = "json"
[[inputs.tail]]
files = ["/opt/emqx/log/emqx.log.[1-9]"]
from_beginning = false
max_undelivered_lines = 64
character_encoding = "utf-8"
data_format = "grok"
grok_patterns = ['^%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02T15:04:05.999999999-07:00"} \[%{LOGLEVEL:level}\] (?m)%{GREEDYDATA:messages}$']
[[outputs.discard]]
`
[global_tags]
instanceID = "test"
[[inputs.http]]
urls = ["http://127.0.0.1:8081/api/v4/emqx_prometheus"]
method = "GET"
timeout = "5s"
username = "admin"
password = "public"
data_format = "json"
[[inputs.tail]]
files = ["/opt/emqx/log/emqx.log.[1-5]"]
from_beginning = false
max_undelivered_lines = 64
character_encoding = "utf-8"
data_format = "grok"
grok_patterns = ['^%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02T15:04:05.999999999-07:00"} \[%{LOGLEVEL:level}\] (?m)%{GREEDYDATA:messages}$']
[[outputs.discard]]
`
emqx := &v1beta1.EmqxEnterprise{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps.emqx.io/v1beta1",
Expand All @@ -395,7 +395,8 @@ func generateEmqxEnterprise(name, namespace string) *v1beta1.EmqxEnterprise {
},
Spec: v1beta1.EmqxEnterpriseSpec{
Image: "emqx/emqx-ee:4.4.0",
License: `-----BEGIN CERTIFICATE-----
License: `
-----BEGIN CERTIFICATE-----
MIIENzCCAx+gAwIBAgIDdMvVMA0GCSqGSIb3DQEBBQUAMIGDMQswCQYDVQQGEwJD
TjERMA8GA1UECAwIWmhlamlhbmcxETAPBgNVBAcMCEhhbmd6aG91MQwwCgYDVQQK
DANFTVExDDAKBgNVBAsMA0VNUTESMBAGA1UEAwwJKi5lbXF4LmlvMR4wHAYJKoZI
Expand All @@ -419,7 +420,8 @@ iL3a2tdZ4sq+h/Z1elIXD71JJBAImjr6BljTIdUCfVtNvxlE8M0D/rKSn2jwzsjI
UrW88THMtlz9sb56kmM3JIOoIJoep6xNEajIBnoChSGjtBYFNFwzdwSTCodYkgPu
JifqxTKSuwAGSlqxJUwhjWG8ulzL3/pCAYEwlWmd2+nsfotQdiANdaPnez7o0z0s
EujOCZMbK8qNfSbyo50q5iIXhz2ZIGl+4hdp
-----END CERTIFICATE-----`,
-----END CERTIFICATE-----
`,
TelegrafTemplate: &v1beta1.TelegrafTemplate{
Image: "telegraf:1.19.3",
Conf: &telegrafConf,
Expand Down