/
knative_service.go
164 lines (147 loc) · 6.29 KB
/
knative_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package resourceprepares
import (
"context"
"fmt"
"reflect"
cappv1alpha1 "github.com/dana-team/container-app-operator/api/v1alpha1"
"github.com/dana-team/container-app-operator/internals/autoscale"
"github.com/dana-team/container-app-operator/internals/utils"
rclient "github.com/dana-team/container-app-operator/internals/wrappers"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
knativev1 "knative.dev/serving/pkg/apis/serving/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const (
danaAnnotationsPrefix = "rcs.dana.io"
cappDisabledState = "disabled"
cappEnabledState = "enabled"
DefaultAutoScaleCM = "autoscale-defaults"
CappNS = "capp-operator-system"
)
type KnativeServiceManager struct {
Ctx context.Context
K8sclient client.Client
Log logr.Logger
EventRecorder record.EventRecorder
}
// prepareResource generates a Knative Service definition from a given Capp resource.
func (k KnativeServiceManager) prepareResource(capp cappv1alpha1.Capp, ctx context.Context) knativev1.Service {
knativeServiceAnnotations := utils.FilterKeysWithoutPrefix(capp.Annotations, danaAnnotationsPrefix)
knativeServiceLabels := utils.FilterKeysWithoutPrefix(capp.Labels, danaAnnotationsPrefix)
knativeServiceLabels[CappResourceKey] = capp.Name
knativeService := knativev1.Service{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: capp.Name,
Namespace: capp.Namespace,
Labels: map[string]string{
CappResourceKey: capp.Name,
},
Annotations: knativeServiceAnnotations,
},
Spec: knativev1.ServiceSpec{
ConfigurationSpec: capp.Spec.ConfigurationSpec,
},
}
// Set defaults
knativeService.Spec.Template.Spec.EnableServiceLinks = new(bool)
knativeService.Spec.ConfigurationSpec.SetDefaults(ctx)
knativeService.Spec.RouteSpec.SetDefaults(ctx)
knativeService.Spec.Template.Spec.SetDefaults(ctx)
knativeService.Spec.ConfigurationSpec.Template.Spec.TimeoutSeconds = capp.Spec.RouteSpec.RouteTimeoutSeconds
volumes := k.prepareVolumes(capp)
knativeService.Spec.Template.Spec.Volumes = append(knativeService.Spec.Template.Spec.Volumes, volumes...)
defaulCM := corev1.ConfigMap{}
if err := k.K8sclient.Get(k.Ctx, types.NamespacedName{Namespace: CappNS, Name: DefaultAutoScaleCM}, &defaulCM); err != nil {
k.Log.Error(err, fmt.Sprintf("could not fetch configMap: %q", CappNS))
}
knativeService.Spec.Template.ObjectMeta.Annotations = utils.MergeMaps(knativeServiceAnnotations,
autoscale.SetAutoScaler(capp, defaulCM.Data))
knativeService.Spec.Template.ObjectMeta.Labels = knativeServiceLabels
return knativeService
}
// prepareVolumes generates a list of volumes to be used in a Knative Service definition from a given Capp resource.
func (k KnativeServiceManager) prepareVolumes(capp cappv1alpha1.Capp) []corev1.Volume {
volumes := []corev1.Volume{}
for _, nfsVolume := range capp.Spec.VolumesSpec.NFSVolumes {
volumes = append(volumes, corev1.Volume{
Name: nfsVolume.Name,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: nfsVolume.Name,
},
},
})
}
return volumes
}
// CleanUp attempts to delete the associated KnativeService for a given Capp resource.
// If the KnativeService is not found, the function completes without error.
// If any other errors occur during the deletion process,
// an error detailing the issue is returned.
func (k KnativeServiceManager) CleanUp(capp cappv1alpha1.Capp) error {
resourceManager := rclient.ResourceBaseManagerClient{Ctx: k.Ctx, K8sclient: k.K8sclient, Log: k.Log}
kservice := knativev1.Service{}
if err := resourceManager.DeleteResource(&kservice, capp.Name, capp.Namespace); err != nil {
if errors.IsNotFound(err) {
return nil
}
return fmt.Errorf("unable to delete KnativeService of Capp: %w", err)
}
return nil
}
// IsRequired determines if a Knative service (ksvc) is required based on the Capp's spec.
func (k KnativeServiceManager) IsRequired(capp cappv1alpha1.Capp) bool {
return capp.Spec.State == cappEnabledState
}
// isResumed checks whether the state changed from disabled to enabled.
func (k KnativeServiceManager) isResumed(capp cappv1alpha1.Capp) bool {
return capp.Status.StateStatus.State == cappDisabledState && k.IsRequired(capp) &&
!capp.Status.StateStatus.LastChange.IsZero()
}
// CreateOrUpdateObject ensures a KnativeService resource exists based on the provided Capp.
// If the Capp doesn't require a KnativeService, it triggers a cleanup.
// Otherwise, it either creates a new KnativeService or updates an existing one based on the Capp's specifications.
func (k KnativeServiceManager) CreateOrUpdateObject(capp cappv1alpha1.Capp) error {
knativeServiceFromCapp := k.prepareResource(capp, k.Ctx)
knativeService := knativev1.Service{}
resourceManager := rclient.ResourceBaseManagerClient{Ctx: k.Ctx, K8sclient: k.K8sclient, Log: k.Log}
if !k.IsRequired(capp) {
k.Log.Info("halting Capp")
k.EventRecorder.Event(&capp, corev1.EventTypeNormal, eventCappDisabled,
fmt.Sprintf("Capp %s state changed to disabled", capp.Name))
return k.CleanUp(capp)
} else {
if err := k.K8sclient.Get(k.Ctx, types.NamespacedName{Namespace: capp.Namespace, Name: capp.Name},
&knativeService); err != nil {
if errors.IsNotFound(err) {
if err := resourceManager.CreateResource(&knativeServiceFromCapp); err != nil {
k.EventRecorder.Event(&capp, corev1.EventTypeWarning, eventCappKnativeServiceCreationFailed,
fmt.Sprintf("Failed to create KnativeService %s for Capp %s",
knativeService.Name, capp.Name))
return fmt.Errorf("unable to create KnativeService for Capp: %w", err)
}
if k.isResumed(capp) {
k.Log.Info("Capp resumed")
k.EventRecorder.Event(&capp, corev1.EventTypeNormal, eventCappEnabled,
fmt.Sprintf("Capp %q state changed to enabled", capp.Name))
}
} else {
return err
}
return nil
}
if !reflect.DeepEqual(knativeService.Spec, knativeServiceFromCapp.Spec) {
knativeService.Spec = knativeServiceFromCapp.Spec
if err := resourceManager.UpdateResource(&knativeService); err != nil {
return fmt.Errorf("unable to update KnativeService of Capp: %w", err)
}
}
}
return nil
}