Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: workflow refactoring #3714

Merged
merged 78 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
5ec094c
devtronContainerImageRepo varilable added
adi6859 Jul 6, 2023
effdf61
Merge remote-tracking branch 'refs/remotes/origin/main'
adi6859 Jul 11, 2023
e5abbfd
Merge remote-tracking branch 'refs/remotes/origin/main'
adi6859 Jul 18, 2023
cc82fa0
Add system workflow
Ashish-devtron Jul 26, 2023
2949a2a
Merge branch 'main' into workflow-refactoring
Ashish-devtron Jul 26, 2023
3a785b5
fix wiring
Ashish-devtron Jul 26, 2023
e1e8458
Merge remote-tracking branch 'refs/remotes/origin/main'
adi6859 Jul 26, 2023
1bff947
code refactored for ciconfig and cdconfig
adi6859 Jul 27, 2023
30a429a
terminate workflow
Ashish-devtron Jul 27, 2023
4daa212
Merge branch 'workflow-refactoring' into workflow-refactoring-2.0
adi6859 Jul 27, 2023
2cada3b
Main merge
Ashish-devtron Jul 27, 2023
a14d41a
Merge remote-tracking branch 'refs/remotes/origin/main'
adi6859 Jul 28, 2023
514a0eb
Merge branch 'main' into workflow-refactoring
adi6859 Jul 28, 2023
bb17538
merge workflow-refactoring in my branch
adi6859 Jul 28, 2023
1c10bbc
common Workflow Service
Ashish-devtron Jul 30, 2023
0606d02
common Workflow Service
Ashish-devtron Jul 30, 2023
4778514
common Workflow Service
Ashish-devtron Jul 30, 2023
d65cdaf
ciconfig and cdconfig merged along with refactoring in workflowservice
adi6859 Jul 30, 2023
0814ada
common Workflow Service
Ashish-devtron Jul 30, 2023
8bc9745
revert connection.go
Ashish-devtron Jul 30, 2023
fc73c7f
common Workflow Service
Ashish-devtron Jul 31, 2023
e219ff5
check for ciImage
Ashish-devtron Jul 31, 2023
7f4f043
getCiConfig and getCdConfig merged
adi6859 Jul 31, 2023
d37a5ff
existing cm cs for job
Ashish-devtron Jul 31, 2023
dc529c9
dev issue fixed
adi6859 Jul 31, 2023
58389af
unused error message removed
adi6859 Jul 31, 2023
54d4e2b
Merge remote-tracking branch 'refs/remotes/origin/main'
adi6859 Jul 31, 2023
c103dd1
Merge branch 'workflow-refactoring-2.0' into workflow-refactoring-final
adi6859 Jul 31, 2023
7a69f04
merge branches
Ashish-devtron Jul 31, 2023
07de6ab
wfClient code refactored
adi6859 Jul 31, 2023
6876f91
ci workflow type
Ashish-devtron Jul 31, 2023
6718d02
getWorkflow function removed
adi6859 Jul 31, 2023
8ed1abe
add TerminationGracePeriod in config
Ashish-devtron Jul 31, 2023
cb3b788
commented code removed
adi6859 Aug 1, 2023
0f75d75
Merge branch 'main' into workflow-refactoring-final
Ashish-devtron Aug 7, 2023
c2994c7
add environmentId to workflowRequest for job
Ashish-devtron Aug 8, 2023
5183e8e
Merge branch 'main' into workflow-refactoring-final
Ashish-devtron Aug 13, 2023
857b6a8
move struct to bean
Ashish-devtron Aug 13, 2023
9bb00b3
update workflow type
Ashish-devtron Aug 14, 2023
b3a4682
update workflow type
Ashish-devtron Aug 14, 2023
d93ea13
update workflow type
Ashish-devtron Aug 14, 2023
504c75c
update workflow type
Ashish-devtron Aug 14, 2023
d704ce0
function added in cicd config
adi6859 Aug 16, 2023
eb15c1c
add ciCdconfig
Ashish-devtron Aug 16, 2023
3b77b40
revert PG_PORT
Ashish-devtron Aug 16, 2023
670b908
Main merge
Ashish-devtron Aug 16, 2023
a20823f
generate wire_gen
Ashish-devtron Aug 16, 2023
ee91d3b
remove unwanted comments
Ashish-devtron Aug 16, 2023
cc0d31b
remove unwanted comments
Ashish-devtron Aug 16, 2023
7438f7b
change func name
Ashish-devtron Aug 16, 2023
fbcd12f
code review changes
Ashish-devtron Aug 17, 2023
c5e0554
config fix
Ashish-devtron Aug 18, 2023
3e61459
move ciCd stage to bean
Ashish-devtron Aug 18, 2023
b45053f
move ciCd stage to bean
Ashish-devtron Aug 18, 2023
419f32a
workflow refactoring
kripanshdevtron Aug 20, 2023
4157e99
check for job
Ashish-devtron Aug 21, 2023
50e586a
check for job
Ashish-devtron Aug 21, 2023
10958cf
add port for Ci and Job
Ashish-devtron Aug 21, 2023
12cfa03
add port for Ci and Job
Ashish-devtron Aug 21, 2023
eec5151
integration testing
Ashish-devtron Sep 4, 2023
6b7552d
integration testing for external job
Ashish-devtron Sep 4, 2023
8f4c54a
check for template length
Ashish-devtron Sep 6, 2023
e60e646
Main merge
Ashish-devtron Sep 6, 2023
11d0f06
revert unusual changes
Ashish-devtron Sep 6, 2023
169c9b8
blob check for ci and cd
Ashish-devtron Sep 6, 2023
75ae8bd
blob check for ci and cd
Ashish-devtron Sep 8, 2023
87fde2c
workflow IT
Ashish-devtron Sep 11, 2023
267b974
uncomment test
Ashish-devtron Sep 11, 2023
b0b3f89
Merge branch 'main' into workflow-refactoring-final
Ashish-devtron Sep 12, 2023
fc750b3
main merge
Ashish-devtron Sep 19, 2023
a261258
revert pg port
Ashish-devtron Sep 19, 2023
2af6848
Merge branch 'main' into workflow-refactoring-final
Ashish-devtron Sep 21, 2023
9ba32fe
scoped variable it test change
Ashish-devtron Sep 21, 2023
90e93cc
Main merge
Ashish-devtron Sep 25, 2023
b095cf4
Main merge
Ashish-devtron Sep 25, 2023
e16f100
main Merge
Ashish-devtron Sep 25, 2023
51c38dc
remove datas from IT
Ashish-devtron Sep 26, 2023
24edc2a
remove TODO comments
Ashish-devtron Sep 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions Wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func InitializeApp() (*App, error) {
repository.NewNotificationSettingsRepositoryImpl,
wire.Bind(new(repository.NotificationSettingsRepository), new(*repository.NotificationSettingsRepositoryImpl)),
util.IntValidator,
pipeline.GetCiConfig,
pipeline.GetCiCdConfig,

pipeline.NewWorkflowServiceImpl,
wire.Bind(new(pipeline.WorkflowService), new(*pipeline.WorkflowServiceImpl)),
Expand Down Expand Up @@ -533,17 +533,13 @@ func InitializeApp() (*App, error) {
pipelineConfig.NewCdWorkflowRepositoryImpl,
wire.Bind(new(pipelineConfig.CdWorkflowRepository), new(*pipelineConfig.CdWorkflowRepositoryImpl)),

pipeline.NewCdWorkflowServiceImpl,
wire.Bind(new(pipeline.CdWorkflowService), new(*pipeline.CdWorkflowServiceImpl)),

pipeline.NewCdHandlerImpl,
wire.Bind(new(pipeline.CdHandler), new(*pipeline.CdHandlerImpl)),

pipeline.NewWorkflowDagExecutorImpl,
wire.Bind(new(pipeline.WorkflowDagExecutor), new(*pipeline.WorkflowDagExecutorImpl)),
appClone.NewAppCloneServiceImpl,
wire.Bind(new(appClone.AppCloneService), new(*appClone.AppCloneServiceImpl)),
pipeline.GetCdConfig,

router.NewDeploymentGroupRouterImpl,
wire.Bind(new(router.DeploymentGroupRouter), new(*router.DeploymentGroupRouterImpl)),
Expand Down Expand Up @@ -879,6 +875,8 @@ func InitializeApp() (*App, error) {
wire.Bind(new(appGroup2.AppGroupMappingRepository), new(*appGroup2.AppGroupMappingRepositoryImpl)),
pipeline.NewArgoWorkflowExecutorImpl,
wire.Bind(new(pipeline.ArgoWorkflowExecutor), new(*pipeline.ArgoWorkflowExecutorImpl)),
pipeline.NewSystemWorkflowExecutorImpl,
wire.Bind(new(pipeline.SystemWorkflowExecutor), new(*pipeline.SystemWorkflowExecutorImpl)),
repository5.NewManifestPushConfigRepository,
wire.Bind(new(repository5.ManifestPushConfigRepository), new(*repository5.ManifestPushConfigRepositoryImpl)),
app.NewGitOpsManifestPushServiceImpl,
Expand Down
4 changes: 2 additions & 2 deletions api/restHandler/PubSubClientRestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ type PubSubClientRestHandler interface {
type PubSubClientRestHandlerImpl struct {
pubsubClient *pubsub.PubSubClientServiceImpl
logger *zap.SugaredLogger
cdConfig *pipeline.CdConfig
cdConfig *pipeline.CiCdConfig
}

type PublishRequest struct {
Topic string `json:"topic"`
Payload json.RawMessage `json:"payload"`
}

func NewPubSubClientRestHandlerImpl(pubsubClient *pubsub.PubSubClientServiceImpl, logger *zap.SugaredLogger, cdConfig *pipeline.CdConfig) *PubSubClientRestHandlerImpl {
func NewPubSubClientRestHandlerImpl(pubsubClient *pubsub.PubSubClientServiceImpl, logger *zap.SugaredLogger, cdConfig *pipeline.CiCdConfig) *PubSubClientRestHandlerImpl {
return &PubSubClientRestHandlerImpl{
pubsubClient: pubsubClient,
logger: logger,
Expand Down
27 changes: 14 additions & 13 deletions api/router/pubsub/CiEventHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/devtron-labs/devtron/internal/sql/repository"
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig"
"github.com/devtron-labs/devtron/pkg/pipeline"
bean2 "github.com/devtron-labs/devtron/pkg/pipeline/bean"
"github.com/devtron-labs/devtron/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -53,19 +54,19 @@ type CiEventHandlerImpl struct {
}

type CiCompleteEvent struct {
CiProjectDetails []pipeline.CiProjectDetails `json:"ciProjectDetails"`
DockerImage string `json:"dockerImage" validate:"required,image-validator"`
Digest string `json:"digest"`
PipelineId int `json:"pipelineId"`
WorkflowId *int `json:"workflowId"`
TriggeredBy int32 `json:"triggeredBy"`
PipelineName string `json:"pipelineName"`
DataSource string `json:"dataSource"`
MaterialType string `json:"materialType"`
Metrics util.CIMetrics `json:"metrics"`
AppName string `json:"appName"`
IsArtifactUploaded bool `json:"isArtifactUploaded"`
FailureReason string `json:"failureReason"`
CiProjectDetails []bean2.CiProjectDetails `json:"ciProjectDetails"`
DockerImage string `json:"dockerImage" validate:"required,image-validator"`
Digest string `json:"digest"`
PipelineId int `json:"pipelineId"`
WorkflowId *int `json:"workflowId"`
TriggeredBy int32 `json:"triggeredBy"`
PipelineName string `json:"pipelineName"`
DataSource string `json:"dataSource"`
MaterialType string `json:"materialType"`
Metrics util.CIMetrics `json:"metrics"`
AppName string `json:"appName"`
IsArtifactUploaded bool `json:"isArtifactUploaded"`
FailureReason string `json:"failureReason"`
}

func NewCiEventHandlerImpl(logger *zap.SugaredLogger, pubsubClient *pubsub.PubSubClientServiceImpl, webhookService pipeline.WebhookService, ciEventConfig *CiEventConfig) *CiEventHandlerImpl {
Expand Down
1 change: 0 additions & 1 deletion api/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ func NewMuxRouter(logger *zap.SugaredLogger, HelmRouter PipelineTriggerRouter, P
appGroupingRouter: appGroupingRouter,
rbacRoleRouter: rbacRoleRouter,
scopedVariableRouter: scopedVariableRouter,

}
return r
}
Expand Down
108 changes: 77 additions & 31 deletions pkg/pipeline/ArgoWorkflowExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,43 @@ package pipeline
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
v1alpha12 "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/util"
bean2 "github.com/devtron-labs/devtron/api/bean"
"github.com/devtron-labs/devtron/pkg/pipeline/bean"
"go.uber.org/zap"
v12 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/rest"
"net/url"
)

const (
STEP_NAME_REGEX = "create-env-%s-gb-%d"
TEMPLATE_NAME_REGEX = "%s-gb-%d"
WORKFLOW_MINIO_CRED = "workflow-minio-cred"
CRED_ACCESS_KEY = "accessKey"
CRED_SECRET_KEY = "secretKey"
STEP_NAME_REGEX = "create-env-%s-gb-%d"
TEMPLATE_NAME_REGEX = "%s-gb-%d"
WORKFLOW_MINIO_CRED = "workflow-minio-cred"
CRED_ACCESS_KEY = "accessKey"
CRED_SECRET_KEY = "secretKey"
S3_ENDPOINT_URL = "s3.amazonaws.com"
DEVTRON_WORKFLOW_LABEL_KEY = "devtron.ai/workflow-purpose"
DEVTRON_WORKFLOW_LABEL_VALUE = "cd"
WORKFLOW_GENERATE_NAME_REGEX = "%s-"
RESOURCE_CREATE_ACTION = "create"
)

var ACCESS_KEY_SELECTOR = &v12.SecretKeySelector{Key: CRED_ACCESS_KEY, LocalObjectReference: v12.LocalObjectReference{Name: WORKFLOW_MINIO_CRED}}
var SECRET_KEY_SELECTOR = &v12.SecretKeySelector{Key: CRED_SECRET_KEY, LocalObjectReference: v12.LocalObjectReference{Name: WORKFLOW_MINIO_CRED}}

type WorkflowExecutor interface {
ExecuteWorkflow(workflowTemplate bean.WorkflowTemplate) error
ExecuteWorkflow(workflowTemplate bean.WorkflowTemplate) (*unstructured.UnstructuredList, error)
TerminateWorkflow(workflowName string, namespace string, clusterConfig *rest.Config) error
}

type ArgoWorkflowExecutor interface {
Expand All @@ -41,37 +54,51 @@ func NewArgoWorkflowExecutorImpl(logger *zap.SugaredLogger) *ArgoWorkflowExecuto
return &ArgoWorkflowExecutorImpl{logger: logger}
}

func (impl *ArgoWorkflowExecutorImpl) ExecuteWorkflow(workflowTemplate bean.WorkflowTemplate) error {
func (impl *ArgoWorkflowExecutorImpl) TerminateWorkflow(workflowName string, namespace string, clusterConfig *rest.Config) error {
impl.logger.Debugw("terminating wf", "name", workflowName)
wfClient, err := impl.getClientInstance(namespace, clusterConfig)
if err != nil {
impl.logger.Errorw("cannot build wf client", "wfName", workflowName, "err", err)
return err
}
_, err = wfClient.Get(context.Background(), workflowName, v1.GetOptions{})
if err != nil {
impl.logger.Errorw("cannot find workflow", "name", workflowName, "err", err)
return errors.New("cannot find workflow " + workflowName)
}
err = util.TerminateWorkflow(context.Background(), wfClient, workflowName)
return err
}

func (impl *ArgoWorkflowExecutorImpl) ExecuteWorkflow(workflowTemplate bean.WorkflowTemplate) (*unstructured.UnstructuredList, error) {

entryPoint := CD_WORKFLOW_NAME
entryPoint := workflowTemplate.WorkflowType
// get cm and cs argo step templates
templates, err := impl.getArgoTemplates(workflowTemplate.ConfigMaps, workflowTemplate.Secrets)
templates, err := impl.getArgoTemplates(workflowTemplate.ConfigMaps, workflowTemplate.Secrets, workflowTemplate.WorkflowType == bean.CI_WORKFLOW_NAME)
if err != nil {
impl.logger.Errorw("error occurred while fetching argo templates and steps", "err", err)
return err
return nil, err
}
if len(templates) > 0 {
entryPoint = CD_WORKFLOW_WITH_STAGES
entryPoint = workflowTemplate.GetEntrypoint()
}

wfContainer := workflowTemplate.Containers[0]
cdTemplate := v1alpha1.Template{
Name: CD_WORKFLOW_NAME,
ciCdTemplate := v1alpha1.Template{
Name: workflowTemplate.WorkflowType,
Container: &wfContainer,
ActiveDeadlineSeconds: &intstr.IntOrString{
IntVal: int32(*workflowTemplate.ActiveDeadlineSeconds),
},
}
impl.updateBlobStorageConfig(workflowTemplate, &cdTemplate)
templates = append(templates, cdTemplate)
impl.updateBlobStorageConfig(workflowTemplate, &ciCdTemplate)
templates = append(templates, ciCdTemplate)

objectMeta := workflowTemplate.CreateObjectMetadata()

var (
cdWorkflow = v1alpha1.Workflow{
ObjectMeta: v1.ObjectMeta{
GenerateName: workflowTemplate.WorkflowNamePrefix + "-",
Annotations: map[string]string{"workflows.argoproj.io/controller-instanceid": workflowTemplate.WfControllerInstanceID},
Labels: map[string]string{"devtron.ai/workflow-purpose": "cd"},
},
ciCdWorkflow = v1alpha1.Workflow{
ObjectMeta: *objectMeta,
Spec: v1alpha1.WorkflowSpec{
ServiceAccountName: workflowTemplate.ServiceAccountName,
NodeSelector: workflowTemplate.NodeSelector,
Expand All @@ -86,26 +113,36 @@ func (impl *ArgoWorkflowExecutorImpl) ExecuteWorkflow(workflowTemplate bean.Work
}
)

wfTemplate, err := json.Marshal(cdWorkflow)
wfTemplate, err := json.Marshal(ciCdWorkflow)
if err != nil {
impl.logger.Errorw("error occurred while marshalling json", "err", err)
return err
return nil, err
}
impl.logger.Debugw("workflow request to submit", "wf", string(wfTemplate))

wfClient, err := impl.getClientInstance(workflowTemplate.Namespace, workflowTemplate.ClusterConfig)
if err != nil {
impl.logger.Errorw("cannot build wf client", "err", err)
return err
return nil, err
}

createdWf, err := wfClient.Create(context.Background(), &cdWorkflow, v1.CreateOptions{})
createdWf, err := wfClient.Create(context.Background(), &ciCdWorkflow, v1.CreateOptions{})
if err != nil {
impl.logger.Errorw("error in wf trigger", "err", err)
return err
return nil, err
}
impl.logger.Debugw("workflow submitted: ", "name", createdWf.Name)
return nil
return impl.convertToUnstructured(createdWf), nil
}

func (impl *ArgoWorkflowExecutorImpl) convertToUnstructured(cdWorkflow interface{}) *unstructured.UnstructuredList {
unstructedObjMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&cdWorkflow)
if err != nil {
return nil
}
unstructuredObj := unstructured.Unstructured{Object: unstructedObjMap}
unstructuredList := &unstructured.UnstructuredList{Items: []unstructured.Unstructured{unstructuredObj}}
return unstructuredList
}

func (impl *ArgoWorkflowExecutorImpl) updateBlobStorageConfig(workflowTemplate bean.WorkflowTemplate, cdTemplate *v1alpha1.Template) {
Expand Down Expand Up @@ -182,7 +219,7 @@ func (impl *ArgoWorkflowExecutorImpl) updateBlobStorageConfig(workflowTemplate b
}
}

func (impl *ArgoWorkflowExecutorImpl) getArgoTemplates(configMaps []bean2.ConfigSecretMap, secrets []bean2.ConfigSecretMap) ([]v1alpha1.Template, error) {
func (impl *ArgoWorkflowExecutorImpl) getArgoTemplates(configMaps []bean2.ConfigSecretMap, secrets []bean2.ConfigSecretMap, isCi bool) ([]v1alpha1.Template, error) {
var templates []v1alpha1.Template
var steps []v1alpha1.ParallelSteps
cmIndex := 0
Expand Down Expand Up @@ -211,18 +248,27 @@ func (impl *ArgoWorkflowExecutorImpl) getArgoTemplates(configMaps []bean2.Config
templates = append(templates, argoTemplate)
csIndex++
}
if len(templates) <= 0 {
return templates, nil
}
stepName := bean.CD_WORKFLOW_NAME
templateName := bean.CD_WORKFLOW_WITH_STAGES
if isCi {
stepName = bean.CI_WORKFLOW_NAME
templateName = bean.CI_WORKFLOW_WITH_STAGES
}

steps = append(steps, v1alpha1.ParallelSteps{
Steps: []v1alpha1.WorkflowStep{
{
Name: "run-wf",
Template: CD_WORKFLOW_NAME,
Template: stepName,
},
},
})

templates = append(templates, v1alpha1.Template{
Name: CD_WORKFLOW_WITH_STAGES,
Name: templateName,
Steps: steps,
})

Expand Down Expand Up @@ -271,7 +317,7 @@ func (impl *ArgoWorkflowExecutorImpl) createStepAndTemplate(isSecret bool, cmSec
argoTemplate := v1alpha1.Template{
Name: templateName,
Resource: &v1alpha1.ResourceTemplate{
Action: "create",
Action: RESOURCE_CREATE_ACTION,
SetOwnerReference: true,
Manifest: string(cmSecretJson),
},
Expand Down
Loading
Loading