Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: App Labels node selector not getting attach in ci-workflow #4084

Merged
merged 12 commits into from
Oct 17, 2023
Merged
44 changes: 44 additions & 0 deletions pkg/pipeline/CiCdConfig.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package pipeline

import (
"encoding/json"
"flag"
"fmt"
"github.com/caarlos0/env"
blob_storage "github.com/devtron-labs/common-lib/blob-storage"
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig"
"github.com/devtron-labs/devtron/pkg/pipeline/bean"
v12 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"os/user"
Expand Down Expand Up @@ -420,3 +422,45 @@ func (impl *CiCdConfig) WorkflowRetriesEnabled() bool {
return false
}
}

func (impl *CiCdConfig) GetWorkflowVolumeAndVolumeMounts() ([]v12.Volume, []v12.VolumeMount, error) {
var volumes []v12.Volume
var volumeMounts []v12.VolumeMount
volumeMountsForCiJson := impl.VolumeMountsForCiJson
if len(volumeMountsForCiJson) > 0 {
var volumeMountsForCi []CiVolumeMount
// Unmarshal or Decode the JSON to the interface.
err := json.Unmarshal([]byte(volumeMountsForCiJson), &volumeMountsForCi)
if err != nil {
return nil, nil, err
}

for _, volumeMountForCi := range volumeMountsForCi {
volumes = append(volumes, getWorkflowVolume(volumeMountForCi))
volumeMounts = append(volumeMounts, getWorkflowVolumeMounts(volumeMountForCi))
}
}
return volumes, volumeMounts, nil
}

func getWorkflowVolume(volumeMountForCi CiVolumeMount) v12.Volume {
hostPathDirectoryOrCreate := v12.HostPathDirectoryOrCreate

return v12.Volume{
Name: volumeMountForCi.Name,
VolumeSource: v12.VolumeSource{
HostPath: &v12.HostPathVolumeSource{
Path: volumeMountForCi.HostMountPath,
Type: &hostPathDirectoryOrCreate,
},
},
}

}

func getWorkflowVolumeMounts(volumeMountForCi CiVolumeMount) v12.VolumeMount {
return v12.VolumeMount{
Name: volumeMountForCi.Name,
MountPath: volumeMountForCi.ContainerMountPath,
}
}
31 changes: 29 additions & 2 deletions pkg/pipeline/WorkflowService.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pipeline

import (
"context"
"encoding/json"
"errors"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
v1alpha12 "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
Expand Down Expand Up @@ -137,10 +138,21 @@ func (impl *WorkflowServiceImpl) createWorkflowTemplate(workflowRequest *Workflo
workflowTemplate.Volumes = ExtractVolumesFromCmCs(workflowConfigMaps, workflowSecrets)

workflowRequest.AddNodeConstraintsFromConfig(&workflowTemplate, impl.ciCdConfig)
workflowMainContainer := workflowRequest.GetWorkflowMainContainer(impl.ciCdConfig, workflowJson, workflowTemplate, workflowConfigMaps, workflowSecrets)
workflowMainContainer, err := workflowRequest.GetWorkflowMainContainer(impl.ciCdConfig, workflowJson, &workflowTemplate, workflowConfigMaps, workflowSecrets)

if err != nil {
impl.Logger.Errorw("error occurred while getting workflow main container", "err", err)
return bean3.WorkflowTemplate{}, err
}

workflowTemplate.Containers = []v12.Container{workflowMainContainer}
impl.updateBlobStorageConfig(workflowRequest, &workflowTemplate)

if workflowRequest.Type == bean3.CI_WORKFLOW_PIPELINE_TYPE || workflowRequest.Type == bean3.JOB_WORKFLOW_PIPELINE_TYPE {
nodeSelector := impl.getAppLabelNodeSelector(workflowRequest)
if nodeSelector != nil {
workflowTemplate.NodeSelector = nodeSelector
}
}
if workflowRequest.Type == bean3.CD_WORKFLOW_PIPELINE_TYPE {
workflowTemplate.WfControllerInstanceID = impl.ciCdConfig.WfControllerInstanceID
workflowTemplate.TerminationGracePeriod = impl.ciCdConfig.TerminationGracePeriod
Expand Down Expand Up @@ -242,6 +254,21 @@ func (impl *WorkflowServiceImpl) updateBlobStorageConfig(workflowRequest *Workfl
workflowTemplate.CloudStorageKey = workflowRequest.BlobStorageLogsKey
}

func (impl *WorkflowServiceImpl) getAppLabelNodeSelector(workflowRequest *WorkflowRequest) map[string]string {
// node selector
if val, ok := workflowRequest.AppLabels[CI_NODE_SELECTOR_APP_LABEL_KEY]; ok && !(workflowRequest.CheckForJob() && workflowRequest.IsExtRun) {
var nodeSelectors map[string]string
// Unmarshal or Decode the JSON to the interface.
err := json.Unmarshal([]byte(val), &nodeSelectors)
if err != nil {
impl.Logger.Errorw("err in unmarshalling nodeSelectors", "err", err, "val", val)
return nil
}
return nodeSelectors
}
return nil
}

func (impl *WorkflowServiceImpl) getWorkflowExecutor(executorType pipelineConfig.WorkflowExecutorType) WorkflowExecutor {
if executorType == pipelineConfig.WORKFLOW_EXECUTOR_TYPE_AWF {
return impl.argoWorkflowExecutor
Expand Down
19 changes: 17 additions & 2 deletions pkg/pipeline/WorkflowUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ func (workflowRequest *WorkflowRequest) getWorkflowImage() string {
return ""
}
}
func (workflowRequest *WorkflowRequest) GetWorkflowMainContainer(config *CiCdConfig, workflowJson []byte, workflowTemplate bean.WorkflowTemplate, workflowConfigMaps []bean2.ConfigSecretMap, workflowSecrets []bean2.ConfigSecretMap) v12.Container {
func (workflowRequest *WorkflowRequest) GetWorkflowMainContainer(config *CiCdConfig, workflowJson []byte, workflowTemplate *bean.WorkflowTemplate, workflowConfigMaps []bean2.ConfigSecretMap, workflowSecrets []bean2.ConfigSecretMap) (v12.Container, error) {
privileged := true
pvc := workflowRequest.getPVCForWorkflowRequest()
containerEnvVariables := workflowRequest.getContainerEnvVariables(config, workflowJson)
Expand All @@ -710,7 +710,12 @@ func (workflowRequest *WorkflowRequest) GetWorkflowMainContainer(config *CiCdCon
Name: "app-data",
ContainerPort: 9102,
}}
err := updateVolumeMountsForCi(config, workflowTemplate, &workflowMainContainer)
if err != nil {
return workflowMainContainer, err
}
}

if len(pvc) != 0 {
buildPvcCachePath := config.BuildPvcCachePath
buildxPvcCachePath := config.BuildxPvcCachePath
Expand Down Expand Up @@ -740,11 +745,21 @@ func (workflowRequest *WorkflowRequest) GetWorkflowMainContainer(config *CiCdCon
})
}
UpdateContainerEnvsFromCmCs(&workflowMainContainer, workflowConfigMaps, workflowSecrets)
return workflowMainContainer
return workflowMainContainer, nil
}

func CheckIfReTriggerRequired(status, message, workflowRunnerStatus string) bool {
return ((status == string(v1alpha1.NodeError) || status == string(v1alpha1.NodeFailed)) &&
message == POD_DELETED_MESSAGE) && workflowRunnerStatus != WorkflowCancel

}

func updateVolumeMountsForCi(config *CiCdConfig, workflowTemplate *bean.WorkflowTemplate, workflowMainContainer *v12.Container) error {
volume, volumeMounts, err := config.GetWorkflowVolumeAndVolumeMounts()
if err != nil {
return err
}
workflowTemplate.Volumes = volume
workflowMainContainer.VolumeMounts = volumeMounts
return nil
}
Loading