Skip to content

Commit

Permalink
refactor telemetry backendRefs (#3293)
Browse files Browse the repository at this point in the history
* refactor telemetry backendRefs

Signed-off-by: zirain <zirain2009@gmail.com>

* update Gateway condition

Signed-off-by: zirain <zirain2009@gmail.com>

* lint

Signed-off-by: zirain <zirain2009@gmail.com>

* processParamsRef before processBackendRefs

Signed-off-by: zirain <zirain2009@gmail.com>

* fix test

Signed-off-by: zirain <zirain2009@gmail.com>

* address comments

Signed-off-by: zirain <zirain2009@gmail.com>

* use destinations

Signed-off-by: zirain <zirain2009@gmail.com>

* refactor

Signed-off-by: zirain <zirain2009@gmail.com>

* lint

Signed-off-by: zirain <zirain2009@gmail.com>

* yamllint

Signed-off-by: zirain <zirain2009@gmail.com>

* fix authority

Signed-off-by: zirain <zirain2009@gmail.com>

* address TODO

Signed-off-by: zirain <zirain2009@gmail.com>

* fix protocol

Signed-off-by: zirain <zirain2009@gmail.com>

* fix lint

Signed-off-by: zirain <zirain2009@gmail.com>

* update test

Signed-off-by: zirain <zirain2009@gmail.com>

* remove status.RemoveGatewayListenersNotValidCondition

Signed-off-by: zirain <zirain2009@gmail.com>

---------

Signed-off-by: zirain <zirain2009@gmail.com>
  • Loading branch information
zirain committed May 22, 2024
1 parent 101aa48 commit 5aded1e
Show file tree
Hide file tree
Showing 47 changed files with 1,648 additions and 482 deletions.
1 change: 1 addition & 0 deletions internal/gatewayapi/contexts.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func GetRouteType(route RouteContext) gwapiv1.Kind {

// TODO: [v1alpha2-gwapiv1] This should not be required once all Route
// objects being implemented are of type gwapiv1.

// GetHostnames returns the hosts targeted by the Route object.
func GetHostnames(route RouteContext) []string {
rv := reflect.ValueOf(route).Elem()
Expand Down
149 changes: 119 additions & 30 deletions internal/gatewayapi/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@
package gatewayapi

import (
"errors"
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/gatewayapi/status"
"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/utils"
"github.com/envoyproxy/gateway/internal/utils/naming"
"github.com/envoyproxy/gateway/internal/utils/net"
)

var _ ListenersTranslator = (*Translator)(nil)
Expand Down Expand Up @@ -44,7 +46,7 @@ func (t *Translator) ProcessListeners(gateways []*GatewayContext, xdsIR XdsIRMap
if resources.EnvoyProxy != nil {
infraIR[irKey].Proxy.Config = resources.EnvoyProxy
}
t.processProxyObservability(gateway.Gateway, xdsIR[irKey], infraIR[irKey].Proxy.Config)
t.processProxyObservability(gateway, xdsIR[irKey], infraIR[irKey].Proxy.Config, resources)

for _, listener := range gateway.listeners {
// Process protocol & supported kinds
Expand Down Expand Up @@ -143,10 +145,29 @@ func (t *Translator) ProcessListeners(gateways []*GatewayContext, xdsIR XdsIRMap
}
}

func (t *Translator) processProxyObservability(gw *gwapiv1.Gateway, xdsIR *ir.Xds, envoyProxy *egv1a1.EnvoyProxy) {
xdsIR.AccessLog = processAccessLog(envoyProxy)
xdsIR.Tracing = processTracing(gw, envoyProxy, t.MergeGateways)
xdsIR.Metrics = processMetrics(envoyProxy)
func (t *Translator) processProxyObservability(gwCtx *GatewayContext, xdsIR *ir.Xds, envoyProxy *egv1a1.EnvoyProxy, resources *Resources) {
var err error

xdsIR.AccessLog, err = t.processAccessLog(envoyProxy, resources)
if err != nil {
status.UpdateGatewayListenersNotValidCondition(gwCtx.Gateway, gwapiv1.GatewayReasonInvalid, metav1.ConditionFalse,
fmt.Sprintf("Invalid access log backendRefs: %v", err))
return
}

xdsIR.Tracing, err = t.processTracing(gwCtx.Gateway, envoyProxy, t.MergeGateways, resources)
if err != nil {
status.UpdateGatewayListenersNotValidCondition(gwCtx.Gateway, gwapiv1.GatewayReasonInvalid, metav1.ConditionFalse,
fmt.Sprintf("Invalid tracing backendRefs: %v", err))
return
}

xdsIR.Metrics, err = t.processMetrics(envoyProxy, resources)
if err != nil {
status.UpdateGatewayListenersNotValidCondition(gwCtx.Gateway, gwapiv1.GatewayReasonInvalid, metav1.ConditionFalse,
fmt.Sprintf("Invalid metrics backendRefs: %v", err))
return
}
}

func (t *Translator) processInfraIRListener(listener *ListenerContext, infraIR InfraIRMap, irKey string, servicePort *protocolPort, containerPort int32) {
Expand Down Expand Up @@ -179,7 +200,7 @@ func (t *Translator) processInfraIRListener(listener *ListenerContext, infraIR I
infraIR[irKey].Proxy.Listeners = append(infraIR[irKey].Proxy.Listeners, proxyListener)
}

func processAccessLog(envoyproxy *egv1a1.EnvoyProxy) *ir.AccessLog {
func (t *Translator) processAccessLog(envoyproxy *egv1a1.EnvoyProxy, resources *Resources) (*ir.AccessLog, error) {
if envoyproxy == nil ||
envoyproxy.Spec.Telemetry == nil ||
envoyproxy.Spec.Telemetry.AccessLog == nil ||
Expand All @@ -191,16 +212,16 @@ func processAccessLog(envoyproxy *egv1a1.EnvoyProxy) *ir.AccessLog {
Path: "/dev/stdout",
},
},
}
}, nil
}

if envoyproxy.Spec.Telemetry.AccessLog.Disable {
return nil
return nil, nil
}

irAccessLog := &ir.AccessLog{}
// translate the access log configuration to the IR
for _, accessLog := range envoyproxy.Spec.Telemetry.AccessLog.Settings {
for idx, accessLog := range envoyproxy.Spec.Telemetry.AccessLog.Settings {
for _, sink := range accessLog.Sinks {
switch sink.Type {
case egv1a1.ProxyAccessLogSinkTypeFile:
Expand Down Expand Up @@ -234,16 +255,28 @@ func processAccessLog(envoyproxy *egv1a1.EnvoyProxy) *ir.AccessLog {

// TODO: remove support for Host/Port in v1.2
al := &ir.OpenTelemetryAccessLog{
Port: uint32(sink.OpenTelemetry.Port),
Resources: sink.OpenTelemetry.Resources,
}

if sink.OpenTelemetry.Host != nil {
al.Host = *sink.OpenTelemetry.Host
// TODO: how to get authority from the backendRefs?
ds, err := t.processBackendRefs(sink.OpenTelemetry.BackendRefs, envoyproxy.Namespace, resources)
if err != nil {
return nil, err
}
al.Destination = ir.RouteDestination{
Name: fmt.Sprintf("accesslog-%d", idx), // TODO: rename this, so that we can share backend with tracing?
Settings: ds,
}

if len(sink.OpenTelemetry.BackendRefs) > 0 {
al.Host, al.Port = net.BackendHostAndPort(sink.OpenTelemetry.BackendRefs[0].BackendObjectReference, envoyproxy.Namespace)
if len(ds) == 0 {
// fallback to host and port
var host string
var port uint32
if sink.OpenTelemetry.Host != nil {
host, port = *sink.OpenTelemetry.Host, uint32(sink.OpenTelemetry.Port)
}
al.Destination.Settings = destinationSettingFromHostAndPort(host, port)
al.Authority = host
}

switch accessLog.Format.Type {
Expand All @@ -258,25 +291,35 @@ func processAccessLog(envoyproxy *egv1a1.EnvoyProxy) *ir.AccessLog {
}
}

return irAccessLog
return irAccessLog, nil
}

func processTracing(gw *gwapiv1.Gateway, envoyproxy *egv1a1.EnvoyProxy, mergeGateways bool) *ir.Tracing {
func (t *Translator) processTracing(gw *gwapiv1.Gateway, envoyproxy *egv1a1.EnvoyProxy, mergeGateways bool, resources *Resources) (*ir.Tracing, error) {
if envoyproxy == nil ||
envoyproxy.Spec.Telemetry == nil ||
envoyproxy.Spec.Telemetry.Tracing == nil {
return nil
return nil, nil
}
tracing := envoyproxy.Spec.Telemetry.Tracing

// TODO: remove support for Host/Port in v1.2
var host string
var port uint32
if tracing.Provider.Host != nil {
host, port = *tracing.Provider.Host, uint32(tracing.Provider.Port)
// TODO: how to get authority from the backendRefs?
ds, err := t.processBackendRefs(tracing.Provider.BackendRefs, envoyproxy.Namespace, resources)
if err != nil {
return nil, err
}
if len(tracing.Provider.BackendRefs) > 0 {
host, port = net.BackendHostAndPort(tracing.Provider.BackendRefs[0].BackendObjectReference, gw.Namespace)

var authority string

// fallback to host and port
// TODO: remove support for Host/Port in v1.2
if len(ds) == 0 {
var host string
var port uint32
if tracing.Provider.Host != nil {
host, port = *tracing.Provider.Host, uint32(tracing.Provider.Port)
}
ds = destinationSettingFromHostAndPort(host, port)
authority = host
}

samplingRate := 100.0
Expand All @@ -290,22 +333,68 @@ func processTracing(gw *gwapiv1.Gateway, envoyproxy *egv1a1.EnvoyProxy, mergeGat
}

return &ir.Tracing{
Authority: authority,
ServiceName: serviceName,
Host: host,
Port: port,
SamplingRate: samplingRate,
CustomTags: tracing.CustomTags,
}
Destination: ir.RouteDestination{
Name: "tracing", // TODO: rename this, so that we can share backend with accesslog?
Settings: ds,
},
}, nil
}

func processMetrics(envoyproxy *egv1a1.EnvoyProxy) *ir.Metrics {
func (t *Translator) processMetrics(envoyproxy *egv1a1.EnvoyProxy, resources *Resources) (*ir.Metrics, error) {
if envoyproxy == nil ||
envoyproxy.Spec.Telemetry == nil ||
envoyproxy.Spec.Telemetry.Metrics == nil {
return nil
return nil, nil
}

for _, sink := range envoyproxy.Spec.Telemetry.Metrics.Sinks {
if sink.OpenTelemetry == nil {
continue
}

_, err := t.processBackendRefs(sink.OpenTelemetry.BackendRefs, envoyproxy.Namespace, resources)
if err != nil {
return nil, err
}
}

return &ir.Metrics{
EnableVirtualHostStats: envoyproxy.Spec.Telemetry.Metrics.EnableVirtualHostStats,
EnablePerEndpointStats: envoyproxy.Spec.Telemetry.Metrics.EnablePerEndpointStats,
}, nil
}

func (t *Translator) processBackendRefs(backendRefs []egv1a1.BackendRef, namespace string, resources *Resources) ([]*ir.DestinationSetting, error) {
result := make([]*ir.DestinationSetting, 0, len(backendRefs))
for _, ref := range backendRefs {
ns := NamespaceDerefOr(ref.Namespace, namespace)
kind := KindDerefOr(ref.Kind, KindService)
if kind != KindService {
return nil, errors.New("only service kind is supported for backendRefs")
}
if err := validateBackendService(ref.BackendObjectReference, resources, ns, corev1.ProtocolTCP); err != nil {
return nil, err
}

ds := t.processServiceDestinationSetting(ref.BackendObjectReference, ns, ir.GRPC, resources)
result = append(result, ds)
}
if len(result) == 0 {
return nil, nil
}
return result, nil
}

func destinationSettingFromHostAndPort(host string, port uint32) []*ir.DestinationSetting {
return []*ir.DestinationSetting{
{
Weight: ptr.To[uint32](1),
Protocol: ir.GRPC,
Endpoints: []*ir.DestinationEndpoint{ir.NewDestEndpoint(host, port)},
},
}
}
Loading

0 comments on commit 5aded1e

Please sign in to comment.