/
processingreport.go
75 lines (59 loc) · 2.46 KB
/
processingreport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package visor
import (
"context"
"time"
"github.com/filecoin-project/lily/metrics"
"github.com/filecoin-project/lily/model"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)
const (
ProcessingStatusOK = "OK"
ProcessingStatusInfo = "INFO" // Processing was successful but the task reported information in the StatusInformation column
ProcessingStatusError = "ERROR" // one or more errors were encountered, data may be incomplete
ProcessingStatusSkip = "SKIP" // no processing was attempted, a reason may be given in the StatusInformation column
)
const (
// ProcessingStatusInformationNullRound is set byt the consensus task to indicate a null round
ProcessingStatusInformationNullRound = "NULL_ROUND" // used by consensus task to indicate a null round
// TODO this could likely be a status of its own, but the indexer isn't currently suited for tasks to set their own status.
)
type ProcessingReport struct {
//lint:ignore U1000 tableName is a convention used by go-pg
tableName struct{} `pg:"visor_processing_reports"`
Height int64 `pg:",pk,use_zero"`
StateRoot string `pg:",pk,notnull"`
// Reporter is the name of the instance that is reporting the result
Reporter string `pg:",pk,notnull"`
// Task is the name of the sub task that generated the report
Task string `pg:",pk,notnull"`
StartedAt time.Time `pg:",pk,use_zero"`
CompletedAt time.Time `pg:",use_zero"`
Status string `pg:",notnull"`
StatusInformation string
ErrorsDetected interface{} `pg:",type:jsonb"`
}
func (p *ProcessingReport) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "visor_processing_reports"))
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()
metrics.RecordCount(ctx, metrics.PersistModel, 1)
return s.PersistModel(ctx, p)
}
type ProcessingReportList []*ProcessingReport
func (pl ProcessingReportList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
if len(pl) == 0 {
return nil
}
ctx, span := otel.Tracer("").Start(ctx, "ProcessingReportList.Persist")
if span.IsRecording() {
span.SetAttributes(attribute.Int("count", len(pl)))
}
defer span.End()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "visor_processing_reports"))
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()
metrics.RecordCount(ctx, metrics.PersistModel, len(pl))
return s.PersistModel(ctx, pl)
}