Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Contract Pruning Metrics #770

Merged
merged 22 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5a157a2
metrics: add contract pruning metrics
peterjan Nov 28, 2023
daaaa08
Merge branch 'pj/contract-pruning' into pj/contract-pruning-metrics
peterjan Nov 29, 2023
d6cd437
Merge branch 'pj/contract-pruning' into pj/contract-pruning-metrics
peterjan Nov 29, 2023
3ace7b6
Merge branch 'pj/contract-pruning' into pj/contract-pruning-metrics
peterjan Nov 29, 2023
085665a
Merge branch 'pj/contract-pruning' into pj/contract-pruning-metrics
peterjan Nov 29, 2023
379230e
worker: add prunable
peterjan Nov 29, 2023
c02e1f0
autopilot: update alert
peterjan Nov 29, 2023
2d74e9c
contractor: add host version
peterjan Nov 29, 2023
f9ff83b
autopilot: add stop ctx
peterjan Nov 29, 2023
0aa1500
stores: remove err from metric
peterjan Nov 29, 2023
aaa3cda
autopilot: add contract size
peterjan Nov 29, 2023
9e3bcca
stores: update migration
peterjan Nov 29, 2023
83a8b35
Merge branch 'pj/contract-pruning' into pj/contract-pruning-metrics
peterjan Nov 30, 2023
28189b5
Merge branch 'pj/contract-pruning' into pj/contract-pruning-metrics
peterjan Nov 30, 2023
c759623
autopilot: cleanup PR
peterjan Nov 30, 2023
f43beae
testing: add integration test
peterjan Nov 30, 2023
1f27915
testing: fix lint
peterjan Nov 30, 2023
d56150d
Merge branch 'pj/contract-pruning' into pj/contract-pruning-metrics
peterjan Dec 1, 2023
d47d7e6
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Dec 1, 2023
499d619
Merge branch 'pj/contract-pruning' into pj/contract-pruning-metrics
peterjan Dec 4, 2023
c294b32
Merge branch 'pj/contract-pruning' into pj/contract-pruning-metrics
peterjan Dec 5, 2023
a0b5c81
testing: fix TestMetrics
peterjan Dec 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
ChurnDirAdded = "added"
ChurnDirRemoved = "removed"

MetricContractPrune = "contractprune"
MetricContractSet = "contractset"
MetricContractSetChurn = "churn"
MetricContract = "contract"
Expand Down Expand Up @@ -77,6 +78,24 @@ type (
HostKey types.PublicKey
}

ContractPruneMetric struct {
Timestamp time.Time `json:"timestamp"`

ContractID types.FileContractID `json:"contractID"`
HostKey types.PublicKey `json:"hostKey"`
HostVersion string `json:"hostVersion"`

Pruned uint64 `json:"pruned"`
Remaining uint64 `json:"remaining"`
Duration time.Duration `json:"duration"`
}

ContractPruneMetricsQueryOpts struct {
ContractID types.FileContractID
HostKey types.PublicKey
HostVersion string
}

WalletMetric struct {
Timestamp time.Time `json:"timestamp"`

Expand All @@ -89,6 +108,10 @@ type (
)

type (
ContractPruneMetricRequestPUT struct {
Metrics []ContractPruneMetric `json:"metrics"`
}

ContractSetChurnMetricRequestPUT struct {
Metrics []ContractSetChurnMetric `json:"metrics"`
}
Expand Down
75 changes: 38 additions & 37 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Bus interface {

// metrics
RecordContractSetChurnMetric(ctx context.Context, metrics ...api.ContractSetChurnMetric) error
RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error

// objects
ObjectsBySlabKey(ctx context.Context, bucket string, key object.EncryptionKey) (objects []api.ObjectMetadata, err error)
Expand Down Expand Up @@ -127,6 +128,37 @@ type state struct {
period uint64
}

// New initializes an Autopilot.
func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) {
ap := &Autopilot{
alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", id)),
id: id,
bus: bus,
logger: logger.Sugar().Named(api.DefaultAutopilotID),
workers: newWorkerPool(workers),

tickerDuration: heartbeat,
}
scanner, err := newScanner(
ap,
scannerBatchSize,
scannerNumThreads,
scannerScanInterval,
scannerTimeoutInterval,
scannerTimeoutMinTimeout,
)
if err != nil {
return nil, err
}

ap.s = scanner
ap.c = newContractor(ap, revisionSubmissionBuffer, revisionBroadcastInterval)
ap.m = newMigrator(ap, migrationHealthCutoff, migratorParallelSlabsPerWorker)
ap.a = newAccounts(ap, ap.bus, ap.bus, ap.workers, ap.logger, accountsRefillInterval)

return ap, nil
}

// Handler returns an HTTP handler that serves the autopilot api.
func (ap *Autopilot) Handler() http.Handler {
return jape.Mux(tracing.TracedRoutes(api.DefaultAutopilotID, map[string]jape.Handler{
Expand Down Expand Up @@ -289,6 +321,12 @@ func (ap *Autopilot) Shutdown(_ context.Context) error {
return nil
}

func (ap *Autopilot) StartTime() time.Time {
ap.startStopMu.Lock()
defer ap.startStopMu.Unlock()
return ap.startTime
}

func (ap *Autopilot) State() state {
ap.stateMu.Lock()
defer ap.stateMu.Unlock()
Expand All @@ -307,12 +345,6 @@ func (ap *Autopilot) Trigger(forceScan bool) bool {
}
}

func (ap *Autopilot) StartTime() time.Time {
ap.startStopMu.Lock()
defer ap.startStopMu.Unlock()
return ap.startTime
}

func (ap *Autopilot) Uptime() (dur time.Duration) {
ap.startStopMu.Lock()
defer ap.startStopMu.Unlock()
Expand Down Expand Up @@ -555,37 +587,6 @@ func (ap *Autopilot) triggerHandlerPOST(jc jape.Context) {
})
}

// New initializes an Autopilot.
func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) {
ap := &Autopilot{
alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", id)),
id: id,
bus: bus,
logger: logger.Sugar().Named(api.DefaultAutopilotID),
workers: newWorkerPool(workers),

tickerDuration: heartbeat,
}
scanner, err := newScanner(
ap,
scannerBatchSize,
scannerNumThreads,
scannerScanInterval,
scannerTimeoutInterval,
scannerTimeoutMinTimeout,
)
if err != nil {
return nil, err
}

ap.s = scanner
ap.c = newContractor(ap, revisionSubmissionBuffer, revisionBroadcastInterval)
ap.m = newMigrator(ap, migrationHealthCutoff, migratorParallelSlabsPerWorker)
ap.a = newAccounts(ap, ap.bus, ap.bus, ap.workers, ap.logger, accountsRefillInterval)

return ap, nil
}

func (ap *Autopilot) hostHandlerGET(jc jape.Context) {
var hostKey types.PublicKey
if jc.DecodeParam("hostKey", &hostKey) != nil {
Expand Down
48 changes: 43 additions & 5 deletions autopilot/contract_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var (

type (
pruneResult struct {
ts time.Time

fcid types.FileContractID
hk types.PublicKey
version string
Expand All @@ -33,6 +35,8 @@ type (

err error
}

pruneMetrics []api.ContractPruneMetric
)

func (pr pruneResult) String() string {
Expand All @@ -49,6 +53,14 @@ func (pr pruneResult) String() string {
return msg
}

func (pm pruneMetrics) String() string {
var total uint64
for _, m := range pm {
total += m.Pruned
}
return fmt.Sprintf("pruned %d (%s) from %v contracts", total, humanReadableSize(int(total)), len(pm))
}

func (pr pruneResult) toAlert() (id types.Hash256, alert *alerts.Alert) {
id = alertIDForContract(alertPruningID, pr.fcid)

Expand All @@ -63,6 +75,17 @@ func (pr pruneResult) toAlert() (id types.Hash256, alert *alerts.Alert) {
return
}

func (pr pruneResult) toMetric() api.ContractPruneMetric {
return api.ContractPruneMetric{
Timestamp: pr.ts,
ContractID: pr.fcid,
HostKey: pr.hk,
Pruned: pr.pruned,
Remaining: pr.remaining,
Duration: pr.duration,
}
}

func (c *contractor) fetchPrunableContracts() (prunable []api.ContractPrunableData, _ error) {
// use a sane timeout
ctx, cancel := context.WithTimeout(c.ap.stopCtx, time.Minute)
Expand Down Expand Up @@ -113,9 +136,14 @@ func (c *contractor) performContractPruning(wp *workerPool) {
// prune every contract individually, one at a time and for a maximum
// duration of 'timeoutPruneContract' to limit the amount of time we lock
// the contract as contracts on old hosts can take a long time to prune
var total uint64
var metrics pruneMetrics
wp.withWorker(func(w Worker) {
for _, contract := range prunable {
// return if we're stopped
if c.ap.isStopped() {
return
}

// prune contract
result := c.pruneContract(w, contract.ID)
if result.err != nil {
Expand All @@ -134,11 +162,19 @@ func (c *contractor) performContractPruning(wp *workerPool) {
cancel()

// handle metrics
total += result.pruned
metrics = append(metrics, result.toMetric())
}
})

c.logger.Infof("pruned %d (%s) from %v contracts", total, humanReadableSize(int(total)), len(prunable))
// record metrics
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
if err := c.ap.bus.RecordContractPruneMetric(ctx, metrics...); err != nil {
c.logger.Error(err)
}
cancel()

// log metrics
c.logger.Info(metrics)
}

func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneResult {
Expand All @@ -147,7 +183,7 @@ func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneRes
defer cancel()

// fetch the host
host, contract, err := c.hostForContract(ctx, fcid)
host, _, err := c.hostForContract(ctx, fcid)
if err != nil {
return pruneResult{fcid: fcid, err: err}
}
Expand All @@ -162,7 +198,9 @@ func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneRes
}

return pruneResult{
fcid: contract.ID,
ts: start,

fcid: fcid,
hk: host.PublicKey,
version: host.Settings.Version,

Expand Down
36 changes: 34 additions & 2 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ type (
MetricsStore interface {
ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]api.ContractSetMetric, error)

ContractPruneMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) ([]api.ContractPruneMetric, error)
RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error

ContractMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractMetricsQueryOpts) ([]api.ContractMetric, error)
RecordContractMetric(ctx context.Context, metrics ...api.ContractMetric) error

Expand Down Expand Up @@ -1995,11 +1998,24 @@ func (b *bus) webhookHandlerPost(jc jape.Context) {
}

func (b *bus) metricsHandlerPUT(jc jape.Context) {
jc.Custom((*interface{})(nil), nil)

key := jc.PathParam("key")
switch key {
case api.MetricContractPrune:
// TODO: jape hack - remove once jape can handle decoding multiple different request types
var req api.ContractPruneMetricRequestPUT
if err := json.NewDecoder(jc.Request.Body).Decode(&req); err != nil {
jc.Error(fmt.Errorf("couldn't decode request type (%T): %w", req, err), http.StatusBadRequest)
return
} else if jc.Check("failed to record contract prune metric", b.mtrcs.RecordContractPruneMetric(jc.Request.Context(), req.Metrics...)) != nil {
return
}
case api.MetricContractSetChurn:
// TODO: jape hack - remove once jape can handle decoding multiple different request types
var req api.ContractSetChurnMetricRequestPUT
if jc.Decode(&req) != nil {
if err := json.NewDecoder(jc.Request.Body).Decode(&req); err != nil {
jc.Error(fmt.Errorf("couldn't decode request type (%T): %w", req, err), http.StatusBadRequest)
return
} else if jc.Check("failed to record contract churn metric", b.mtrcs.RecordContractSetChurnMetric(jc.Request.Context(), req.Metrics...)) != nil {
return
Expand Down Expand Up @@ -2050,6 +2066,20 @@ func (b *bus) metricsHandlerGET(jc jape.Context) {
jc.Encode(metrics)
return
}
case api.MetricContractPrune:
var opts api.ContractPruneMetricsQueryOpts
if jc.DecodeForm("contractID", &opts.ContractID) != nil {
return
} else if jc.DecodeForm("hostKey", &opts.HostKey) != nil {
return
} else if jc.DecodeForm("hostVersion", &opts.HostVersion) != nil {
return
} else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract prune metrics", err) != nil {
return
} else {
jc.Encode(metrics)
return
}
case api.MetricContractSet:
var opts api.ContractSetMetricsQueryOpts
if jc.DecodeForm("name", &opts.Name) != nil {
Expand Down Expand Up @@ -2092,14 +2122,16 @@ func (b *bus) metrics(ctx context.Context, key string, start time.Time, n uint64
switch key {
case api.MetricContract:
return b.mtrcs.ContractMetrics(ctx, start, n, interval, opts.(api.ContractMetricsQueryOpts))
case api.MetricContractPrune:
return b.mtrcs.ContractPruneMetrics(ctx, start, n, interval, opts.(api.ContractPruneMetricsQueryOpts))
case api.MetricContractSet:
return b.mtrcs.ContractSetMetrics(ctx, start, n, interval, opts.(api.ContractSetMetricsQueryOpts))
case api.MetricContractSetChurn:
return b.mtrcs.ContractSetChurnMetrics(ctx, start, n, interval, opts.(api.ContractSetChurnMetricsQueryOpts))
case api.MetricWallet:
return b.mtrcs.WalletMetrics(ctx, start, n, interval, opts.(api.WalletMetricsQueryOpts))
}
return nil, nil
return nil, fmt.Errorf("unknown metric '%s'", key)
}

func (b *bus) multipartHandlerCreatePOST(jc jape.Context) {
Expand Down
Loading