Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 108 additions & 119 deletions controllers/gateway_controller.go

Large diffs are not rendered by default.

26 changes: 8 additions & 18 deletions controllers/gatewayclass_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,46 +28,36 @@ import (
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
gateway_api "sigs.k8s.io/gateway-api/apis/v1beta1"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

// GatewayClassReconciler reconciles a GatewayClass object
type GatewayClassReconciler struct {
type gatewayClassReconciler struct {
log gwlog.Logger
client client.Client
scheme *runtime.Scheme
latticeControllerEnabled bool
}

func RegisterGatewayClassController(log gwlog.Logger, mgr ctrl.Manager) error {
r := &GatewayClassReconciler{
r := &gatewayClassReconciler{
log: log,
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
latticeControllerEnabled: false,
}
return ctrl.NewControllerManagedBy(mgr).
For(&gateway_api.GatewayClass{}).
For(&gwv1beta1.GatewayClass{}).
Complete(r)
}

//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the GatewayClass object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
func (r *GatewayClassReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *gatewayClassReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.log.Infow("reconcile", "name", req.Name)

gwClass := &gateway_api.GatewayClass{}
gwClass := &gwv1beta1.GatewayClass{}
if err := r.client.Get(ctx, req.NamespacedName, gwClass); err != nil {
r.log.Debugw("gateway not found", "name", req.Name)
return ctrl.Result{}, nil
Expand All @@ -88,8 +78,8 @@ func (r *GatewayClassReconciler) Reconcile(ctx context.Context, req ctrl.Request
gwClass.Status.Conditions[0].LastTransitionTime = metav1.NewTime(time.Now())
gwClass.Status.Conditions[0].ObservedGeneration = gwClass.Generation
gwClass.Status.Conditions[0].Status = "True"
gwClass.Status.Conditions[0].Message = string(gateway_api.GatewayClassReasonAccepted)
gwClass.Status.Conditions[0].Reason = string(gateway_api.GatewayClassReasonAccepted)
gwClass.Status.Conditions[0].Message = string(gwv1beta1.GatewayClassReasonAccepted)
gwClass.Status.Conditions[0].Reason = string(gwv1beta1.GatewayClassReasonAccepted)

if err := r.client.Status().Patch(ctx, gwClass, client.MergeFrom(gwClassOld)); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to update gatewayclass status")
Expand Down
10 changes: 0 additions & 10 deletions controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

// podReconciler reconciles a Pod object
type podReconciler struct {
log gwlog.Logger
client client.Client
Expand All @@ -49,15 +48,6 @@ func RegisterPodController(log gwlog.Logger, mgr ctrl.Manager) error {
//+kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=core,resources=pods/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the Pod object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
func (r *podReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
pod := &corev1.Pod{}
if err := r.client.Get(ctx, req.NamespacedName, pod); err != nil {
Expand Down
131 changes: 54 additions & 77 deletions controllers/route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/source"
gateway_api_v1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gateway_api "sigs.k8s.io/gateway-api/apis/v1beta1"
gateway_api_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

"sigs.k8s.io/external-dns/endpoint"

Expand All @@ -52,7 +51,7 @@ import (
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/latticestore"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
latticemodel "github.com/aws/aws-application-networking-k8s/pkg/model/lattice"
model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice"
lattice_runtime "github.com/aws/aws-application-networking-k8s/pkg/runtime"
)

Expand All @@ -61,8 +60,7 @@ var routeTypeToFinalizer = map[core.RouteType]string{
core.GrpcRouteType: "grpcroute.k8s.aws/resources",
}

// RouteReconciler reconciles a HTTPRoute and GRPCRoute objects
type RouteReconciler struct {
type routeReconciler struct {
routeType core.RouteType
log gwlog.Logger
client client.Client
Expand Down Expand Up @@ -103,12 +101,12 @@ func RegisterAllRouteControllers(
routeType core.RouteType
gatewayApiType client.Object
}{
{core.HttpRouteType, &gateway_api_v1beta1.HTTPRoute{}},
{core.GrpcRouteType, &gateway_api_v1alpha2.GRPCRoute{}},
{core.HttpRouteType, &gwv1beta1.HTTPRoute{}},
{core.GrpcRouteType, &gwv1alpha2.GRPCRoute{}},
}

for _, routeInfo := range routeInfos {
reconciler := RouteReconciler{
reconciler := routeReconciler{
routeType: routeInfo.routeType,
log: log,
client: mgrClient,
Expand All @@ -126,9 +124,9 @@ func RegisterAllRouteControllers(

builder := ctrl.NewControllerManagedBy(mgr).
For(routeInfo.gatewayApiType).
Watches(&source.Kind{Type: &gateway_api_v1beta1.Gateway{}}, gwEventHandler).
Watches(&source.Kind{Type: &gwv1beta1.Gateway{}}, gwEventHandler).
Watches(&source.Kind{Type: &corev1.Service{}}, svcEventHandler.MapToRoute(routeInfo.routeType)).
Watches(&source.Kind{Type: &mcs_api.ServiceImport{}}, svcImportEventHandler.MapToRoute(routeInfo.routeType)).
Watches(&source.Kind{Type: &mcsv1alpha1.ServiceImport{}}, svcImportEventHandler.MapToRoute(routeInfo.routeType)).
Watches(&source.Kind{Type: &corev1.Endpoints{}}, svcEventHandler.MapToRoute(routeInfo.routeType))

if ok, err := k8s.IsGVKSupported(mgr, v1alpha1.GroupVersion.String(), v1alpha1.TargetGroupPolicyKind); ok {
Expand Down Expand Up @@ -162,20 +160,11 @@ func RegisterAllRouteControllers(
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=grpcroutes/status;httproutes/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=grpcroutes/finalizers;httproutes/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the Route object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *routeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
return lattice_runtime.HandleReconcileError(r.reconcile(ctx, req))
}

func (r *RouteReconciler) reconcile(ctx context.Context, req ctrl.Request) error {
func (r *routeReconciler) reconcile(ctx context.Context, req ctrl.Request) error {
route, err := r.getRoute(ctx, req)
if err != nil {
return client.IgnoreNotFound(err)
Expand All @@ -190,34 +179,29 @@ func (r *RouteReconciler) reconcile(ctx context.Context, req ctrl.Request) error
}

if !route.DeletionTimestamp().IsZero() {
r.log.Infow("reconcile, deleting", "name", req.Name)
r.eventRecorder.Event(route.K8sObject(), corev1.EventTypeNormal,
k8s.RouteEventReasonReconcile, "Deleting Reconcile")
if err := r.cleanupRouteResources(ctx, route); err != nil {
return fmt.Errorf("failed to cleanup GRPCRoute %v, %v: %w", route.Name(), route.Namespace(), err)
}
err = updateRouteListenerStatus(ctx, r.client, route)
if err != nil {
return err
}
err = r.finalizerManager.RemoveFinalizers(ctx, route.K8sObject(), routeTypeToFinalizer[r.routeType])
if err != nil {
return err
}

// TODO delete metrics
return nil
return r.reconcileDelete(ctx, req, route)
} else {
r.log.Infow("reconcile, adding or updating", "name", req.Name)
r.eventRecorder.Event(route.K8sObject(), corev1.EventTypeNormal,
k8s.RouteEventReasonReconcile, "Adding/Updating Reconcile")
err := r.reconcileRouteResource(ctx, route)
// TODO add/update metrics
return r.reconcileUpsert(ctx, req, route)
}
}

func (r *routeReconciler) reconcileDelete(ctx context.Context, req ctrl.Request, route core.Route) error {
r.log.Infow("reconcile, deleting", "name", req.Name)
r.eventRecorder.Event(route.K8sObject(), corev1.EventTypeNormal,
k8s.RouteEventReasonReconcile, "Deleting Reconcile")

if err := r.cleanupRouteResources(ctx, route); err != nil {
return fmt.Errorf("failed to cleanup GRPCRoute %v, %v: %w", route.Name(), route.Namespace(), err)
}

if err := updateRouteListenerStatus(ctx, r.client, route); err != nil {
return err
}

return r.finalizerManager.RemoveFinalizers(ctx, route.K8sObject(), routeTypeToFinalizer[r.routeType])
}

func (r *RouteReconciler) getRoute(ctx context.Context, req ctrl.Request) (core.Route, error) {
func (r *routeReconciler) getRoute(ctx context.Context, req ctrl.Request) (core.Route, error) {
switch r.routeType {
case core.HttpRouteType:
return core.GetHTTPRoute(ctx, r.client, req.NamespacedName)
Expand All @@ -229,7 +213,7 @@ func (r *RouteReconciler) getRoute(ctx context.Context, req ctrl.Request) (core.
}

func updateRouteListenerStatus(ctx context.Context, k8sClient client.Client, route core.Route) error {
gw := &gateway_api_v1beta1.Gateway{}
gw := &gwv1beta1.Gateway{}

gwNamespace := route.Namespace()
if route.Spec().ParentRefs()[0].Namespace != nil {
Expand All @@ -248,18 +232,18 @@ func updateRouteListenerStatus(ctx context.Context, k8sClient client.Client, rou
return UpdateGWListenerStatus(ctx, k8sClient, gw)
}

func (r *RouteReconciler) cleanupRouteResources(ctx context.Context, route core.Route) error {
func (r *routeReconciler) cleanupRouteResources(ctx context.Context, route core.Route) error {
_, _, err := r.buildAndDeployModel(ctx, route)
return err
}

func (r *RouteReconciler) isRouteRelevant(ctx context.Context, route core.Route) bool {
func (r *routeReconciler) isRouteRelevant(ctx context.Context, route core.Route) bool {
if len(route.Spec().ParentRefs()) == 0 {
r.log.Infof("Ignore Route which has no ParentRefs gateway %v ", route.Name())
return false
}

gw := &gateway_api_v1beta1.Gateway{}
gw := &gwv1beta1.Gateway{}

gwNamespace := route.Namespace()
if route.Spec().ParentRefs()[0].Namespace != nil {
Expand All @@ -277,7 +261,7 @@ func (r *RouteReconciler) isRouteRelevant(ctx context.Context, route core.Route)
}

// make sure gateway is an aws-vpc-lattice
gwClass := &gateway_api_v1beta1.GatewayClass{}
gwClass := &gwv1beta1.GatewayClass{}
gwClassName := types.NamespacedName{
Namespace: defaultNameSpace,
Name: string(gw.Spec.GatewayClassName),
Expand All @@ -297,10 +281,10 @@ func (r *RouteReconciler) isRouteRelevant(ctx context.Context, route core.Route)
return false
}

func (r *RouteReconciler) buildAndDeployModel(
func (r *routeReconciler) buildAndDeployModel(
ctx context.Context,
route core.Route,
) (core.Stack, *latticemodel.Service, error) {
) (core.Stack, *model.Service, error) {
stack, latticeService, err := r.modelBuilder.Build(ctx, route)

if err != nil {
Expand Down Expand Up @@ -332,7 +316,11 @@ func (r *RouteReconciler) buildAndDeployModel(
return stack, latticeService, err
}

func (r *RouteReconciler) reconcileRouteResource(ctx context.Context, route core.Route) error {
func (r *routeReconciler) reconcileUpsert(ctx context.Context, req ctrl.Request, route core.Route) error {
r.log.Infow("reconcile, adding or updating", "name", req.Name)
r.eventRecorder.Event(route.K8sObject(), corev1.EventTypeNormal,
k8s.RouteEventReasonReconcile, "Adding/Updating Reconcile")

if err := r.finalizerManager.AddFinalizers(ctx, route.K8sObject(), routeTypeToFinalizer[r.routeType]); err != nil {
r.eventRecorder.Event(route.K8sObject(), corev1.EventTypeWarning, k8s.RouteEventReasonFailedAddFinalizer, fmt.Sprintf("Failed add finalizer due to %v", err))
}
Expand All @@ -345,10 +333,10 @@ func (r *RouteReconciler) reconcileRouteResource(ctx context.Context, route core
route.Status().UpdateParentRefs(route.Spec().ParentRefs()[0], config.LatticeGatewayControllerName)

route.Status().UpdateRouteCondition(metav1.Condition{
Type: string(gateway_api.RouteConditionAccepted),
Type: string(gwv1beta1.RouteConditionAccepted),
Status: metav1.ConditionFalse,
ObservedGeneration: route.K8sObject().GetGeneration(),
Reason: string(gateway_api.RouteReasonUnsupportedValue),
Reason: string(gwv1beta1.RouteReasonUnsupportedValue),
Message: fmt.Sprintf("Dual stack Service is not supported"),
})

Expand Down Expand Up @@ -382,7 +370,7 @@ func (r *RouteReconciler) reconcileRouteResource(ctx context.Context, route core
return nil
}

func (r *RouteReconciler) updateRouteStatus(ctx context.Context, dns string, route core.Route) error {
func (r *routeReconciler) updateRouteStatus(ctx context.Context, dns string, route core.Route) error {
r.log.Debugf("Updating route %s-%s with DNS %s", route.Name(), route.Namespace(), dns)
routeOld := route.DeepCopy()

Expand All @@ -401,25 +389,25 @@ func (r *RouteReconciler) updateRouteStatus(ctx context.Context, dns string, rou
// Update listener Status
if err := updateRouteListenerStatus(ctx, r.client, route); err != nil {
route.Status().UpdateRouteCondition(metav1.Condition{
Type: string(gateway_api_v1beta1.RouteConditionAccepted),
Type: string(gwv1beta1.RouteConditionAccepted),
Status: metav1.ConditionFalse,
ObservedGeneration: route.K8sObject().GetGeneration(),
Reason: string(gateway_api_v1beta1.RouteReasonNoMatchingParent),
Reason: string(gwv1beta1.RouteReasonNoMatchingParent),
Message: fmt.Sprintf("Could not match gateway %s: %s", route.Spec().ParentRefs()[0].Name, err),
})
} else {
route.Status().UpdateRouteCondition(metav1.Condition{
Type: string(gateway_api_v1beta1.RouteConditionAccepted),
Type: string(gwv1beta1.RouteConditionAccepted),
Status: metav1.ConditionTrue,
ObservedGeneration: route.K8sObject().GetGeneration(),
Reason: string(gateway_api_v1beta1.RouteReasonAccepted),
Reason: string(gwv1beta1.RouteReasonAccepted),
Message: fmt.Sprintf("DNS Name: %s", dns),
})
route.Status().UpdateRouteCondition(metav1.Condition{
Type: string(gateway_api_v1beta1.RouteConditionResolvedRefs),
Type: string(gwv1beta1.RouteConditionResolvedRefs),
Status: metav1.ConditionTrue,
ObservedGeneration: route.K8sObject().GetGeneration(),
Reason: string(gateway_api_v1beta1.RouteReasonResolvedRefs),
Reason: string(gwv1beta1.RouteReasonResolvedRefs),
Message: fmt.Sprintf("DNS Name: %s", dns),
})
}
Expand All @@ -432,7 +420,7 @@ func (r *RouteReconciler) updateRouteStatus(ctx context.Context, dns string, rou
return nil
}

func (r *RouteReconciler) validateBackendRefsIpFamilies(ctx context.Context, route core.Route) error {
func (r *routeReconciler) validateBackendRefsIpFamilies(ctx context.Context, route core.Route) error {
rules := route.Spec().Rules()

for _, rule := range rules {
Expand All @@ -444,19 +432,8 @@ func (r *RouteReconciler) validateBackendRefsIpFamilies(ctx context.Context, rou
continue
}

svc := &corev1.Service{}

key := types.NamespacedName{
Name: string(backendRef.Name()),
}

if backendRef.Namespace() != nil {
key.Namespace = string(*backendRef.Namespace())
} else {
key.Namespace = route.Namespace()
}

if err := r.client.Get(ctx, key, svc); err != nil {
svc, err := gateway.GetServiceForBackendRef(ctx, r.client, route, backendRef)
if err != nil {
// Ignore error since Service might not be created yet
continue
}
Expand Down
Loading