Skip to content

Commit d7fb8d5

Browse files
dilyevskyclaude
andcommitted
[mirror] add coordination lease heartbeat and mirror protection types
Add coordination.k8s.io/v1 Lease resource type for the Apoxy apiserver and implement heartbeat lease renewal in the mirror controller. The kube-controller now renews a coordination Lease every 10s (30s duration) so the cloud-side GC controller can detect disconnected mirrors and clean up orphaned objects. All sync methods now stamp a heartbeat annotation as a secondary liveness signal. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8d225f0 commit d7fb8d5

File tree

7 files changed

+883
-37
lines changed

7 files changed

+883
-37
lines changed

api/coordination/v1/doc.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// +k8s:deepcopy-gen=package
2+
// +k8s:openapi-gen=true
3+
4+
// Package v1 contains the coordination.k8s.io/v1 Lease API type for the Apoxy apiserver.
5+
package v1

api/coordination/v1/register.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package v1
2+
3+
import (
4+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
5+
"k8s.io/apimachinery/pkg/runtime"
6+
"k8s.io/apimachinery/pkg/runtime/schema"
7+
)
8+
9+
// GroupName specifies the group name used to register the objects.
10+
const GroupName = "coordination.k8s.io"
11+
12+
// GroupVersion specifies the group and the version used to register the objects.
13+
var GroupVersion = metav1.GroupVersion{Group: GroupName, Version: "v1"}
14+
15+
// SchemeGroupVersion is group version used to register these objects.
16+
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}
17+
18+
// Resource takes an unqualified resource and returns a Group qualified GroupResource.
19+
func Resource(resource string) schema.GroupResource {
20+
return SchemeGroupVersion.WithResource(resource).GroupResource()
21+
}
22+
23+
var (
24+
// SchemeBuilder and AddToScheme will stay in k8s.io/kubernetes.
25+
SchemeBuilder runtime.SchemeBuilder
26+
localSchemeBuilder = &SchemeBuilder
27+
// AddToScheme adds the types in this group-version to the given scheme.
28+
AddToScheme = localSchemeBuilder.AddToScheme
29+
)
30+
31+
func init() {
32+
localSchemeBuilder.Register(addKnownTypes)
33+
}
34+
35+
// Adds the list of known types to Scheme.
36+
func addKnownTypes(scheme *runtime.Scheme) error {
37+
scheme.AddKnownTypes(SchemeGroupVersion,
38+
&Lease{},
39+
&LeaseList{},
40+
)
41+
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
42+
return nil
43+
}

api/coordination/v1/types.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package v1
2+
3+
import (
4+
coordinationv1 "k8s.io/api/coordination/v1"
5+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
6+
"k8s.io/apimachinery/pkg/runtime"
7+
"k8s.io/apimachinery/pkg/runtime/schema"
8+
"sigs.k8s.io/apiserver-runtime/pkg/builder/resource"
9+
)
10+
11+
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
12+
// +k8s:openapi-gen=true
13+
// +kubebuilder:object:root=true
14+
15+
// Lease is a wrapper around the standard coordination.k8s.io/v1 Lease that
16+
// implements the apiserver-runtime resource.Object interface. It reuses the
17+
// upstream LeaseSpec so clients can use the standard coordination API.
18+
type Lease struct {
19+
metav1.TypeMeta `json:",inline"`
20+
metav1.ObjectMeta `json:"metadata,omitempty"`
21+
22+
Spec coordinationv1.LeaseSpec `json:"spec,omitempty"`
23+
}
24+
25+
// Implement resource.Object interface for apiserver-runtime.
26+
var _ resource.Object = &Lease{}
27+
28+
func (l *Lease) GetObjectMeta() *metav1.ObjectMeta {
29+
return &l.ObjectMeta
30+
}
31+
32+
func (l *Lease) NamespaceScoped() bool {
33+
return true
34+
}
35+
36+
func (l *Lease) New() runtime.Object {
37+
return &Lease{}
38+
}
39+
40+
func (l *Lease) NewList() runtime.Object {
41+
return &LeaseList{}
42+
}
43+
44+
func (l *Lease) GetGroupVersionResource() schema.GroupVersionResource {
45+
return schema.GroupVersionResource{
46+
Group: SchemeGroupVersion.Group,
47+
Version: SchemeGroupVersion.Version,
48+
Resource: "leases",
49+
}
50+
}
51+
52+
func (l *Lease) IsStorageVersion() bool {
53+
return true
54+
}
55+
56+
func (l *Lease) GetSingularName() string {
57+
return "lease"
58+
}
59+
60+
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
61+
// +kubebuilder:object:root=true
62+
63+
// LeaseList contains a list of Lease objects.
64+
type LeaseList struct {
65+
metav1.TypeMeta `json:",inline"`
66+
metav1.ListMeta `json:"metadata,omitempty"`
67+
Items []Lease `json:"items"`
68+
}
69+
70+
var _ resource.ObjectList = &LeaseList{}
71+
72+
func (ll *LeaseList) GetListMeta() *metav1.ListMeta {
73+
return &ll.ListMeta
74+
}

api/coordination/v1/zz_generated.deepcopy.go

Lines changed: 68 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/cmd/run/mirror.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import (
44
"context"
55
"fmt"
66

7+
"golang.org/x/sync/errgroup"
78
"k8s.io/apimachinery/pkg/runtime"
89
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
10+
coordinationclient "k8s.io/client-go/kubernetes/typed/coordination/v1"
911
"k8s.io/client-go/rest"
1012
ctrl "sigs.k8s.io/controller-runtime"
1113
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
@@ -42,10 +44,18 @@ func runKubeMirror(ctx context.Context, cfg *configv1alpha1.Config, mc *configv1
4244
return fmt.Errorf("failed to create Apoxy client: %w", err)
4345
}
4446

45-
reconciler := controllers.NewMirrorReconciler(mgr.GetClient(), apoxyClient, mc)
47+
coordClient, err := coordinationclient.NewForConfig(kCluster)
48+
if err != nil {
49+
return fmt.Errorf("failed to create coordination client: %w", err)
50+
}
51+
52+
reconciler := controllers.NewMirrorReconciler(mgr.GetClient(), apoxyClient, coordClient, mc)
4653
if err := reconciler.SetupWithManager(ctx, mgr); err != nil {
4754
return fmt.Errorf("failed to setup mirror reconciler: %w", err)
4855
}
4956

50-
return mgr.Start(ctx)
57+
g, ctx := errgroup.WithContext(ctx)
58+
g.Go(func() error { return mgr.Start(ctx) })
59+
g.Go(func() error { return reconciler.RunHeartbeat(ctx, mc.Namespace) })
60+
return g.Wait()
5161
}

0 commit comments

Comments
 (0)