Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding upgrade time for upgrade process #7490

Merged
merged 2 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions components/compliance-service/ingest/ingestic/ingestic.go
Original file line number Diff line number Diff line change
Expand Up @@ -1206,19 +1206,21 @@ func getNewControlStatus(controlStatus string, nodeStatus string) string {
}

//GetReportsDailyLatestTrue Get the Nodes where daily latest flag is true from past 90 days from current date for upgrading
func (backend *ESClient) GetReportsDailyLatestTrue(ctx context.Context, time90daysAgo time.Time) (map[string]string, map[string]bool, error) {
func (backend *ESClient) GetReportsDailyLatestTrue(ctx context.Context, upgradeTime time.Time) (map[string]string, map[string]bool, error) {
reportsMap := make(map[string]string)
nodesMap := make(map[string]relaxting.ReportId)
latestReportMap := make(map[string]bool)
indices, err := relaxting.IndexDates(relaxting.CompDailyRepIndexPrefix, time90daysAgo.Format(time.RFC3339), time.Now().Format(time.RFC3339))
indices, err := relaxting.IndexDates(relaxting.CompDailyRepIndexPrefix, upgradeTime.Format(time.RFC3339), time.Now().Format(time.RFC3339))
if err != nil {
return nil, nil, err
}

indicesSlice := strings.Split(indices, ",")

rangeQuery := elastic.NewRangeQuery("end_time").Gte(upgradeTime.Format(time.RFC3339))

boolQuery := elastic.NewBoolQuery().
Must(elastic.NewTermQuery("daily_latest", true))
Must(elastic.NewTermQuery("daily_latest", true)).Must(rangeQuery)

fsc := elastic.NewFetchSourceContext(true).Include(
"report_uuid",
Expand Down
10 changes: 6 additions & 4 deletions components/compliance-service/migrations/cereal_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,31 @@ package migrations
import (
"context"
"fmt"
"time"

"github.com/chef/automate/lib/cereal"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

type cerealInterface interface {
EnqueueWorkflowUpgrade() error
EnqueueWorkflowUpgrade(updateDate time.Time) error
}

type cerealService struct {
cerealManger *cereal.Manager
}

//EnqueueWorkflowUpgrade enqueue the work flow
func (u *cerealService) EnqueueWorkflowUpgrade() error {
func (u *cerealService) EnqueueWorkflowUpgrade(updateDate time.Time) error {
err := u.cerealManger.EnqueueWorkflow(context.TODO(), MigrationWorkflowName,
fmt.Sprintf("%s-%s", MigrationWorkflowName, UpgradeTaskName),
fmt.Sprintf("%s-%s-%s", MigrationWorkflowName, UpgradeTaskName, time.Now().Format(time.RFC3339)),
MigrationWorkflowParameters{
ControlIndexFlag: true,
UpgradeDate: updateDate,
})
if err != nil {
logrus.Debugf("Unable to Enqueue Workflow for Daily Latest Task")
logrus.Errorf("Unable to Enqueue Workflow for Daily Latest Task %v", err)
return errors.Wrapf(err, "Unable to Enqueue Workflow for Daily Latest Task")
}
return nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package migrations

import "github.com/pkg/errors"
import (
"github.com/pkg/errors"
"time"
)

type CerealInterfaceTest struct {
NeedError bool
NeedErrorForControl bool
}

func (c CerealInterfaceTest) EnqueueWorkflowUpgrade() error {
func (c CerealInterfaceTest) EnqueueWorkflowUpgrade(update time.Time) error {
if c.NeedError {
return errors.New("Unable to enqueue workflow for day latest flag")
}
Expand Down
15 changes: 10 additions & 5 deletions components/compliance-service/migrations/migration_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type MigrationWorkflow struct {

type MigrationWorkflowParameters struct {
ControlIndexFlag bool
UpgradeDate time.Time
}

type MigrationWorkflowPayload struct {
Expand Down Expand Up @@ -68,6 +69,7 @@ func (s *MigrationWorkflow) OnStart(w cereal.WorkflowInstance,

err = w.EnqueueTask(UpgradeTaskName, UpgradeParameters{
ControlFlag: workflowParams.ControlIndexFlag,
UpgradeDate: workflowParams.UpgradeDate,
})
if err != nil {
err = errors.Wrap(err, "failed to enqueue the migration-task")
Expand Down Expand Up @@ -115,6 +117,7 @@ type UpgradeParameters struct {
DayLatestFlag bool
ControlFlag bool
CompRunInfoFlag bool
UpgradeDate time.Time
}

func (t *UpgradeTask) Run(ctx context.Context, task cereal.Task) (interface{}, error) {
Expand All @@ -125,11 +128,11 @@ func (t *UpgradeTask) Run(ctx context.Context, task cereal.Task) (interface{}, e
return nil, err
}

logrus.Info("Inside the upgrades flag flow")
logrus.Infof("Upgrade started at time %v", time.Now())
if job.ControlFlag {

logrus.Info("Inside the control flag")
if err := performActionForUpgrade(ctx, t.ESClient); err != nil {
if err := performActionForUpgrade(ctx, t.ESClient, job.UpgradeDate); err != nil {
logrus.WithError(err).Error("Unable to upgrade control index flag for latest record ")
}

Expand All @@ -154,10 +157,12 @@ type ControlIndexUpgradeTask struct {
UpgradesDB *pgdb.UpgradesDB
}

func performActionForUpgrade(ctx context.Context, esClient *ingestic.ESClient) error {
func performActionForUpgrade(ctx context.Context, esClient *ingestic.ESClient, upgradeTime time.Time) error {
mapping := mappings.ComplianceRepDate
time90DaysAgo := time.Now().Add(-24 * time.Hour * 90)
reportsMap, latestReportsMap, err := esClient.GetReportsDailyLatestTrue(ctx, time90DaysAgo)
if time.Now().Sub(upgradeTime).Hours()/24 > 90 {
upgradeTime = time.Now().Add(-24 * time.Hour * 90)
}
reportsMap, latestReportsMap, err := esClient.GetReportsDailyLatestTrue(ctx, upgradeTime)
if err != nil {
logrus.Errorf("Unable to Get Report IDs where daily latest true with err %v", err)
return err
Expand Down
4 changes: 2 additions & 2 deletions components/compliance-service/migrations/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ func NewService(pg *pgdb.UpgradesDB, cerealManger *cereal.Manager) *Upgrade {
}

//PollForUpgradeFlagDayLatest checks for the day latest flag value in upgrade flags
//TODO:: run the upgrade from current date to upgradeDate
func (u *Upgrade) PollForUpgradeFlagDayLatest(upgradeDate time.Time) error {
logrus.Infof("upgrade will run from %s to till now", upgradeDate.String())
controlFlag, err := u.getUpgradeFlag(pgdb.ControlIndexFlag)
if err != nil {
return err
}
if controlFlag.Status {
err = u.cerealInterface.EnqueueWorkflowUpgrade()
err = u.cerealInterface.EnqueueWorkflowUpgrade(upgradeDate)
if err != nil {
return errors.Wrapf(err, "Unable to enqueue the message in the flow for daily latest flag")
}
u.storage.UpdateControlFlagTimeStamp()
}

return nil
}

Expand Down