Skip to content

Commit

Permalink
feat: add init containers to vertices. Closes numaproj#284
Browse files Browse the repository at this point in the history
Signed-off-by: David Seapy <dseapy@gmail.com>
  • Loading branch information
dseapy committed Oct 29, 2022
1 parent 88cf272 commit 03812de
Show file tree
Hide file tree
Showing 15 changed files with 14,166 additions and 6,515 deletions.
14 changes: 14 additions & 0 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -17271,6 +17271,13 @@
"x-kubernetes-patch-merge-key": "name",
"x-kubernetes-patch-strategy": "merge"
},
"initContainers": {
"description": "List of init containers belonging to the pod. More info: https://kubernetes.io/docs/concepts/workloads/pods/init-containers/",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.Container"
},
"type": "array"
},
"limits": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.VertexLimits",
"description": "Limits define the limitations such as buffer read batch size for all the vertices of a pipleine, will override pipeline level settings"
Expand Down Expand Up @@ -18701,6 +18708,13 @@
"x-kubernetes-patch-merge-key": "name",
"x-kubernetes-patch-strategy": "merge"
},
"initContainers": {
"description": "List of init containers belonging to the pod. More info: https://kubernetes.io/docs/concepts/workloads/pods/init-containers/",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.Container"
},
"type": "array"
},
"interStepBufferServiceName": {
"type": "string"
},
Expand Down
14 changes: 14 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -17279,6 +17279,13 @@
"x-kubernetes-patch-merge-key": "name",
"x-kubernetes-patch-strategy": "merge"
},
"initContainers": {
"description": "List of init containers belonging to the pod. More info: https://kubernetes.io/docs/concepts/workloads/pods/init-containers/",
"type": "array",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.Container"
}
},
"limits": {
"description": "Limits define the limitations such as buffer read batch size for all the vertices of a pipleine, will override pipeline level settings",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.VertexLimits"
Expand Down Expand Up @@ -18683,6 +18690,13 @@
"x-kubernetes-patch-merge-key": "name",
"x-kubernetes-patch-strategy": "merge"
},
"initContainers": {
"description": "List of init containers belonging to the pod. More info: https://kubernetes.io/docs/concepts/workloads/pods/init-containers/",
"type": "array",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.Container"
}
},
"interStepBufferServiceName": {
"type": "string"
},
Expand Down
1,270 changes: 1,270 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml

Large diffs are not rendered by default.

1,218 changes: 1,218 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml

Large diffs are not rendered by default.

8,678 changes: 5,583 additions & 3,095 deletions config/install.yaml

Large diffs are not rendered by default.

8,678 changes: 5,583 additions & 3,095 deletions config/namespace-install.yaml

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,20 @@ Settings for autoscaling
</p>
</td>
</tr>
<tr>
<td>
<code>initContainers</code></br> <em>
<a href="https://v1-18.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#container-v1-core">
\[\]Kubernetes core/v1.Container </a> </em>
</td>
<td>
<em>(Optional)</em>
<p>
List of init containers belonging to the pod. More info:
<a href="https://kubernetes.io/docs/concepts/workloads/pods/init-containers/">https://kubernetes.io/docs/concepts/workloads/pods/init-containers/</a>
</p>
</td>
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.Authorization">
Expand Down
675 changes: 369 additions & 306 deletions pkg/apis/numaflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 30 additions & 2 deletions pkg/apis/numaflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 17 additions & 12 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,28 +240,29 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
Affinity: v.Spec.Affinity,
ServiceAccountName: v.Spec.ServiceAccountName,
Volumes: append(volumes, v.Spec.Volumes...),
InitContainers: []corev1.Container{
v.getInitContainer(req),
},
Containers: containers,
InitContainers: v.getInitContainers(req),
Containers: containers,
}
return spec, nil
}

func (v Vertex) getInitContainer(req GetVertexPodSpecReq) corev1.Container {
func (v Vertex) getInitContainers(req GetVertexPodSpecReq) []corev1.Container {
envVars := []corev1.EnvVar{
{Name: EnvPipelineName, Value: v.Spec.PipelineName},
{Name: "GODEBUG", Value: os.Getenv("GODEBUG")},
}
envVars = append(envVars, req.Env...)
return corev1.Container{
Name: CtrInit,
Env: envVars,
Image: req.Image,
ImagePullPolicy: req.PullPolicy,
Resources: standardResources,
Args: []string{"isbsvc-buffer-validate", "--isbsvc-type=" + string(req.ISBSvcType)},
initContainers := []corev1.Container{
{
Name: CtrInit,
Env: envVars,
Image: req.Image,
ImagePullPolicy: req.PullPolicy,
Resources: standardResources,
Args: []string{"isbsvc-buffer-validate", "--isbsvc-type=" + string(req.ISBSvcType)},
},
}
return append(initContainers, v.Spec.InitContainers...)
}
func (vs VertexSpec) WithOutReplicas() VertexSpec {
zero := int32(0)
Expand Down Expand Up @@ -398,6 +399,10 @@ type AbstractVertex struct {
// Settings for autoscaling
// +optional
Scale Scale `json:"scale,omitempty" protobuf:"bytes,18,opt,name=scale"`
// List of init containers belonging to the pod.
// More info: https://kubernetes.io/docs/concepts/workloads/pods/init-containers/
// +optional
InitContainers []corev1.Container `json:"initContainers,omitempty" protobuf:"bytes,19,rep,name=initContainers"`
}

type Scale struct {
Expand Down
16 changes: 11 additions & 5 deletions pkg/apis/numaflow/v1alpha1/vertex_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func Test_VertexIsSink(t *testing.T) {
assert.True(t, o.IsASink())
}

func Test_VertexGetInitContainer(t *testing.T) {
func Test_VertexGetInitContainers(t *testing.T) {
req := GetVertexPodSpecReq{
ISBSvcType: ISBSvcTypeRedis,
Image: testFlowImage,
Expand All @@ -337,13 +337,19 @@ func Test_VertexGetInitContainer(t *testing.T) {
}
o := testVertex.DeepCopy()
o.Spec.Sink = &Sink{}
s := o.getInitContainer(req)
assert.Equal(t, CtrInit, s.Name)
o.Spec.InitContainers = []corev1.Container{
{Name: "my-test-init", Image: "my-test-init-image"},
}
s := o.getInitContainers(req)
assert.Len(t, s, 2)
assert.Equal(t, CtrInit, s[0].Name)
assert.Equal(t, "my-test-init", s[1].Name)
assert.Equal(t, "my-test-init-image", s[1].Image)
a := []string{}
for _, env := range s.Env {
for _, env := range s[0].Env {
a = append(a, env.Name)
}
for _, env := range s.Env {
for _, env := range s[0].Env {
assert.Contains(t, a, env.Name)
}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/numaflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/reconciler/pipeline/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ func validateVertex(v dfv1.AbstractVertex) error {
if min > max {
return fmt.Errorf("vertex %q: max number of replicas should be greater than or equal to min", v.Name)
}
for _, ic := range v.InitContainers {
if ic.Name == dfv1.CtrInit ||
ic.Name == dfv1.CtrMain ||
ic.Name == "numa" ||
ic.Name == dfv1.CtrUdf ||
ic.Name == dfv1.CtrUdsink {
return fmt.Errorf("vertex %q: init container name %q is reserved for containers created by numaflow", v.Name, ic.Name)
}
}
if v.UDF != nil {
return validateUDF(*v.UDF)
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/reconciler/pipeline/validate_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pipeline

import (
corev1 "k8s.io/api/core/v1"
"testing"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
Expand Down Expand Up @@ -205,6 +206,27 @@ func TestValidateVertex(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "or equal to")
})

t.Run("good init container", func(t *testing.T) {
v := dfv1.AbstractVertex{
InitContainers: []corev1.Container{
{Name: "my-test-image", Image: "my-image:latest"},
},
}
err := validateVertex(v)
assert.NoError(t, err)
})

t.Run("bad init container name", func(t *testing.T) {
v := dfv1.AbstractVertex{
InitContainers: []corev1.Container{
{Name: dfv1.CtrInit, Image: "my-image:latest"},
},
}
err := validateVertex(v)
assert.Error(t, err)
assert.Contains(t, err.Error(), "is reserved for containers created by numaflow")
})
}

func TestValidateUDF(t *testing.T) {
Expand Down

0 comments on commit 03812de

Please sign in to comment.