Skip to content

Commit

Permalink
controllers/vmagent: properly handle merge serviceSpec for additional…
Browse files Browse the repository at this point in the history
…Ports

previously port name wasn't checked by operator, which leads to runtime errros during additional service creation.
now user could override port settings by it's name and obtain full controll over additional ingest ports (influx,graphite, opentsdb).
it's usefull for nodePorts
#531
  • Loading branch information
f41gh7 committed Sep 29, 2022
1 parent 4f3f5ea commit 05d332d
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 9 deletions.
17 changes: 13 additions & 4 deletions controllers/factory/additional_ports.go
Expand Up @@ -89,7 +89,16 @@ func buildAdditionalServicePorts(ip *victoriametricsv1beta1.InsertPorts, svc *co
if ip == nil || svc == nil {
return
}
if ip.GraphitePort != "" {
hasPortByName := func(name string) bool {
for _, port := range svc.Spec.Ports {
if port.Name == name {
return true
}
}
return false
}
// user want to use own definition for graphite
if ip.GraphitePort != "" && !hasPortByName("graphite-tcp") && !hasPortByName("graphite-udp") {
svc.Spec.Ports = append(svc.Spec.Ports,
corev1.ServicePort{
Name: "graphite-tcp",
Expand All @@ -105,7 +114,7 @@ func buildAdditionalServicePorts(ip *victoriametricsv1beta1.InsertPorts, svc *co
},
)
}
if ip.InfluxPort != "" {
if ip.InfluxPort != "" && !hasPortByName("influx-tcp") && !hasPortByName("influx-udp") {
svc.Spec.Ports = append(svc.Spec.Ports,
corev1.ServicePort{
Name: "influx-tcp",
Expand All @@ -121,7 +130,7 @@ func buildAdditionalServicePorts(ip *victoriametricsv1beta1.InsertPorts, svc *co
},
)
}
if ip.OpenTSDBPort != "" {
if ip.OpenTSDBPort != "" && !hasPortByName("opentsdb-tcp") && !hasPortByName("opentsdb-udp") {
svc.Spec.Ports = append(svc.Spec.Ports,
corev1.ServicePort{
Name: "opentsdb-tcp",
Expand All @@ -137,7 +146,7 @@ func buildAdditionalServicePorts(ip *victoriametricsv1beta1.InsertPorts, svc *co
},
)
}
if ip.OpenTSDBHTTPPort != "" {
if ip.OpenTSDBHTTPPort != "" && !hasPortByName("opentsdb-http") {
svc.Spec.Ports = append(svc.Spec.Ports,
corev1.ServicePort{
Name: "opentsdb-http",
Expand Down
89 changes: 84 additions & 5 deletions controllers/factory/vmagent_test.go
Expand Up @@ -751,11 +751,12 @@ func TestCreateOrUpdateVMAgentService(t *testing.T) {
c *config.BaseOperatorConf
}
tests := []struct {
name string
args args
want func(svc *corev1.Service) error
wantErr bool
predefinedObjects []runtime.Object
name string
args args
want func(svc *corev1.Service) error
wantAdditionalService func(svc *corev1.Service) error
wantErr bool
predefinedObjects []runtime.Object
}{
{
name: "base case",
Expand Down Expand Up @@ -829,6 +830,75 @@ func TestCreateOrUpdateVMAgentService(t *testing.T) {
return nil
},
},
{
name: "base case with ingestPorts and extra service",
args: args{
ctx: context.TODO(),
c: config.MustGetBaseConfig(),
cr: &victoriametricsv1beta1.VMAgent{
ObjectMeta: metav1.ObjectMeta{
Name: "base",
Namespace: "default",
},
Spec: victoriametricsv1beta1.VMAgentSpec{
InsertPorts: &victoriametricsv1beta1.InsertPorts{
InfluxPort: "8011",
},
ServiceSpec: &victoriametricsv1beta1.ServiceSpec{
EmbeddedObjectMetadata: victoriametricsv1beta1.EmbeddedObjectMetadata{Name: "extra-svc"},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Ports: []corev1.ServicePort{
{
Name: "influx-udp",
NodePort: 8085,
Protocol: corev1.ProtocolUDP,
},
},
},
},
},
},
},
predefinedObjects: []runtime.Object{
&corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "extra-svc",
Namespace: "default",
Labels: map[string]string{
"app.kubernetes.io/name": "vmagent",
"app.kubernetes.io/instance": "base",
"app.kubernetes.io/component": "monitoring",
"managed-by": "vm-operator",
},
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
},
},
},
want: func(svc *corev1.Service) error {
if svc.Name != "vmagent-base" {
return fmt.Errorf("unexpected name for service: %v", svc.Name)
}
if len(svc.Spec.Ports) != 3 {
return fmt.Errorf("unexpcted count for ports, want 3, got: %v", len(svc.Spec.Ports))
}
return nil
},
wantAdditionalService: func(svc *corev1.Service) error {
if len(svc.Spec.Ports) != 1 {
return fmt.Errorf("unexpcted count for ports, want 1, got: %v", len(svc.Spec.Ports))
}
if svc.Spec.Ports[0].NodePort != 8085 {
return fmt.Errorf("unexpected port %v, want 8085", svc.Spec.Ports[0])
}
if svc.Spec.Ports[0].Protocol != corev1.ProtocolUDP {
return fmt.Errorf("unexpected protocol want udp, got: %v", svc.Spec.Ports[0].Protocol)
}
return nil
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -841,6 +911,15 @@ func TestCreateOrUpdateVMAgentService(t *testing.T) {
if err := tt.want(got); err != nil {
t.Errorf("CreateOrUpdateVMAgentService() unexpected error: %v", err)
}
if tt.wantAdditionalService != nil {
var additionalSvc corev1.Service
if err := cl.Get(tt.args.ctx, types.NamespacedName{Namespace: tt.args.cr.Namespace, Name: tt.args.cr.Spec.ServiceSpec.NameOrDefault(tt.args.cr.Name)}, &additionalSvc); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if err := tt.wantAdditionalService(&additionalSvc); err != nil {
t.Fatalf("CreateOrUpdateVMAgentService validation failed for additional service: %s", err)
}
}
})
}
}
Expand Down

0 comments on commit 05d332d

Please sign in to comment.