-
Notifications
You must be signed in to change notification settings - Fork 0
/
header_times_streamer.go
154 lines (121 loc) · 4.37 KB
/
header_times_streamer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package agent
import (
"regexp"
"strconv"
"sync"
"time"
"github.com/buildkite/agent/logger"
)
// If you change header parsing here make sure to change it in the
// buildkite.com frontend logic, too
var HeaderRegex = regexp.MustCompile("^(?:---|\\+\\+\\+|~~~)\\s(.+)?$")
var ANSIColorRegex = regexp.MustCompile(`\x1b\[([;\d]+)?[mK]`)
type HeaderTimesStreamer struct {
// The callback that will be called when a header time is ready for
// upload
UploadCallback func(int, int, map[string]string)
// The times that have found while scanning lines
times []string
timesMutex sync.Mutex
// Every time we get a new time, we increment the wait group, and
// decrement it after it has been uploaded.
uploadWaitGroup sync.WaitGroup
// Every time we go to scan a line, we increment the wait group, then
// decrement after it's finished scanning. That way when we stop the
// streamer, we can wait for all the lines to finish scanning first.
scanWaitGroup sync.WaitGroup
// A boolean to keep track if we're currently streaming header times
streaming bool
streamingMutex sync.Mutex
// We store the last index we uploaded to, so we don't have to keep
// uploading the same times
cursor int
}
func (h *HeaderTimesStreamer) Start() error {
h.streaming = true
go func() {
logger.Debug("[HeaderTimesStreamer] Streamer has started...")
for true {
// Break out of streaming if it's finished. We also
// need to aquire a read lock on the flag because it
// can be modified by other routines.
h.streamingMutex.Lock()
if !h.streaming {
break
}
h.streamingMutex.Unlock()
// Upload any pending header times
h.Upload()
// Sleep for a second and try upload some more later
time.Sleep(1 * time.Second)
}
logger.Debug("[HeaderTimesStreamer] Streamer has finished...")
}()
return nil
}
func (h *HeaderTimesStreamer) Scan(line string) {
// Keep track of how many line scans we need to do
h.scanWaitGroup.Add(1)
defer h.scanWaitGroup.Done()
if h.LineIsHeader(line) {
logger.Debug("[HeaderTimesStreamer] Found header %q", line)
// Aquire a lock on the times and then add the current time to
// our times slice.
h.timesMutex.Lock()
h.times = append(h.times, time.Now().UTC().Format(time.RFC3339Nano))
h.timesMutex.Unlock()
// Add the time to the wait group
h.uploadWaitGroup.Add(1)
}
}
func (h *HeaderTimesStreamer) Upload() {
// Store the current cursor value
c := h.cursor
// Grab only the times that we haven't uploaded yet. We need to aquire
// a lock since other routines may be adding to it.
h.timesMutex.Lock()
length := len(h.times)
times := h.times[h.cursor:length]
h.timesMutex.Unlock()
// Construct the payload to send to the server
payload := map[string]string{}
for index, time := range times {
payload[strconv.Itoa(h.cursor+index)] = time
}
// Save the cursor we're up to
h.cursor = length
// How many times are we uploading this time
timesToUpload := len(times)
// Do we even have some times to upload
if timesToUpload > 0 {
// Call our callback with the times for upload
logger.Debug("[HeaderTimesStreamer] Uploading header times %d..%d", c, length-1)
h.UploadCallback(c, length, payload)
logger.Debug("[HeaderTimesStreamer] Finished uploading header times %d..%d", c, length-1)
// Decrement the wait group for every time we've uploaded.
h.uploadWaitGroup.Add(timesToUpload * -1)
}
}
func (h *HeaderTimesStreamer) Stop() {
logger.Debug("[HeaderTimesStreamer] Waiting for all the lines to be scanned")
h.scanWaitGroup.Wait()
logger.Debug("[HeaderTimesStreamer] Waiting for all the header times to be uploaded")
h.uploadWaitGroup.Wait()
// Since we're modifying the waitGroup and the streaming flag, we need
// to aquire a write lock.
h.streamingMutex.Lock()
h.streaming = false
h.streamingMutex.Unlock()
}
func (h *HeaderTimesStreamer) LinePreProcessor(line string) string {
// Make sure all ANSI colors are removed from the string before we
// check to see if it's a header (sometimes a color escape sequence may
// be the first thing on the line, which will cause the regex to ignore
// it)
return ANSIColorRegex.ReplaceAllString(line, "")
}
func (h *HeaderTimesStreamer) LineIsHeader(line string) bool {
// To avoid running the regex over every single line, we'll first do a
// length check. Hopefully there are no heeaders over 500 characters!
return len(line) < 500 && HeaderRegex.MatchString(line)
}