-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
remoteread.go
127 lines (111 loc) · 2.98 KB
/
remoteread.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/cheggaaa/pb/v3"
)
type remoteReadProcessor struct {
filter remoteReadFilter
dst *vm.Importer
src *remoteread.Client
cc int
}
type remoteReadFilter struct {
timeStart *time.Time
timeEnd *time.Time
chunk string
}
func (rrp *remoteReadProcessor) run(ctx context.Context, silent, verbose bool) error {
rrp.dst.ResetStats()
if rrp.filter.timeEnd == nil {
t := time.Now().In(rrp.filter.timeStart.Location())
rrp.filter.timeEnd = &t
}
if rrp.cc < 1 {
rrp.cc = 1
}
ranges, err := stepper.SplitDateRange(*rrp.filter.timeStart, *rrp.filter.timeEnd, rrp.filter.chunk)
if err != nil {
return fmt.Errorf("failed to create date ranges for the given time filters: %v", err)
}
question := fmt.Sprintf("Selected time range %q - %q will be split into %d ranges according to %q step. Continue?",
rrp.filter.timeStart.String(), rrp.filter.timeEnd.String(), len(ranges), rrp.filter.chunk)
if !silent && !prompt(question) {
return nil
}
var bar *pb.ProgressBar
if !silent {
bar = barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing ranges"), len(ranges))
if err := barpool.Start(); err != nil {
return err
}
}
defer func() {
if !silent {
barpool.Stop()
}
log.Println("Import finished!")
log.Print(rrp.dst.Stats())
}()
rangeC := make(chan *remoteread.Filter)
errCh := make(chan error)
var wg sync.WaitGroup
wg.Add(rrp.cc)
for i := 0; i < rrp.cc; i++ {
go func() {
defer wg.Done()
for r := range rangeC {
if err := rrp.do(ctx, r); err != nil {
errCh <- fmt.Errorf("request failed for: %s", err)
return
}
if bar != nil {
bar.Increment()
}
}
}()
}
for _, r := range ranges {
select {
case infErr := <-errCh:
return fmt.Errorf("remote read error: %s", infErr)
case vmErr := <-rrp.dst.Errors():
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
case rangeC <- &remoteread.Filter{
StartTimestampMs: r[0].UnixMilli(),
EndTimestampMs: r[1].UnixMilli(),
}:
}
}
close(rangeC)
wg.Wait()
rrp.dst.Close()
close(errCh)
// drain import errors channel
for vmErr := range rrp.dst.Errors() {
if vmErr.Err != nil {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
}
}
for err := range errCh {
return fmt.Errorf("import process failed: %s", err)
}
return nil
}
func (rrp *remoteReadProcessor) do(ctx context.Context, filter *remoteread.Filter) error {
return rrp.src.Read(ctx, filter, func(series *vm.TimeSeries) error {
if err := rrp.dst.Input(series); err != nil {
return fmt.Errorf(
"failed to read data for time range start: %d, end: %d, %s",
filter.StartTimestampMs, filter.EndTimestampMs, err)
}
return nil
})
}