Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CD stage trigger is not working for external CI #4440

Merged
merged 9 commits into from
Jan 18, 2024
7 changes: 7 additions & 0 deletions api/restHandler/ExternalCiRestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ func (impl ExternalCiRestHandlerImpl) HandleExternalCiWebhook(w http.ResponseWri
common.WriteJsonResp(w, err, nil, http.StatusInternalServerError)
return
}

err = impl.validator.Struct(ciArtifactReq)
if err != nil {
impl.logger.Errorw("validation err, HandleExternalCiWebhook", "err", err, "payload", ciArtifactReq)
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
return
}
_, err = impl.webhookService.HandleExternalCiWebhook(externalCiId, ciArtifactReq, impl.checkExternalCiDeploymentAuth)
if err != nil {
impl.logger.Errorw("service err, HandleExternalCiWebhook", "err", err, "payload", req)
Expand Down
37 changes: 30 additions & 7 deletions api/router/pubsub/CiEventHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
bean2 "github.com/devtron-labs/devtron/pkg/pipeline/bean"
"github.com/devtron-labs/devtron/util"
"go.uber.org/zap"
"gopkg.in/go-playground/validator.v9"
"time"
)

Expand All @@ -53,6 +54,7 @@ type CiEventHandlerImpl struct {
logger *zap.SugaredLogger
pubsubClient *pubsub.PubSubClientServiceImpl
webhookService pipeline.WebhookService
validator *validator.Validate
ciEventConfig *CiEventConfig
}

Expand Down Expand Up @@ -80,11 +82,12 @@ type CiCompleteEvent struct {
PluginArtifactStage string `json:"pluginArtifactStage"`
}

func NewCiEventHandlerImpl(logger *zap.SugaredLogger, pubsubClient *pubsub.PubSubClientServiceImpl, webhookService pipeline.WebhookService, ciEventConfig *CiEventConfig) *CiEventHandlerImpl {
func NewCiEventHandlerImpl(logger *zap.SugaredLogger, pubsubClient *pubsub.PubSubClientServiceImpl, webhookService pipeline.WebhookService, validator *validator.Validate, ciEventConfig *CiEventConfig) *CiEventHandlerImpl {
ciEventHandlerImpl := &CiEventHandlerImpl{
logger: logger,
pubsubClient: pubsubClient,
webhookService: webhookService,
validator: validator,
ciEventConfig: ciEventConfig,
}
err := ciEventHandlerImpl.Subscribe()
Expand Down Expand Up @@ -133,10 +136,8 @@ func (impl *CiEventHandlerImpl) Subscribe() error {
impl.logger.Error("Error while creating request for pipelineID", "pipelineId", ciCompleteEvent.PipelineId, "err", err)
return
}
resp, err := impl.webhookService.HandleCiSuccessEvent(ciCompleteEvent.PipelineId, request, detail.ImagePushedAt)
resp, err := impl.ValidateAndHandleCiSuccessEvent(ciCompleteEvent.PipelineId, request, detail.ImagePushedAt)
if err != nil {
impl.logger.Error("Error while sending event for CI success for pipelineID", "pipelineId",
ciCompleteEvent.PipelineId, "request", request, "err", err)
return
}
impl.logger.Debug("response of handle ci success event for multiple images from plugin", "resp", resp)
Expand All @@ -145,10 +146,8 @@ func (impl *CiEventHandlerImpl) Subscribe() error {

} else {
util.TriggerCIMetrics(ciCompleteEvent.Metrics, impl.ciEventConfig.ExposeCiMetrics, ciCompleteEvent.PipelineName, ciCompleteEvent.AppName)
resp, err := impl.webhookService.HandleCiSuccessEvent(ciCompleteEvent.PipelineId, req, &time.Time{})
resp, err := impl.ValidateAndHandleCiSuccessEvent(ciCompleteEvent.PipelineId, req, &time.Time{})
if err != nil {
impl.logger.Error("Error while sending event for CI success for pipelineID: ",
ciCompleteEvent.PipelineId, "request: ", req, "error: ", err)
return
}
impl.logger.Debug(resp)
Expand All @@ -162,6 +161,21 @@ func (impl *CiEventHandlerImpl) Subscribe() error {
return nil
}

func (impl *CiEventHandlerImpl) ValidateAndHandleCiSuccessEvent(ciPipelineId int, request *pipeline.CiArtifactWebhookRequest, imagePushedAt *time.Time) (int, error) {
validationErr := impl.validator.Struct(request)
if validationErr != nil {
impl.logger.Errorw("validation err, HandleCiSuccessEvent", "err", validationErr, "payload", request)
return 0, validationErr
}
buildArtifactId, err := impl.webhookService.HandleCiSuccessEvent(ciPipelineId, request, imagePushedAt)
if err != nil {
impl.logger.Error("Error while sending event for CI success for pipelineID",
ciPipelineId, "request", request, "error", err)
return 0, err
}
return buildArtifactId, nil
}

func (impl *CiEventHandlerImpl) BuildCiArtifactRequest(event CiCompleteEvent) (*pipeline.CiArtifactWebhookRequest, error) {
var ciMaterialInfos []repository.CiMaterialInfo
for _, p := range event.CiProjectDetails {
Expand Down Expand Up @@ -228,6 +242,9 @@ func (impl *CiEventHandlerImpl) BuildCiArtifactRequest(event CiCompleteEvent) (*
PluginRegistryArtifactDetails: event.PluginRegistryArtifactDetails,
PluginArtifactStage: event.PluginArtifactStage,
}
if request.DataSource == "" {
request.DataSource = repository.WEBHOOK
}
return request, nil
}

Expand All @@ -244,6 +261,9 @@ func (impl *CiEventHandlerImpl) BuildCIArtifactRequestForImageFromCR(imageDetail
WorkflowId: &workflowId,
IsArtifactUploaded: event.IsArtifactUploaded,
}
if request.DataSource == "" {
request.DataSource = repository.WEBHOOK
}
return request, nil
}

Expand Down Expand Up @@ -314,5 +334,8 @@ func (impl *CiEventHandlerImpl) BuildCiArtifactRequestForWebhook(event CiComplet
WorkflowId: event.WorkflowId,
IsArtifactUploaded: event.IsArtifactUploaded,
}
if request.DataSource == "" {
request.DataSource = repository.WEBHOOK
}
return request, nil
}
2 changes: 1 addition & 1 deletion internal/sql/repository/AppListingRepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func parseMaterialInfo(materialInfo string, source string) (json.RawMessage, err
fmt.Printf("PARSEMATERIALINFO_MATERIAL_RECOVER, materialInfo: %s, source: %s, err: %s \n", materialInfo, source, r)
}
}()
if source != "GOCD" && source != "CI-RUNNER" && source != "EXTERNAL" {
if source != GOCD && source != CI_RUNNER && source != WEBHOOK && source != DEPRICATED_EXT {
return nil, fmt.Errorf("datasource: %s not supported", source)
}
if materialInfo == "" {
Expand Down
38 changes: 27 additions & 11 deletions internal/sql/repository/CiArtifactRepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@ import (
)

type credentialsSource = string
type artifactsSourceType = string
type ArtifactsSourceType = string

const (
GLOBAL_CONTAINER_REGISTRY credentialsSource = "global_container_registry"
)

// List of possible DataSource Type for an artifact
const (
CI_RUNNER artifactsSourceType = "CI-RUNNER"
WEBHOOK artifactsSourceType = "EXTERNAL"
PRE_CD artifactsSourceType = "pre_cd"
POST_CD artifactsSourceType = "post_cd"
PRE_CI artifactsSourceType = "pre_ci"
POST_CI artifactsSourceType = "post_ci"
GOCD artifactsSourceType = "GOCD"
CI_RUNNER ArtifactsSourceType = "CI-RUNNER"
WEBHOOK ArtifactsSourceType = "EXTERNAL" // Currently in use instead of DEPRICATED_EXT
PRE_CD ArtifactsSourceType = "pre_cd"
POST_CD ArtifactsSourceType = "post_cd"
POST_CI ArtifactsSourceType = "post_ci"
GOCD ArtifactsSourceType = "GOCD"
DEPRICATED_EXT ArtifactsSourceType = "ext" // For backward compatibility
// PRE_CI is not a valid DataSource for an artifact
)

type CiArtifactWithExtraData struct {
Expand All @@ -63,7 +66,7 @@ type CiArtifact struct {
Image string `sql:"image,notnull"`
ImageDigest string `sql:"image_digest,notnull"`
MaterialInfo string `sql:"material_info"` //git material metadata json array string
DataSource string `sql:"data_source,notnull"` // possible values -> (CI_RUNNER,ext,post_ci,pre_cd,post_cd) CI_runner is for normal build ci
DataSource string `sql:"data_source,notnull"` // possible values -> (CI_RUNNER,EXTERNAL,post_ci,pre_cd,post_cd) CI_runner is for normal build ci
WorkflowId *int `sql:"ci_workflow_id"`
ParentCiArtifact int `sql:"parent_ci_artifact"`
ScanEnabled bool `sql:"scan_enabled,notnull"`
Expand All @@ -83,6 +86,9 @@ type CiArtifact struct {
type CiArtifactRepository interface {
Save(artifact *CiArtifact) error
Delete(artifact *CiArtifact) error

// Get returns the CiArtifact of the given id.
// Note: Use Get along with MigrateToWebHookDataSourceType. For webhook artifacts, migration is required for column DataSource from 'ext' to 'EXTERNAL'
Get(id int) (artifact *CiArtifact, err error)
GetArtifactParentCiAndWorkflowDetailsByIds(ids []int) ([]*CiArtifact, error)
GetByWfId(wfId int) (artifact *CiArtifact, err error)
Expand All @@ -106,6 +112,8 @@ type CiArtifactRepository interface {
GetArtifactsByDataSourceAndComponentId(dataSource string, componentId int) ([]CiArtifact, error)
FindCiArtifactByImagePaths(images []string) ([]CiArtifact, error)

// MigrateToWebHookDataSourceType is used for backward compatibility. It'll migrate the deprecated DataSource type
MigrateToWebHookDataSourceType(id int) error
UpdateLatestTimestamp(artifactIds []int) error
}

Expand All @@ -132,6 +140,14 @@ func (impl CiArtifactRepositoryImpl) SaveAll(artifacts []*CiArtifact) ([]*CiArti
return artifacts, err
}

func (impl CiArtifactRepositoryImpl) MigrateToWebHookDataSourceType(id int) error {
_, err := impl.dbConnection.Model(&CiArtifact{}).
Set("data_source = ?", WEBHOOK).
Where("id = ?", id).
Update()
return err
}

func (impl CiArtifactRepositoryImpl) UpdateLatestTimestamp(artifactIds []int) error {
if len(artifactIds) == 0 {
impl.logger.Debug("UpdateLatestTimestamp empty list of artifacts, not updating")
Expand Down Expand Up @@ -479,7 +495,7 @@ func (impl CiArtifactRepositoryImpl) GetArtifactsByCDPipelineAndRunnerType(cdPip

// return map of gitUrl:hash
func (info *CiArtifact) ParseMaterialInfo() (map[string]string, error) {
if info.DataSource != "GOCD" && info.DataSource != "CI-RUNNER" && info.DataSource != "EXTERNAL" {
if info.DataSource != GOCD && info.DataSource != CI_RUNNER && info.DataSource != WEBHOOK && info.DataSource != DEPRICATED_EXT {
return nil, fmt.Errorf("datasource: %s not supported", info.DataSource)
}
var ciMaterials []*CiMaterialInfo
Expand Down Expand Up @@ -590,7 +606,7 @@ func (impl CiArtifactRepositoryImpl) GetArtifactsByCDPipelineV2(cdPipelineId int
}

func GetCiMaterialInfo(materialInfo string, source string) ([]CiMaterialInfo, error) {
if source != "GOCD" && source != "CI-RUNNER" && source != "EXTERNAL" && source != "post_ci" && source != "pre_cd" && source != "post_cd" {
if source != GOCD && source != CI_RUNNER && source != WEBHOOK && source != POST_CI && source != PRE_CD && source != POST_CD && source != DEPRICATED_EXT {
return nil, fmt.Errorf("datasource: %s not supported", source)
}
var ciMaterials []CiMaterialInfo
Expand Down
2 changes: 1 addition & 1 deletion pkg/deploymentGroup/DeploymentGroupService.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func (impl *DeploymentGroupServiceImpl) GetArtifactsByCiPipeline(ciPipelineId in
}

func (impl *DeploymentGroupServiceImpl) parseMaterialInfo(materialInfo json.RawMessage, source string) (json.RawMessage, error) {
if source != "GOCD" && source != "CI-RUNNER" && source != "EXTERNAL" {
if source != repository.GOCD && source != repository.CI_RUNNER && source != repository.WEBHOOK && source != repository.DEPRICATED_EXT {
return nil, fmt.Errorf("datasource: %s not supported", source)
}
var ciMaterials []repository.CiMaterialInfo
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/PipelineBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ type ConfigMapSecretsResponse struct {
}

func parseMaterialInfo(materialInfo json.RawMessage, source string) (json.RawMessage, error) {
if source != repository.GOCD && source != repository.CI_RUNNER && source != repository.WEBHOOK && source != repository.PRE_CD && source != repository.POST_CD && source != repository.POST_CI {
if source != repository.GOCD && source != repository.CI_RUNNER && source != repository.WEBHOOK && source != repository.DEPRICATED_EXT && source != repository.PRE_CD && source != repository.POST_CD && source != repository.POST_CI {
return nil, fmt.Errorf("datasource: %s not supported", source)
}
var ciMaterials []repository.CiMaterialInfo
Expand Down
32 changes: 11 additions & 21 deletions pkg/pipeline/WebhookService.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,17 @@ import (
)

type CiArtifactWebhookRequest struct {
Image string `json:"image"`
ImageDigest string `json:"imageDigest"`
MaterialInfo json.RawMessage `json:"materialInfo"`
DataSource string `json:"dataSource"`
PipelineName string `json:"pipelineName"`
WorkflowId *int `json:"workflowId"`
UserId int32 `json:"userId"`
IsArtifactUploaded bool `json:"isArtifactUploaded"`
FailureReason string `json:"failureReason"`
PluginRegistryArtifactDetails map[string][]string `json:"PluginRegistryArtifactDetails"` //map of registry and array of images generated by Copy container image plugin
PluginArtifactStage string `json:"pluginArtifactStage"` // at which stage of CI artifact was generated by plugin ("pre_ci/post_ci")
Image string `json:"image" validate:"required"`
ImageDigest string `json:"imageDigest"`
MaterialInfo json.RawMessage `json:"materialInfo"`
DataSource repository.ArtifactsSourceType `json:"dataSource" validate:"oneof=CI-RUNNER EXTERNAL pre_cd post_cd post_ci GOCD"`
PipelineName string `json:"pipelineName"`
WorkflowId *int `json:"workflowId"`
UserId int32 `json:"userId"`
IsArtifactUploaded bool `json:"isArtifactUploaded"`
FailureReason string `json:"failureReason"`
PluginRegistryArtifactDetails map[string][]string `json:"PluginRegistryArtifactDetails"` //map of registry and array of images generated by Copy container image plugin
PluginArtifactStage string `json:"pluginArtifactStage"` // at which stage of CI artifact was generated by plugin ("pre_ci/post_ci")
}

type WebhookService interface {
Expand Down Expand Up @@ -190,9 +190,6 @@ func (impl WebhookServiceImpl) HandleCiSuccessEvent(ciPipelineId int, request *C
if request.PipelineName == "" {
request.PipelineName = pipeline.Name
}
if request.DataSource == "" {
request.DataSource = "EXTERNAL"
}
if err != nil {
impl.logger.Errorw("unable to find pipeline", "name", request.PipelineName, "err", err)
return 0, err
Expand All @@ -213,10 +210,6 @@ func (impl WebhookServiceImpl) HandleCiSuccessEvent(ciPipelineId int, request *C
if !imagePushedAt.IsZero() {
createdOn = *imagePushedAt
}
if pipeline.PipelineType == bean.CI_JOB && request.Image == "" {
impl.logger.Errorw("empty image artifact found!", "request", request)
return 0, fmt.Errorf("empty image artifact found")
}
buildArtifact := &repository.CiArtifact{
Image: request.Image,
ImageDigest: request.ImageDigest,
Expand Down Expand Up @@ -376,9 +369,6 @@ func (impl WebhookServiceImpl) HandleExternalCiWebhook(externalCiId int, request
}

impl.logger.Infow("request of webhook external ci", "req", request)
if request.DataSource == "" {
request.DataSource = "EXTERNAL"
}
materialJson, err := request.MaterialInfo.MarshalJSON()
if err != nil {
impl.logger.Errorw("unable to marshal material metadata", "err", err)
Expand Down
Loading
Loading