diff --git a/.gitignore b/.gitignore index fac43360..e2b0117e 100644 --- a/.gitignore +++ b/.gitignore @@ -78,3 +78,7 @@ tags .idea* .idea/* # End of https://www.gitignore.io/api/go,vim,emacs,visualstudiocode + +# dev files +cluster.yaml +supervisor-spec.yaml \ No newline at end of file diff --git a/.tool-versions b/.tool-versions new file mode 100644 index 00000000..b51e846f --- /dev/null +++ b/.tool-versions @@ -0,0 +1 @@ +kubebuilder 3.9.1 diff --git a/Makefile b/Makefile index 291d3c3a..2b2f60cd 100644 --- a/Makefile +++ b/Makefile @@ -96,9 +96,19 @@ deploy-testjob: ## Run a wikipedia test pod .PHONY: helm-install-druid-operator helm-install-druid-operator: ## Helm install to deploy the druid operator + helm upgrade --install \ + --namespace zookeeper \ + --create-namespace \ + zookeeper bitnami/zookeeper helm upgrade --install \ --namespace ${NAMESPACE_DRUID_OPERATOR} \ --create-namespace \ + --set auth.username=druid \ + --set auth.password=druid \ + --set auth.database=druid \ + mysql bitnami/mysql + helm upgrade --install \ + --namespace ${NAMESPACE_DRUID_OPERATOR} \ ${NAMESPACE_DRUID_OPERATOR} chart/ \ --set image.repository=${IMG_KIND} \ --set image.tag=${IMG_TAG} diff --git a/PROJECT b/PROJECT index afbe4fc0..f8ab6c5f 100644 --- a/PROJECT +++ b/PROJECT @@ -18,4 +18,8 @@ resources: kind: Druid path: github.com/datainfrahq/druid-operator/apis/druid/v1alpha1 version: v1alpha1 +- group: druid + kind: SupervisorSpec + path: github.com/datainfrahq/druid-operator/apis/supervisorspec/v1alpha1 + version: v1alpha1 version: "3" diff --git a/README.md b/README.md index d355f622..3522c561 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,9 @@ Feel free to join Kubernetes slack and join [druid-operator](https://kubernetes. - The operator supports CR of type ```Druid```. - ```Druid``` CR belongs to api Group ```druid.apache.org``` and version ```v1alpha1``` +- The operator supports CR of type ```SupervisorSpec```. +- ```SupervisorSpec``` CR belongs to api Group ```druid.apache.org``` and version ```v1alpha1``` + ### Druid Operator Architecture ![Druid Operator](docs/images/druid-operator.png?raw=true "Druid Operator") diff --git a/apis/druid/v1alpha1/supervisorspec_types.go b/apis/druid/v1alpha1/supervisorspec_types.go new file mode 100644 index 00000000..875b3229 --- /dev/null +++ b/apis/druid/v1alpha1/supervisorspec_types.go @@ -0,0 +1,45 @@ +/* + + */ + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! +// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +// SupervisorSpecSpec defines the desired state of SupervisorSpec +type SupervisorSpecSpec struct { + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster + // Important: Run "make" to regenerate code after modifying this file + + // Foo is an example field of SupervisorSpec. Edit supervisorspec_types.go to remove/update + ClusterRef string `json:"clusterRef,omitempty"` + SupervisorSpec string `json:"supervisorSpec,omitempty"` +} + +//+kubebuilder:object:root=true + +// SupervisorSpec is the Schema for the supervisorspecs API +type SupervisorSpec struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec SupervisorSpecSpec `json:"spec,omitempty"` +} + +//+kubebuilder:object:root=true + +// SupervisorSpecList contains a list of SupervisorSpec +type SupervisorSpecList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []SupervisorSpec `json:"items"` +} + +func init() { + SchemeBuilder.Register(&SupervisorSpec{}, &SupervisorSpecList{}) +} diff --git a/apis/druid/v1alpha1/zz_generated.deepcopy.go b/apis/druid/v1alpha1/zz_generated.deepcopy.go index 4d413372..c85ec172 100644 --- a/apis/druid/v1alpha1/zz_generated.deepcopy.go +++ b/apis/druid/v1alpha1/zz_generated.deepcopy.go @@ -617,6 +617,79 @@ func (in *MetadataStoreSpec) DeepCopy() *MetadataStoreSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SupervisorSpec) DeepCopyInto(out *SupervisorSpec) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SupervisorSpec. +func (in *SupervisorSpec) DeepCopy() *SupervisorSpec { + if in == nil { + return nil + } + out := new(SupervisorSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *SupervisorSpec) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SupervisorSpecList) DeepCopyInto(out *SupervisorSpecList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]SupervisorSpec, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SupervisorSpecList. +func (in *SupervisorSpecList) DeepCopy() *SupervisorSpecList { + if in == nil { + return nil + } + out := new(SupervisorSpecList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *SupervisorSpecList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SupervisorSpecSpec) DeepCopyInto(out *SupervisorSpecSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SupervisorSpecSpec. +func (in *SupervisorSpecSpec) DeepCopy() *SupervisorSpecSpec { + if in == nil { + return nil + } + out := new(SupervisorSpecSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ZookeeperSpec) DeepCopyInto(out *ZookeeperSpec) { *out = *in diff --git a/chart/templates/crds/druid.apache.org_supervisorspecs.yaml b/chart/templates/crds/druid.apache.org_supervisorspecs.yaml new file mode 100644 index 00000000..016940c6 --- /dev/null +++ b/chart/templates/crds/druid.apache.org_supervisorspecs.yaml @@ -0,0 +1,47 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.11.2 + creationTimestamp: null + name: supervisorspecs.druid.apache.org +spec: + group: druid.apache.org + names: + kind: SupervisorSpec + listKind: SupervisorSpecList + plural: supervisorspecs + singular: supervisorspec + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: SupervisorSpec is the Schema for the supervisorspecs API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: SupervisorSpecSpec defines the desired state of SupervisorSpec + properties: + clusterRef: + description: Foo is an example field of SupervisorSpec. Edit supervisorspec_types.go + to remove/update + type: string + supervisorSpec: + type: string + type: object + type: object + served: true + storage: true diff --git a/chart/templates/rbac_manager.yaml b/chart/templates/rbac_manager.yaml index 9136d9b8..8b5df99a 100644 --- a/chart/templates/rbac_manager.yaml +++ b/chart/templates/rbac_manager.yaml @@ -110,6 +110,7 @@ rules: - druid.apache.org resources: - druids + - supervisorspecs verbs: - create - delete @@ -122,6 +123,7 @@ rules: - druid.apache.org resources: - druids/status + - supervisorspecs/status verbs: - get - patch @@ -268,6 +270,7 @@ rules: - druid.apache.org resources: - druids + - supervisorspecs verbs: - create - delete @@ -280,6 +283,7 @@ rules: - druid.apache.org resources: - druids/status + - supervisorspecs/status verbs: - get - patch diff --git a/config/crd/bases/druid.apache.org_supervisorspecs.yaml b/config/crd/bases/druid.apache.org_supervisorspecs.yaml new file mode 100644 index 00000000..016940c6 --- /dev/null +++ b/config/crd/bases/druid.apache.org_supervisorspecs.yaml @@ -0,0 +1,47 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.11.2 + creationTimestamp: null + name: supervisorspecs.druid.apache.org +spec: + group: druid.apache.org + names: + kind: SupervisorSpec + listKind: SupervisorSpecList + plural: supervisorspecs + singular: supervisorspec + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: SupervisorSpec is the Schema for the supervisorspecs API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: SupervisorSpecSpec defines the desired state of SupervisorSpec + properties: + clusterRef: + description: Foo is an example field of SupervisorSpec. Edit supervisorspec_types.go + to remove/update + type: string + supervisorSpec: + type: string + type: object + type: object + served: true + storage: true diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index d0c0e5e3..e75030af 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -3,17 +3,20 @@ # It should be run by config/default resources: - bases/druid.apache.org_druids.yaml +- bases/druid.apache.org_supervisorspecs.yaml #+kubebuilder:scaffold:crdkustomizeresource patchesStrategicMerge: # [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix. # patches here are for enabling the conversion webhook for each CRD #- patches/webhook_in_druids.yaml +#- patches/webhook_in_supervisorspecs.yaml #+kubebuilder:scaffold:crdkustomizewebhookpatch # [CERTMANAGER] To enable cert-manager, uncomment all the sections with [CERTMANAGER] prefix. # patches here are for enabling the CA injection for each CRD #- patches/cainjection_in_druids.yaml +#- patches/cainjection_in_supervisorspecs.yaml #+kubebuilder:scaffold:crdkustomizecainjectionpatch # the following config is for teaching kustomize how to do kustomization for CRDs. diff --git a/config/crd/patches/cainjection_in_supervisorspecs.yaml b/config/crd/patches/cainjection_in_supervisorspecs.yaml new file mode 100644 index 00000000..e57afb78 --- /dev/null +++ b/config/crd/patches/cainjection_in_supervisorspecs.yaml @@ -0,0 +1,8 @@ +# The following patch adds a directive for certmanager to inject CA into the CRD +# CRD conversion requires k8s 1.13 or later. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) + name: supervisorspecs.druid.apache.org diff --git a/config/crd/patches/webhook_in_supervisorspecs.yaml b/config/crd/patches/webhook_in_supervisorspecs.yaml new file mode 100644 index 00000000..a414780f --- /dev/null +++ b/config/crd/patches/webhook_in_supervisorspecs.yaml @@ -0,0 +1,17 @@ +# The following patch enables conversion webhook for CRD +# CRD conversion requires k8s 1.13 or later. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: supervisorspecs.druid.apache.org +spec: + conversion: + strategy: Webhook + webhookClientConfig: + # this is "\n" used as a placeholder, otherwise it will be rejected by the apiserver for being blank, + # but we're going to set it later using the cert-manager (or potentially a patch if not using cert-manager) + caBundle: Cg== + service: + namespace: system + name: webhook-service + path: /convert diff --git a/config/druid.apache.org_supervisorspecs.yaml b/config/druid.apache.org_supervisorspecs.yaml new file mode 100644 index 00000000..835ee7fa --- /dev/null +++ b/config/druid.apache.org_supervisorspecs.yaml @@ -0,0 +1,49 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.11.2 + creationTimestamp: null + name: supervisorspecs.druid.apache.org +spec: + group: druid.apache.org + names: + kind: SupervisorSpec + listKind: SupervisorSpecList + plural: supervisorspecs + singular: supervisorspec + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: SupervisorSpec is the Schema for the supervisorspecs API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: SupervisorSpecSpec defines the desired state of SupervisorSpec + properties: + clusterRef: + description: Foo is an example field of SupervisorSpec. Edit supervisorspec_types.go + to remove/update + type: string + supervisorSpec: + type: string + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index ab6a4c2b..f614ca9b 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -119,6 +119,26 @@ rules: - get - patch - update +- apiGroups: + - druid.apache.org + resources: + - supervisorspecs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - druid.apache.org + resources: + - supervisorspecs/status + verbs: + - get + - patch + - update - apiGroups: - networking.k8s.io resources: diff --git a/config/rbac/supervisorspec_editor_role.yaml b/config/rbac/supervisorspec_editor_role.yaml new file mode 100644 index 00000000..1a452adf --- /dev/null +++ b/config/rbac/supervisorspec_editor_role.yaml @@ -0,0 +1,24 @@ +# permissions for end users to edit supervisorspecs. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: supervisorspec-editor-role +rules: +- apiGroups: + - druid.apache.org + resources: + - supervisorspecs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - druid.apache.org + resources: + - supervisorspecs/status + verbs: + - get diff --git a/config/rbac/supervisorspec_viewer_role.yaml b/config/rbac/supervisorspec_viewer_role.yaml new file mode 100644 index 00000000..b28a9edb --- /dev/null +++ b/config/rbac/supervisorspec_viewer_role.yaml @@ -0,0 +1,20 @@ +# permissions for end users to view supervisorspecs. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: supervisorspec-viewer-role +rules: +- apiGroups: + - druid.apache.org + resources: + - supervisorspecs + verbs: + - get + - list + - watch +- apiGroups: + - druid.apache.org + resources: + - supervisorspecs/status + verbs: + - get diff --git a/config/samples/druid_v1alpha1_supervisorspec.yaml b/config/samples/druid_v1alpha1_supervisorspec.yaml new file mode 100644 index 00000000..e11ee72e --- /dev/null +++ b/config/samples/druid_v1alpha1_supervisorspec.yaml @@ -0,0 +1,6 @@ +apiVersion: druid.apache.org/v1alpha1 +kind: SupervisorSpec +metadata: + name: supervisorspec-sample +spec: + # TODO(user): Add fields here diff --git a/controllers/druid/handler.go b/controllers/druid/handler.go index d2963f35..c360c655 100644 --- a/controllers/druid/handler.go +++ b/controllers/druid/handler.go @@ -33,8 +33,14 @@ import ( const ( druidOpResourceHash = "druidOpResourceHash" defaultCommonConfigMountPath = "/druid/conf/druid/_common" + defaultServicePortName = "service-port" toBeDeletedLabel = "toBeDeleted" deletionTSLabel = "deletionTS" + labelKeyComponent = "component" + labelKeyNodeSpecUniqueStr = "nodeSpecUniqueStr" + labelKeyDruidCr = "druid_cr" + labelKeyApp = "app" + nodeTypeRouter = "router" ) var logger = logf.Log.WithName("druid_operator_handler") @@ -842,7 +848,7 @@ func scalePVCForSts(ctx context.Context, sdk client.Client, nodeSpec *v1alpha1.D } pvcLabels := map[string]string{ - "component": nodeSpec.NodeType, + labelKeyComponent: nodeSpec.NodeType, } pvcList, err := readers.List(ctx, sdk, drd, pvcLabels, emitEvent, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object { @@ -1022,7 +1028,7 @@ func makeService(svc *v1.Service, nodeSpec *v1alpha1.DruidNodeSpec, m *v1alpha1. if svc.Spec.Ports == nil { svc.Spec.Ports = []v1.ServicePort{ { - Name: "service-port", + Name: defaultServicePortName, Port: nodeSpec.DruidPort, TargetPort: intstr.FromInt(int(nodeSpec.DruidPort)), }, @@ -1483,10 +1489,10 @@ func makeLabelsForNodeSpec(nodeSpec *v1alpha1.DruidNodeSpec, m *v1alpha1.Druid, labels[k] = v } - labels["app"] = "druid" - labels["druid_cr"] = clusterName - labels["nodeSpecUniqueStr"] = nodeSpecUniqueStr - labels["component"] = nodeSpec.NodeType + labels[labelKeyApp] = "druid" + labels[labelKeyDruidCr] = clusterName + labels[labelKeyNodeSpecUniqueStr] = nodeSpecUniqueStr + labels[labelKeyComponent] = nodeSpec.NodeType return labels } diff --git a/controllers/druid/supervisorspec_controller.go b/controllers/druid/supervisorspec_controller.go new file mode 100644 index 00000000..19c625f3 --- /dev/null +++ b/controllers/druid/supervisorspec_controller.go @@ -0,0 +1,455 @@ +/* + + */ + +package druid + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + druidv1alpha1 "github.com/datainfrahq/druid-operator/apis/druid/v1alpha1" + "github.com/go-logr/logr" + "github.com/go-resty/resty/v2" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/uuid" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type DruidApiSupervisor struct { + Id string `json:"id"` + State string `json:"state"` + DetailedState string `json:"detailedState"` + Healthy bool `json:"healthy"` + Spec json.RawMessage `json:"spec"` + Suspended bool `json:"suspended"` +} + +type DruidApiSupervisorSpecMinDataSchema struct { + DataSource string `json:"dataSource"` +} + +type DruidApiSupervisorSpecMin struct { + DataSchema DruidApiSupervisorSpecMinDataSchema `json:"dataSchema"` +} + +type SupervisorSpecStateEntry struct { + Id string `json:"id"` +} + +const ( + ActionCreate = "create" + ActionUpdate = "update" + ActionDelete = "delete" + SupervisorSpecConfigMap = "supervisor-specs-controller" + SupervisorListUrlPattern = "%s/druid/indexer/v1/supervisor" + SupervisorTerminateUrlPattern = "%s/druid/indexer/v1/supervisor/%s/terminate" + UrlPrefixPattern = "http://%s:%d" +) + +// SupervisorSpecReconciler reconciles a SupervisorSpec object +type SupervisorSpecReconciler struct { + client.Client + Log logr.Logger + Scheme *runtime.Scheme +} + +//+kubebuilder:rbac:groups=druid.apache.org,resources=supervisorspecs,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=druid.apache.org,resources=supervisorspecs/status,verbs=get;update;patch + +// SetupWithManager sets up the controller with the Manager. +func (r *SupervisorSpecReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&druidv1alpha1.SupervisorSpec{}). + Complete(r) +} + +func (r *SupervisorSpecReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + start := time.Now() + log := r.Log.WithName(string(uuid.NewUUID())) + log = log.WithValues("supervisorspec", req.NamespacedName) + + log.Info(fmt.Sprintf("reconciling SupervisorSpec (%s)", req.NamespacedName.String())) + + action := ActionCreate + supervisorCr := &druidv1alpha1.SupervisorSpec{} + err := r.Client.Get(ctx, req.NamespacedName, supervisorCr) + if err != nil { + if apierrors.IsNotFound(err) { + // Request object not found, could have been deleted after reconcile request. + // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers. + // Return and don't requeue + return ctrl.Result{}, nil + } + // Error reading the object - requeue the request. + return ctrl.Result{}, err + } + if supervisorCr.GetDeletionTimestamp() != nil { + action = ActionDelete + } + + supervisorSpec := supervisorCr.Spec + + urlPrefix, err := r.getDruidRouterUrlPrefix(ctx, log, req, supervisorSpec.ClusterRef) + if err != nil { + log.Info("failed to determine druid router url, will gracefully retry") + return ctrl.Result{ + Requeue: true, + RequeueAfter: LookupReconcileTime(), + }, nil + } + + requeue := false + switch action { + case ActionCreate: // and update + requeue, err = r.createOrUpdateSupervisorSpec(ctx, log, req, *urlPrefix, supervisorSpec) + if err != nil { + return ctrl.Result{}, err + } + if requeue { + return ctrl.Result{ + Requeue: true, + RequeueAfter: LookupReconcileTime(), + }, nil + } + case ActionDelete: + requeue, err = r.deleteSupervisorSpec(ctx, log, req, *urlPrefix) + if err != nil { + return ctrl.Result{}, err + } + if requeue { + return ctrl.Result{ + Requeue: true, + RequeueAfter: LookupReconcileTime(), + }, nil + } + default: + log.Error(fmt.Errorf("unexpected action: %s", action), "Error occurred") + } + + end := time.Now() + log.Info(fmt.Sprintf("reconciled SupervisorSpec (%s) in %s", req.NamespacedName.String(), end.Sub(start).String())) + + return ctrl.Result{}, err +} + +func objectKeyFromStringSlice(input []string, fallbackNamespace string) client.ObjectKey { + namespace := "" + name := "" + if len(input) < 2 { + namespace = "" + name = input[0] + } else { + namespace = input[0] + name = input[1] + } + if len(namespace) == 0 { + namespace = fallbackNamespace + } + + return client.ObjectKey{ + Namespace: namespace, + Name: name, + } +} + +func (r *SupervisorSpecReconciler) getSupervisorSpecStateEntry(ctx context.Context, log logr.Logger, req ctrl.Request) (*SupervisorSpecStateEntry, error) { + state := v1.ConfigMap{} + stateKey := objectKeyFromStringSlice([]string{req.Namespace, SupervisorSpecConfigMap}, "") + err := r.Client.Get(ctx, stateKey, &state) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } else { + log.Error(err, fmt.Sprintf("failed to get supervisor spec state configmap %s for reading", stateKey.String())) + return nil, err + } + } + + ssse := &SupervisorSpecStateEntry{} + + entry, ok := state.Data[req.Name] + if !ok { + return nil, nil + } + + err = json.Unmarshal([]byte(entry), ssse) + if err != nil { + log.Error(err, "failed to unmarshal supervisor spec state configmap") + return nil, err + } + + return ssse, nil +} + +func (r *SupervisorSpecReconciler) fetchDruidServicesWithNsList(ctx context.Context, log logr.Logger, clusterRefS string, specNamespace string) ([]string, bool, error) { + druid := &druidv1alpha1.Druid{} + clusterRef := strings.Split(clusterRefS, "/") + if len(clusterRef) < 1 { + log.Error(fmt.Errorf("clusterRef is empty"), "The cluster reference is invalid") + + return nil, false, nil + } + + druidObjectKey := objectKeyFromStringSlice(clusterRef, specNamespace) + err := r.Client.Get(ctx, druidObjectKey, druid) + if err != nil { + log.Error(err, "failed to get druid from k8s api") + } + + druidObjectServices := druid.Status.Services + if len(druidObjectServices) == 0 { + log.Error(fmt.Errorf("no services in druid spec"), "The services in the status field of the druid spec were empty") + + return nil, true, nil + } + + druidServices := make([]string, len(druidObjectServices), len(druidObjectServices)) + for i, service := range druidObjectServices { + druidServices[i] = fmt.Sprintf("%s.%s", service, specNamespace) + } + + return druidServices, false, nil +} + +func (r *SupervisorSpecReconciler) putSupervisorSpecStateEntry(ctx context.Context, log logr.Logger, req ctrl.Request, id string) error { + state := v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: SupervisorSpecConfigMap, + Namespace: req.Namespace, + }, + } + stateKey := objectKeyFromStringSlice([]string{req.Namespace, SupervisorSpecConfigMap}, "") + action := ActionUpdate + + err := r.Client.Get(ctx, stateKey, &state) + if err != nil { + if !apierrors.IsNotFound(err) { + log.Error(err, fmt.Sprintf("failed to get supervisor spec state configmap %s for update", stateKey.String())) + return err + } + + action = ActionCreate + } + + ssse := SupervisorSpecStateEntry{ + Id: id, + } + + entry, err := json.Marshal(ssse) + if err != nil { + log.Error(err, "failed to marshal supervisor spec state configmap") + return err + } + + if state.Data == nil { + state.Data = map[string]string{} + } + + state.Data[req.Name] = string(entry) + + if action == ActionCreate { + err = r.Client.Create(ctx, &state) + } else { + err = r.Client.Update(ctx, &state) + } + + if err != nil { + log.Error(err, "failed to set supervisor spec state configmap") + return err + } + + return nil +} + +func (r *SupervisorSpecReconciler) getDruidRouterUrlPrefix(ctx context.Context, log logr.Logger, req ctrl.Request, clusterRef string) (*string, error) { + opts := make([]client.ListOption, 0) + opts = append(opts, client.InNamespace(req.Namespace)) + opts = append(opts, client.MatchingLabels{ + labelKeyDruidCr: clusterRef, + labelKeyComponent: nodeTypeRouter, + }) + serviceList := v1.ServiceList{} + err := r.List(ctx, &serviceList, opts...) + if err != nil { + log.Error(err, "failed to fetch druid router pod") + return nil, err + } + + if len(serviceList.Items) != 1 { + log.Error(nil, fmt.Sprintf("found %d druid router services, but expected 1", len(serviceList.Items))) + return nil, fmt.Errorf("druid router pod not found") + } + + service := serviceList.Items[0] + // check spec for whether it needs an update + hostname := fmt.Sprintf("%s.%s", service.GetName(), service.GetNamespace()) + + port, err := getPortFromService(service) + if err != nil { + return nil, err + } + + prefix := fmt.Sprintf(UrlPrefixPattern, hostname, port) + url := fmt.Sprintf(SupervisorListUrlPattern, prefix) + + rst := resty.New() + druidResponse, err := rst.NewRequest(). + SetContext(ctx). + SetHeader("Accept", "application/json"). + Get(url) + if err != nil { + log.Error(err, fmt.Sprintf("request to druid router failed (%s)", url)) + return nil, err + } + + if druidResponse.StatusCode() > 299 { + log.Error(err, fmt.Sprintf("request to druid router failed (%s), unexpected status code", url)) + return nil, err + } + + return &prefix, nil +} + +func getPortFromService(s v1.Service) (int32, error) { + if len(s.Spec.Ports) == 1 { + return s.Spec.Ports[0].Port, nil + } + + for _, servicePort := range s.Spec.Ports { + if servicePort.Name == defaultServicePortName { + return servicePort.Port, nil + } + } + + return 0, fmt.Errorf("could not determine port for service") +} + +func (r *SupervisorSpecReconciler) getFullDruidSupervisorList(ctx context.Context, log logr.Logger, rst *resty.Client, urlPrefix string) ([]DruidApiSupervisor, error) { + url := fmt.Sprintf(SupervisorListUrlPattern, urlPrefix) + response, err := rst.NewRequest(). + SetContext(ctx). + SetHeader("Accept", "application/json"). + SetQueryParam("state", "true"). + SetQueryParam("full", "true"). + Get(url) + if err != nil { + log.Error(err, fmt.Sprintf("the request could not be successfully executed (%s)", url)) + return nil, err + } + + if response.StatusCode() > 399 { + return nil, nil + } + + responseBody := response.Body() + druidApiSupervisorList := make([]DruidApiSupervisor, 0) + + err = json.Unmarshal(responseBody, &druidApiSupervisorList) + if err != nil { + return nil, err + } + + return druidApiSupervisorList, nil +} + +func (r *SupervisorSpecReconciler) doCreateOrUpdateSupervisor(ctx context.Context, log logr.Logger, rst *resty.Client, urlPrefix string, spec string) (*string, error) { + url := fmt.Sprintf(SupervisorListUrlPattern, urlPrefix) + druidResponse, err := rst.NewRequest(). + SetContext(ctx). + SetHeader("Content-Type", "application/json"). + SetBody(spec). + Post(url) + if err != nil { + log.Error(err, fmt.Sprintf("the request could not be successfully executed (create: %s)", url)) + return nil, err + } + + v := map[string]string{} + err = json.Unmarshal(druidResponse.Body(), &v) + if err != nil { + log.Error(err, "failed to unmarshal supervisor spec creation response") + return nil, err + } + id := v["id"] + + return &id, nil +} + +func (r *SupervisorSpecReconciler) createOrUpdateSupervisorSpec(ctx context.Context, log logr.Logger, req ctrl.Request, urlPrefix string, k8sApiSupervisorSpec druidv1alpha1.SupervisorSpecSpec) (bool, error) { + // check spec for whether it needs an update + rst := resty.New() + + supervisorId, err := r.doCreateOrUpdateSupervisor(ctx, log, rst, urlPrefix, k8sApiSupervisorSpec.SupervisorSpec) + if err != nil { + return true, err + } + + err = r.putSupervisorSpecStateEntry(ctx, log, req, *supervisorId) + if err != nil { + log.Error(err, "failed to put supervisor spec state configmap") + return true, nil + } + + return false, nil +} + +func (r *SupervisorSpecReconciler) deleteSupervisorSpecStateRef(ctx context.Context, _ logr.Logger, req ctrl.Request) error { + state := v1.ConfigMap{} + stateKey := objectKeyFromStringSlice([]string{req.Namespace, SupervisorSpecConfigMap}, "") + err := r.Client.Get(ctx, stateKey, &state) + if err != nil { + return err + } + + delete(state.Data, req.Name) + + return r.Client.Update(ctx, &state) +} + +func (r *SupervisorSpecReconciler) deleteSupervisorSpec(ctx context.Context, log logr.Logger, req ctrl.Request, urlPrefix string) (bool, error) { + // check spec for whether it needs an update + rst := resty.New() + druidResponse := &resty.Response{} + url := "" + + state, err := r.getSupervisorSpecStateEntry(ctx, log, req) + if err != nil { + log.Error(err, "failed to get supervisor spec state configmap for delete action") + return false, nil + } + if state == nil { + log.Error(fmt.Errorf("state is nil"), "can not delete supervisor spec without reference in state") + return false, nil + } + + url = fmt.Sprintf(SupervisorTerminateUrlPattern, urlPrefix, state.Id) + druidResponse, err = rst.NewRequest(). + SetContext(ctx). + SetHeader("Content-Type", "application/json"). + Post(url) + + if err != nil { + log.Error(err, fmt.Sprintf("the request could not be successfully executed (delete: %s)", url)) + return true, nil + } + + if druidResponse.StatusCode() > 204 { + log.Error(fmt.Errorf("received status code %d", druidResponse.StatusCode()), "unexpected status code received") + } + + err = r.deleteSupervisorSpecStateRef(ctx, log, req) + if err != nil { + log.Error(err, "failed to delete supervisor spec state configmap for delete action") + return false, nil + } + + return false, nil +} diff --git a/go.mod b/go.mod index e652f54f..2389ae57 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,9 @@ go 1.20 require ( github.com/ghodss/yaml v1.0.0 github.com/go-logr/logr v1.2.4 + github.com/go-resty/resty/v2 v2.7.0 github.com/onsi/ginkgo/v2 v2.6.0 github.com/onsi/gomega v1.24.1 - github.com/stretchr/testify v1.8.1 k8s.io/api v0.26.3 k8s.io/apimachinery v0.26.3 k8s.io/client-go v0.26.3 @@ -41,7 +41,6 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect diff --git a/go.sum b/go.sum index 86506220..a14f5fc6 100644 --- a/go.sum +++ b/go.sum @@ -97,6 +97,8 @@ github.com/go-openapi/jsonreference v0.20.0/go.mod h1:Ag74Ico3lPc+zR+qjn4XBUmXym github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5Fng= github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= +github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY= +github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -257,18 +259,13 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -353,6 +350,7 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= diff --git a/main.go b/main.go index e0861689..1b65ba76 100644 --- a/main.go +++ b/main.go @@ -98,6 +98,15 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "Druid") os.Exit(1) } + + if err = (&druid.SupervisorSpecReconciler{ + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("SupervisorSpec"), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "SupervisorSpec") + os.Exit(1) + } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {