Skip to content

Commit

Permalink
Implement DataGather status conditions and status propagation (opensh…
Browse files Browse the repository at this point in the history
…ift#805)

* Implement DataGather status conditions

* propagate status to clusteroperator status

* propagate the failed job to the original status controller

* Improve as suggested in review
  • Loading branch information
tremes authored and JoaoFula committed Jan 23, 2024
1 parent 6210ad9 commit 11c54de
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 74 deletions.
131 changes: 84 additions & 47 deletions pkg/controller/gather_commands.go
Expand Up @@ -107,19 +107,16 @@ func (d *GatherJob) Gather(ctx context.Context, kubeConfig, protoKubeConfig *res
return gather.RecordArchiveMetadata(mapToArray(allFunctionReports), rec, anonymizer)
}

// GatherAndUpload runs a single gather and stores the generated archive, uploads it
// and waits for the corresponding Insights analysis report.
// 1. Creates the necessary configs/clients
// 2. Creates the configobserver
// 3. Initiates the recorder
// 4. Executes a Gather
// 5. Flushes the results
// 6. Get the latest archive
// 7. Uploads the archive
// 8. Waits for the corresponding Insights analysis download
// GatherAndUpload runs a single gather and stores the generated archive, uploads it.
// 1. Prepare the necessary kube configs
// 2. Get the corresponding "datagathers.insights.openshift.io" resource
// 3. Create all the gatherers
// 4. Run data gathering
// 5. Recodrd the data into the Insights archive
// 6. Get the latest archive and upload it
// 7. Updates the status of the corresponding "datagathers.insights.openshift.io" resource continuously
func (d *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) error { // nolint: funlen, gocyclo
klog.Infof("Starting insights-operator %s", version.Get().String())
// these are operator clients
klog.Info("Starting data gathering")
kubeClient, err := kubernetes.NewForConfig(protoKubeConfig)
if err != nil {
return err
Expand All @@ -143,11 +140,8 @@ func (d *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) er
klog.Error("failed to get coresponding DataGather custom resource: %v", err)
return err
}
updatedCR := dataGatherCR.DeepCopy()
updatedCR.Status.State = insightsv1alpha1.Running
updatedCR.Status.StartTime = metav1.Now()

dataGatherCR, err = insightClient.DataGathers().UpdateStatus(ctx, updatedCR, metav1.UpdateOptions{})
dataGatherCR, err = updateDataGatherStatus(ctx, *insightClient, dataGatherCR.DeepCopy(), insightsv1alpha1.Pending, nil)
if err != nil {
klog.Error("failed to update coresponding DataGather custom resource: %v", err)
return err
Expand Down Expand Up @@ -186,6 +180,11 @@ func (d *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) er
)
uploader := insightsuploader.New(nil, insightsClient, configObserver, nil, nil, 0)

dataGatherCR, err = updateDataGatherStatus(ctx, *insightClient, dataGatherCR, insightsv1alpha1.Running, nil)
if err != nil {
klog.Error("failed to update coresponding DataGather custom resource: %v", err)
return err
}
allFunctionReports := make(map[string]gather.GathererFunctionReport)
for _, gatherer := range gatherers {
functionReports, err := gather.CollectAndRecordGatherer(ctx, gatherer, rec, dataGatherCR.Spec.Gatherers) // nolint: govet
Expand All @@ -197,50 +196,50 @@ func (d *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) er
allFunctionReports[functionReports[i].FuncName] = functionReports[i]
}
}
err = gather.RecordArchiveMetadata(mapToArray(allFunctionReports), rec, anonymizer)
if err != nil {
klog.Error(err)
return err
}
err = rec.Flush()
if err != nil {
klog.Error(err)
return err

for k := range allFunctionReports {
fr := allFunctionReports[k]
// duration = 0 means the gatherer didn't run
if fr.Duration == 0 {
continue
}

gs := status.CreateDataGatherGathererStatus(&fr)
dataGatherCR.Status.Gatherers = append(dataGatherCR.Status.Gatherers, gs)
}
lastArchive, err := recdriver.LastArchive()

// record data
conditions := []metav1.Condition{}
lastArchive, err := record(mapToArray(allFunctionReports), rec, recdriver, anonymizer)
if err != nil {
klog.Error(err)
conditions = append(conditions, status.DataRecordedCondition(metav1.ConditionFalse, "RecordingFailed",
fmt.Sprintf("Failed to record data: %v", err)))
_, recErr := updateDataGatherStatus(ctx, *insightClient, dataGatherCR, insightsv1alpha1.Failed, conditions)
if recErr != nil {
klog.Error("data recording failed and the update of DataGaher resource status failed as well: %v", recErr)
}
return err
}
conditions = append(conditions, status.DataRecordedCondition(metav1.ConditionTrue, "AsExpected", ""))

// upload data
insightsRequestID, err := uploader.Upload(ctx, lastArchive)
if err != nil {
klog.Error(err)
conditions = append(conditions, status.DataUploadedCondition(metav1.ConditionFalse, "UploadFailed",
fmt.Sprintf("Failed to upload data: %v", err)))
_, updateErr := updateDataGatherStatus(ctx, *insightClient, dataGatherCR, insightsv1alpha1.Failed, conditions)
if updateErr != nil {
klog.Error("data upload failed and the update of DataGaher resource status failed as well: %v", updateErr)
}
return err
}
klog.Infof("Insights archive successfully uploaded with InsightsRequestID: %s", insightsRequestID)

dataGatherCR.Status.FinishTime = metav1.Now()
dataGatherCR.Status.State = insightsv1alpha1.Completed
dataGatherCR.Status.InsightsRequestID = insightsRequestID
dataGatherCR.Status.Conditions = []metav1.Condition{
{
Type: "DataUploaded",
Status: metav1.ConditionTrue,
Reason: "AsExpected",
LastTransitionTime: metav1.Now(),
},
}
for k := range allFunctionReports {
fr := allFunctionReports[k]
// duration = 0 means the gatherer didn't run
if fr.Duration == 0 {
continue
}
conditions = append(conditions, status.DataUploadedCondition(metav1.ConditionTrue, "AsExpected", ""))

gs := status.CreateDataGatherGathererStatus(&fr)
dataGatherCR.Status.Gatherers = append(dataGatherCR.Status.Gatherers, gs)
}
_, err = insightClient.DataGathers().UpdateStatus(ctx, dataGatherCR, metav1.UpdateOptions{})
_, err = updateDataGatherStatus(ctx, *insightClient, dataGatherCR, insightsv1alpha1.Completed, conditions)
if err != nil {
klog.Error(err)
return err
Expand All @@ -256,3 +255,41 @@ func mapToArray(m map[string]gather.GathererFunctionReport) []gather.GathererFun
}
return a
}

// record is a helper function recording the archive metadata as well as data.
// Returns last known Insights archive and an error when recording failed.
func record(functionReports []gather.GathererFunctionReport,
rec *recorder.Recorder, recdriver *diskrecorder.DiskRecorder, anonymizer *anonymization.Anonymizer) (*insightsclient.Source, error) {
err := gather.RecordArchiveMetadata(functionReports, rec, anonymizer)
if err != nil {
return nil, err
}
err = rec.Flush()
if err != nil {
return nil, err
}
return recdriver.LastArchive()
}

// updateDataGatherStatus updates status' time attributes, state and conditions
// of the provided DataGather resource
func updateDataGatherStatus(ctx context.Context,
insightsClient insightsv1alpha1cli.InsightsV1alpha1Client,
dataGatherCR *insightsv1alpha1.DataGather,
newState insightsv1alpha1.DataGatherState, conditions []metav1.Condition) (*insightsv1alpha1.DataGather, error) {
switch newState {
case insightsv1alpha1.Completed:
dataGatherCR.Status.FinishTime = metav1.Now()
case insightsv1alpha1.Failed:
dataGatherCR.Status.FinishTime = metav1.Now()
case insightsv1alpha1.Running:
dataGatherCR.Status.StartTime = metav1.Now()
case insightsv1alpha1.Pending:
// no op
}
dataGatherCR.Status.State = newState
if conditions != nil {
dataGatherCR.Status.Conditions = append(dataGatherCR.Status.Conditions, conditions...)
}
return insightsClient.DataGathers().UpdateStatus(ctx, dataGatherCR, metav1.UpdateOptions{})
}
3 changes: 2 additions & 1 deletion pkg/controller/operator.go
Expand Up @@ -197,6 +197,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
reportRetriever := insightsreport.NewWithTechPreview(insightsClient, secretConfigObserver, operatorClient.InsightsOperators())
periodicGather = periodic.NewWithTechPreview(reportRetriever, secretConfigObserver,
insightsDataGatherObserver, gatherers, kubeClient, insightClient, operatorClient.InsightsOperators())
statusReporter.AddSources(periodicGather.Sources()...)
go periodicGather.PeriodicPrune(ctx)
}

Expand All @@ -209,7 +210,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
initialDelay = wait.Jitter(baseInitialDelay, 0.5)
klog.Infof("Unable to check insights-operator pod status. Setting initial delay to %s", initialDelay)
}
go periodicGather.Run(ctx.Done(), initialDelay, insightsConfigAPIEnabled)
go periodicGather.Run(ctx.Done(), initialDelay)

if !insightsConfigAPIEnabled {
// upload results to the provided client - if no client is configured reporting
Expand Down
71 changes: 56 additions & 15 deletions pkg/controller/periodic/periodic.go
Expand Up @@ -52,6 +52,7 @@ type Controller struct {
image string
jobController *JobController
pruneInterval time.Duration
techPreview bool
}

func NewWithTechPreview(
Expand All @@ -64,6 +65,8 @@ func NewWithTechPreview(
insightsOperatorCLI operatorv1client.InsightsOperatorInterface,
) *Controller {
statuses := make(map[string]controllerstatus.StatusController)

statuses["insightsuploader"] = controllerstatus.New("insightsuploader")
jobController := NewJobController(kubeClient)
return &Controller{
reportRetriever: reportRetriever,
Expand All @@ -76,6 +79,7 @@ func NewWithTechPreview(
jobController: jobController,
insightsOperatorCLI: insightsOperatorCLI,
pruneInterval: 1 * time.Hour,
techPreview: true,
}
}

Expand Down Expand Up @@ -120,7 +124,7 @@ func (c *Controller) Sources() []controllerstatus.StatusController {
return sources
}

func (c *Controller) Run(stopCh <-chan struct{}, initialDelay time.Duration, techPreview bool) {
func (c *Controller) Run(stopCh <-chan struct{}, initialDelay time.Duration) {
defer utilruntime.HandleCrash()
defer klog.Info("Shutting down")

Expand All @@ -130,21 +134,21 @@ func (c *Controller) Run(stopCh <-chan struct{}, initialDelay time.Duration, tec
case <-stopCh:
return
case <-time.After(initialDelay):
if techPreview {
if c.techPreview {
c.GatherJob()
} else {
c.Gather()
}
}
} else {
if techPreview {
if c.techPreview {
c.GatherJob()
} else {
c.Gather()
}
}

go wait.Until(func() { c.periodicTrigger(stopCh, techPreview) }, time.Second, stopCh)
go wait.Until(func() { c.periodicTrigger(stopCh) }, time.Second, stopCh)

<-stopCh
}
Expand Down Expand Up @@ -217,7 +221,7 @@ func (c *Controller) Gather() {

// Periodically starts the gathering.
// If there is an initialDelay set then it waits that much for the first gather to happen.
func (c *Controller) periodicTrigger(stopCh <-chan struct{}, techPreview bool) {
func (c *Controller) periodicTrigger(stopCh <-chan struct{}) {
configCh, closeFn := c.secretConfigurator.ConfigChanged()
defer closeFn()

Expand All @@ -237,7 +241,7 @@ func (c *Controller) periodicTrigger(stopCh <-chan struct{}, techPreview bool) {
klog.Infof("Gathering cluster info every %s", interval)

case <-time.After(interval):
if techPreview {
if c.techPreview {
c.GatherJob()
} else {
c.Gather()
Expand All @@ -263,8 +267,8 @@ func (c *Controller) GatherJob() {
c.image = image
}

// create a new datagather.insights.openshift.io custom resource
disabledGatherers, dp := c.createDataGatherAttributeValues()
// create a new datagather.insights.openshift.io custom resource
dataGatherCR, err := c.createNewDataGatherCR(ctx, disabledGatherers, dp)
if err != nil {
klog.Errorf("Failed to create a new DataGather resource: %v", err)
Expand All @@ -288,8 +292,19 @@ func (c *Controller) GatherJob() {
klog.Error(err)
}
klog.Infof("Job completed %s", gj.Name)
dataGatherFinished, err := c.dataGatherClient.DataGathers().Get(ctx, dataGatherCR.Name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get DataGather resource %s: %v", dataGatherCR.Name, err)
return
}
dataGatheredOK := c.wasDataGatherSuccessful(dataGatherFinished)
if !dataGatheredOK {
klog.Errorf("Last data gathering %v was not successful", dataGatherFinished.Name)
return
}

c.reportRetriever.RetrieveReport()
_, err = c.copyDataGatherStatusToOperatorStatus(ctx, dataGatherCR.Name)
_, err = c.copyDataGatherStatusToOperatorStatus(ctx, dataGatherFinished)
if err != nil {
klog.Errorf("Failed to copy the last DataGather status to \"cluster\" operator status: %v", err)
return
Expand All @@ -298,18 +313,14 @@ func (c *Controller) GatherJob() {
}

// copyDataGatherStatusToOperatorStatus gets the "cluster" "insightsoperator.operator.openshift.io" resource
// and updates its status with values from the provided "dgName" "datagather.insights.openshift.io" resource.
func (c *Controller) copyDataGatherStatusToOperatorStatus(ctx context.Context, dgName string) (*v1.InsightsOperator, error) {
// and updates its status with values from the provided "datagather.insights.openshift.io" resource.
func (c *Controller) copyDataGatherStatusToOperatorStatus(ctx context.Context,
dataGather *insightsv1alpha1.DataGather) (*v1.InsightsOperator, error) {
operator, err := c.insightsOperatorCLI.Get(ctx, "cluster", metav1.GetOptions{})
if err != nil {
return nil, err
}
statusToUpdate := operator.Status.DeepCopy()

dataGather, err := c.dataGatherClient.DataGathers().Get(ctx, dgName, metav1.GetOptions{})
if err != nil {
return nil, err
}
statusToUpdate.GatherStatus = status.DataGatherStatusToOperatorGatherStatus(&dataGather.Status)
operator.Status = *statusToUpdate

Expand Down Expand Up @@ -493,6 +504,36 @@ func (c *Controller) createDataGatherAttributeValues() ([]string, insightsv1alph
return disabledGatherers, dp
}

// wasDataGatherSuccessful reads status conditions of the provided "dataGather" "datagather.insights.openshift.io"
// custom resource and checks whether the data was successfully uploaded or not and updates status accordingly
func (c *Controller) wasDataGatherSuccessful(dataGather *insightsv1alpha1.DataGather) bool {
var dataUploadedCon *metav1.Condition
for i := range dataGather.Status.Conditions {
con := dataGather.Status.Conditions[i]
if con.Type == status.DataUploaded {
dataUploadedCon = &con
}
}
statusSummary := controllerstatus.Summary{
Operation: controllerstatus.Uploading,
Healthy: true,
}
if dataUploadedCon == nil {
statusSummary.Healthy = false
statusSummary.Count = 5
statusSummary.Reason = "DataUploadedConditionNotAvailable"
statusSummary.Message = fmt.Sprintf("did not find any %q condition in the %s dataGather resource",
status.DataUploaded, dataGather.Name)
} else if dataUploadedCon.Status == metav1.ConditionFalse {
statusSummary.Healthy = false
statusSummary.Count = 5
statusSummary.Reason = dataUploadedCon.Reason
statusSummary.Message = dataUploadedCon.Message
}
c.statuses["insightsuploader"].UpdateStatus(statusSummary)
return statusSummary.Healthy
}

func mapToArray(m map[string]gather.GathererFunctionReport) []gather.GathererFunctionReport {
a := make([]gather.GathererFunctionReport, 0, len(m))
for _, v := range m {
Expand Down

0 comments on commit 11c54de

Please sign in to comment.