diff --git a/components/compliance-service/ingest/ingestic/ingestic.go b/components/compliance-service/ingest/ingestic/ingestic.go index 883e041f27e..b06567566af 100644 --- a/components/compliance-service/ingest/ingestic/ingestic.go +++ b/components/compliance-service/ingest/ingestic/ingestic.go @@ -1206,19 +1206,24 @@ func getNewControlStatus(controlStatus string, nodeStatus string) string { } //GetReportsDailyLatestTrue Get the Nodes where daily latest flag is true from past 90 days from current date for upgrading -func (backend *ESClient) GetReportsDailyLatestTrue(ctx context.Context, time90daysAgo time.Time) (map[string]string, map[string]bool, error) { +func (backend *ESClient) GetReportsDailyLatestTrue(ctx context.Context, upgradeTime time.Time) (map[string]string, map[string]bool, error) { reportsMap := make(map[string]string) nodesMap := make(map[string]relaxting.ReportId) latestReportMap := make(map[string]bool) - indices, err := relaxting.IndexDates(relaxting.CompDailyRepIndexPrefix, time90daysAgo.Format(time.RFC3339), time.Now().Format(time.RFC3339)) + indices, err := relaxting.IndexDates(relaxting.CompDailyRepIndexPrefix, upgradeTime.Format(time.RFC3339), time.Now().Format(time.RFC3339)) if err != nil { return nil, nil, err } indicesSlice := strings.Split(indices, ",") + rangeQuery := elastic.NewRangeQuery("end_time").Gte(upgradeTime.Format(time.RFC3339)) + boolQuery := elastic.NewBoolQuery(). - Must(elastic.NewTermQuery("daily_latest", true)) + Must(elastic.NewTermQuery("daily_latest", true)).Must(rangeQuery) + + src, _ := boolQuery.Source() + logrus.Infof("Test Query For upgrade is noow : %v", src) fsc := elastic.NewFetchSourceContext(true).Include( "report_uuid", diff --git a/components/compliance-service/migrations/cereal_interface.go b/components/compliance-service/migrations/cereal_interface.go index 72bd7efa48d..9680e577add 100644 --- a/components/compliance-service/migrations/cereal_interface.go +++ b/components/compliance-service/migrations/cereal_interface.go @@ -3,6 +3,7 @@ package migrations import ( "context" "fmt" + "time" "github.com/chef/automate/lib/cereal" "github.com/pkg/errors" @@ -10,7 +11,7 @@ import ( ) type cerealInterface interface { - EnqueueWorkflowUpgrade() error + EnqueueWorkflowUpgrade(updateDate time.Time) error } type cerealService struct { @@ -18,14 +19,15 @@ type cerealService struct { } //EnqueueWorkflowUpgrade enqueue the work flow -func (u *cerealService) EnqueueWorkflowUpgrade() error { +func (u *cerealService) EnqueueWorkflowUpgrade(updateDate time.Time) error { err := u.cerealManger.EnqueueWorkflow(context.TODO(), MigrationWorkflowName, - fmt.Sprintf("%s-%s", MigrationWorkflowName, UpgradeTaskName), + fmt.Sprintf("%s-%s-%s", MigrationWorkflowName, UpgradeTaskName, time.Now().Format(time.RFC3339)), MigrationWorkflowParameters{ ControlIndexFlag: true, + UpgradeDate: updateDate, }) if err != nil { - logrus.Debugf("Unable to Enqueue Workflow for Daily Latest Task") + logrus.Errorf("Unable to Enqueue Workflow for Daily Latest Task %v", err) return errors.Wrapf(err, "Unable to Enqueue Workflow for Daily Latest Task") } return nil diff --git a/components/compliance-service/migrations/cererl_interface_test.go b/components/compliance-service/migrations/cererl_interface_test.go index 324ad3545b4..49d5e8173f3 100644 --- a/components/compliance-service/migrations/cererl_interface_test.go +++ b/components/compliance-service/migrations/cererl_interface_test.go @@ -1,13 +1,16 @@ package migrations -import "github.com/pkg/errors" +import ( + "github.com/pkg/errors" + "time" +) type CerealInterfaceTest struct { NeedError bool NeedErrorForControl bool } -func (c CerealInterfaceTest) EnqueueWorkflowUpgrade() error { +func (c CerealInterfaceTest) EnqueueWorkflowUpgrade(update time.Time) error { if c.NeedError { return errors.New("Unable to enqueue workflow for day latest flag") } diff --git a/components/compliance-service/migrations/migration_workflow.go b/components/compliance-service/migrations/migration_workflow.go index bb9a5bb8864..5a8399f0259 100644 --- a/components/compliance-service/migrations/migration_workflow.go +++ b/components/compliance-service/migrations/migration_workflow.go @@ -41,6 +41,7 @@ type MigrationWorkflow struct { type MigrationWorkflowParameters struct { ControlIndexFlag bool + UpgradeDate time.Time } type MigrationWorkflowPayload struct { @@ -68,6 +69,7 @@ func (s *MigrationWorkflow) OnStart(w cereal.WorkflowInstance, err = w.EnqueueTask(UpgradeTaskName, UpgradeParameters{ ControlFlag: workflowParams.ControlIndexFlag, + UpgradeDate: workflowParams.UpgradeDate, }) if err != nil { err = errors.Wrap(err, "failed to enqueue the migration-task") @@ -115,6 +117,7 @@ type UpgradeParameters struct { DayLatestFlag bool ControlFlag bool CompRunInfoFlag bool + UpgradeDate time.Time } func (t *UpgradeTask) Run(ctx context.Context, task cereal.Task) (interface{}, error) { @@ -129,7 +132,7 @@ func (t *UpgradeTask) Run(ctx context.Context, task cereal.Task) (interface{}, e logrus.Infof("Upgrade started at time %v", time.Now()) if job.ControlFlag { logrus.Info("Inside the control flag") - if err := performActionForUpgrade(ctx, t.ESClient); err != nil { + if err := performActionForUpgrade(ctx, t.ESClient, job.UpgradeDate); err != nil { logrus.WithError(err).Error("Unable to upgrade control index flag for latest record ") } @@ -154,10 +157,12 @@ type ControlIndexUpgradeTask struct { UpgradesDB *pgdb.UpgradesDB } -func performActionForUpgrade(ctx context.Context, esClient *ingestic.ESClient) error { +func performActionForUpgrade(ctx context.Context, esClient *ingestic.ESClient, upgradeTime time.Time) error { mapping := mappings.ComplianceRepDate - time90DaysAgo := time.Now().Add(-24 * time.Hour * 90) - reportsMap, latestReportsMap, err := esClient.GetReportsDailyLatestTrue(ctx, time90DaysAgo) + if time.Now().Sub(upgradeTime)/24 > 90 { + upgradeTime = time.Now().Add(-24 * time.Hour * 90) + } + reportsMap, latestReportsMap, err := esClient.GetReportsDailyLatestTrue(ctx, upgradeTime) if err != nil { logrus.Errorf("Unable to Get Report IDs where daily latest true with err %v", err) return err diff --git a/components/compliance-service/migrations/upgrades.go b/components/compliance-service/migrations/upgrades.go index 8bd0e5f8949..869d3e3d5a7 100644 --- a/components/compliance-service/migrations/upgrades.go +++ b/components/compliance-service/migrations/upgrades.go @@ -29,12 +29,13 @@ func (u *Upgrade) PollForUpgradeFlagDayLatest(upgradeDate time.Time) error { return err } if controlFlag.Status { - err = u.cerealInterface.EnqueueWorkflowUpgrade() + err = u.cerealInterface.EnqueueWorkflowUpgrade(upgradeDate) if err != nil { return errors.Wrapf(err, "Unable to enqueue the message in the flow for daily latest flag") } u.storage.UpdateControlFlagTimeStamp() } + return nil }