Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4694] Component initialization order adjustment, add resource constraints. #4693

Merged
merged 9 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions eventmesh-operator/api/v1/connectors_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type ConnectorsSpec struct {
Size int `json:"size"`
// ConnectorContainers define some configuration
ConnectorContainers []corev1.Container `json:"connectorContainers"`
// HostNetwork can be true or false
HostNetwork bool `json:"hostNetwork,omitempty"`
// DNSPolicy Set DNS policy for the pod.
DNSPolicy corev1.DNSPolicy `json:"dnsPolicy,omitempty"`
// Volumes define the connector config
Volumes []corev1.Volume `json:"volumes"`
// ImagePullPolicy defines how the image is pulled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2266,6 +2266,12 @@ spec:
type: string
type: object
type: object
dnsPolicy:
description: DNSPolicy Set DNS policy for the pod.
type: string
hostNetwork:
description: HostNetwork can be true or false
type: boolean
imagePullPolicy:
description: ImagePullPolicy defines how the image is pulled
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ spec:
size: 1
# imagePullPolicy is the image pull policy
imagePullPolicy: Always
# set DNS policy for the pod
# dnsPolicy: ClusterFirstWithHostNet
# define the connector-rocketmq container.
connectorContainers:
- name: connector-rocketmq
Expand All @@ -81,6 +83,14 @@ spec:
volumeMounts:
- mountPath: "/data/app/eventmesh-connector-rocketmq/conf"
name: connector-rocketmq-config
# resources describes the compute resource requirements and limits
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1024Mi"
cpu: "500m"
# configuration file settings to be mounted.
volumes:
- name: connector-rocketmq-config
Expand Down
23 changes: 22 additions & 1 deletion eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ data:
rocketmq.properties: |
#######################rocketmq-client##################
eventMesh.server.rocketmq.namesrvAddr=127.0.0.1:9876;127.0.0.1:9876
eventMesh.server.rocketmq.cluster=DefaultCluster
eventMesh.server.rocketmq.accessKey=********
eventMesh.server.rocketmq.secretKey=********

eventmesh.properties: |
###############################EVNETMESH-runtime ENV#################################
eventMesh.server.idc=DEFAULT
Expand Down Expand Up @@ -254,7 +258,16 @@ spec:
runtimePodTemplate:
template:
spec:
# hostNetwork: true
# Host networking requested for this pod. Use the host's network namespace.
# If this option is set, the ports that will be used must be specified.
# Default to false.
hostNetwork: false
# Defaults to "ClusterFirst"
# Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'.
# DNS parameters given in DNSConfig will be merged with the policy selected with DNSPolicy.
# To have DNS options set along with hostNetwork, you have to specify DNS policy
# explicitly to 'ClusterFirstWithHostNet'.
dnsPolicy: ClusterFirstWithHostNet
# define the runtime container.
containers:
- name: eventmesh-runtime
Expand All @@ -271,6 +284,14 @@ spec:
name: runtime-config
# imagePullPolicy is the image pull policy
imagePullPolicy: Always
# resources describes the compute resource requirements and limits
resources:
requests:
memory: "1024Mi"
cpu: "250m"
limits:
memory: "2048Mi"
cpu: "500m"
Pil0tXia marked this conversation as resolved.
Show resolved Hide resolved
# configuration file settings to be mounted.
volumes:
- name: runtime-config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
eventmeshoperatorv1 "github.com/apache/eventmesh/eventmesh-operator/api/v1"
"github.com/apache/eventmesh/eventmesh-operator/share"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -123,6 +124,15 @@ func (r ConnectorsReconciler) Reconcile(ctx context.Context, req reconcile.Reque
return reconcile.Result{}, err
}

for {
if share.IsEventMeshRuntimeInitialized {
break
} else {
r.Logger.Info("connector Waiting for runtime ready...")
time.Sleep(time.Duration(share.WaitForRuntimePodNameReadyInSecond) * time.Second)
}
}

connectorStatefulSet := r.getConnectorStatefulSet(connector)
// Check if the statefulSet already exists, if not create a new one
found := &appsv1.StatefulSet{}
Expand Down Expand Up @@ -192,7 +202,7 @@ func (r ConnectorsReconciler) Reconcile(ctx context.Context, req reconcile.Reque
}

r.Logger.Info("Successful reconciliation!")
return reconcile.Result{}, nil
return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(share.RequeueAfterSecond) * time.Second}, nil
}

func (r ConnectorsReconciler) getConnectorStatefulSet(connector *eventmeshoperatorv1.Connectors) *appsv1.StatefulSet {
Expand All @@ -217,13 +227,16 @@ func (r ConnectorsReconciler) getConnectorStatefulSet(connector *eventmeshoperat
Labels: getLabels(),
},
Spec: corev1.PodSpec{
HostNetwork: connector.Spec.HostNetwork,
DNSPolicy: connector.Spec.DNSPolicy,
ServiceAccountName: connector.Spec.ServiceAccountName,
Affinity: connector.Spec.Affinity,
Tolerations: connector.Spec.Tolerations,
NodeSelector: connector.Spec.NodeSelector,
PriorityClassName: connector.Spec.PriorityClassName,
ImagePullSecrets: connector.Spec.ImagePullSecrets,
Containers: []corev1.Container{{
Resources: connector.Spec.ConnectorContainers[0].Resources,
Image: connector.Spec.ConnectorContainers[0].Image,
Name: connector.Spec.ConnectorContainers[0].Name,
SecurityContext: getConnectorContainerSecurityContext(connector),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
eventmeshoperatorv1 "github.com/apache/eventmesh/eventmesh-operator/api/v1"
"github.com/apache/eventmesh/eventmesh-operator/share"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -231,8 +232,24 @@ func (r *RuntimeReconciler) Reconcile(ctx context.Context, req reconcile.Request
r.Logger.Error(err, "Not found eventmesh runtime pods")
}

runningEventMeshRuntimeNum := getRunningRuntimeNum(podList.Items)
if runningEventMeshRuntimeNum == eventMeshRuntime.Spec.Size {
share.IsEventMeshRuntimeInitialized = true
}

r.Logger.Info("Successful reconciliation!")
return reconcile.Result{}, nil

return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(share.RequeueAfterSecond) * time.Second}, nil
}

func getRunningRuntimeNum(pods []corev1.Pod) int {
var num = 0
for _, pod := range pods {
if reflect.DeepEqual(pod.Status.Phase, corev1.PodRunning) {
num++
}
}
return num
}

func getRuntimePodNames(pods []corev1.Pod) []string {
Expand Down Expand Up @@ -273,13 +290,14 @@ func (r *RuntimeReconciler) getEventMeshRuntimeStatefulSet(runtime *eventmeshope
Labels: label,
},
Spec: corev1.PodSpec{
DNSPolicy: corev1.DNSClusterFirstWithHostNet,
DNSPolicy: runtime.Spec.RuntimePodTemplate.Template.Spec.DNSPolicy,
Affinity: runtime.Spec.RuntimePodTemplate.Template.Spec.Affinity,
Tolerations: runtime.Spec.RuntimePodTemplate.Template.Spec.Tolerations,
NodeSelector: runtime.Spec.RuntimePodTemplate.Template.Spec.NodeSelector,
PriorityClassName: runtime.Spec.RuntimePodTemplate.Template.Spec.PriorityClassName,
HostNetwork: runtime.Spec.RuntimePodTemplate.Template.Spec.HostNetwork,
Containers: []corev1.Container{{
Resources: runtime.Spec.RuntimePodTemplate.Template.Spec.Containers[0].Resources,
Image: runtime.Spec.RuntimePodTemplate.Template.Spec.Containers[0].Image,
Name: runtime.Spec.RuntimePodTemplate.Template.Spec.Containers[0].Name,
SecurityContext: getContainerSecurityContext(runtime),
Expand Down
30 changes: 30 additions & 0 deletions eventmesh-operator/share/share.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package share

var (
// IsEventMeshRuntimeInitialized is whether the runtime list is initialized
IsEventMeshRuntimeInitialized = false
)

const (
// WaitForRuntimePodNameReadyInSecond is the time connector sleep for waiting runtime ready in second
WaitForRuntimePodNameReadyInSecond = 1
// RequeueAfterSecond is a universal interval of the reconcile function
RequeueAfterSecond = 6
)
Loading