diff --git a/pkg/pipeline/CiCdConfig.go b/pkg/pipeline/CiCdConfig.go index af9d54d35ca..6cf984bef94 100644 --- a/pkg/pipeline/CiCdConfig.go +++ b/pkg/pipeline/CiCdConfig.go @@ -1,12 +1,14 @@ package pipeline import ( + "encoding/json" "flag" "fmt" "github.com/caarlos0/env" blob_storage "github.com/devtron-labs/common-lib/blob-storage" "github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig" "github.com/devtron-labs/devtron/pkg/pipeline/bean" + v12 "k8s.io/api/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "os/user" @@ -420,3 +422,45 @@ func (impl *CiCdConfig) WorkflowRetriesEnabled() bool { return false } } + +func (impl *CiCdConfig) GetWorkflowVolumeAndVolumeMounts() ([]v12.Volume, []v12.VolumeMount, error) { + var volumes []v12.Volume + var volumeMounts []v12.VolumeMount + volumeMountsForCiJson := impl.VolumeMountsForCiJson + if len(volumeMountsForCiJson) > 0 { + var volumeMountsForCi []CiVolumeMount + // Unmarshal or Decode the JSON to the interface. + err := json.Unmarshal([]byte(volumeMountsForCiJson), &volumeMountsForCi) + if err != nil { + return nil, nil, err + } + + for _, volumeMountForCi := range volumeMountsForCi { + volumes = append(volumes, getWorkflowVolume(volumeMountForCi)) + volumeMounts = append(volumeMounts, getWorkflowVolumeMounts(volumeMountForCi)) + } + } + return volumes, volumeMounts, nil +} + +func getWorkflowVolume(volumeMountForCi CiVolumeMount) v12.Volume { + hostPathDirectoryOrCreate := v12.HostPathDirectoryOrCreate + + return v12.Volume{ + Name: volumeMountForCi.Name, + VolumeSource: v12.VolumeSource{ + HostPath: &v12.HostPathVolumeSource{ + Path: volumeMountForCi.HostMountPath, + Type: &hostPathDirectoryOrCreate, + }, + }, + } + +} + +func getWorkflowVolumeMounts(volumeMountForCi CiVolumeMount) v12.VolumeMount { + return v12.VolumeMount{ + Name: volumeMountForCi.Name, + MountPath: volumeMountForCi.ContainerMountPath, + } +} diff --git a/pkg/pipeline/WorkflowService.go b/pkg/pipeline/WorkflowService.go index 223c52ecb86..9d25a19c434 100644 --- a/pkg/pipeline/WorkflowService.go +++ b/pkg/pipeline/WorkflowService.go @@ -19,6 +19,7 @@ package pipeline import ( "context" + "encoding/json" "errors" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" v1alpha12 "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1" @@ -137,10 +138,21 @@ func (impl *WorkflowServiceImpl) createWorkflowTemplate(workflowRequest *Workflo workflowTemplate.Volumes = ExtractVolumesFromCmCs(workflowConfigMaps, workflowSecrets) workflowRequest.AddNodeConstraintsFromConfig(&workflowTemplate, impl.ciCdConfig) - workflowMainContainer := workflowRequest.GetWorkflowMainContainer(impl.ciCdConfig, workflowJson, workflowTemplate, workflowConfigMaps, workflowSecrets) + workflowMainContainer, err := workflowRequest.GetWorkflowMainContainer(impl.ciCdConfig, workflowJson, &workflowTemplate, workflowConfigMaps, workflowSecrets) + + if err != nil { + impl.Logger.Errorw("error occurred while getting workflow main container", "err", err) + return bean3.WorkflowTemplate{}, err + } + workflowTemplate.Containers = []v12.Container{workflowMainContainer} impl.updateBlobStorageConfig(workflowRequest, &workflowTemplate) - + if workflowRequest.Type == bean3.CI_WORKFLOW_PIPELINE_TYPE || workflowRequest.Type == bean3.JOB_WORKFLOW_PIPELINE_TYPE { + nodeSelector := impl.getAppLabelNodeSelector(workflowRequest) + if nodeSelector != nil { + workflowTemplate.NodeSelector = nodeSelector + } + } if workflowRequest.Type == bean3.CD_WORKFLOW_PIPELINE_TYPE { workflowTemplate.WfControllerInstanceID = impl.ciCdConfig.WfControllerInstanceID workflowTemplate.TerminationGracePeriod = impl.ciCdConfig.TerminationGracePeriod @@ -242,6 +254,21 @@ func (impl *WorkflowServiceImpl) updateBlobStorageConfig(workflowRequest *Workfl workflowTemplate.CloudStorageKey = workflowRequest.BlobStorageLogsKey } +func (impl *WorkflowServiceImpl) getAppLabelNodeSelector(workflowRequest *WorkflowRequest) map[string]string { + // node selector + if val, ok := workflowRequest.AppLabels[CI_NODE_SELECTOR_APP_LABEL_KEY]; ok && !(workflowRequest.CheckForJob() && workflowRequest.IsExtRun) { + var nodeSelectors map[string]string + // Unmarshal or Decode the JSON to the interface. + err := json.Unmarshal([]byte(val), &nodeSelectors) + if err != nil { + impl.Logger.Errorw("err in unmarshalling nodeSelectors", "err", err, "val", val) + return nil + } + return nodeSelectors + } + return nil +} + func (impl *WorkflowServiceImpl) getWorkflowExecutor(executorType pipelineConfig.WorkflowExecutorType) WorkflowExecutor { if executorType == pipelineConfig.WORKFLOW_EXECUTOR_TYPE_AWF { return impl.argoWorkflowExecutor diff --git a/pkg/pipeline/WorkflowUtils.go b/pkg/pipeline/WorkflowUtils.go index 4263e408890..c020464564d 100644 --- a/pkg/pipeline/WorkflowUtils.go +++ b/pkg/pipeline/WorkflowUtils.go @@ -690,7 +690,7 @@ func (workflowRequest *WorkflowRequest) getWorkflowImage() string { return "" } } -func (workflowRequest *WorkflowRequest) GetWorkflowMainContainer(config *CiCdConfig, workflowJson []byte, workflowTemplate bean.WorkflowTemplate, workflowConfigMaps []bean2.ConfigSecretMap, workflowSecrets []bean2.ConfigSecretMap) v12.Container { +func (workflowRequest *WorkflowRequest) GetWorkflowMainContainer(config *CiCdConfig, workflowJson []byte, workflowTemplate *bean.WorkflowTemplate, workflowConfigMaps []bean2.ConfigSecretMap, workflowSecrets []bean2.ConfigSecretMap) (v12.Container, error) { privileged := true pvc := workflowRequest.getPVCForWorkflowRequest() containerEnvVariables := workflowRequest.getContainerEnvVariables(config, workflowJson) @@ -710,7 +710,12 @@ func (workflowRequest *WorkflowRequest) GetWorkflowMainContainer(config *CiCdCon Name: "app-data", ContainerPort: 9102, }} + err := updateVolumeMountsForCi(config, workflowTemplate, &workflowMainContainer) + if err != nil { + return workflowMainContainer, err + } } + if len(pvc) != 0 { buildPvcCachePath := config.BuildPvcCachePath buildxPvcCachePath := config.BuildxPvcCachePath @@ -740,7 +745,7 @@ func (workflowRequest *WorkflowRequest) GetWorkflowMainContainer(config *CiCdCon }) } UpdateContainerEnvsFromCmCs(&workflowMainContainer, workflowConfigMaps, workflowSecrets) - return workflowMainContainer + return workflowMainContainer, nil } func CheckIfReTriggerRequired(status, message, workflowRunnerStatus string) bool { @@ -748,3 +753,13 @@ func CheckIfReTriggerRequired(status, message, workflowRunnerStatus string) bool message == POD_DELETED_MESSAGE) && workflowRunnerStatus != WorkflowCancel } + +func updateVolumeMountsForCi(config *CiCdConfig, workflowTemplate *bean.WorkflowTemplate, workflowMainContainer *v12.Container) error { + volume, volumeMounts, err := config.GetWorkflowVolumeAndVolumeMounts() + if err != nil { + return err + } + workflowTemplate.Volumes = volume + workflowMainContainer.VolumeMounts = volumeMounts + return nil +}