From 57db65b5746d6e095e259c176835f55520bfbe9b Mon Sep 17 00:00:00 2001 From: vishal Date: Mon, 10 May 2021 19:45:40 -0400 Subject: [PATCH 1/2] Add worker label to worker pods to only stream logs from worker pods --- pkg/crds/controllers/batch/batchjob_controller_helpers.go | 2 ++ pkg/operator/endpoints/logs_job.go | 2 +- pkg/operator/resources/job/taskapi/k8s_specs.go | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/crds/controllers/batch/batchjob_controller_helpers.go b/pkg/crds/controllers/batch/batchjob_controller_helpers.go index badd2d9ad5..55aed8abf3 100644 --- a/pkg/crds/controllers/batch/batchjob_controller_helpers.go +++ b/pkg/crds/controllers/batch/batchjob_controller_helpers.go @@ -301,6 +301,7 @@ func (r *BatchJobReconciler) desiredWorkerJob(batchJob batch.BatchJob, apiSpec s "handlerID": apiSpec.HandlerID, "jobID": batchJob.Name, "cortex.dev/api": "true", + "worker": "true", }, PodSpec: k8s.PodSpec{ Labels: map[string]string{ @@ -311,6 +312,7 @@ func (r *BatchJobReconciler) desiredWorkerJob(batchJob batch.BatchJob, apiSpec s "handlerID": apiSpec.HandlerID, "jobID": batchJob.Name, "cortex.dev/api": "true", + "worker": "true", }, Annotations: map[string]string{ "traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0", diff --git a/pkg/operator/endpoints/logs_job.go b/pkg/operator/endpoints/logs_job.go index 28a6d9dd13..96dc2c552d 100644 --- a/pkg/operator/endpoints/logs_job.go +++ b/pkg/operator/endpoints/logs_job.go @@ -52,5 +52,5 @@ func ReadJobLogs(w http.ResponseWriter, r *http.Request) { } defer socket.Close() - operator.StreamLogsFromRandomPod(map[string]string{"apiName": apiName, "jobID": jobID}, socket) + operator.StreamLogsFromRandomPod(map[string]string{"apiName": apiName, "jobID": jobID, "worker": "true"}, socket) } diff --git a/pkg/operator/resources/job/taskapi/k8s_specs.go b/pkg/operator/resources/job/taskapi/k8s_specs.go index 1df9a94ad7..f634106533 100644 --- a/pkg/operator/resources/job/taskapi/k8s_specs.go +++ b/pkg/operator/resources/job/taskapi/k8s_specs.go @@ -82,6 +82,7 @@ func k8sJobSpec(api *spec.API, job *spec.TaskJob) *kbatch.Job { "jobID": job.ID, "apiKind": api.Kind.String(), "cortex.dev/api": "true", + "worker": "true", }, PodSpec: k8s.PodSpec{ Labels: map[string]string{ @@ -90,6 +91,7 @@ func k8sJobSpec(api *spec.API, job *spec.TaskJob) *kbatch.Job { "jobID": job.ID, "apiKind": api.Kind.String(), "cortex.dev/api": "true", + "worker": "true", }, Annotations: map[string]string{ "traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0", From 62cf2d8f83046bcd33b20a906d2a0ae1bf2ff9f7 Mon Sep 17 00:00:00 2001 From: vishal Date: Tue, 11 May 2021 10:36:44 -0400 Subject: [PATCH 2/2] Improve labels --- .../batch/batchjob_controller_helpers.go | 54 ++++++++++--------- pkg/operator/endpoints/logs_job.go | 8 ++- .../resources/job/taskapi/k8s_specs.go | 2 - 3 files changed, 35 insertions(+), 29 deletions(-) diff --git a/pkg/crds/controllers/batch/batchjob_controller_helpers.go b/pkg/crds/controllers/batch/batchjob_controller_helpers.go index 55aed8abf3..91d588a201 100644 --- a/pkg/crds/controllers/batch/batchjob_controller_helpers.go +++ b/pkg/crds/controllers/batch/batchjob_controller_helpers.go @@ -215,19 +215,21 @@ func (r *BatchJobReconciler) desiredEnqueuerJob(batchJob batch.BatchJob, queueUR Namespace: batchJob.Namespace, Parallelism: 1, Labels: map[string]string{ - "apiKind": userconfig.BatchAPIKind.String(), - "apiName": batchJob.Spec.APIName, - "apiID": batchJob.Spec.APIID, - "jobID": batchJob.Name, - "cortex.dev/api": "true", + "apiKind": userconfig.BatchAPIKind.String(), + "apiName": batchJob.Spec.APIName, + "apiID": batchJob.Spec.APIID, + "jobID": batchJob.Name, + "cortex.dev/api": "true", + "cortex.dev/batch": "enqueuer", }, PodSpec: k8s.PodSpec{ Labels: map[string]string{ - "apiKind": userconfig.BatchAPIKind.String(), - "apiName": batchJob.Spec.APIName, - "apiID": batchJob.Spec.APIID, - "jobID": batchJob.Name, - "cortex.dev/api": "true", + "apiKind": userconfig.BatchAPIKind.String(), + "apiName": batchJob.Spec.APIName, + "apiID": batchJob.Spec.APIID, + "jobID": batchJob.Name, + "cortex.dev/api": "true", + "cortex.dev/batch": "enqueuer", }, Annotations: map[string]string{ "traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0", @@ -294,25 +296,25 @@ func (r *BatchJobReconciler) desiredWorkerJob(batchJob batch.BatchJob, apiSpec s Namespace: batchJob.Namespace, Parallelism: batchJob.Spec.Workers, Labels: map[string]string{ - "apiKind": userconfig.BatchAPIKind.String(), - "apiName": batchJob.Spec.APIName, - "apiID": batchJob.Spec.APIID, - "specID": apiSpec.SpecID, - "handlerID": apiSpec.HandlerID, - "jobID": batchJob.Name, - "cortex.dev/api": "true", - "worker": "true", + "apiKind": userconfig.BatchAPIKind.String(), + "apiName": batchJob.Spec.APIName, + "apiID": batchJob.Spec.APIID, + "specID": apiSpec.SpecID, + "handlerID": apiSpec.HandlerID, + "jobID": batchJob.Name, + "cortex.dev/api": "true", + "cortex.dev/batch": "worker", }, PodSpec: k8s.PodSpec{ Labels: map[string]string{ - "apiKind": userconfig.BatchAPIKind.String(), - "apiName": batchJob.Spec.APIName, - "apiID": batchJob.Spec.APIID, - "specID": apiSpec.SpecID, - "handlerID": apiSpec.HandlerID, - "jobID": batchJob.Name, - "cortex.dev/api": "true", - "worker": "true", + "apiKind": userconfig.BatchAPIKind.String(), + "apiName": batchJob.Spec.APIName, + "apiID": batchJob.Spec.APIID, + "specID": apiSpec.SpecID, + "handlerID": apiSpec.HandlerID, + "jobID": batchJob.Name, + "cortex.dev/api": "true", + "cortex.dev/batch": "worker", }, Annotations: map[string]string{ "traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0", diff --git a/pkg/operator/endpoints/logs_job.go b/pkg/operator/endpoints/logs_job.go index 96dc2c552d..6232466569 100644 --- a/pkg/operator/endpoints/logs_job.go +++ b/pkg/operator/endpoints/logs_job.go @@ -52,5 +52,11 @@ func ReadJobLogs(w http.ResponseWriter, r *http.Request) { } defer socket.Close() - operator.StreamLogsFromRandomPod(map[string]string{"apiName": apiName, "jobID": jobID, "worker": "true"}, socket) + labels := map[string]string{"apiName": apiName, "jobID": jobID} + + if deployedResource.Kind == userconfig.BatchAPIKind { + labels["cortex.dev/batch"] = "worker" + } + + operator.StreamLogsFromRandomPod(labels, socket) } diff --git a/pkg/operator/resources/job/taskapi/k8s_specs.go b/pkg/operator/resources/job/taskapi/k8s_specs.go index f634106533..1df9a94ad7 100644 --- a/pkg/operator/resources/job/taskapi/k8s_specs.go +++ b/pkg/operator/resources/job/taskapi/k8s_specs.go @@ -82,7 +82,6 @@ func k8sJobSpec(api *spec.API, job *spec.TaskJob) *kbatch.Job { "jobID": job.ID, "apiKind": api.Kind.String(), "cortex.dev/api": "true", - "worker": "true", }, PodSpec: k8s.PodSpec{ Labels: map[string]string{ @@ -91,7 +90,6 @@ func k8sJobSpec(api *spec.API, job *spec.TaskJob) *kbatch.Job { "jobID": job.ID, "apiKind": api.Kind.String(), "cortex.dev/api": "true", - "worker": "true", }, Annotations: map[string]string{ "traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0",