Skip to content

Commit d71a4d1

Browse files
dilyevskyclaude
andcommitted
[gateway] Implement Gateway status error handling (APO-387)
Ensure validation errors in the xDS pipeline bubble up to users via Gateway and Route status conditions. Key fixes: - Fix status updates always being bypassed due to in-place mutation comparison bug (deep copy obj before mutation) - Add status runner infrastructure for processing Gateway API resource status updates - Wire up status updater through gateway server and apiserver Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent c50bb30 commit d71a4d1

File tree

8 files changed

+357
-45
lines changed

8 files changed

+357
-45
lines changed

pkg/apiserver/manager.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/apoxy-dev/apoxy/pkg/apiserver/gateway"
4747
"github.com/apoxy-dev/apoxy/pkg/cryptoutils"
4848
"github.com/apoxy-dev/apoxy/pkg/gateway/message"
49+
statusrunner "github.com/apoxy-dev/apoxy/pkg/gateway/status/runner"
4950
"github.com/apoxy-dev/apoxy/pkg/log"
5051
apoxynet "github.com/apoxy-dev/apoxy/pkg/tunnel/net"
5152
tunnet "github.com/apoxy-dev/apoxy/pkg/tunnel/net"
@@ -523,6 +524,18 @@ func (m *Manager) Start(
523524
return fmt.Errorf("failed to wait for APIService %s: %v", corev1alpha2.GroupVersion.Group, err)
524525
}
525526

527+
// Start the Status Runner to write Gateway API resource statuses back to the API server.
528+
// This subscribes to status updates from the gateway translation pipeline and writes
529+
// them back to Kubernetes.
530+
log.Infof("Starting Gateway API Status Runner")
531+
statusRunner := statusrunner.New(&statusrunner.Config{
532+
Client: m.manager.GetClient(),
533+
ProviderResources: gwResources,
534+
})
535+
if err := statusRunner.Start(ctx); err != nil {
536+
return fmt.Errorf("failed to start status runner: %v", err)
537+
}
538+
526539
ctx, cancel := context.WithCancel(ctx)
527540
defer cancel()
528541
g, ctx := errgroup.WithContext(ctx)

pkg/backplane/controllers/gateway.go

Lines changed: 8 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"strconv"
77

88
"k8s.io/apimachinery/pkg/api/errors"
9-
"k8s.io/apimachinery/pkg/api/meta"
10-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
119
"k8s.io/apimachinery/pkg/types"
1210
ctrl "sigs.k8s.io/controller-runtime"
1311
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -154,48 +152,15 @@ func (r *GatewayReconciler) extractEnvoyListeners(gw *gatewayv1.Gateway) []*envo
154152
return listeners
155153
}
156154

157-
// updateGatewayStatus updates the Gateway status with conditions based on the Proxy state.
155+
// updateGatewayStatus updates the Gateway status based on the Proxy state.
156+
// Note: Gateway status conditions (Accepted, Programmed) are now managed by the
157+
// apiserver's status runner which subscribes to translation pipeline status updates.
158+
// This function is kept for potential future health-check based status updates.
158159
func (r *GatewayReconciler) updateGatewayStatus(ctx context.Context, gw *gatewayv1.Gateway, proxy *corev1alpha2.Proxy) error {
159-
acceptedCondition := metav1.Condition{
160-
Type: string(gwapiv1.GatewayConditionAccepted),
161-
Status: metav1.ConditionTrue,
162-
ObservedGeneration: gw.Generation,
163-
LastTransitionTime: metav1.Now(),
164-
Reason: string(gwapiv1.GatewayReasonAccepted),
165-
Message: "Gateway accepted and associated with Proxy",
166-
}
167-
168-
programmedCondition := metav1.Condition{
169-
Type: string(gwapiv1.GatewayConditionProgrammed),
170-
Status: metav1.ConditionTrue,
171-
ObservedGeneration: gw.Generation,
172-
LastTransitionTime: metav1.Now(),
173-
Reason: string(gwapiv1.GatewayReasonProgrammed),
174-
Message: "Gateway listeners are configured in Envoy",
175-
}
176-
177-
meta.SetStatusCondition(&gw.Status.Conditions, acceptedCondition)
178-
meta.SetStatusCondition(&gw.Status.Conditions, programmedCondition)
179-
180-
for i, l := range gw.Spec.Listeners {
181-
if i < len(gw.Status.Listeners) {
182-
gw.Status.Listeners[i].Name = l.Name
183-
gw.Status.Listeners[i].AttachedRoutes = 0 // This would need route counting logic
184-
185-
// Set listener conditions
186-
listenerAccepted := metav1.Condition{
187-
Type: string(gwapiv1.ListenerConditionAccepted),
188-
Status: metav1.ConditionTrue,
189-
ObservedGeneration: gw.Generation,
190-
LastTransitionTime: metav1.Now(),
191-
Reason: string(gwapiv1.ListenerReasonAccepted),
192-
Message: "Listener accepted",
193-
}
194-
meta.SetStatusCondition(&gw.Status.Listeners[i].Conditions, listenerAccepted)
195-
}
196-
}
197-
198-
return r.Status().Update(ctx, gw)
160+
// Status conditions are managed by the apiserver status runner.
161+
// The backplane should only update status based on health checks or
162+
// runtime information that isn't available at translation time.
163+
return nil
199164
}
200165

201166
// SetupWithManager sets up the controller with the Manager.

pkg/gateway/gatewayapi/contexts.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,39 @@ type GatewayContext struct {
2525
listeners []*ListenerContext
2626
}
2727

28+
// SetCondition sets a condition on the Gateway status.
29+
func (g *GatewayContext) SetCondition(conditionType gwapiv1.GatewayConditionType, status metav1.ConditionStatus, reason gwapiv1.GatewayConditionReason, message string) {
30+
cond := metav1.Condition{
31+
Type: string(conditionType),
32+
Status: status,
33+
Reason: string(reason),
34+
Message: message,
35+
ObservedGeneration: g.Generation,
36+
LastTransitionTime: metav1.NewTime(time.Now()),
37+
}
38+
39+
idx := -1
40+
for i, existing := range g.Status.Conditions {
41+
if existing.Type == cond.Type {
42+
// return early if the condition is unchanged
43+
if existing.Status == cond.Status &&
44+
existing.Reason == cond.Reason &&
45+
existing.Message == cond.Message &&
46+
existing.ObservedGeneration == cond.ObservedGeneration {
47+
return
48+
}
49+
idx = i
50+
break
51+
}
52+
}
53+
54+
if idx > -1 {
55+
g.Status.Conditions[idx] = cond
56+
} else {
57+
g.Status.Conditions = append(g.Status.Conditions, cond)
58+
}
59+
}
60+
2861
// ResetListeners resets the listener statuses and re-generates the GatewayContext
2962
// ListenerContexts from the Gateway spec.
3063
func (g *GatewayContext) ResetListeners() {

pkg/gateway/gatewayapi/runner/runner.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package runner
22

33
import (
44
"context"
5+
"fmt"
56

67
egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
8+
"k8s.io/apimachinery/pkg/api/meta"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
710
"k8s.io/apimachinery/pkg/runtime/schema"
811
"k8s.io/apimachinery/pkg/types"
912
"k8s.io/apimachinery/pkg/util/sets"
@@ -103,6 +106,22 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
103106
if err := val.Validate(); err != nil {
104107
log.Error("unable to validate xds ir, skipped sending it", "error", err)
105108
errChan <- err
109+
110+
// Update Gateway status to reflect validation error
111+
for _, gateway := range result.Gateways {
112+
gwKey := utils.NamespacedName(gateway)
113+
meta.SetStatusCondition(&gateway.Status.Conditions, metav1.Condition{
114+
Type: string(gwapiv1.GatewayConditionProgrammed),
115+
Status: metav1.ConditionFalse,
116+
ObservedGeneration: gateway.Generation,
117+
LastTransitionTime: metav1.Now(),
118+
Reason: string(gwapiv1.GatewayReasonInvalid),
119+
Message: fmt.Sprintf("IR validation failed: %v", err),
120+
})
121+
r.ProviderResources.GatewayStatuses.Store(gwKey, &gateway.Status)
122+
delete(statusesToDelete.GatewayStatusKeys, gwKey)
123+
}
124+
106125
continue
107126
}
108127

pkg/gateway/gatewayapi/status/status.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ import (
2828
gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
2929

3030
egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
31+
32+
gatewayv1 "github.com/apoxy-dev/apoxy/api/gateway/v1"
33+
gatewayv1alpha2 "github.com/apoxy-dev/apoxy/api/gateway/v1alpha2"
3134
)
3235

3336
// Update contains an all the information needed to update an object's status.
@@ -84,16 +87,20 @@ func (u *UpdateHandler) apply(update Update) {
8487
return err
8588
}
8689

90+
// Deep copy before mutation to preserve original state for comparison.
91+
// This is necessary because Mutate() modifies the object in-place.
92+
oldObj := obj.DeepCopyObject().(client.Object)
93+
8794
newObj := update.Mutator.Mutate(obj)
8895

89-
if isStatusEqual(obj, newObj) {
96+
if isStatusEqual(oldObj, newObj) {
9097
u.log.WithName(update.NamespacedName.Name).
9198
WithName(update.NamespacedName.Namespace).
9299
Info("status unchanged, bypassing update")
93100
return nil
94101
}
95102

96-
newObj.SetUID(obj.GetUID())
103+
newObj.SetUID(oldObj.GetUID())
97104

98105
return u.client.Status().Update(context.Background(), newObj)
99106
}); err != nil {
@@ -171,6 +178,7 @@ func (u *UpdateWriter) Send(update Update) {
171178
// ClientTrafficPolicy
172179
// SecurityPolicy
173180
// BackendTLSPolicy
181+
// Apoxy Gateway types (gatewayv1.Gateway, gatewayv1.HTTPRoute, etc.)
174182
func isStatusEqual(objA, objB interface{}) bool {
175183
opts := cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")
176184
switch a := objA.(type) {
@@ -240,6 +248,43 @@ func isStatusEqual(objA, objB interface{}) bool {
240248
return true
241249
}
242250
}
251+
// Apoxy Gateway types
252+
case *gatewayv1.Gateway:
253+
if b, ok := objB.(*gatewayv1.Gateway); ok {
254+
if cmp.Equal(a.Status, b.Status, opts) {
255+
return true
256+
}
257+
}
258+
case *gatewayv1.HTTPRoute:
259+
if b, ok := objB.(*gatewayv1.HTTPRoute); ok {
260+
if cmp.Equal(a.Status, b.Status, opts) {
261+
return true
262+
}
263+
}
264+
case *gatewayv1.GRPCRoute:
265+
if b, ok := objB.(*gatewayv1.GRPCRoute); ok {
266+
if cmp.Equal(a.Status, b.Status, opts) {
267+
return true
268+
}
269+
}
270+
case *gatewayv1alpha2.TLSRoute:
271+
if b, ok := objB.(*gatewayv1alpha2.TLSRoute); ok {
272+
if cmp.Equal(a.Status, b.Status, opts) {
273+
return true
274+
}
275+
}
276+
case *gatewayv1alpha2.TCPRoute:
277+
if b, ok := objB.(*gatewayv1alpha2.TCPRoute); ok {
278+
if cmp.Equal(a.Status, b.Status, opts) {
279+
return true
280+
}
281+
}
282+
case *gatewayv1alpha2.UDPRoute:
283+
if b, ok := objB.(*gatewayv1alpha2.UDPRoute); ok {
284+
if cmp.Equal(a.Status, b.Status, opts) {
285+
return true
286+
}
287+
}
243288
}
244289

245290
return false

pkg/gateway/gatewayapi/translator.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package gatewayapi
77

88
import (
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
910
"k8s.io/apimachinery/pkg/runtime/schema"
1011
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
1112

@@ -202,6 +203,10 @@ func (t *Translator) GetRelevantGateways(gateways []*gwapiv1.Gateway) []*Gateway
202203
}
203204
gc.ResetListeners()
204205

206+
// Set Accepted condition - Gateway is accepted by this controller
207+
gc.SetCondition(gwapiv1.GatewayConditionAccepted, metav1.ConditionTrue,
208+
gwapiv1.GatewayReasonAccepted, "Gateway accepted by controller")
209+
205210
relevant = append(relevant, gc)
206211
}
207212
}

pkg/gateway/server.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import (
44
"context"
55

66
"google.golang.org/grpc/credentials"
7+
"sigs.k8s.io/controller-runtime/pkg/client"
78

89
gatewayapirunner "github.com/apoxy-dev/apoxy/pkg/gateway/gatewayapi/runner"
910
"github.com/apoxy-dev/apoxy/pkg/gateway/message"
11+
statusrunner "github.com/apoxy-dev/apoxy/pkg/gateway/status/runner"
1012
xdsserverrunner "github.com/apoxy-dev/apoxy/pkg/gateway/xds/server/runner"
1113
xdstranslator "github.com/apoxy-dev/apoxy/pkg/gateway/xds/translator"
1214
xdstranslatorrunner "github.com/apoxy-dev/apoxy/pkg/gateway/xds/translator/runner"
@@ -21,6 +23,16 @@ type serverOptions struct {
2123
extensionServerAddr string
2224
extensionServerCreds credentials.TransportCredentials
2325
extensionFailOpen bool
26+
client client.Client
27+
}
28+
29+
// WithClient sets the Kubernetes client for status updates.
30+
// If provided, a status runner will be started to write Gateway API
31+
// resource statuses back to the API server.
32+
func WithClient(c client.Client) ServerOption {
33+
return func(o *serverOptions) {
34+
o.client = c
35+
}
2436
}
2537

2638
// WithExtensionServer sets the extension server for the server options. If failOpen is true,
@@ -46,6 +58,19 @@ func RunServer(ctx context.Context, resources *message.ProviderResources, opts .
4658
opt(options)
4759
}
4860

61+
// Start the Status Runner if a client is provided.
62+
// It subscribes to Gateway API resource status updates and writes them
63+
// back to Kubernetes.
64+
if options.client != nil {
65+
statusRunner := statusrunner.New(&statusrunner.Config{
66+
Client: options.client,
67+
ProviderResources: resources,
68+
})
69+
if err := statusRunner.Start(ctx); err != nil {
70+
return err
71+
}
72+
}
73+
4974
xdsIR := new(message.XdsIR)
5075
// Start the GatewayAPI Translator Runner.
5176
// It subscribes to the provider resources, translates it to xDS IR

0 commit comments

Comments
 (0)