Skip to content

Commit

Permalink
fix: App Labels node selector not getting attach in ci-workflow (#4084)
Browse files Browse the repository at this point in the history
* check for active ci_pipeline_material

* add appLabel nodeSelector and VolumeMountJson

* check for ci and job

* code review changes

* separate function for volume and volumeMounts

* move function to cicdconfig

* send address instead of value
  • Loading branch information
Ashish-devtron committed Oct 17, 2023
1 parent 4c2f42f commit 81e24a0
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 4 deletions.
44 changes: 44 additions & 0 deletions pkg/pipeline/CiCdConfig.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package pipeline

import (
"encoding/json"
"flag"
"fmt"
"github.com/caarlos0/env"
blob_storage "github.com/devtron-labs/common-lib/blob-storage"
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig"
"github.com/devtron-labs/devtron/pkg/pipeline/bean"
v12 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"os/user"
Expand Down Expand Up @@ -420,3 +422,45 @@ func (impl *CiCdConfig) WorkflowRetriesEnabled() bool {
return false
}
}

func (impl *CiCdConfig) GetWorkflowVolumeAndVolumeMounts() ([]v12.Volume, []v12.VolumeMount, error) {
var volumes []v12.Volume
var volumeMounts []v12.VolumeMount
volumeMountsForCiJson := impl.VolumeMountsForCiJson
if len(volumeMountsForCiJson) > 0 {
var volumeMountsForCi []CiVolumeMount
// Unmarshal or Decode the JSON to the interface.
err := json.Unmarshal([]byte(volumeMountsForCiJson), &volumeMountsForCi)
if err != nil {
return nil, nil, err
}

for _, volumeMountForCi := range volumeMountsForCi {
volumes = append(volumes, getWorkflowVolume(volumeMountForCi))
volumeMounts = append(volumeMounts, getWorkflowVolumeMounts(volumeMountForCi))
}
}
return volumes, volumeMounts, nil
}

func getWorkflowVolume(volumeMountForCi CiVolumeMount) v12.Volume {
hostPathDirectoryOrCreate := v12.HostPathDirectoryOrCreate

return v12.Volume{
Name: volumeMountForCi.Name,
VolumeSource: v12.VolumeSource{
HostPath: &v12.HostPathVolumeSource{
Path: volumeMountForCi.HostMountPath,
Type: &hostPathDirectoryOrCreate,
},
},
}

}

func getWorkflowVolumeMounts(volumeMountForCi CiVolumeMount) v12.VolumeMount {
return v12.VolumeMount{
Name: volumeMountForCi.Name,
MountPath: volumeMountForCi.ContainerMountPath,
}
}
31 changes: 29 additions & 2 deletions pkg/pipeline/WorkflowService.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pipeline

import (
"context"
"encoding/json"
"errors"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
v1alpha12 "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
Expand Down Expand Up @@ -137,10 +138,21 @@ func (impl *WorkflowServiceImpl) createWorkflowTemplate(workflowRequest *Workflo
workflowTemplate.Volumes = ExtractVolumesFromCmCs(workflowConfigMaps, workflowSecrets)

workflowRequest.AddNodeConstraintsFromConfig(&workflowTemplate, impl.ciCdConfig)
workflowMainContainer := workflowRequest.GetWorkflowMainContainer(impl.ciCdConfig, workflowJson, workflowTemplate, workflowConfigMaps, workflowSecrets)
workflowMainContainer, err := workflowRequest.GetWorkflowMainContainer(impl.ciCdConfig, workflowJson, &workflowTemplate, workflowConfigMaps, workflowSecrets)

if err != nil {
impl.Logger.Errorw("error occurred while getting workflow main container", "err", err)
return bean3.WorkflowTemplate{}, err
}

workflowTemplate.Containers = []v12.Container{workflowMainContainer}
impl.updateBlobStorageConfig(workflowRequest, &workflowTemplate)

if workflowRequest.Type == bean3.CI_WORKFLOW_PIPELINE_TYPE || workflowRequest.Type == bean3.JOB_WORKFLOW_PIPELINE_TYPE {
nodeSelector := impl.getAppLabelNodeSelector(workflowRequest)
if nodeSelector != nil {
workflowTemplate.NodeSelector = nodeSelector
}
}
if workflowRequest.Type == bean3.CD_WORKFLOW_PIPELINE_TYPE {
workflowTemplate.WfControllerInstanceID = impl.ciCdConfig.WfControllerInstanceID
workflowTemplate.TerminationGracePeriod = impl.ciCdConfig.TerminationGracePeriod
Expand Down Expand Up @@ -242,6 +254,21 @@ func (impl *WorkflowServiceImpl) updateBlobStorageConfig(workflowRequest *Workfl
workflowTemplate.CloudStorageKey = workflowRequest.BlobStorageLogsKey
}

func (impl *WorkflowServiceImpl) getAppLabelNodeSelector(workflowRequest *WorkflowRequest) map[string]string {
// node selector
if val, ok := workflowRequest.AppLabels[CI_NODE_SELECTOR_APP_LABEL_KEY]; ok && !(workflowRequest.CheckForJob() && workflowRequest.IsExtRun) {
var nodeSelectors map[string]string
// Unmarshal or Decode the JSON to the interface.
err := json.Unmarshal([]byte(val), &nodeSelectors)
if err != nil {
impl.Logger.Errorw("err in unmarshalling nodeSelectors", "err", err, "val", val)
return nil
}
return nodeSelectors
}
return nil
}

func (impl *WorkflowServiceImpl) getWorkflowExecutor(executorType pipelineConfig.WorkflowExecutorType) WorkflowExecutor {
if executorType == pipelineConfig.WORKFLOW_EXECUTOR_TYPE_AWF {
return impl.argoWorkflowExecutor
Expand Down
19 changes: 17 additions & 2 deletions pkg/pipeline/WorkflowUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ func (workflowRequest *WorkflowRequest) getWorkflowImage() string {
return ""
}
}
func (workflowRequest *WorkflowRequest) GetWorkflowMainContainer(config *CiCdConfig, workflowJson []byte, workflowTemplate bean.WorkflowTemplate, workflowConfigMaps []bean2.ConfigSecretMap, workflowSecrets []bean2.ConfigSecretMap) v12.Container {
func (workflowRequest *WorkflowRequest) GetWorkflowMainContainer(config *CiCdConfig, workflowJson []byte, workflowTemplate *bean.WorkflowTemplate, workflowConfigMaps []bean2.ConfigSecretMap, workflowSecrets []bean2.ConfigSecretMap) (v12.Container, error) {
privileged := true
pvc := workflowRequest.getPVCForWorkflowRequest()
containerEnvVariables := workflowRequest.getContainerEnvVariables(config, workflowJson)
Expand All @@ -710,7 +710,12 @@ func (workflowRequest *WorkflowRequest) GetWorkflowMainContainer(config *CiCdCon
Name: "app-data",
ContainerPort: 9102,
}}
err := updateVolumeMountsForCi(config, workflowTemplate, &workflowMainContainer)
if err != nil {
return workflowMainContainer, err
}
}

if len(pvc) != 0 {
buildPvcCachePath := config.BuildPvcCachePath
buildxPvcCachePath := config.BuildxPvcCachePath
Expand Down Expand Up @@ -740,11 +745,21 @@ func (workflowRequest *WorkflowRequest) GetWorkflowMainContainer(config *CiCdCon
})
}
UpdateContainerEnvsFromCmCs(&workflowMainContainer, workflowConfigMaps, workflowSecrets)
return workflowMainContainer
return workflowMainContainer, nil
}

func CheckIfReTriggerRequired(status, message, workflowRunnerStatus string) bool {
return ((status == string(v1alpha1.NodeError) || status == string(v1alpha1.NodeFailed)) &&
message == POD_DELETED_MESSAGE) && workflowRunnerStatus != WorkflowCancel

}

func updateVolumeMountsForCi(config *CiCdConfig, workflowTemplate *bean.WorkflowTemplate, workflowMainContainer *v12.Container) error {
volume, volumeMounts, err := config.GetWorkflowVolumeAndVolumeMounts()
if err != nil {
return err
}
workflowTemplate.Volumes = volume
workflowMainContainer.VolumeMounts = volumeMounts
return nil
}

0 comments on commit 81e24a0

Please sign in to comment.