Skip to content

Commit

Permalink
adding upgrade time for upgrade process
Browse files Browse the repository at this point in the history
Signed-off-by: Yashvi Jain <yashvi.jain@progress.com>
  • Loading branch information
Yashvi Jain committed Oct 18, 2022
1 parent 795f8b4 commit 2f82418
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 14 deletions.
11 changes: 8 additions & 3 deletions components/compliance-service/ingest/ingestic/ingestic.go
Original file line number Diff line number Diff line change
Expand Up @@ -1206,19 +1206,24 @@ func getNewControlStatus(controlStatus string, nodeStatus string) string {
}

//GetReportsDailyLatestTrue Get the Nodes where daily latest flag is true from past 90 days from current date for upgrading
func (backend *ESClient) GetReportsDailyLatestTrue(ctx context.Context, time90daysAgo time.Time) (map[string]string, map[string]bool, error) {
func (backend *ESClient) GetReportsDailyLatestTrue(ctx context.Context, upgradeTime time.Time) (map[string]string, map[string]bool, error) {
reportsMap := make(map[string]string)
nodesMap := make(map[string]relaxting.ReportId)
latestReportMap := make(map[string]bool)
indices, err := relaxting.IndexDates(relaxting.CompDailyRepIndexPrefix, time90daysAgo.Format(time.RFC3339), time.Now().Format(time.RFC3339))
indices, err := relaxting.IndexDates(relaxting.CompDailyRepIndexPrefix, upgradeTime.Format(time.RFC3339), time.Now().Format(time.RFC3339))
if err != nil {
return nil, nil, err
}

indicesSlice := strings.Split(indices, ",")

rangeQuery := elastic.NewRangeQuery("end_time").Gte(upgradeTime.Format(time.RFC3339))

boolQuery := elastic.NewBoolQuery().
Must(elastic.NewTermQuery("daily_latest", true))
Must(elastic.NewTermQuery("daily_latest", true)).Must(rangeQuery)

src, _ := boolQuery.Source()
logrus.Infof("Test Query For upgrade is noow : %v", src)

fsc := elastic.NewFetchSourceContext(true).Include(
"report_uuid",
Expand Down
10 changes: 6 additions & 4 deletions components/compliance-service/migrations/cereal_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,31 @@ package migrations
import (
"context"
"fmt"
"time"

"github.com/chef/automate/lib/cereal"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

type cerealInterface interface {
EnqueueWorkflowUpgrade() error
EnqueueWorkflowUpgrade(updateDate time.Time) error
}

type cerealService struct {
cerealManger *cereal.Manager
}

//EnqueueWorkflowUpgrade enqueue the work flow
func (u *cerealService) EnqueueWorkflowUpgrade() error {
func (u *cerealService) EnqueueWorkflowUpgrade(updateDate time.Time) error {
err := u.cerealManger.EnqueueWorkflow(context.TODO(), MigrationWorkflowName,
fmt.Sprintf("%s-%s", MigrationWorkflowName, UpgradeTaskName),
fmt.Sprintf("%s-%s-%s", MigrationWorkflowName, UpgradeTaskName, time.Now().Format(time.RFC3339)),
MigrationWorkflowParameters{
ControlIndexFlag: true,
UpgradeDate: updateDate,
})
if err != nil {
logrus.Debugf("Unable to Enqueue Workflow for Daily Latest Task")
logrus.Errorf("Unable to Enqueue Workflow for Daily Latest Task %v", err)
return errors.Wrapf(err, "Unable to Enqueue Workflow for Daily Latest Task")
}
return nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package migrations

import "github.com/pkg/errors"
import (
"github.com/pkg/errors"
"time"
)

type CerealInterfaceTest struct {
NeedError bool
NeedErrorForControl bool
}

func (c CerealInterfaceTest) EnqueueWorkflowUpgrade() error {
func (c CerealInterfaceTest) EnqueueWorkflowUpgrade(update time.Time) error {
if c.NeedError {
return errors.New("Unable to enqueue workflow for day latest flag")
}
Expand Down
13 changes: 9 additions & 4 deletions components/compliance-service/migrations/migration_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type MigrationWorkflow struct {

type MigrationWorkflowParameters struct {
ControlIndexFlag bool
UpgradeDate time.Time
}

type MigrationWorkflowPayload struct {
Expand Down Expand Up @@ -68,6 +69,7 @@ func (s *MigrationWorkflow) OnStart(w cereal.WorkflowInstance,

err = w.EnqueueTask(UpgradeTaskName, UpgradeParameters{
ControlFlag: workflowParams.ControlIndexFlag,
UpgradeDate: workflowParams.UpgradeDate,
})
if err != nil {
err = errors.Wrap(err, "failed to enqueue the migration-task")
Expand Down Expand Up @@ -115,6 +117,7 @@ type UpgradeParameters struct {
DayLatestFlag bool
ControlFlag bool
CompRunInfoFlag bool
UpgradeDate time.Time
}

func (t *UpgradeTask) Run(ctx context.Context, task cereal.Task) (interface{}, error) {
Expand All @@ -129,7 +132,7 @@ func (t *UpgradeTask) Run(ctx context.Context, task cereal.Task) (interface{}, e
logrus.Infof("Upgrade started at time %v", time.Now())
if job.ControlFlag {
logrus.Info("Inside the control flag")
if err := performActionForUpgrade(ctx, t.ESClient); err != nil {
if err := performActionForUpgrade(ctx, t.ESClient, job.UpgradeDate); err != nil {
logrus.WithError(err).Error("Unable to upgrade control index flag for latest record ")
}

Expand All @@ -154,10 +157,12 @@ type ControlIndexUpgradeTask struct {
UpgradesDB *pgdb.UpgradesDB
}

func performActionForUpgrade(ctx context.Context, esClient *ingestic.ESClient) error {
func performActionForUpgrade(ctx context.Context, esClient *ingestic.ESClient, upgradeTime time.Time) error {
mapping := mappings.ComplianceRepDate
time90DaysAgo := time.Now().Add(-24 * time.Hour * 90)
reportsMap, latestReportsMap, err := esClient.GetReportsDailyLatestTrue(ctx, time90DaysAgo)
if time.Now().Sub(upgradeTime)/24 > 90 {
upgradeTime = time.Now().Add(-24 * time.Hour * 90)
}
reportsMap, latestReportsMap, err := esClient.GetReportsDailyLatestTrue(ctx, upgradeTime)
if err != nil {
logrus.Errorf("Unable to Get Report IDs where daily latest true with err %v", err)
return err
Expand Down
3 changes: 2 additions & 1 deletion components/compliance-service/migrations/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ func (u *Upgrade) PollForUpgradeFlagDayLatest(upgradeDate time.Time) error {
return err
}
if controlFlag.Status {
err = u.cerealInterface.EnqueueWorkflowUpgrade()
err = u.cerealInterface.EnqueueWorkflowUpgrade(upgradeDate)
if err != nil {
return errors.Wrapf(err, "Unable to enqueue the message in the flow for daily latest flag")
}
u.storage.UpdateControlFlagTimeStamp()
}

return nil
}

Expand Down

0 comments on commit 2f82418

Please sign in to comment.