Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

add additional background job metrics #1451

Merged
merged 2 commits into from Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/cleanup/cleanup.go
Expand Up @@ -101,7 +101,7 @@ func (h *exposureCleanupHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
http.Error(w, "internal processing error", http.StatusInternalServerError)
return
}
stats.Record(ctx, mExposuresStatsDeleted.M(statsCount))
stats.Record(ctx, mExposuresStatsDeleted.M(statsCount), mExposuresCleanupRun.M(1))
logger.Infof("stats cleanup run complete, deleted %v records.", statsCount)

w.WriteHeader(http.StatusOK)
Expand Down Expand Up @@ -164,7 +164,7 @@ func (h *exportCleanupHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
return
}

stats.Record(ctx, mExportsDeleted.M(int64(count)))
stats.Record(ctx, mExportsDeleted.M(int64(count)), mExportsCleanupRun.M(1))
logger.Infof("cleanup run complete, deleted %v files.", count)
w.WriteHeader(http.StatusOK)
}
Expand Down
5 changes: 5 additions & 0 deletions internal/cleanup/metrics.go
Expand Up @@ -25,6 +25,11 @@ import (
var (
cleanupMetricsPrefix = metrics.MetricRoot + "cleanup"

mExposuresCleanupRun = stats.Int64(cleanupMetricsPrefix+"exposures_run",
"Number of times cleanup exposures has run", stats.UnitDimensionless)
mExportsCleanupRun = stats.Int64(cleanupMetricsPrefix+"exports_run",
"Number of times cleanup exports has run", stats.UnitDimensionless)

mExposuresSetupFailed = stats.Int64(cleanupMetricsPrefix+"exposures_setup_failed",
"Instances of exposures setup failures", stats.UnitDimensionless)
mExposuresCleanupBefore = stats.Int64(cleanupMetricsPrefix+"exposures_cleanup_before",
Expand Down
20 changes: 20 additions & 0 deletions internal/exportimport/importer.go
Expand Up @@ -24,6 +24,9 @@ import (
"github.com/google/exposure-notifications-server/internal/database"
"github.com/google/exposure-notifications-server/internal/exportimport/model"
"github.com/google/exposure-notifications-server/pkg/logging"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
)

Expand Down Expand Up @@ -52,6 +55,7 @@ func (s *Server) handleImport() http.Handler {
logger.Errorw("unable to read active configs", "error", err)
}

anyErrors := false
mikehelmick marked this conversation as resolved.
Show resolved Hide resolved
for _, config := range configs {
// Check how we're doing on max runtime.
if deadlinePassed(ctx) {
Expand All @@ -61,9 +65,14 @@ func (s *Server) handleImport() http.Handler {

if err := s.runImport(ctx, config); err != nil {
logger.Errorw("error running export-import", "config", config, "error", err)
anyErrors = true
}
}

if !anyErrors {
stats.Record(ctx, mImportCompletion.M(1))
}

w.WriteHeader(http.StatusOK)
})
}
Expand Down Expand Up @@ -107,6 +116,7 @@ func (s *Server) runImport(ctx context.Context, config *model.ExportImport) erro
logger.Debugw("allowed public keys for file", "publicKeys", keys)

errs := []error{}
var completedFiles, failedFiles int64
for _, file := range openFiles {
// Check how we're doing on max runtime.
if deadlinePassed(ctx) {
Expand Down Expand Up @@ -136,16 +146,26 @@ func (s *Server) runImport(ctx context.Context, config *model.ExportImport) erro

// Check the retries.
logger.Errorw(str, "exportImportID", config.ID, "filename", file.ZipFilename)
failedFiles++
}
// the not found error is passed through.
if result != nil {
completedFiles++
logger.Infow("completed file import", "inserted", result.insertedKeys, "revised", result.revisedKeys, "dropped", result.droppedKeys)
}

if err := s.exportImportDB.CompleteImportFile(ctx, file, status); err != nil {
logger.Errorw("failed to mark file completed", "file", file, "error", err)
}
}

tags := []tag.Mutator{
mikehelmick marked this conversation as resolved.
Show resolved Hide resolved
tag.Upsert(exportImportIDTagKey, fmt.Sprintf("%d", config.ID)),
}
if err := stats.RecordWithTags(ctx, tags, mFilesImported.M(completedFiles), mFilesFailed.M(failedFiles)); err != nil {
logger.Errorw("failed to export-import config completion", "error", err, "export-import-id", config.ID)
}

if len(errs) != 0 {
return fmt.Errorf("%d errors processing import file: %v", len(errs), errs)
}
Expand Down
81 changes: 81 additions & 0 deletions internal/exportimport/metrics.go
@@ -0,0 +1,81 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exportimport

import (
"github.com/google/exposure-notifications-server/internal/metrics"
"github.com/google/exposure-notifications-server/pkg/observability"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var (
exportImportMetricsPrefix = metrics.MetricRoot + "exportimport"

// Configuration specific metrics.
mFilesScheduled = stats.Int64(exportImportMetricsPrefix+"files_scheduled",
"Number of import files scheduled by ID", stats.UnitDimensionless)
mFilesImported = stats.Int64(exportImportMetricsPrefix+"files_imported",
"Number of import files completed by ID", stats.UnitDimensionless)
mFilesFailed = stats.Int64(exportImportMetricsPrefix+"files_failed",
"Number of import files failed by ID", stats.UnitDimensionless)

// Job specific metrics.
mScheduleCompletion = stats.Int64(exportImportMetricsPrefix+"schedule_completion",
"Number of times the export-import scheduler completes", stats.UnitDimensionless)
mImportCompletion = stats.Int64(exportImportMetricsPrefix+"import_completion",
"Number of times the export-import job completes", stats.UnitDimensionless)

exportImportIDTagKey = tag.MustNewKey("export_import_id")
)

func init() {
observability.CollectViews([]*view.View{
{
Name: metrics.MetricRoot + "exportimport_files_scheduled",
Description: "Total count of files scheduled, by configuration",
Measure: mFilesScheduled,
Aggregation: view.Sum(),
TagKeys: []tag.Key{exportImportIDTagKey},
},
{
Name: metrics.MetricRoot + "exportimport_files_imported",
Description: "Total count of files imported, by configuration",
Measure: mFilesImported,
Aggregation: view.Sum(),
TagKeys: []tag.Key{exportImportIDTagKey},
},
{
Name: metrics.MetricRoot + "exportimport_files_failed",
Description: "Total count of files failed, by configuration",
Measure: mFilesFailed,
Aggregation: view.Sum(),
TagKeys: []tag.Key{exportImportIDTagKey},
},
{
Name: metrics.MetricRoot + "exportimport_schedule_completion",
Description: "Total count of export-import schedule job completion",
Measure: mScheduleCompletion,
Aggregation: view.Sum(),
},
{
Name: metrics.MetricRoot + "exportimport_import_completion",
Description: "Total count of export-import import job completion",
Measure: mImportCompletion,
Aggregation: view.Sum(),
},
}...)
}
8 changes: 8 additions & 0 deletions internal/exportimport/scheduler.go
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/google/exposure-notifications-server/internal/exportimport/model"
"github.com/google/exposure-notifications-server/internal/project"
"github.com/google/exposure-notifications-server/pkg/logging"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
)

Expand Down Expand Up @@ -105,12 +107,18 @@ func (s *Server) handleSchedule() http.Handler {
anyErrors = true
} else {
logger.Infow("import index sync result", "exportImportID", config.ID, "index", config.IndexFile, "newFiles", n, "failedFiles", f)
tags := []tag.Mutator{tag.Upsert(exportImportIDTagKey, fmt.Sprintf("%d", config.ID))}
if err := stats.RecordWithTags(ctx, tags, mFilesScheduled.M(int64(n))); err != nil {
logger.Errorw("recording schedule metrics", "exportImprotID", config.ID, "error", err, "scheduled", n)
}
}
}

status := http.StatusOK
if anyErrors {
status = http.StatusInternalServerError
} else {
stats.Record(ctx, mScheduleCompletion.M(1))
}
w.WriteHeader(status)
fmt.Fprint(w, http.StatusText(status))
Expand Down
41 changes: 41 additions & 0 deletions internal/mirror/metrics.go
@@ -0,0 +1,41 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mirror

import (
"github.com/google/exposure-notifications-server/internal/metrics"
"github.com/google/exposure-notifications-server/pkg/observability"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
)

var (
mirrorMetricsPrefix = metrics.MetricRoot + "mirror"

mMirrorJobCompletion = stats.Int64(mirrorMetricsPrefix+"completion",
"Number of times cleanup exposures has run", stats.UnitDimensionless)
)

func init() {
observability.CollectViews([]*view.View{
{
Name: metrics.MetricRoot + "mirror_completion",
Description: "Total count of mirror job completion",
Measure: mMirrorJobCompletion,
Aggregation: view.Sum(),
},
}...)
}
3 changes: 3 additions & 0 deletions internal/mirror/mirror.go
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/google/exposure-notifications-server/internal/storage"
"github.com/google/exposure-notifications-server/pkg/logging"
"github.com/hashicorp/go-multierror"
"go.opencensus.io/stats"
"go.opencensus.io/trace"
)

Expand Down Expand Up @@ -127,6 +128,8 @@ func (s *Server) handleMirror() http.Handler {
status := http.StatusOK
if hasError {
status = http.StatusInternalServerError
} else {
stats.Record(ctx, mMirrorJobCompletion.M(1))
}
w.WriteHeader(status)
fmt.Fprint(w, string(b))
Expand Down