This repository has been archived by the owner on Jul 28, 2022. It is now read-only.
/
count-distinct.go
70 lines (57 loc) · 1.57 KB
/
count-distinct.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// Package parser handles parsing log files based on the SQL execution type.
package parser
import (
"sync"
"github.com/busbud/tidalwave/logger"
"github.com/busbud/tidalwave/sqlquery"
"github.com/tidwall/gjson"
)
func distinctCountParse(query *sqlquery.QueryParams, resultsChan chan<- map[string]int, logPath string, wg *sync.WaitGroup) {
defer wg.Done()
results := map[string]int{}
err := readLines(logPath, func(line *[]byte) {
if query.ProcessLine(line) {
res := gjson.GetBytes(*line, query.AggrPath)
if res.Type != 0 {
value := res.String()
results[value]++
}
}
})
if err != nil {
logger.Log.Fatal(err)
}
resultsChan <- results
}
// CountDistinct executes a COUNT(DISTINCT()) query over log results.
// SELECT COUNT(DISTINCT(line.cmd)) FROM testapp WHERE date > '2016-10-05'
func (tp *TidalwaveParser) CountDistinct() *map[string]int { //nolint:gocritic // Leave it alone.
logsLen := len(tp.LogPaths)
resultsChan := make(chan map[string]int, logsLen)
var wg sync.WaitGroup
wg.Add(logsLen + 1)
results := []map[string]int{}
coreLimit := make(chan bool, tp.MaxParallelism)
go func() {
for res := range resultsChan {
results = append(results, res)
<-coreLimit
if len(results) == logsLen {
wg.Done()
}
}
}()
for i := 0; i < logsLen; i++ {
go distinctCountParse(tp.Query, resultsChan, tp.LogPaths[i], &wg)
coreLimit <- true
}
wg.Wait()
mergedResults := map[string]int{}
for idx := range results {
for key, val := range results[idx] {
mergedResults[key] += val
}
}
results = nil // Manual GC
return &mergedResults
}